diff --git a/skills/kugetsu/scripts/kugetsu b/skills/kugetsu/scripts/kugetsu index d100fae..6f337e1 100755 --- a/skills/kugetsu/scripts/kugetsu +++ b/skills/kugetsu/scripts/kugetsu @@ -17,6 +17,15 @@ CONTEXT_DIR="${CONTEXT_DIR:-$KUGETSU_DIR/context}" ENABLE_CONTEXT_DUMP="${ENABLE_CONTEXT_DUMP:-true}" WORKTREE_CHECK_PR_STATUS="${WORKTREE_CHECK_PR_STATUS:-true}" +QUEUE_DIR="${QUEUE_DIR:-$KUGETSU_DIR/queue}" +QUEUE_ITEMS_DIR="${QUEUE_ITEMS_DIR:-$QUEUE_DIR/items}" +QUEUE_DAEMON_PID_FILE="${QUEUE_DAEMON_PID_FILE:-$QUEUE_DIR/daemon.pid}" +QUEUE_DAEMON_LOCK_FILE="${QUEUE_DAEMON_LOCK_FILE:-$QUEUE_DIR/daemon.lock}" +QUEUE_DAEMON_LOG_FILE="${QUEUE_DAEMON_LOG_FILE:-$QUEUE_DIR/daemon.log}" +QUEUE_DAEMON_INTERVAL_MINUTES="${QUEUE_DAEMON_INTERVAL_MINUTES:-5}" +QUEUE_DAEMON_BATCH_SIZE="${QUEUE_DAEMON_BATCH_SIZE:-2}" +QUEUE_CLEANUP_AGE_DAYS="${QUEUE_CLEANUP_AGE_DAYS:-7}" + # Load user config overrides (~/.kugetsu/config) if [ -f "$KUGETSU_DIR/config" ]; then source "$KUGETSU_DIR/config" @@ -82,6 +91,9 @@ Usage: kugetsu destroy --base [-y] Delete base session kugetsu set-pr Set PR URL for session (for PR tracking) kugetsu context Show context for issue + kugetsu queue [list|stats|clear] Show queue status or statistics + kugetsu queue enqueue Enqueue a task (normally via delegate) + kugetsu queue-daemon [start|stop|restart|status|logs] Manage queue daemon kugetsu help Show this help Issue Ref Format: @@ -477,6 +489,142 @@ except Exception as e: PYEOF } +ensure_queue_dirs() { + mkdir -p "$QUEUE_ITEMS_DIR" +} + +generate_queue_id() { + echo "q_$(date +%s)_$$_$RANDOM" +} + +enqueue_task() { + local issue_ref="$1" + local message="$2" + + if [ -z "$issue_ref" ] || [ -z "$message" ]; then + echo "Error: enqueue_task requires and " >&2 + return 1 + fi + + validate_issue_ref "$issue_ref" + ensure_queue_dirs + + local queue_id=$(generate_queue_id) + local pending_since=$(date -Iseconds) + + python3 << PYEOF +import json + +queue_item = { + "id": "$queue_id", + "issue_ref": "$issue_ref", + "message": """$message""", + "state": "pending", + "pending_since": "$pending_since", + "notified_at": None, + "completed_at": None, + "error": None +} + +with open("$QUEUE_ITEMS_DIR/${queue_id}.json", "w") as f: + json.dump(queue_item, f, indent=2) + +print(f"Enqueued: $queue_id") +PYEOF +} + +get_pending_tasks() { + local limit="${1:-10}" + + if [ ! -d "$QUEUE_ITEMS_DIR" ]; then + echo "[]" + return + fi + + find "$QUEUE_ITEMS_DIR" -name "*.json" -type f 2>/dev/null | while read -r file; do + local state=$(python3 -c "import json; print(json.load(open('$file')).get('state', ''))" 2>/dev/null || echo "") + if [ "$state" = "pending" ]; then + cat "$file" + fi + done | head -"$limit" +} + +get_queue_stats() { + local total=0 + local pending=0 + local notified=0 + local completed=0 + local error=0 + + if [ -d "$QUEUE_ITEMS_DIR" ]; then + for file in "$QUEUE_ITEMS_DIR"/*.json; do + [ -f "$file" ] || continue + total=$((total + 1)) + local state=$(python3 -c "import json; print(json.load(open('$file')).get('state', ''))" 2>/dev/null || echo "") + case "$state" in + pending) pending=$((pending + 1)) ;; + notified) notified=$((notified + 1)) ;; + completed) completed=$((completed + 1)) ;; + error) error=$((error + 1)) ;; + esac + done + fi + + echo "{\"total\": $total, \"pending\": $pending, \"notified\": $notified, \"completed\": $completed, \"error\": $error}" +} + +update_queue_item_state() { + local queue_id="$1" + local new_state="$2" + + local item_file="$QUEUE_ITEMS_DIR/${queue_id}.json" + if [ ! -f "$item_file" ]; then + echo "Error: Queue item not found: $queue_id" >&2 + return 1 + fi + + python3 << PYEOF +import json +from datetime import datetime + +item_file = "$item_file" +new_state = "$new_state" + +with open(item_file, 'r') as f: + item = json.load(f) + +item['state'] = new_state + +if new_state == "notified": + item['notified_at'] = datetime.now().isoformat() + "Z" +elif new_state == "completed": + item['completed_at'] = datetime.now().isoformat() + "Z" +elif new_state == "error": + item['error'] = datetime.now().isoformat() + "Z" + +with open(item_file, 'w') as f: + json.dump(item, f, indent=2) + +print(f"Updated $queue_id to state: $new_state") +PYEOF +} + +cleanup_old_queue_items() { + local days="${QUEUE_CLEANUP_AGE_DAYS:-7}" + + if [ ! -d "$QUEUE_ITEMS_DIR" ]; then + return + fi + + find "$QUEUE_ITEMS_DIR" -name "*.json" -type f -mtime "+$days" 2>/dev/null | while read -r file; do + local state=$(python3 -c "import json; print(json.load(open('$file')).get('state', ''))" 2>/dev/null || echo "") + if [ "$state" = "completed" ] || [ "$state" = "error" ]; then + rm -f "$file" + echo "Cleaned up: $(basename "$file")" + fi + done +} + update_session_pr_url() { local issue_ref="$1" local pr_url="$2" @@ -984,9 +1132,205 @@ find_sessions_by_issue_number() { echo "$results" } +cmd_queue() { + local action="${1:-list}" + shift + + case "$action" in + list) + ensure_queue_dirs + local stats=$(get_queue_stats) + echo "Queue Statistics:" + echo "$stats" | python3 -c "import json, sys; d=json.load(sys.stdin); print(f\" Total: {d['total']}\n Pending: {d['pending']}\n Notified: {d['notified']}\n Completed: {d['completed']}\n Error: {d['error']}\")" + echo "" + echo "Pending tasks:" + local count=0 + for item in "$QUEUE_ITEMS_DIR"/*.json; do + [ -f "$item" ] || continue + local state=$(python3 -c "import json; print(json.load(open('$item')).get('state', '')" 2>/dev/null) + if [ "$state" = "pending" ]; then + count=$((count + 1)) + python3 -c "import json; d=json.load(open('$item')); print(f\" [{d['id']}] {d['issue_ref']}: {d['message'][:50]}...\n pending since: {d['pending_since']}\")" 2>/dev/null + fi + done + if [ $count -eq 0 ]; then + echo " (none)" + fi + ;; + stats) + local stats=$(get_queue_stats) + echo "$stats" | python3 -c "import json, sys; d=json.load(sys.stdin); print(json.dumps(d, indent=2))" + ;; + clear) + echo "Cleaning up old queue items..." + cleanup_old_queue_items + ;; + enqueue) + local issue_ref="${1:-}" + local message="${2:-}" + if [ -z "$issue_ref" ] || [ -z "$message" ]; then + echo "Usage: kugetsu queue enqueue " >&2 + exit 1 + fi + enqueue_task "$issue_ref" "$message" + ;; + *) + echo "Usage: kugetsu queue [list|stats|clear|enqueue ]" >&2 + exit 1 + ;; + esac +} + +cmd_queue_daemon() { + local action="${1:-status}" + shift + + case "$action" in + start) + if [ -f "$QUEUE_DAEMON_PID_FILE" ]; then + local old_pid=$(cat "$QUEUE_DAEMON_PID_FILE" 2>/dev/null) + if [ -n "$old_pid" ] && kill -0 "$old_pid" 2>/dev/null; then + echo "Daemon is already running with PID $old_pid" + exit 1 + fi + rm -f "$QUEUE_DAEMON_PID_FILE" + fi + + mkdir -p "$(dirname "$QUEUE_DAEMON_LOG_FILE")" + nohup bash "$0" queue-daemon run >> "$QUEUE_DAEMON_LOG_FILE" 2>&1 & + local daemon_pid=$! + echo "$daemon_pid" > "$QUEUE_DAEMON_PID_FILE" + echo "Queue daemon started with PID $daemon_pid" + echo "Log file: $QUEUE_DAEMON_LOG_FILE" + ;; + stop) + if [ ! -f "$QUEUE_DAEMON_PID_FILE" ]; then + echo "Daemon PID file not found. Is the daemon running?" + exit 1 + fi + local pid=$(cat "$QUEUE_DAEMON_PID_FILE") + if [ -n "$pid" ] && kill -0 "$pid" 2>/dev/null; then + kill "$pid" + rm -f "$QUEUE_DAEMON_PID_FILE" + echo "Daemon stopped (PID $pid)" + else + echo "Daemon not running (stale PID file)" + rm -f "$QUEUE_DAEMON_PID_FILE" + fi + ;; + restart) + cmd_queue_daemon stop + sleep 1 + cmd_queue_daemon start + ;; + status) + if [ -f "$QUEUE_DAEMON_PID_FILE" ]; then + local pid=$(cat "$QUEUE_DAEMON_PID_FILE") + if [ -n "$pid" ] && kill -0 "$pid" 2>/dev/null; then + echo "Queue daemon is running (PID $pid)" + else + echo "Daemon not running (stale PID file)" + rm -f "$QUEUE_DAEMON_PID_FILE" + fi + else + echo "Queue daemon is not running" + fi + ;; + logs) + local lines="${1:-50}" + if [ -f "$QUEUE_DAEMON_LOG_FILE" ]; then + tail -"$lines" "$QUEUE_DAEMON_LOG_FILE" + else + echo "No daemon log file found" + fi + ;; + run) + queue_daemon_loop + ;; + *) + echo "Usage: kugetsu queue-daemon [start|stop|restart|status|logs]" >&2 + exit 1 + ;; + esac +} + +queue_daemon_loop() { + local pid=$$ + echo "$pid" > "$QUEUE_DAEMON_PID_FILE" + echo "Queue daemon started (PID $pid) at $(date)" + + while true; do + sleep $((QUEUE_DAEMON_INTERVAL_MINUTES * 60)) + + if [ ! -f "$QUEUE_DAEMON_PID_FILE" ] || [ "$(cat "$QUEUE_DAEMON_PID_FILE")" != "$pid" ]; then + echo "PID file changed, stopping daemon" + exit 0 + fi + + process_queue + done +} + +process_queue() { + local active_count=$(count_active_dev_sessions) + + if [ "$active_count" -ge "$MAX_CONCURRENT_AGENTS" ]; then + return + fi + + local available_slots=$((MAX_CONCURRENT_AGENTS - active_count)) + local batch_size=$QUEUE_DAEMON_BATCH_SIZE + [ "$batch_size" -gt "$available_slots" ] && batch_size=$available_slots + + if [ "$batch_size" -le 0 ]; then + return + fi + + local pm_session=$(get_pm_agent_session_id) + if [ -z "$pm_session" ] || [ "$pm_session" = "null" ]; then + return + fi + + local count=0 + for item in $(ls -t "$QUEUE_ITEMS_DIR"/*.json 2>/dev/null | head -20); do + [ -f "$item" ] || continue + [ $count -ge "$batch_size" ] && break + + local state=$(python3 -c "import json; print(json.load(open('$item')).get('state', ''))" 2>/dev/null) + if [ "$state" != "pending" ]; then + continue + fi + + local queue_id=$(basename "$item" .json) + local issue_ref=$(python3 -c "import json; print(json.load(open('$item')).get('issue_ref', '')" 2>/dev/null) + local message=$(python3 -c "import json; print(json.load(open('$item')).get('message', '')" 2>/dev/null) + + if [ -z "$issue_ref" ] || [ -z "$message" ]; then + continue + fi + + update_queue_item_state "$queue_id" "notified" + + local log_file="$LOGS_DIR/delegate-${queue_id}.log" + mkdir -p "$LOGS_DIR" + + local env_sh="set -a; " + if [ -f "$ENV_DIR/pm-agent.env" ]; then + env_sh="${env_sh}source '$ENV_DIR/pm-agent.env'; " + elif [ -f "$ENV_DIR/default.env" ]; then + env_sh="${env_sh}source '$ENV_DIR/default.env'; " + fi + env_sh="${env_sh}set +a; " + + nohup sh -c "${env_sh}opencode run 'Delegate task: ${message}' --continue --session '$pm_session'" >> "$log_file" 2>&1 & + + echo "Queued task $queue_id for PM agent" + count=$((count + 1)) + done +} + cmd_delegate() { local message="${1:-}" - local verbosity="${KUGETSU_VERBOSITY:-default}" if [ -z "$message" ]; then echo "Error: message is required" >&2 @@ -994,94 +1338,22 @@ cmd_delegate() { exit 1 fi - local pm_session=$(get_pm_agent_session_id) - if [ -z "$pm_session" ] || [ "$pm_session" = "null" ] || [ "$pm_session" = "None" ]; then - echo "Error: PM agent session not found. Run 'kugetsu init' first." >&2 - exit 1 - fi - - mkdir -p "$LOGS_DIR" - local log_file="$LOGS_DIR/delegate-$(date +%s).log" - local parsed=$(parse_issue_ref_from_message "$message") local gitserver=$(echo "$parsed" | cut -d'|' -f1) local owner=$(echo "$parsed" | cut -d'|' -f2) local repo=$(echo "$parsed" | cut -d'|' -f3) local issue_number=$(echo "$parsed" | cut -d'|' -f4) - local missing_info=$(get_missing_info "$parsed") - local context_injection="" - if [ -n "$missing_info" ]; then - context_injection=$(build_missing_info_context "$missing_info") - echo "NOTE: Delegation missing information: ${missing_info}" + if [ -z "$issue_number" ] || [ -z "$gitserver" ] || [ -z "$owner" ] || [ -z "$repo" ]; then + echo "Error: Could not parse issue reference from message" >&2 + echo "Message should contain an issue reference like 'github.com/user/repo#123'" >&2 + exit 1 fi - local candidates="" - local candidate_count=0 + local issue_ref="${gitserver}/${owner}/${repo}#${issue_number}" - if [ -n "$issue_number" ]; then - local worktrees=$(find_worktrees_by_issue_number "$issue_number") - local sessions=$(find_sessions_by_issue_number "$issue_number") - - while IFS=: read -r path type; do - if [ -n "$path" ]; then - candidate_count=$((candidate_count + 1)) - candidates="${candidates}${candidate_count}) ${path} (${type}) -" - fi - done <<< "$worktrees" - - while IFS=: read -r path type; do - if [ -n "$path" ]; then - candidate_count=$((candidate_count + 1)) - candidates="${candidates}${candidate_count}) ${path} (${type}) -" - fi - done <<< "$sessions" - fi - - local use_worktree="" - if [ $candidate_count -gt 0 ]; then - echo "Found $candidate_count existing worktree(s)/session(s) for issue #${issue_number}:" - echo "$candidates" - echo "r) Delegate anyway (without routing)" - echo "Which one to use? [1-${candidate_count}/r]: " - read -r choice - - if [ "$choice" = "r" ] || [ -z "$choice" ]; then - use_worktree="" - elif [ "$choice" -ge 1 ] && [ "$choice" -le "$candidate_count" ]; then - local selected=$(echo "$candidates" | sed -n "${choice}p") - use_worktree=$(echo "$selected" | sed 's/) .*//') - fi - fi - - local final_message="${message}${context_injection}" - - if [ -n "$use_worktree" ]; then - if [ -d "$use_worktree" ]; then - echo "Using worktree: $use_worktree" - final_message="${final_message} - -NOTE: Worktree selected: ${use_worktree}" - fi - fi - - local temp_dir="${KUGETSU_TEMP_DIR:-$HOME/.local/share/opencode/tool-output}" - - mkdir -p "$ENV_DIR" - local env_sh="set -a; export KUGETSU_TEMP_DIR='$temp_dir'; export KUGETSU_VERBOSITY='$verbosity'; " - if [ -f "$ENV_DIR/pm-agent.env" ]; then - env_sh="${env_sh}source '$ENV_DIR/pm-agent.env'; " - elif [ -f "$ENV_DIR/default.env" ]; then - env_sh="${env_sh}source '$ENV_DIR/default.env'; " - fi - env_sh="${env_sh}set +a; " - - nohup sh -c "${env_sh}opencode run '${final_message}' --continue --session '$pm_session' >> '$log_file' 2>&1" > /dev/null 2>&1 & - disown - echo "Delegated to PM agent (logged to $(basename "$log_file"))" - echo "Verbosity: $verbosity" + enqueue_task "$issue_ref" "$message" + echo "Task enqueued. The queue daemon will process it when a slot is available." } cmd_logs() { @@ -2218,6 +2490,16 @@ main() { exit 1 fi ;; + queue) + local action="${1:-list}" + shift + cmd_queue "$action" "$@" + ;; + queue-daemon) + local action="${1:-status}" + shift + cmd_queue_daemon "$action" "$@" + ;; *) echo "Error: unknown command '$command'" >&2 usage