Compare commits

...

5 Commits

Author SHA1 Message Date
shokollm
d0b100fca8 fix: add support for gitserver/owner/repo#number issue ref format
Add third pattern to parse_issue_ref_from_message() to support the mixed
format 'gitserver/owner/repo#number' (e.g., git.fbrns.co/shoko/kugetsu#116).

Previously only two formats were supported:
1. Full URL: #116
2. Short format: shoko/kugetsu#116

Now supports:
3. Mixed format: git.fbrns.co/shoko/kugetsu#116

Fixes #144
2026-04-05 10:22:31 +00:00
da0fa302de Merge pull request 'fix(kugetsu): prevent excess agent spawning with flock + sequential processing' (#147) from fix/issue-queue-daemon-excess-agents into main 2026-04-05 10:49:24 +02:00
shokollm
54aa6419eb fix(kugetsu): prevent excess agent spawning with flock + sequential processing
- count_active_dev_sessions() now excludes pm-agent.json from count
- process_queue() now calls kugetsu start directly (not opencode run)
- process_queue() uses dynamic batch size = available_slots
- process_queue() has retry logic (max 3 attempts) on failure
- cmd_start() now uses flock around critical section
- Added notification types: task_queued, task_dequeued, task_started, task_completed, task_error
- Removed QUEUE_DAEMON_BATCH_SIZE config (no longer needed)

Fixes issue #146
2026-04-05 08:44:45 +00:00
98a31070a7 Merge pull request '[FIX] process_queue: add missing closing parentheses' (#143) from fix/issue-142-process-queue-missing-parens into main 2026-04-05 08:56:59 +02:00
26346235c9 fix: add missing closing parenthesis in process_queue Python extraction
Fixes #142 - process_queue silently skips all queue items because
issue_ref and message Python extraction commands were missing a closing
parenthesis. The error was silently swallowed by 2>/dev/null causing
both variables to be empty, so every queue item was skipped.
2026-04-05 06:51:57 +00:00
3 changed files with 184 additions and 92 deletions

View File

@@ -0,0 +1,67 @@
# Fix: Queue daemon spawning excess agents due to race condition
## Problem
When enqueueing multiple tasks (e.g., 6 tasks), the queue daemon was spawning many more subagents than expected, eventually exhausting container memory.
**Root Cause:** The combination of:
1. `process_queue()` calling `opencode run` directly instead of `kugetsu start`, bypassing all concurrency logic
2. `count_active_dev_sessions()` counting `pm-agent.json` toward `MAX_CONCURRENT_AGENTS`, reducing effective dev agent slots
3. No atomic locking around session count check + session file creation (TOCTOU race condition)
4. Background spawning of multiple concurrent processes in `process_queue()`
**Expected behavior:** With `MAX_CONCURRENT_AGENTS=3` and 6 tasks:
- Tasks should be processed sequentially via `kugetsu start`
- Only 3 dev agents should run at a time
- Tasks should queue and wait for slots to free up
## Solution
### 1. `count_active_dev_sessions()` - Exclude pm-agent
Only count actual dev agent session files (exclude `pm-agent.json`).
### 2. `process_queue()` - Call `kugetsu start` directly + retry logic
- Call `kugetsu start` directly (foreground, sequential) instead of spawning `opencode run` background process
- Dynamic batch size = available slots (removes need for `QUEUE_DAEMON_BATCH_SIZE`)
- Retry logic (max 3 attempts) on failure
- On failure: cleanup worktree/session and revert to `pending` state
- Save `fork_pid` to queue item for timeout handling
### 3. `cmd_start()` - Add flock
- Add flock around critical section (count check + fork)
- Track `fork_pid` for queue item timeout handling
### 4. Notification System
New notification types:
| Event | Type |
|-------|------|
| Task enqueued | `task_queued` |
| Task dequeued | `task_dequeued` |
| Task started | `task_started` |
| Task completed | `task_completed` |
| Task error | `task_error` |
### 5. Config
- Remove `QUEUE_DAEMON_BATCH_SIZE` (no longer needed - batch size is now dynamic)
## Notification Flow
| Event | Location | Type |
|-------|----------|------|
| Task enqueued | `enqueue_task()` | `task_queued` |
| Task dequeued | `process_queue()` after state change to `notified` | `task_dequeued` |
| Task started | `cmd_start()` after session file created | `task_started` |
| Task completed | `update_queue_item_state()` | `task_completed` |
| Task error | `update_queue_item_state()` | `task_error` |
## Out of Scope
- Re-check loop in cmd_start (checking if session DB is reliable) - deferred to separate research issue
- Buffer mechanism for excess forking (safety failsafe only)
## Status
- [x] Issue created
- [x] Implementation
- [x] PR created (#147)
- [ ] Merged

View File

@@ -50,7 +50,6 @@ A default config file is created during `kugetsu init` with commented examples:
| `KUGETSU_TEMP_DIR` | `~/.local/share/opencode/tool-output` | Temp directory for subagent tool output (useful in headless environments where /tmp is restricted) | | `KUGETSU_TEMP_DIR` | `~/.local/share/opencode/tool-output` | Temp directory for subagent tool output (useful in headless environments where /tmp is restricted) |
| `KUGETSU_VERBOSITY` | `default` | PM agent verbosity level: `verbose`, `default`, or `quiet` | | `KUGETSU_VERBOSITY` | `default` | PM agent verbosity level: `verbose`, `default`, or `quiet` |
| `QUEUE_DAEMON_INTERVAL_MINUTES` | 5 | How often daemon polls queue (in minutes) | | `QUEUE_DAEMON_INTERVAL_MINUTES` | 5 | How often daemon polls queue (in minutes) |
| `QUEUE_DAEMON_BATCH_SIZE` | 2 | How many tasks daemon picks per poll |
| `QUEUE_CLEANUP_AGE_DAYS` | 7 | Auto-cleanup completed/error items older than N days | | `QUEUE_CLEANUP_AGE_DAYS` | 7 | Auto-cleanup completed/error items older than N days |
### Environment Variables for Agents ### Environment Variables for Agents

View File

@@ -23,7 +23,6 @@ QUEUE_DAEMON_PID_FILE="${QUEUE_DAEMON_PID_FILE:-$QUEUE_DIR/daemon.pid}"
QUEUE_DAEMON_LOCK_FILE="${QUEUE_DAEMON_LOCK_FILE:-$QUEUE_DIR/daemon.lock}" QUEUE_DAEMON_LOCK_FILE="${QUEUE_DAEMON_LOCK_FILE:-$QUEUE_DIR/daemon.lock}"
QUEUE_DAEMON_LOG_FILE="${QUEUE_DAEMON_LOG_FILE:-$QUEUE_DIR/daemon.log}" QUEUE_DAEMON_LOG_FILE="${QUEUE_DAEMON_LOG_FILE:-$QUEUE_DIR/daemon.log}"
QUEUE_DAEMON_INTERVAL_MINUTES="${QUEUE_DAEMON_INTERVAL_MINUTES:-5}" QUEUE_DAEMON_INTERVAL_MINUTES="${QUEUE_DAEMON_INTERVAL_MINUTES:-5}"
QUEUE_DAEMON_BATCH_SIZE="${QUEUE_DAEMON_BATCH_SIZE:-2}"
QUEUE_CLEANUP_AGE_DAYS="${QUEUE_CLEANUP_AGE_DAYS:-7}" QUEUE_CLEANUP_AGE_DAYS="${QUEUE_CLEANUP_AGE_DAYS:-7}"
TASK_TIMEOUT_HOURS="${TASK_TIMEOUT_HOURS:-1}" TASK_TIMEOUT_HOURS="${TASK_TIMEOUT_HOURS:-1}"
@@ -63,7 +62,7 @@ count_active_dev_sessions() {
for session_file in "$SESSIONS_DIR"/*.json; do for session_file in "$SESSIONS_DIR"/*.json; do
if [ -f "$session_file" ]; then if [ -f "$session_file" ]; then
local filename=$(basename "$session_file") local filename=$(basename "$session_file")
if [ "$filename" != "base.json" ]; then if [ "$filename" != "base.json" ] && [ "$filename" != "pm-agent.json" ]; then
count=$((count + 1)) count=$((count + 1))
fi fi
fi fi
@@ -532,6 +531,8 @@ with open("$QUEUE_ITEMS_DIR/${queue_id}.json", "w") as f:
print(f"Enqueued: $queue_id") print(f"Enqueued: $queue_id")
PYEOF PYEOF
kugetsu_add_notification "task_queued" "Task queued: $issue_ref" "$issue_ref"
} }
get_pending_tasks() { get_pending_tasks() {
@@ -588,6 +589,7 @@ update_queue_item_state() {
python3 << PYEOF python3 << PYEOF
import json import json
import os
from datetime import datetime from datetime import datetime
item_file = "$item_file" item_file = "$item_file"
@@ -598,6 +600,8 @@ pid = "$pid"
with open(item_file, 'r') as f: with open(item_file, 'r') as f:
item = json.load(f) item = json.load(f)
issue_ref = item.get('issue_ref', '')
item['state'] = new_state item['state'] = new_state
if new_state == "notified": if new_state == "notified":
@@ -608,8 +612,10 @@ if new_state == "notified":
item['pid'] = int(pid) if pid.isdigit() else None item['pid'] = int(pid) if pid.isdigit() else None
elif new_state == "completed": elif new_state == "completed":
item['completed_at'] = datetime.now().isoformat() + "Z" item['completed_at'] = datetime.now().isoformat() + "Z"
os.system(f"kugetsu_add_notification 'task_completed' 'Task completed: {issue_ref}' '{issue_ref}'")
elif new_state == "error": elif new_state == "error":
item['error'] = datetime.now().isoformat() + "Z" item['error'] = datetime.now().isoformat() + "Z"
os.system(f"kugetsu_add_notification 'task_error' 'Task error: {issue_ref}' '{issue_ref}'")
with open(item_file, 'w') as f: with open(item_file, 'w') as f:
json.dump(item, f, indent=2) json.dump(item, f, indent=2)
@@ -1139,6 +1145,11 @@ parse_issue_ref_from_message() {
owner=$(echo "$full_path" | cut -d'/' -f2) owner=$(echo "$full_path" | cut -d'/' -f2)
repo=$(echo "$full_path" | cut -d'/' -f3) repo=$(echo "$full_path" | cut -d'/' -f3)
issue_number=$(echo "$full_path" | grep -oE '[0-9]+$' | head -1) issue_number=$(echo "$full_path" | grep -oE '[0-9]+$' | head -1)
elif echo "$message" | grep -qE '[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}/[a-zA-Z0-9_.-]+/[a-zA-Z0-9_.-]+#[0-9]+'; then
gitserver=$(echo "$message" | grep -oE '[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}/[a-zA-Z0-9_.-]+/[a-zA-Z0-9_.-]+' | head -1)
owner=$(echo "$gitserver" | cut -d'/' -f2)
repo=$(echo "$gitserver" | cut -d'/' -f3)
issue_number=$(echo "$message" | grep -oE '#[0-9]+' | grep -oE '[0-9]+' | head -1)
elif echo "$message" | grep -qE '[a-zA-Z0-9_.-]+/[a-zA-Z0-9_.-]+#([0-9]+)'; then elif echo "$message" | grep -qE '[a-zA-Z0-9_.-]+/[a-zA-Z0-9_.-]+#([0-9]+)'; then
owner=$(echo "$message" | grep -oE '[a-zA-Z0-9_.-]+/[a-zA-Z0-9_.-]+#' | sed 's/#$//' | cut -d'/' -f1) owner=$(echo "$message" | grep -oE '[a-zA-Z0-9_.-]+/[a-zA-Z0-9_.-]+#' | sed 's/#$//' | cut -d'/' -f1)
repo=$(echo "$message" | grep -oE '[a-zA-Z0-9_.-]+/[a-zA-Z0-9_.-]+#' | sed 's/#$//' | cut -d'/' -f2) repo=$(echo "$message" | grep -oE '[a-zA-Z0-9_.-]+/[a-zA-Z0-9_.-]+#' | sed 's/#$//' | cut -d'/' -f2)
@@ -1366,22 +1377,15 @@ process_queue() {
fi fi
local available_slots=$((MAX_CONCURRENT_AGENTS - active_count)) local available_slots=$((MAX_CONCURRENT_AGENTS - active_count))
local batch_size=$QUEUE_DAEMON_BATCH_SIZE
[ "$batch_size" -gt "$available_slots" ] && batch_size=$available_slots
if [ "$batch_size" -le 0 ]; then if [ "$available_slots" -le 0 ]; then
return
fi
local pm_session=$(get_pm_agent_session_id)
if [ -z "$pm_session" ] || [ "$pm_session" = "null" ]; then
return return
fi fi
local count=0 local count=0
for item in $(ls -t "$QUEUE_ITEMS_DIR"/*.json 2>/dev/null | head -20); do for item in $(ls -t "$QUEUE_ITEMS_DIR"/*.json 2>/dev/null | head -20); do
[ $count -ge "$available_slots" ] && break
[ -f "$item" ] || continue [ -f "$item" ] || continue
[ $count -ge "$batch_size" ] && break
local state=$(python3 -c "import json; print(json.load(open('$item')).get('state', ''))" 2>/dev/null) local state=$(python3 -c "import json; print(json.load(open('$item')).get('state', ''))" 2>/dev/null)
if [ "$state" != "pending" ]; then if [ "$state" != "pending" ]; then
@@ -1389,33 +1393,49 @@ process_queue() {
fi fi
local queue_id=$(basename "$item" .json) local queue_id=$(basename "$item" .json)
local issue_ref=$(python3 -c "import json; print(json.load(open('$item')).get('issue_ref', '')" 2>/dev/null) local issue_ref=$(python3 -c "import json; print(json.load(open('$item')).get('issue_ref', ''))" 2>/dev/null)
local message=$(python3 -c "import json; print(json.load(open('$item')).get('message', '')" 2>/dev/null) local message=$(python3 -c "import json; print(json.load(open('$item')).get('message', ''))" 2>/dev/null)
if [ -z "$issue_ref" ] || [ -z "$message" ]; then if [ -z "$issue_ref" ] || [ -z "$message" ]; then
continue continue
fi fi
update_queue_item_state "$queue_id" "notified" update_queue_item_state "$queue_id" "notified"
kugetsu_add_notification "task_dequeued" "Task dequeued: $issue_ref" "$issue_ref"
local log_file="$LOGS_DIR/delegate-${queue_id}.log" local log_file="$LOGS_DIR/delegate-${queue_id}.log"
mkdir -p "$LOGS_DIR" mkdir -p "$LOGS_DIR"
local env_sh="set -a; " local max_retries=3
if [ -f "$ENV_DIR/pm-agent.env" ]; then local attempt=1
env_sh="${env_sh}source '$ENV_DIR/pm-agent.env'; " local success=false
elif [ -f "$ENV_DIR/default.env" ]; then local fork_pid=""
env_sh="${env_sh}source '$ENV_DIR/default.env'; "
while [ $attempt -le $max_retries ]; do
if kugetsu start "$issue_ref" "$message" >> "$log_file" 2>&1; then
success=true
break
fi
echo "Attempt $attempt failed for $queue_id, cleaning up..." >> "$log_file"
local session_file="$(issue_ref_to_filename "$issue_ref").json"
local worktree_path=$(issue_ref_to_worktree_path "$issue_ref" "$PWD")
[ -f "$SESSIONS_DIR/$session_file" ] && rm -f "$SESSIONS_DIR/$session_file"
worktree_exists "$issue_ref" "$PWD" && remove_worktree_for_issue "$issue_ref" "$PWD"
remove_issue_from_index "$issue_ref" 2>/dev/null || true
attempt=$((attempt + 1))
done
if [ "$success" = true ]; then
echo "Started task $queue_id: $issue_ref"
count=$((count + 1))
else
echo "Failed to start task $queue_id after $max_retries attempts"
update_queue_item_state "$queue_id" "pending"
fi fi
env_sh="${env_sh}set +a; "
nohup sh -c "${env_sh}opencode run 'Delegate task: ${message}' --continue --session '$pm_session'" >> "$log_file" 2>&1 &
local fork_pid=$!
update_queue_item_state "$queue_id" "notified" "" "$fork_pid"
echo "Queued task $queue_id for PM agent (PID: $fork_pid)"
count=$((count + 1))
done done
} }
@@ -2061,20 +2081,10 @@ cmd_start() {
create_worktree "$issue_ref" "$parent_dir" create_worktree "$issue_ref" "$parent_dir"
local session_file="$(issue_ref_to_filename "$issue_ref").json" local session_file="$(issue_ref_to_filename "$issue_ref").json"
echo "Forking session for '$issue_ref'..."
# Session-counting: count actual dev sessions, reject if at limit
local active_count=$(count_active_dev_sessions)
if [ "$active_count" -ge "$MAX_CONCURRENT_AGENTS" ]; then
echo "Error: Max concurrent agents ($MAX_CONCURRENT_AGENTS) reached" >&2
echo "Active sessions: $active_count" >&2
remove_worktree_for_issue "$issue_ref" "$parent_dir"
exit 1
fi
local fork_log="$SESSIONS_DIR/$session_file.fork.log" local fork_log="$SESSIONS_DIR/$session_file.fork.log"
local opencode_db="${OPENCODE_DB:-$HOME/.local/share/opencode/opencode.db}" local opencode_db="${OPENCODE_DB:-$HOME/.local/share/opencode/opencode.db}"
local lock_file="$KUGETSU_DIR/.session_lock"
local lock_fd=200
> "$fork_log" > "$fork_log"
@@ -2087,25 +2097,38 @@ ${previous_context}
## YOUR TASK ## YOUR TASK
$message" $message"
fix_session_permissions (
flock -x $lock_fd
if [ "$DEBUG_MODE" = true ]; then
(cd "$worktree_path" && opencode run "$full_message" --fork --session "$base_session_id" --dir "$worktree_path" 2>&1) | tee "$fork_log" &
else
(cd "$worktree_path" && opencode run "$full_message" --fork --session "$base_session_id" --dir "$worktree_path" 2>&1) >> "$fork_log" &
fi
local fork_pid=$!
local max_attempts=10
local attempt=1
local new_session_id=""
local fork_log_output=""
while [ $attempt -le $max_attempts ]; do
sleep 1
new_session_id=$(python3 -c " local active_count=$(count_active_dev_sessions)
if [ "$active_count" -ge "$MAX_CONCURRENT_AGENTS" ]; then
echo "Error: Max concurrent agents ($MAX_CONCURRENT_AGENTS) reached" >&2
echo "Active sessions: $active_count" >&2
remove_worktree_for_issue "$issue_ref" "$parent_dir"
exit 1
fi
echo "Forking session for '$issue_ref'..."
fix_session_permissions
if [ "$DEBUG_MODE" = true ]; then
(cd "$worktree_path" && opencode run "$full_message" --fork --session "$base_session_id" --dir "$worktree_path" 2>&1) | tee "$fork_log" &
else
(cd "$worktree_path" && opencode run "$full_message" --fork --session "$base_session_id" --dir "$worktree_path" 2>&1) >> "$fork_log" &
fi
local fork_pid=$!
local max_attempts=10
local attempt=1
local new_session_id=""
local fork_log_output=""
while [ $attempt -le $max_attempts ]; do
sleep 1
new_session_id=$(python3 -c "
import sqlite3 import sqlite3
conn = sqlite3.connect('$opencode_db') conn = sqlite3.connect('$opencode_db')
cursor = conn.cursor() cursor = conn.cursor()
@@ -2114,31 +2137,31 @@ result = cursor.fetchone()
if result: if result:
print(result[0]) print(result[0])
" 2>/dev/null || echo "") " 2>/dev/null || echo "")
if [ -n "$new_session_id" ] && [ "$new_session_id" != "$base_session_id" ] && [ "$new_session_id" != "$pm_agent_session_id" ]; then if [ -n "$new_session_id" ] && [ "$new_session_id" != "$base_session_id" ] && [ "$new_session_id" != "$pm_agent_session_id" ]; then
break break
fi fi
if ! kill -0 $fork_pid 2>/dev/null; then if ! kill -0 $fork_pid 2>/dev/null; then
fork_log_output=$(tail -20 "$fork_log" 2>/dev/null || echo "(log empty or unavailable)") fork_log_output=$(tail -20 "$fork_log" 2>/dev/null || echo "(log empty or unavailable)")
break break
fi fi
attempt=$((attempt + 1)) attempt=$((attempt + 1))
done done
if [ -z "$new_session_id" ]; then if [ -z "$new_session_id" ]; then
echo "Error: Could not find newly created session after ${max_attempts}s" >&2 echo "Error: Could not find newly created session after ${max_attempts}s" >&2
if [ -n "$fork_log_output" ]; then if [ -n "$fork_log_output" ]; then
echo "Fork log output:" >&2 echo "Fork log output:" >&2
echo "$fork_log_output" >&2 echo "$fork_log_output" >&2
fi
remove_worktree_for_issue "$issue_ref"
exit 1
fi fi
remove_worktree_for_issue "$issue_ref"
exit 1
fi
echo "Updating permissions for new session: $new_session_id" echo "Updating permissions for new session: $new_session_id"
python3 -c " python3 -c "
import sqlite3 import sqlite3
conn = sqlite3.connect('$opencode_db') conn = sqlite3.connect('$opencode_db')
cursor = conn.cursor() cursor = conn.cursor()
@@ -2148,9 +2171,9 @@ conn.commit()
print('[OK] Session permissions updated') print('[OK] Session permissions updated')
" "
if [ "$DEBUG_MODE" = true ]; then if [ "$DEBUG_MODE" = true ]; then
echo "[DEBUG] Forked session permissions check:" echo "[DEBUG] Forked session permissions check:"
python3 -c " python3 -c "
import sqlite3 import sqlite3
conn = sqlite3.connect('$opencode_db') conn = sqlite3.connect('$opencode_db')
cursor = conn.cursor() cursor = conn.cursor()
@@ -2160,11 +2183,11 @@ for row in cursor.fetchall():
print(' Directory:', row[1]) print(' Directory:', row[1])
print(' Permission:', row[2]) print(' Permission:', row[2])
" 2>/dev/null || echo " (failed to query DB)" " 2>/dev/null || echo " (failed to query DB)"
fi fi
local branch_name=$(issue_ref_to_branch_name "$issue_ref") local branch_name=$(issue_ref_to_branch_name "$issue_ref")
python3 << PYEOF > "$SESSIONS_DIR/$session_file" python3 << PYEOF > "$SESSIONS_DIR/$session_file"
import json import json
session = { session = {
@@ -2182,12 +2205,15 @@ with open("$SESSIONS_DIR/$session_file", "w") as f:
json.dump(session, f, indent=2) json.dump(session, f, indent=2)
PYEOF PYEOF
add_issue_to_index "$issue_ref" "$session_file" add_issue_to_index "$issue_ref" "$session_file"
kugetsu_context_dump "$issue_ref" "$message" "$branch_name" kugetsu_context_dump "$issue_ref" "$message" "$branch_name"
kugetsu_add_notification "task_started" "Task started: $issue_ref" "$issue_ref"
echo "Session started for '$issue_ref': $new_session_id" echo "Session started for '$issue_ref': $new_session_id"
echo "Worktree: $worktree_path" echo "Worktree: $worktree_path"
) 200>"$lock_file"
} }
cmd_continue() { cmd_continue() {