feat(queue): add queue system with background daemon
Implements #134 - Queue system with background daemon. ## Changes ### Configuration - QUEUE_DIR, QUEUE_ITEMS_DIR for queue storage - QUEUE_DAEMON_PID_FILE, LOCK_FILE, LOG_FILE for daemon management - QUEUE_DAEMON_INTERVAL_MINUTES (default: 5) - QUEUE_DAEMON_BATCH_SIZE (default: 2) - QUEUE_CLEANUP_AGE_DAYS (default: 7) ### Queue System - File-based queue at ~/.kugetsu/queue/items/ - One JSON file per queue item - States: pending, notified, completed, error ### New Commands - kugetsu queue [list|stats|clear] - View queue status - kugetsu queue enqueue <issue-ref> <message> - Manually enqueue - kugetsu queue-daemon [start|stop|restart|status|logs] - Daemon management ### Behavior Change - kugetsu delegate now always enqueues (fire-and-forget) - Queue daemon polls queue and invokes PM when slots available ### Queue Item Format ```json { "id": "q_xxx", "issue_ref": "github.com/user/repo#123", "message": "task description", "state": "pending", "pending_since": "...", "notified_at": null, "completed_at": null, "error": null } ``` Closes #134
This commit is contained in:
@@ -17,6 +17,15 @@ CONTEXT_DIR="${CONTEXT_DIR:-$KUGETSU_DIR/context}"
|
||||
ENABLE_CONTEXT_DUMP="${ENABLE_CONTEXT_DUMP:-true}"
|
||||
WORKTREE_CHECK_PR_STATUS="${WORKTREE_CHECK_PR_STATUS:-true}"
|
||||
|
||||
QUEUE_DIR="${QUEUE_DIR:-$KUGETSU_DIR/queue}"
|
||||
QUEUE_ITEMS_DIR="${QUEUE_ITEMS_DIR:-$QUEUE_DIR/items}"
|
||||
QUEUE_DAEMON_PID_FILE="${QUEUE_DAEMON_PID_FILE:-$QUEUE_DIR/daemon.pid}"
|
||||
QUEUE_DAEMON_LOCK_FILE="${QUEUE_DAEMON_LOCK_FILE:-$QUEUE_DIR/daemon.lock}"
|
||||
QUEUE_DAEMON_LOG_FILE="${QUEUE_DAEMON_LOG_FILE:-$QUEUE_DIR/daemon.log}"
|
||||
QUEUE_DAEMON_INTERVAL_MINUTES="${QUEUE_DAEMON_INTERVAL_MINUTES:-5}"
|
||||
QUEUE_DAEMON_BATCH_SIZE="${QUEUE_DAEMON_BATCH_SIZE:-2}"
|
||||
QUEUE_CLEANUP_AGE_DAYS="${QUEUE_CLEANUP_AGE_DAYS:-7}"
|
||||
|
||||
# Load user config overrides (~/.kugetsu/config)
|
||||
if [ -f "$KUGETSU_DIR/config" ]; then
|
||||
source "$KUGETSU_DIR/config"
|
||||
@@ -82,6 +91,9 @@ Usage:
|
||||
kugetsu destroy --base [-y] Delete base session
|
||||
kugetsu set-pr <issue-ref> <pr-url> Set PR URL for session (for PR tracking)
|
||||
kugetsu context <issue-ref> Show context for issue
|
||||
kugetsu queue [list|stats|clear] Show queue status or statistics
|
||||
kugetsu queue enqueue <issue-ref> <message> Enqueue a task (normally via delegate)
|
||||
kugetsu queue-daemon [start|stop|restart|status|logs] Manage queue daemon
|
||||
kugetsu help Show this help
|
||||
|
||||
Issue Ref Format:
|
||||
@@ -477,6 +489,142 @@ except Exception as e:
|
||||
PYEOF
|
||||
}
|
||||
|
||||
ensure_queue_dirs() {
|
||||
mkdir -p "$QUEUE_ITEMS_DIR"
|
||||
}
|
||||
|
||||
generate_queue_id() {
|
||||
echo "q_$(date +%s)_$$_$RANDOM"
|
||||
}
|
||||
|
||||
enqueue_task() {
|
||||
local issue_ref="$1"
|
||||
local message="$2"
|
||||
|
||||
if [ -z "$issue_ref" ] || [ -z "$message" ]; then
|
||||
echo "Error: enqueue_task requires <issue-ref> and <message>" >&2
|
||||
return 1
|
||||
fi
|
||||
|
||||
validate_issue_ref "$issue_ref"
|
||||
ensure_queue_dirs
|
||||
|
||||
local queue_id=$(generate_queue_id)
|
||||
local pending_since=$(date -Iseconds)
|
||||
|
||||
python3 << PYEOF
|
||||
import json
|
||||
|
||||
queue_item = {
|
||||
"id": "$queue_id",
|
||||
"issue_ref": "$issue_ref",
|
||||
"message": """$message""",
|
||||
"state": "pending",
|
||||
"pending_since": "$pending_since",
|
||||
"notified_at": None,
|
||||
"completed_at": None,
|
||||
"error": None
|
||||
}
|
||||
|
||||
with open("$QUEUE_ITEMS_DIR/${queue_id}.json", "w") as f:
|
||||
json.dump(queue_item, f, indent=2)
|
||||
|
||||
print(f"Enqueued: $queue_id")
|
||||
PYEOF
|
||||
}
|
||||
|
||||
get_pending_tasks() {
|
||||
local limit="${1:-10}"
|
||||
|
||||
if [ ! -d "$QUEUE_ITEMS_DIR" ]; then
|
||||
echo "[]"
|
||||
return
|
||||
fi
|
||||
|
||||
find "$QUEUE_ITEMS_DIR" -name "*.json" -type f 2>/dev/null | while read -r file; do
|
||||
local state=$(python3 -c "import json; print(json.load(open('$file')).get('state', ''))" 2>/dev/null || echo "")
|
||||
if [ "$state" = "pending" ]; then
|
||||
cat "$file"
|
||||
fi
|
||||
done | head -"$limit"
|
||||
}
|
||||
|
||||
get_queue_stats() {
|
||||
local total=0
|
||||
local pending=0
|
||||
local notified=0
|
||||
local completed=0
|
||||
local error=0
|
||||
|
||||
if [ -d "$QUEUE_ITEMS_DIR" ]; then
|
||||
for file in "$QUEUE_ITEMS_DIR"/*.json; do
|
||||
[ -f "$file" ] || continue
|
||||
total=$((total + 1))
|
||||
local state=$(python3 -c "import json; print(json.load(open('$file')).get('state', ''))" 2>/dev/null || echo "")
|
||||
case "$state" in
|
||||
pending) pending=$((pending + 1)) ;;
|
||||
notified) notified=$((notified + 1)) ;;
|
||||
completed) completed=$((completed + 1)) ;;
|
||||
error) error=$((error + 1)) ;;
|
||||
esac
|
||||
done
|
||||
fi
|
||||
|
||||
echo "{\"total\": $total, \"pending\": $pending, \"notified\": $notified, \"completed\": $completed, \"error\": $error}"
|
||||
}
|
||||
|
||||
update_queue_item_state() {
|
||||
local queue_id="$1"
|
||||
local new_state="$2"
|
||||
|
||||
local item_file="$QUEUE_ITEMS_DIR/${queue_id}.json"
|
||||
if [ ! -f "$item_file" ]; then
|
||||
echo "Error: Queue item not found: $queue_id" >&2
|
||||
return 1
|
||||
fi
|
||||
|
||||
python3 << PYEOF
|
||||
import json
|
||||
from datetime import datetime
|
||||
|
||||
item_file = "$item_file"
|
||||
new_state = "$new_state"
|
||||
|
||||
with open(item_file, 'r') as f:
|
||||
item = json.load(f)
|
||||
|
||||
item['state'] = new_state
|
||||
|
||||
if new_state == "notified":
|
||||
item['notified_at'] = datetime.now().isoformat() + "Z"
|
||||
elif new_state == "completed":
|
||||
item['completed_at'] = datetime.now().isoformat() + "Z"
|
||||
elif new_state == "error":
|
||||
item['error'] = datetime.now().isoformat() + "Z"
|
||||
|
||||
with open(item_file, 'w') as f:
|
||||
json.dump(item, f, indent=2)
|
||||
|
||||
print(f"Updated $queue_id to state: $new_state")
|
||||
PYEOF
|
||||
}
|
||||
|
||||
cleanup_old_queue_items() {
|
||||
local days="${QUEUE_CLEANUP_AGE_DAYS:-7}"
|
||||
|
||||
if [ ! -d "$QUEUE_ITEMS_DIR" ]; then
|
||||
return
|
||||
fi
|
||||
|
||||
find "$QUEUE_ITEMS_DIR" -name "*.json" -type f -mtime "+$days" 2>/dev/null | while read -r file; do
|
||||
local state=$(python3 -c "import json; print(json.load(open('$file')).get('state', ''))" 2>/dev/null || echo "")
|
||||
if [ "$state" = "completed" ] || [ "$state" = "error" ]; then
|
||||
rm -f "$file"
|
||||
echo "Cleaned up: $(basename "$file")"
|
||||
fi
|
||||
done
|
||||
}
|
||||
|
||||
update_session_pr_url() {
|
||||
local issue_ref="$1"
|
||||
local pr_url="$2"
|
||||
@@ -984,9 +1132,205 @@ find_sessions_by_issue_number() {
|
||||
echo "$results"
|
||||
}
|
||||
|
||||
cmd_queue() {
|
||||
local action="${1:-list}"
|
||||
shift
|
||||
|
||||
case "$action" in
|
||||
list)
|
||||
ensure_queue_dirs
|
||||
local stats=$(get_queue_stats)
|
||||
echo "Queue Statistics:"
|
||||
echo "$stats" | python3 -c "import json, sys; d=json.load(sys.stdin); print(f\" Total: {d['total']}\n Pending: {d['pending']}\n Notified: {d['notified']}\n Completed: {d['completed']}\n Error: {d['error']}\")"
|
||||
echo ""
|
||||
echo "Pending tasks:"
|
||||
local count=0
|
||||
for item in "$QUEUE_ITEMS_DIR"/*.json; do
|
||||
[ -f "$item" ] || continue
|
||||
local state=$(python3 -c "import json; print(json.load(open('$item')).get('state', '')" 2>/dev/null)
|
||||
if [ "$state" = "pending" ]; then
|
||||
count=$((count + 1))
|
||||
python3 -c "import json; d=json.load(open('$item')); print(f\" [{d['id']}] {d['issue_ref']}: {d['message'][:50]}...\n pending since: {d['pending_since']}\")" 2>/dev/null
|
||||
fi
|
||||
done
|
||||
if [ $count -eq 0 ]; then
|
||||
echo " (none)"
|
||||
fi
|
||||
;;
|
||||
stats)
|
||||
local stats=$(get_queue_stats)
|
||||
echo "$stats" | python3 -c "import json, sys; d=json.load(sys.stdin); print(json.dumps(d, indent=2))"
|
||||
;;
|
||||
clear)
|
||||
echo "Cleaning up old queue items..."
|
||||
cleanup_old_queue_items
|
||||
;;
|
||||
enqueue)
|
||||
local issue_ref="${1:-}"
|
||||
local message="${2:-}"
|
||||
if [ -z "$issue_ref" ] || [ -z "$message" ]; then
|
||||
echo "Usage: kugetsu queue enqueue <issue-ref> <message>" >&2
|
||||
exit 1
|
||||
fi
|
||||
enqueue_task "$issue_ref" "$message"
|
||||
;;
|
||||
*)
|
||||
echo "Usage: kugetsu queue [list|stats|clear|enqueue <issue-ref> <message>]" >&2
|
||||
exit 1
|
||||
;;
|
||||
esac
|
||||
}
|
||||
|
||||
cmd_queue_daemon() {
|
||||
local action="${1:-status}"
|
||||
shift
|
||||
|
||||
case "$action" in
|
||||
start)
|
||||
if [ -f "$QUEUE_DAEMON_PID_FILE" ]; then
|
||||
local old_pid=$(cat "$QUEUE_DAEMON_PID_FILE" 2>/dev/null)
|
||||
if [ -n "$old_pid" ] && kill -0 "$old_pid" 2>/dev/null; then
|
||||
echo "Daemon is already running with PID $old_pid"
|
||||
exit 1
|
||||
fi
|
||||
rm -f "$QUEUE_DAEMON_PID_FILE"
|
||||
fi
|
||||
|
||||
mkdir -p "$(dirname "$QUEUE_DAEMON_LOG_FILE")"
|
||||
nohup bash "$0" queue-daemon run >> "$QUEUE_DAEMON_LOG_FILE" 2>&1 &
|
||||
local daemon_pid=$!
|
||||
echo "$daemon_pid" > "$QUEUE_DAEMON_PID_FILE"
|
||||
echo "Queue daemon started with PID $daemon_pid"
|
||||
echo "Log file: $QUEUE_DAEMON_LOG_FILE"
|
||||
;;
|
||||
stop)
|
||||
if [ ! -f "$QUEUE_DAEMON_PID_FILE" ]; then
|
||||
echo "Daemon PID file not found. Is the daemon running?"
|
||||
exit 1
|
||||
fi
|
||||
local pid=$(cat "$QUEUE_DAEMON_PID_FILE")
|
||||
if [ -n "$pid" ] && kill -0 "$pid" 2>/dev/null; then
|
||||
kill "$pid"
|
||||
rm -f "$QUEUE_DAEMON_PID_FILE"
|
||||
echo "Daemon stopped (PID $pid)"
|
||||
else
|
||||
echo "Daemon not running (stale PID file)"
|
||||
rm -f "$QUEUE_DAEMON_PID_FILE"
|
||||
fi
|
||||
;;
|
||||
restart)
|
||||
cmd_queue_daemon stop
|
||||
sleep 1
|
||||
cmd_queue_daemon start
|
||||
;;
|
||||
status)
|
||||
if [ -f "$QUEUE_DAEMON_PID_FILE" ]; then
|
||||
local pid=$(cat "$QUEUE_DAEMON_PID_FILE")
|
||||
if [ -n "$pid" ] && kill -0 "$pid" 2>/dev/null; then
|
||||
echo "Queue daemon is running (PID $pid)"
|
||||
else
|
||||
echo "Daemon not running (stale PID file)"
|
||||
rm -f "$QUEUE_DAEMON_PID_FILE"
|
||||
fi
|
||||
else
|
||||
echo "Queue daemon is not running"
|
||||
fi
|
||||
;;
|
||||
logs)
|
||||
local lines="${1:-50}"
|
||||
if [ -f "$QUEUE_DAEMON_LOG_FILE" ]; then
|
||||
tail -"$lines" "$QUEUE_DAEMON_LOG_FILE"
|
||||
else
|
||||
echo "No daemon log file found"
|
||||
fi
|
||||
;;
|
||||
run)
|
||||
queue_daemon_loop
|
||||
;;
|
||||
*)
|
||||
echo "Usage: kugetsu queue-daemon [start|stop|restart|status|logs]" >&2
|
||||
exit 1
|
||||
;;
|
||||
esac
|
||||
}
|
||||
|
||||
queue_daemon_loop() {
|
||||
local pid=$$
|
||||
echo "$pid" > "$QUEUE_DAEMON_PID_FILE"
|
||||
echo "Queue daemon started (PID $pid) at $(date)"
|
||||
|
||||
while true; do
|
||||
sleep $((QUEUE_DAEMON_INTERVAL_MINUTES * 60))
|
||||
|
||||
if [ ! -f "$QUEUE_DAEMON_PID_FILE" ] || [ "$(cat "$QUEUE_DAEMON_PID_FILE")" != "$pid" ]; then
|
||||
echo "PID file changed, stopping daemon"
|
||||
exit 0
|
||||
fi
|
||||
|
||||
process_queue
|
||||
done
|
||||
}
|
||||
|
||||
process_queue() {
|
||||
local active_count=$(count_active_dev_sessions)
|
||||
|
||||
if [ "$active_count" -ge "$MAX_CONCURRENT_AGENTS" ]; then
|
||||
return
|
||||
fi
|
||||
|
||||
local available_slots=$((MAX_CONCURRENT_AGENTS - active_count))
|
||||
local batch_size=$QUEUE_DAEMON_BATCH_SIZE
|
||||
[ "$batch_size" -gt "$available_slots" ] && batch_size=$available_slots
|
||||
|
||||
if [ "$batch_size" -le 0 ]; then
|
||||
return
|
||||
fi
|
||||
|
||||
local pm_session=$(get_pm_agent_session_id)
|
||||
if [ -z "$pm_session" ] || [ "$pm_session" = "null" ]; then
|
||||
return
|
||||
fi
|
||||
|
||||
local count=0
|
||||
for item in $(ls -t "$QUEUE_ITEMS_DIR"/*.json 2>/dev/null | head -20); do
|
||||
[ -f "$item" ] || continue
|
||||
[ $count -ge "$batch_size" ] && break
|
||||
|
||||
local state=$(python3 -c "import json; print(json.load(open('$item')).get('state', ''))" 2>/dev/null)
|
||||
if [ "$state" != "pending" ]; then
|
||||
continue
|
||||
fi
|
||||
|
||||
local queue_id=$(basename "$item" .json)
|
||||
local issue_ref=$(python3 -c "import json; print(json.load(open('$item')).get('issue_ref', '')" 2>/dev/null)
|
||||
local message=$(python3 -c "import json; print(json.load(open('$item')).get('message', '')" 2>/dev/null)
|
||||
|
||||
if [ -z "$issue_ref" ] || [ -z "$message" ]; then
|
||||
continue
|
||||
fi
|
||||
|
||||
update_queue_item_state "$queue_id" "notified"
|
||||
|
||||
local log_file="$LOGS_DIR/delegate-${queue_id}.log"
|
||||
mkdir -p "$LOGS_DIR"
|
||||
|
||||
local env_sh="set -a; "
|
||||
if [ -f "$ENV_DIR/pm-agent.env" ]; then
|
||||
env_sh="${env_sh}source '$ENV_DIR/pm-agent.env'; "
|
||||
elif [ -f "$ENV_DIR/default.env" ]; then
|
||||
env_sh="${env_sh}source '$ENV_DIR/default.env'; "
|
||||
fi
|
||||
env_sh="${env_sh}set +a; "
|
||||
|
||||
nohup sh -c "${env_sh}opencode run 'Delegate task: ${message}' --continue --session '$pm_session'" >> "$log_file" 2>&1 &
|
||||
|
||||
echo "Queued task $queue_id for PM agent"
|
||||
count=$((count + 1))
|
||||
done
|
||||
}
|
||||
|
||||
cmd_delegate() {
|
||||
local message="${1:-}"
|
||||
local verbosity="${KUGETSU_VERBOSITY:-default}"
|
||||
|
||||
if [ -z "$message" ]; then
|
||||
echo "Error: message is required" >&2
|
||||
@@ -994,94 +1338,22 @@ cmd_delegate() {
|
||||
exit 1
|
||||
fi
|
||||
|
||||
local pm_session=$(get_pm_agent_session_id)
|
||||
if [ -z "$pm_session" ] || [ "$pm_session" = "null" ] || [ "$pm_session" = "None" ]; then
|
||||
echo "Error: PM agent session not found. Run 'kugetsu init' first." >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
mkdir -p "$LOGS_DIR"
|
||||
local log_file="$LOGS_DIR/delegate-$(date +%s).log"
|
||||
|
||||
local parsed=$(parse_issue_ref_from_message "$message")
|
||||
local gitserver=$(echo "$parsed" | cut -d'|' -f1)
|
||||
local owner=$(echo "$parsed" | cut -d'|' -f2)
|
||||
local repo=$(echo "$parsed" | cut -d'|' -f3)
|
||||
local issue_number=$(echo "$parsed" | cut -d'|' -f4)
|
||||
|
||||
local missing_info=$(get_missing_info "$parsed")
|
||||
local context_injection=""
|
||||
if [ -n "$missing_info" ]; then
|
||||
context_injection=$(build_missing_info_context "$missing_info")
|
||||
echo "NOTE: Delegation missing information: ${missing_info}"
|
||||
if [ -z "$issue_number" ] || [ -z "$gitserver" ] || [ -z "$owner" ] || [ -z "$repo" ]; then
|
||||
echo "Error: Could not parse issue reference from message" >&2
|
||||
echo "Message should contain an issue reference like 'github.com/user/repo#123'" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
local candidates=""
|
||||
local candidate_count=0
|
||||
local issue_ref="${gitserver}/${owner}/${repo}#${issue_number}"
|
||||
|
||||
if [ -n "$issue_number" ]; then
|
||||
local worktrees=$(find_worktrees_by_issue_number "$issue_number")
|
||||
local sessions=$(find_sessions_by_issue_number "$issue_number")
|
||||
|
||||
while IFS=: read -r path type; do
|
||||
if [ -n "$path" ]; then
|
||||
candidate_count=$((candidate_count + 1))
|
||||
candidates="${candidates}${candidate_count}) ${path} (${type})
|
||||
"
|
||||
fi
|
||||
done <<< "$worktrees"
|
||||
|
||||
while IFS=: read -r path type; do
|
||||
if [ -n "$path" ]; then
|
||||
candidate_count=$((candidate_count + 1))
|
||||
candidates="${candidates}${candidate_count}) ${path} (${type})
|
||||
"
|
||||
fi
|
||||
done <<< "$sessions"
|
||||
fi
|
||||
|
||||
local use_worktree=""
|
||||
if [ $candidate_count -gt 0 ]; then
|
||||
echo "Found $candidate_count existing worktree(s)/session(s) for issue #${issue_number}:"
|
||||
echo "$candidates"
|
||||
echo "r) Delegate anyway (without routing)"
|
||||
echo "Which one to use? [1-${candidate_count}/r]: "
|
||||
read -r choice
|
||||
|
||||
if [ "$choice" = "r" ] || [ -z "$choice" ]; then
|
||||
use_worktree=""
|
||||
elif [ "$choice" -ge 1 ] && [ "$choice" -le "$candidate_count" ]; then
|
||||
local selected=$(echo "$candidates" | sed -n "${choice}p")
|
||||
use_worktree=$(echo "$selected" | sed 's/) .*//')
|
||||
fi
|
||||
fi
|
||||
|
||||
local final_message="${message}${context_injection}"
|
||||
|
||||
if [ -n "$use_worktree" ]; then
|
||||
if [ -d "$use_worktree" ]; then
|
||||
echo "Using worktree: $use_worktree"
|
||||
final_message="${final_message}
|
||||
|
||||
NOTE: Worktree selected: ${use_worktree}"
|
||||
fi
|
||||
fi
|
||||
|
||||
local temp_dir="${KUGETSU_TEMP_DIR:-$HOME/.local/share/opencode/tool-output}"
|
||||
|
||||
mkdir -p "$ENV_DIR"
|
||||
local env_sh="set -a; export KUGETSU_TEMP_DIR='$temp_dir'; export KUGETSU_VERBOSITY='$verbosity'; "
|
||||
if [ -f "$ENV_DIR/pm-agent.env" ]; then
|
||||
env_sh="${env_sh}source '$ENV_DIR/pm-agent.env'; "
|
||||
elif [ -f "$ENV_DIR/default.env" ]; then
|
||||
env_sh="${env_sh}source '$ENV_DIR/default.env'; "
|
||||
fi
|
||||
env_sh="${env_sh}set +a; "
|
||||
|
||||
nohup sh -c "${env_sh}opencode run '${final_message}' --continue --session '$pm_session' >> '$log_file' 2>&1" > /dev/null 2>&1 &
|
||||
disown
|
||||
echo "Delegated to PM agent (logged to $(basename "$log_file"))"
|
||||
echo "Verbosity: $verbosity"
|
||||
enqueue_task "$issue_ref" "$message"
|
||||
echo "Task enqueued. The queue daemon will process it when a slot is available."
|
||||
}
|
||||
|
||||
cmd_logs() {
|
||||
@@ -2218,6 +2490,16 @@ main() {
|
||||
exit 1
|
||||
fi
|
||||
;;
|
||||
queue)
|
||||
local action="${1:-list}"
|
||||
shift
|
||||
cmd_queue "$action" "$@"
|
||||
;;
|
||||
queue-daemon)
|
||||
local action="${1:-status}"
|
||||
shift
|
||||
cmd_queue_daemon "$action" "$@"
|
||||
;;
|
||||
*)
|
||||
echo "Error: unknown command '$command'" >&2
|
||||
usage
|
||||
|
||||
Reference in New Issue
Block a user