Files
kugetsu/skills/kugetsu/scripts/kugetsu-queue-daemon.sh
shokollm d240000088 refactor(session): make cmd_continue idempotent with ensure_* functions
- Add ensure_worktree() - creates worktree if missing, returns status
- Add ensure_session() - creates session if missing, handles inconsistent states
- Add fork_agent() - extracted agent forking logic
- Refactor cmd_continue() to use ensure_* functions (idempotent)
- Make cmd_start() a thin wrapper calling cmd_continue()
- Simplify daemon to always call cmd_continue (no existence check)

This makes cmd_continue truly idempotent - it will:
- Continue existing session if it exists
- Create session and worktree if they don't exist
- Clean and recreate if state is inconsistent

Closes #168
2026-04-07 22:25:53 +00:00

142 lines
5.1 KiB
Bash

#!/bin/bash
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
source "$SCRIPT_DIR/kugetsu-config.sh"
source "$SCRIPT_DIR/kugetsu-index.sh"
source "$SCRIPT_DIR/kugetsu-worktree.sh"
source "$SCRIPT_DIR/kugetsu-log.sh"
load_agent_env "pm-agent"
acquire_lock() {
local issue_ref="$1"
local lock_file="$QUEUE_DIR/locks/$(echo "$issue_ref" | sed 's/[\/:]/-/g' | sed 's/#/-/').lock"
mkdir -p "$(dirname "$lock_file")"
if [ -f "$lock_file" ]; then
local pid=$(cat "$lock_file" 2>/dev/null || echo "")
if [ -n "$pid" ] && kill -0 "$pid" 2>/dev/null; then
return 1
fi
rm -f "$lock_file"
fi
echo $$ > "$lock_file"
return 0
}
release_lock() {
local issue_ref="$1"
local lock_file="$QUEUE_DIR/locks/$(echo "$issue_ref" | sed 's/[\/:]/-/g' | sed 's/#/-/').lock"
rm -f "$lock_file"
}
check_task_completion() {
local item="$1"
local queue_id=$(basename "$item" .json)
local state=$(python3 -c "import json; print(json.load(open('$item')).get('state', ''))" 2>/dev/null)
[ "$state" = "notified" ] || return 0
local session_id=$(python3 -c "import json; print(json.load(open('$item')).get('opencode_session_id', ''))" 2>/dev/null)
local issue_ref=$(python3 -c "import json; print(json.load(open('$item')).get('issue_ref', ''))" 2>/dev/null)
local pid=$(python3 -c "import json; print(json.load(open('$item')).get('pid', ''))" 2>/dev/null)
if [ -n "$pid" ] && [ "$pid" != "None" ]; then
if ! kill -0 "$pid" 2>/dev/null; then
local worktree_path=$(issue_ref_to_worktree_path "$issue_ref" "$WORKTREES_DIR")
local has_commits=false
if [ -d "$worktree_path" ] && [ -d "$worktree_path/.git" ]; then
if [ -n "$(git -C "$worktree_path" log --oneline origin/main..HEAD 2>/dev/null)" ]; then
has_commits=true
fi
fi
if [ "$has_commits" = true ]; then
update_queue_item_state "$queue_id" "completed"
echo "Task $queue_id ($issue_ref) completed — new commits found"
else
update_queue_item_state "$queue_id" "error"
echo "Task $queue_id ($issue_ref) marked error — no commits found after session ended"
fi
release_lock "$issue_ref"
fi
else
if [ -n "$session_id" ] && ! opencode session list 2>/dev/null | grep -q "$session_id"; then
local worktree_path=$(issue_ref_to_worktree_path "$issue_ref" "$WORKTREES_DIR")
local has_commits=false
if [ -d "$worktree_path" ] && [ -d "$worktree_path/.git" ]; then
if [ -n "$(git -C "$worktree_path" log --oneline origin/main..HEAD 2>/dev/null)" ]; then
has_commits=true
fi
fi
if [ "$has_commits" = true ]; then
update_queue_item_state "$queue_id" "completed"
echo "Task $queue_id ($issue_ref) completed — new commits found"
else
update_queue_item_state "$queue_id" "error"
echo "Task $queue_id ($issue_ref) marked error — no commits found after session ended"
fi
release_lock "$issue_ref"
fi
fi
}
get_session_id_for_issue() {
local issue_ref="$1"
local session_file=$(issue_ref_to_filename "$issue_ref")
local session_path="$SESSIONS_DIR/$session_file"
if [ -f "$session_path" ]; then
python3 -c "import json; print(json.load(open('$session_path')).get('opencode_session_id', ''))" 2>/dev/null || echo ""
else
echo ""
fi
}
process_task() {
local item="$1"
local queue_id=$(basename "$item" .json)
local issue_ref=$(python3 -c "import json; print(json.load(open('$item')).get('issue_ref', ''))" 2>/dev/null)
local message=$(python3 -c "import json; print(json.load(open('$item')).get('message', ''))" 2>/dev/null)
if ! acquire_lock "$issue_ref"; then
echo "Task $queue_id ($issue_ref) skipped — another process is handling it"
return
fi
source "$SCRIPT_DIR/kugetsu-session.sh"
log_file="$LOGS_DIR/delegate-$(date +%s).log"
if cmd_continue "$issue_ref" "$message" >> "$log_file" 2>&1; then
sleep 1
local session_id=$(get_session_id_for_issue "$issue_ref")
update_queue_item_state "$queue_id" "notified" "$session_id" ""
echo "Task $queue_id continued for $issue_ref"
else
update_queue_item_state "$queue_id" "error"
echo "Task $queue_id ($issue_ref) failed to continue"
fi
release_lock "$issue_ref"
}
while true; do
if [ -d "$QUEUE_ITEMS_DIR" ]; then
for item in "$QUEUE_ITEMS_DIR"/*.json; do
[ -f "$item" ] || continue
check_task_completion "$item"
done
for item in "$QUEUE_ITEMS_DIR"/*.json; do
[ -f "$item" ] || continue
state=$(python3 -c "import json; print(json.load(open('$item')).get('state', ''))" 2>/dev/null)
if [ "$state" = "pending" ]; then
process_task "$item"
fi
done
fi
sleep "${QUEUE_DAEMON_INTERVAL_MINUTES:-5}m"
done