From bda762b7741ae06c876c43477444a9f122c57d3c Mon Sep 17 00:00:00 2001 From: shokollm <270575765+shokollm@users.noreply.github.com> Date: Sun, 5 Apr 2026 08:43:59 +0000 Subject: [PATCH] fix(kugetsu): prevent excess agent spawning with flock + sequential processing - count_active_dev_sessions() now excludes pm-agent.json from count - process_queue() now calls kugetsu start directly (not opencode run) - process_queue() uses dynamic batch size = available_slots - process_queue() has retry logic (max 3 attempts) on failure - cmd_start() now uses flock around critical section - Added notification types: task_queued, task_dequeued, task_started, task_completed, task_error - Removed QUEUE_DAEMON_BATCH_SIZE config (no longer needed) Fixes issue #146 --- .../ISSUES/fix-queue-daemon-excess-agents.md | 67 ++++++ skills/kugetsu/SKILL.md | 1 - skills/kugetsu/scripts/kugetsu | 199 ++++++++++-------- 3 files changed, 177 insertions(+), 90 deletions(-) create mode 100644 .github/ISSUES/fix-queue-daemon-excess-agents.md diff --git a/.github/ISSUES/fix-queue-daemon-excess-agents.md b/.github/ISSUES/fix-queue-daemon-excess-agents.md new file mode 100644 index 0000000..11ce151 --- /dev/null +++ b/.github/ISSUES/fix-queue-daemon-excess-agents.md @@ -0,0 +1,67 @@ +# Fix: Queue daemon spawning excess agents due to race condition + +## Problem + +When enqueueing multiple tasks (e.g., 6 tasks), the queue daemon was spawning many more subagents than expected, eventually exhausting container memory. + +**Root Cause:** The combination of: +1. `process_queue()` calling `opencode run` directly instead of `kugetsu start`, bypassing all concurrency logic +2. `count_active_dev_sessions()` counting `pm-agent.json` toward `MAX_CONCURRENT_AGENTS`, reducing effective dev agent slots +3. No atomic locking around session count check + session file creation (TOCTOU race condition) +4. Background spawning of multiple concurrent processes in `process_queue()` + +**Expected behavior:** With `MAX_CONCURRENT_AGENTS=3` and 6 tasks: +- Tasks should be processed sequentially via `kugetsu start` +- Only 3 dev agents should run at a time +- Tasks should queue and wait for slots to free up + +## Solution + +### 1. `count_active_dev_sessions()` - Exclude pm-agent +Only count actual dev agent session files (exclude `pm-agent.json`). + +### 2. `process_queue()` - Call `kugetsu start` directly + retry logic +- Call `kugetsu start` directly (foreground, sequential) instead of spawning `opencode run` background process +- Dynamic batch size = available slots (removes need for `QUEUE_DAEMON_BATCH_SIZE`) +- Retry logic (max 3 attempts) on failure +- On failure: cleanup worktree/session and revert to `pending` state +- Save `fork_pid` to queue item for timeout handling + +### 3. `cmd_start()` - Add flock +- Add flock around critical section (count check + fork) +- Track `fork_pid` for queue item timeout handling + +### 4. Notification System +New notification types: +| Event | Type | +|-------|------| +| Task enqueued | `task_queued` | +| Task dequeued | `task_dequeued` | +| Task started | `task_started` | +| Task completed | `task_completed` | +| Task error | `task_error` | + +### 5. Config +- Remove `QUEUE_DAEMON_BATCH_SIZE` (no longer needed - batch size is now dynamic) + +## Notification Flow + +| Event | Location | Type | +|-------|----------|------| +| Task enqueued | `enqueue_task()` | `task_queued` | +| Task dequeued | `process_queue()` after state change to `notified` | `task_dequeued` | +| Task started | `cmd_start()` after session file created | `task_started` | +| Task completed | `update_queue_item_state()` | `task_completed` | +| Task error | `update_queue_item_state()` | `task_error` | + +## Out of Scope + +- Re-check loop in cmd_start (checking if session DB is reliable) - deferred to separate research issue +- Buffer mechanism for excess forking (safety failsafe only) + +## Status + +- [x] Issue created +- [x] Implementation +- [ ] PR created +- [ ] Merged diff --git a/skills/kugetsu/SKILL.md b/skills/kugetsu/SKILL.md index fb2f26e..298c385 100644 --- a/skills/kugetsu/SKILL.md +++ b/skills/kugetsu/SKILL.md @@ -50,7 +50,6 @@ A default config file is created during `kugetsu init` with commented examples: | `KUGETSU_TEMP_DIR` | `~/.local/share/opencode/tool-output` | Temp directory for subagent tool output (useful in headless environments where /tmp is restricted) | | `KUGETSU_VERBOSITY` | `default` | PM agent verbosity level: `verbose`, `default`, or `quiet` | | `QUEUE_DAEMON_INTERVAL_MINUTES` | 5 | How often daemon polls queue (in minutes) | -| `QUEUE_DAEMON_BATCH_SIZE` | 2 | How many tasks daemon picks per poll | | `QUEUE_CLEANUP_AGE_DAYS` | 7 | Auto-cleanup completed/error items older than N days | ### Environment Variables for Agents diff --git a/skills/kugetsu/scripts/kugetsu b/skills/kugetsu/scripts/kugetsu index 2129cad..5023cc6 100755 --- a/skills/kugetsu/scripts/kugetsu +++ b/skills/kugetsu/scripts/kugetsu @@ -23,7 +23,6 @@ QUEUE_DAEMON_PID_FILE="${QUEUE_DAEMON_PID_FILE:-$QUEUE_DIR/daemon.pid}" QUEUE_DAEMON_LOCK_FILE="${QUEUE_DAEMON_LOCK_FILE:-$QUEUE_DIR/daemon.lock}" QUEUE_DAEMON_LOG_FILE="${QUEUE_DAEMON_LOG_FILE:-$QUEUE_DIR/daemon.log}" QUEUE_DAEMON_INTERVAL_MINUTES="${QUEUE_DAEMON_INTERVAL_MINUTES:-5}" -QUEUE_DAEMON_BATCH_SIZE="${QUEUE_DAEMON_BATCH_SIZE:-2}" QUEUE_CLEANUP_AGE_DAYS="${QUEUE_CLEANUP_AGE_DAYS:-7}" TASK_TIMEOUT_HOURS="${TASK_TIMEOUT_HOURS:-1}" @@ -63,7 +62,7 @@ count_active_dev_sessions() { for session_file in "$SESSIONS_DIR"/*.json; do if [ -f "$session_file" ]; then local filename=$(basename "$session_file") - if [ "$filename" != "base.json" ]; then + if [ "$filename" != "base.json" ] && [ "$filename" != "pm-agent.json" ]; then count=$((count + 1)) fi fi @@ -532,6 +531,8 @@ with open("$QUEUE_ITEMS_DIR/${queue_id}.json", "w") as f: print(f"Enqueued: $queue_id") PYEOF + + kugetsu_add_notification "task_queued" "Task queued: $issue_ref" "$issue_ref" } get_pending_tasks() { @@ -588,6 +589,7 @@ update_queue_item_state() { python3 << PYEOF import json +import os from datetime import datetime item_file = "$item_file" @@ -598,6 +600,8 @@ pid = "$pid" with open(item_file, 'r') as f: item = json.load(f) +issue_ref = item.get('issue_ref', '') + item['state'] = new_state if new_state == "notified": @@ -608,8 +612,10 @@ if new_state == "notified": item['pid'] = int(pid) if pid.isdigit() else None elif new_state == "completed": item['completed_at'] = datetime.now().isoformat() + "Z" + os.system(f"kugetsu_add_notification 'task_completed' 'Task completed: {issue_ref}' '{issue_ref}'") elif new_state == "error": item['error'] = datetime.now().isoformat() + "Z" + os.system(f"kugetsu_add_notification 'task_error' 'Task error: {issue_ref}' '{issue_ref}'") with open(item_file, 'w') as f: json.dump(item, f, indent=2) @@ -1366,22 +1372,15 @@ process_queue() { fi local available_slots=$((MAX_CONCURRENT_AGENTS - active_count)) - local batch_size=$QUEUE_DAEMON_BATCH_SIZE - [ "$batch_size" -gt "$available_slots" ] && batch_size=$available_slots - if [ "$batch_size" -le 0 ]; then - return - fi - - local pm_session=$(get_pm_agent_session_id) - if [ -z "$pm_session" ] || [ "$pm_session" = "null" ]; then + if [ "$available_slots" -le 0 ]; then return fi local count=0 for item in $(ls -t "$QUEUE_ITEMS_DIR"/*.json 2>/dev/null | head -20); do + [ $count -ge "$available_slots" ] && break [ -f "$item" ] || continue - [ $count -ge "$batch_size" ] && break local state=$(python3 -c "import json; print(json.load(open('$item')).get('state', ''))" 2>/dev/null) if [ "$state" != "pending" ]; then @@ -1397,25 +1396,41 @@ process_queue() { fi update_queue_item_state "$queue_id" "notified" + kugetsu_add_notification "task_dequeued" "Task dequeued: $issue_ref" "$issue_ref" local log_file="$LOGS_DIR/delegate-${queue_id}.log" mkdir -p "$LOGS_DIR" - local env_sh="set -a; " - if [ -f "$ENV_DIR/pm-agent.env" ]; then - env_sh="${env_sh}source '$ENV_DIR/pm-agent.env'; " - elif [ -f "$ENV_DIR/default.env" ]; then - env_sh="${env_sh}source '$ENV_DIR/default.env'; " + local max_retries=3 + local attempt=1 + local success=false + local fork_pid="" + + while [ $attempt -le $max_retries ]; do + if kugetsu start "$issue_ref" "$message" >> "$log_file" 2>&1; then + success=true + break + fi + + echo "Attempt $attempt failed for $queue_id, cleaning up..." >> "$log_file" + + local session_file="$(issue_ref_to_filename "$issue_ref").json" + local worktree_path=$(issue_ref_to_worktree_path "$issue_ref" "$PWD") + + [ -f "$SESSIONS_DIR/$session_file" ] && rm -f "$SESSIONS_DIR/$session_file" + worktree_exists "$issue_ref" "$PWD" && remove_worktree_for_issue "$issue_ref" "$PWD" + remove_issue_from_index "$issue_ref" 2>/dev/null || true + + attempt=$((attempt + 1)) + done + + if [ "$success" = true ]; then + echo "Started task $queue_id: $issue_ref" + count=$((count + 1)) + else + echo "Failed to start task $queue_id after $max_retries attempts" + update_queue_item_state "$queue_id" "pending" fi - env_sh="${env_sh}set +a; " - - nohup sh -c "${env_sh}opencode run 'Delegate task: ${message}' --continue --session '$pm_session'" >> "$log_file" 2>&1 & - local fork_pid=$! - - update_queue_item_state "$queue_id" "notified" "" "$fork_pid" - - echo "Queued task $queue_id for PM agent (PID: $fork_pid)" - count=$((count + 1)) done } @@ -2061,20 +2076,10 @@ cmd_start() { create_worktree "$issue_ref" "$parent_dir" local session_file="$(issue_ref_to_filename "$issue_ref").json" - - echo "Forking session for '$issue_ref'..." - - # Session-counting: count actual dev sessions, reject if at limit - local active_count=$(count_active_dev_sessions) - if [ "$active_count" -ge "$MAX_CONCURRENT_AGENTS" ]; then - echo "Error: Max concurrent agents ($MAX_CONCURRENT_AGENTS) reached" >&2 - echo "Active sessions: $active_count" >&2 - remove_worktree_for_issue "$issue_ref" "$parent_dir" - exit 1 - fi - local fork_log="$SESSIONS_DIR/$session_file.fork.log" local opencode_db="${OPENCODE_DB:-$HOME/.local/share/opencode/opencode.db}" + local lock_file="$KUGETSU_DIR/.session_lock" + local lock_fd=200 > "$fork_log" @@ -2087,25 +2092,38 @@ ${previous_context} ## YOUR TASK $message" - fix_session_permissions - - if [ "$DEBUG_MODE" = true ]; then - (cd "$worktree_path" && opencode run "$full_message" --fork --session "$base_session_id" --dir "$worktree_path" 2>&1) | tee "$fork_log" & - else - (cd "$worktree_path" && opencode run "$full_message" --fork --session "$base_session_id" --dir "$worktree_path" 2>&1) >> "$fork_log" & - fi - - local fork_pid=$! - - local max_attempts=10 - local attempt=1 - local new_session_id="" - local fork_log_output="" - - while [ $attempt -le $max_attempts ]; do - sleep 1 + ( + flock -x $lock_fd - new_session_id=$(python3 -c " + local active_count=$(count_active_dev_sessions) + if [ "$active_count" -ge "$MAX_CONCURRENT_AGENTS" ]; then + echo "Error: Max concurrent agents ($MAX_CONCURRENT_AGENTS) reached" >&2 + echo "Active sessions: $active_count" >&2 + remove_worktree_for_issue "$issue_ref" "$parent_dir" + exit 1 + fi + + echo "Forking session for '$issue_ref'..." + + fix_session_permissions + + if [ "$DEBUG_MODE" = true ]; then + (cd "$worktree_path" && opencode run "$full_message" --fork --session "$base_session_id" --dir "$worktree_path" 2>&1) | tee "$fork_log" & + else + (cd "$worktree_path" && opencode run "$full_message" --fork --session "$base_session_id" --dir "$worktree_path" 2>&1) >> "$fork_log" & + fi + + local fork_pid=$! + + local max_attempts=10 + local attempt=1 + local new_session_id="" + local fork_log_output="" + + while [ $attempt -le $max_attempts ]; do + sleep 1 + + new_session_id=$(python3 -c " import sqlite3 conn = sqlite3.connect('$opencode_db') cursor = conn.cursor() @@ -2114,31 +2132,31 @@ result = cursor.fetchone() if result: print(result[0]) " 2>/dev/null || echo "") - - if [ -n "$new_session_id" ] && [ "$new_session_id" != "$base_session_id" ] && [ "$new_session_id" != "$pm_agent_session_id" ]; then - break - fi - - if ! kill -0 $fork_pid 2>/dev/null; then - fork_log_output=$(tail -20 "$fork_log" 2>/dev/null || echo "(log empty or unavailable)") - break - fi - - attempt=$((attempt + 1)) - done + + if [ -n "$new_session_id" ] && [ "$new_session_id" != "$base_session_id" ] && [ "$new_session_id" != "$pm_agent_session_id" ]; then + break + fi + + if ! kill -0 $fork_pid 2>/dev/null; then + fork_log_output=$(tail -20 "$fork_log" 2>/dev/null || echo "(log empty or unavailable)") + break + fi + + attempt=$((attempt + 1)) + done - if [ -z "$new_session_id" ]; then - echo "Error: Could not find newly created session after ${max_attempts}s" >&2 - if [ -n "$fork_log_output" ]; then - echo "Fork log output:" >&2 - echo "$fork_log_output" >&2 + if [ -z "$new_session_id" ]; then + echo "Error: Could not find newly created session after ${max_attempts}s" >&2 + if [ -n "$fork_log_output" ]; then + echo "Fork log output:" >&2 + echo "$fork_log_output" >&2 + fi + remove_worktree_for_issue "$issue_ref" + exit 1 fi - remove_worktree_for_issue "$issue_ref" - exit 1 - fi - echo "Updating permissions for new session: $new_session_id" - python3 -c " + echo "Updating permissions for new session: $new_session_id" + python3 -c " import sqlite3 conn = sqlite3.connect('$opencode_db') cursor = conn.cursor() @@ -2148,9 +2166,9 @@ conn.commit() print('[OK] Session permissions updated') " - if [ "$DEBUG_MODE" = true ]; then - echo "[DEBUG] Forked session permissions check:" - python3 -c " + if [ "$DEBUG_MODE" = true ]; then + echo "[DEBUG] Forked session permissions check:" + python3 -c " import sqlite3 conn = sqlite3.connect('$opencode_db') cursor = conn.cursor() @@ -2160,11 +2178,11 @@ for row in cursor.fetchall(): print(' Directory:', row[1]) print(' Permission:', row[2]) " 2>/dev/null || echo " (failed to query DB)" - fi + fi - local branch_name=$(issue_ref_to_branch_name "$issue_ref") - - python3 << PYEOF > "$SESSIONS_DIR/$session_file" + local branch_name=$(issue_ref_to_branch_name "$issue_ref") + + python3 << PYEOF > "$SESSIONS_DIR/$session_file" import json session = { @@ -2182,12 +2200,15 @@ with open("$SESSIONS_DIR/$session_file", "w") as f: json.dump(session, f, indent=2) PYEOF - add_issue_to_index "$issue_ref" "$session_file" - - kugetsu_context_dump "$issue_ref" "$message" "$branch_name" + add_issue_to_index "$issue_ref" "$session_file" + + kugetsu_context_dump "$issue_ref" "$message" "$branch_name" + + kugetsu_add_notification "task_started" "Task started: $issue_ref" "$issue_ref" - echo "Session started for '$issue_ref': $new_session_id" - echo "Worktree: $worktree_path" + echo "Session started for '$issue_ref': $new_session_id" + echo "Worktree: $worktree_path" + ) 200>"$lock_file" } cmd_continue() {