#!/bin/bash set -euo pipefail SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" source "$SCRIPT_DIR/kugetsu-config.sh" source "$SCRIPT_DIR/kugetsu-index.sh" source "$SCRIPT_DIR/kugetsu-worktree.sh" source "$SCRIPT_DIR/kugetsu-log.sh" load_agent_env "pm-agent" acquire_lock() { local issue_ref="$1" local lock_file="$QUEUE_DIR/locks/$(echo "$issue_ref" | sed 's/[\/:]/-/g' | sed 's/#/-/').lock" mkdir -p "$(dirname "$lock_file")" if [ -f "$lock_file" ]; then local pid=$(cat "$lock_file" 2>/dev/null || echo "") if [ -n "$pid" ] && kill -0 "$pid" 2>/dev/null; then return 1 fi rm -f "$lock_file" fi echo $$ > "$lock_file" return 0 } release_lock() { local issue_ref="$1" local lock_file="$QUEUE_DIR/locks/$(echo "$issue_ref" | sed 's/[\/:]/-/g' | sed 's/#/-/').lock" rm -f "$lock_file" } check_task_completion() { local item="$1" local queue_id=$(basename "$item" .json) local state=$(python3 -c "import json; print(json.load(open('$item')).get('state', ''))" 2>/dev/null) [ "$state" = "notified" ] || return 0 local session_id=$(python3 -c "import json; print(json.load(open('$item')).get('opencode_session_id', ''))" 2>/dev/null) local issue_ref=$(python3 -c "import json; print(json.load(open('$item')).get('issue_ref', ''))" 2>/dev/null) local pid=$(python3 -c "import json; print(json.load(open('$item')).get('pid', ''))" 2>/dev/null) local notified_at=$(python3 -c "import json; print(json.load(open('$item')).get('notified_at', ''))" 2>/dev/null) local timed_out=false if [ -n "$notified_at" ]; then local notified_epoch=$(date -d "$notified_at" +%s 2>/dev/null || echo "0") local now_epoch=$(date +%s) local hours_elapsed=$(( (now_epoch - notified_epoch) / 3600 )) if [ "$hours_elapsed" -ge "${TASK_TIMEOUT_HOURS:-1}" ]; then timed_out=true log_warn "queue-daemon" "Task $queue_id ($issue_ref) timed out after ${hours_elapsed}h" fi fi if [ "$timed_out" = true ]; then if [ -n "$pid" ] && [ "$pid" != "None" ]; then kill "$pid" 2>/dev/null || true fi if [ -n "$session_id" ]; then opencode session stop "$session_id" 2>/dev/null || true fi update_queue_item_state "$queue_id" "error" log_error "queue-daemon" "Task $queue_id ($issue_ref) marked error — timeout after ${hours_elapsed}h" release_lock "$issue_ref" return fi if [ -n "$pid" ] && [ "$pid" != "None" ]; then if ! kill -0 "$pid" 2>/dev/null; then local worktree_path=$(issue_ref_to_worktree_path "$issue_ref" "$WORKTREES_DIR") local has_commits=false if [ -d "$worktree_path" ] && [ -d "$worktree_path/.git" ]; then if [ -n "$(git -C "$worktree_path" log --oneline origin/main..HEAD 2>/dev/null)" ]; then has_commits=true fi fi if [ "$has_commits" = true ]; then update_queue_item_state "$queue_id" "completed" echo "Task $queue_id ($issue_ref) completed — new commits found" else update_queue_item_state "$queue_id" "error" echo "Task $queue_id ($issue_ref) marked error — no commits found after session ended" fi release_lock "$issue_ref" fi else if [ -n "$session_id" ] && ! opencode session list 2>/dev/null | grep -q "$session_id"; then local worktree_path=$(issue_ref_to_worktree_path "$issue_ref" "$WORKTREES_DIR") local has_commits=false if [ -d "$worktree_path" ] && [ -d "$worktree_path/.git" ]; then if [ -n "$(git -C "$worktree_path" log --oneline origin/main..HEAD 2>/dev/null)" ]; then has_commits=true fi fi if [ "$has_commits" = true ]; then update_queue_item_state "$queue_id" "completed" echo "Task $queue_id ($issue_ref) completed — new commits found" else update_queue_item_state "$queue_id" "error" echo "Task $queue_id ($issue_ref) marked error — no commits found after session ended" fi release_lock "$issue_ref" fi fi } get_session_id_for_issue() { local issue_ref="$1" local session_file=$(issue_ref_to_filename "$issue_ref") local session_path="$SESSIONS_DIR/$session_file" if [ -f "$session_path" ]; then python3 -c "import json; print(json.load(open('$session_path')).get('opencode_session_id', ''))" 2>/dev/null || echo "" else echo "" fi } process_task() { local item="$1" local queue_id=$(basename "$item" .json) local issue_ref=$(python3 -c "import json; print(json.load(open('$item')).get('issue_ref', ''))" 2>/dev/null) local message=$(python3 -c "import json; print(json.load(open('$item')).get('message', ''))" 2>/dev/null) if ! acquire_lock "$issue_ref"; then echo "Task $queue_id ($issue_ref) skipped — another process is handling it" return fi source "$SCRIPT_DIR/kugetsu-session.sh" log_file="$LOGS_DIR/delegate-$(date +%s).log" if cmd_continue "$issue_ref" "$message" >> "$log_file" 2>&1; then sleep 1 local session_id=$(get_session_id_for_issue "$issue_ref") update_queue_item_state "$queue_id" "notified" "$session_id" "" echo "Task $queue_id continued for $issue_ref" else update_queue_item_state "$queue_id" "error" echo "Task $queue_id ($issue_ref) failed to continue" fi release_lock "$issue_ref" } while true; do if [ -d "$QUEUE_ITEMS_DIR" ]; then for item in "$QUEUE_ITEMS_DIR"/*.json; do [ -f "$item" ] || continue check_task_completion "$item" done for item in "$QUEUE_ITEMS_DIR"/*.json; do [ -f "$item" ] || continue state=$(python3 -c "import json; print(json.load(open('$item')).get('state', ''))" 2>/dev/null) if [ "$state" = "pending" ]; then process_task "$item" fi done fi sleep "${QUEUE_DAEMON_INTERVAL_MINUTES:-5}m" done