fix(kugetsu): queue daemon improvements - locking, error handling, cmd_delegate enqueue #164
@@ -8,86 +8,148 @@ source "$SCRIPT_DIR/kugetsu-index.sh"
|
||||
source "$SCRIPT_DIR/kugetsu-worktree.sh"
|
||||
source "$SCRIPT_DIR/kugetsu-log.sh"
|
||||
|
||||
# Load GITEA_TOKEN from default.env
|
||||
if [ -f "$HOME/.kugetsu/env/default.env" ]; then
|
||||
source "$HOME/.kugetsu/env/default.env"
|
||||
fi
|
||||
load_agent_env "pm-agent"
|
||||
|
||||
acquire_lock() {
|
||||
local issue_ref="$1"
|
||||
local lock_file="$QUEUE_DIR/locks/$(echo "$issue_ref" | sed 's/[\/:]/-/g' | sed 's/#/-/').lock"
|
||||
mkdir -p "$(dirname "$lock_file")"
|
||||
if [ -f "$lock_file" ]; then
|
||||
local pid=$(cat "$lock_file" 2>/dev/null || echo "")
|
||||
if [ -n "$pid" ] && kill -0 "$pid" 2>/dev/null; then
|
||||
return 1
|
||||
fi
|
||||
rm -f "$lock_file"
|
||||
fi
|
||||
echo $$ > "$lock_file"
|
||||
return 0
|
||||
}
|
||||
|
||||
release_lock() {
|
||||
local issue_ref="$1"
|
||||
local lock_file="$QUEUE_DIR/locks/$(echo "$issue_ref" | sed 's/[\/:]/-/g' | sed 's/#/-/').lock"
|
||||
rm -f "$lock_file"
|
||||
}
|
||||
|
||||
# Check if a notified task has completed (forked session ended or has new commits)
|
||||
check_task_completion() {
|
||||
local item="$1"
|
||||
local queue_id=$(basename "$item" .json)
|
||||
local state=$(python3 -c "import json; print(json.load(open('$item')).get('state', ''))" 2>/dev/null)
|
||||
|
||||
|
||||
[ "$state" = "notified" ] || return 0
|
||||
|
||||
# Use opencode_session_id (the forked session, not the parent pm_session)
|
||||
|
||||
local session_id=$(python3 -c "import json; print(json.load(open('$item')).get('opencode_session_id', ''))" 2>/dev/null)
|
||||
local issue_ref=$(python3 -c "import json; print(json.load(open('$item')).get('issue_ref', ''))" 2>/dev/null)
|
||||
|
||||
# If no session tracked, skip
|
||||
[ -n "$session_id" ] || return 0
|
||||
|
||||
# Check if forked session still exists in opencode
|
||||
if ! opencode session list 2>/dev/null | grep -q "$session_id"; then
|
||||
# Forked session ended — check if work was done
|
||||
local worktree_path=$(issue_ref_to_worktree_path "$issue_ref" "$HOME/.kugetsu-worktrees")
|
||||
local has_commits=false
|
||||
|
||||
if [ -d "$worktree_path" ] && [ -d "$worktree_path/.git" ]; then
|
||||
# Check if worktree has new commits beyond origin/main
|
||||
if [ -n "$(git -C "$worktree_path" log --oneline origin/main..HEAD 2>/dev/null)" ]; then
|
||||
has_commits=true
|
||||
local pid=$(python3 -c "import json; print(json.load(open('$item')).get('pid', ''))" 2>/dev/null)
|
||||
|
||||
if [ -n "$pid" ] && [ "$pid" != "None" ]; then
|
||||
if ! kill -0 "$pid" 2>/dev/null; then
|
||||
local worktree_path=$(issue_ref_to_worktree_path "$issue_ref" "$HOME/.kugetsu-worktrees")
|
||||
local has_commits=false
|
||||
|
||||
if [ -d "$worktree_path" ] && [ -d "$worktree_path/.git" ]; then
|
||||
if [ -n "$(git -C "$worktree_path" log --oneline origin/main..HEAD 2>/dev/null)" ]; then
|
||||
has_commits=true
|
||||
fi
|
||||
fi
|
||||
|
||||
if [ "$has_commits" = true ]; then
|
||||
update_queue_item_state "$queue_id" "completed"
|
||||
echo "Task $queue_id ($issue_ref) completed — new commits found"
|
||||
else
|
||||
update_queue_item_state "$queue_id" "error"
|
||||
echo "Task $queue_id ($issue_ref) marked error — no commits found after session ended"
|
||||
fi
|
||||
release_lock "$issue_ref"
|
||||
fi
|
||||
|
||||
if [ "$has_commits" = true ]; then
|
||||
update_queue_item_state "$queue_id" "completed"
|
||||
echo "Task $queue_id ($issue_ref) completed — new commits found"
|
||||
else
|
||||
update_queue_item_state "$queue_id" "error"
|
||||
echo "Task $queue_id ($issue_ref) marked error — no commits found after session ended"
|
||||
else
|
||||
if [ -n "$session_id" ] && ! opencode session list 2>/dev/null | grep -q "$session_id"; then
|
||||
local worktree_path=$(issue_ref_to_worktree_path "$issue_ref" "$HOME/.kugetsu-worktrees")
|
||||
local has_commits=false
|
||||
|
||||
if [ -d "$worktree_path" ] && [ -d "$worktree_path/.git" ]; then
|
||||
if [ -n "$(git -C "$worktree_path" log --oneline origin/main..HEAD 2>/dev/null)" ]; then
|
||||
has_commits=true
|
||||
fi
|
||||
fi
|
||||
|
||||
if [ "$has_commits" = true ]; then
|
||||
update_queue_item_state "$queue_id" "completed"
|
||||
echo "Task $queue_id ($issue_ref) completed — new commits found"
|
||||
else
|
||||
update_queue_item_state "$queue_id" "error"
|
||||
echo "Task $queue_id ($issue_ref) marked error — no commits found after session ended"
|
||||
fi
|
||||
release_lock "$issue_ref"
|
||||
fi
|
||||
fi
|
||||
}
|
||||
|
||||
get_session_id_for_issue() {
|
||||
local issue_ref="$1"
|
||||
local session_file=$(issue_ref_to_filename "$issue_ref")
|
||||
local session_path="$SESSIONS_DIR/$session_file"
|
||||
if [ -f "$session_path" ]; then
|
||||
python3 -c "import json; print(json.load(open('$session_path')).get('opencode_session_id', ''))" 2>/dev/null || echo ""
|
||||
else
|
||||
echo ""
|
||||
fi
|
||||
}
|
||||
|
||||
process_task() {
|
||||
local item="$1"
|
||||
local queue_id=$(basename "$item" .json)
|
||||
local issue_ref=$(python3 -c "import json; print(json.load(open('$item')).get('issue_ref', ''))" 2>/dev/null)
|
||||
local message=$(python3 -c "import json; print(json.load(open('$item')).get('message', ''))" 2>/dev/null)
|
||||
|
||||
if ! acquire_lock "$issue_ref"; then
|
||||
echo "Task $queue_id ($issue_ref) skipped — another process is handling it"
|
||||
return
|
||||
fi
|
||||
|
||||
source "$SCRIPT_DIR/kugetsu-session.sh"
|
||||
|
||||
if worktree_exists "$issue_ref" "$HOME/.kugetsu-worktrees" || [ -f "$SESSIONS_DIR/$(issue_ref_to_filename "$issue_ref").json" ]; then
|
||||
log_file="$LOGS_DIR/delegate-$(date +%s).log"
|
||||
if cmd_continue "$issue_ref" "$message" >> "$log_file" 2>&1; then
|
||||
sleep 1
|
||||
local session_id=$(get_session_id_for_issue "$issue_ref")
|
||||
update_queue_item_state "$queue_id" "notified" "$session_id" ""
|
||||
echo "Task $queue_id continued for $issue_ref"
|
||||
else
|
||||
update_queue_item_state "$queue_id" "error"
|
||||
echo "Task $queue_id ($issue_ref) failed to continue"
|
||||
fi
|
||||
else
|
||||
log_file="$LOGS_DIR/delegate-$(date +%s).log"
|
||||
if cmd_start "$issue_ref" "$message" >> "$log_file" 2>&1; then
|
||||
sleep 1
|
||||
local session_id=$(get_session_id_for_issue "$issue_ref")
|
||||
update_queue_item_state "$queue_id" "notified" "$session_id" ""
|
||||
echo "Task $queue_id started for $issue_ref"
|
||||
else
|
||||
update_queue_item_state "$queue_id" "error"
|
||||
echo "Task $queue_id ($issue_ref) failed to start"
|
||||
fi
|
||||
fi
|
||||
|
||||
release_lock "$issue_ref"
|
||||
}
|
||||
|
||||
while true; do
|
||||
# Check completion of notified tasks
|
||||
if [ -d "$QUEUE_ITEMS_DIR" ]; then
|
||||
for item in "$QUEUE_ITEMS_DIR"/*.json; do
|
||||
[ -f "$item" ] || continue
|
||||
check_task_completion "$item"
|
||||
done
|
||||
|
||||
# Process pending tasks
|
||||
|
||||
for item in "$QUEUE_ITEMS_DIR"/*.json; do
|
||||
[ -f "$item" ] || continue
|
||||
state=$(python3 -c "import json; print(json.load(open('$item')).get('state', ''))" 2>/dev/null)
|
||||
if [ "$state" = "pending" ]; then
|
||||
queue_id=$(basename "$item" .json)
|
||||
issue_ref=$(python3 -c "import json; print(json.load(open('$item')).get('issue_ref', ''))" 2>/dev/null)
|
||||
message=$(python3 -c "import json; print(json.load(open('$item')).get('message', ''))" 2>/dev/null)
|
||||
|
||||
# Source session management and use cmd_start/cmd_continue
|
||||
source "$SCRIPT_DIR/kugetsu-session.sh"
|
||||
|
||||
if worktree_exists "$issue_ref" "$HOME/.kugetsu-worktrees" || [ -f "$SESSIONS_DIR/$(issue_ref_to_filename "$issue_ref").json" ]; then
|
||||
# Continue existing session
|
||||
log_file="$LOGS_DIR/delegate-$(date +%s).log"
|
||||
cmd_continue "$issue_ref" "$message" >> "$log_file" 2>&1 &
|
||||
pid=$!
|
||||
update_queue_item_state "$queue_id" "notified" "" "$pid"
|
||||
echo "Task $queue_id continued for $issue_ref"
|
||||
else
|
||||
# Start new session
|
||||
log_file="$LOGS_DIR/delegate-$(date +%s).log"
|
||||
cmd_start "$issue_ref" "$message" >> "$log_file" 2>&1 &
|
||||
pid=$!
|
||||
update_queue_item_state "$queue_id" "notified" "" "$pid"
|
||||
echo "Task $queue_id started for $issue_ref"
|
||||
fi
|
||||
process_task "$item"
|
||||
fi
|
||||
done
|
||||
fi
|
||||
sleep "${QUEUE_DAEMON_INTERVAL_MINUTES:-5}m"
|
||||
done
|
||||
done
|
||||
Reference in New Issue
Block a user