From 270219873f22a01f7d092595c25663beca24cb55 Mon Sep 17 00:00:00 2001 From: shokollm <270575765+shokollm@users.noreply.github.com> Date: Sun, 5 Apr 2026 22:05:18 +0000 Subject: [PATCH 1/4] fix(kugetsu): cmd_delegate should enqueue instead of calling cmd_start When cmd_delegate detects an issue ref with number (e.g. git.fbrns.co/shoko/kugetsu#158), it was calling cmd_start directly which tries to create worktree and clone. This breaks the queue-based workflow where daemon should handle task execution. Now cmd_delegate calls enqueue_task to add to queue, and daemon processes tasks by calling cmd_start/cmd_continue as appropriate. --- skills/kugetsu/scripts/kugetsu-session.sh | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/skills/kugetsu/scripts/kugetsu-session.sh b/skills/kugetsu/scripts/kugetsu-session.sh index 1abdf67..b355e6c 100755 --- a/skills/kugetsu/scripts/kugetsu-session.sh +++ b/skills/kugetsu/scripts/kugetsu-session.sh @@ -153,10 +153,12 @@ cmd_delegate() { local issue_ref=$(extract_issue_ref_from_message "$message") if [ -n "$issue_ref" ] && [[ "$issue_ref" =~ \#[0-9]+$ ]]; then - cmd_start "$issue_ref" "$message" + # Enqueue for daemon to process via cmd_start/cmd_continue + enqueue_task "$issue_ref" "$message" return fi + # No issue ref detected — delegate directly to PM agent (legacy path) local pm_session=$(get_pm_agent_session_id) if [ -z "$pm_session" ] || [ "$pm_session" = "null" ] || [ "$pm_session" = "None" ]; then echo "Error: PM agent session not found. Run 'kugetsu init' first." >&2 @@ -165,7 +167,7 @@ cmd_delegate() { mkdir -p "$LOGS_DIR" local log_file="$LOGS_DIR/delegate-$(date +%s).log" - nohup sh -c "GITEA_TOKEN='${GITEA_TOKEN:-}' opencode run '$message' --continue --session '$pm_session' >> '$log_file' 2>&1" > /dev/null 2>&1 & + nohup sh -c "GITEA_TOKEN='***' opencode run '$message' --continue --session '$pm_session' >> '$log_file' 2>&1" > /dev/null 2>&1 & disown echo "Delegated to PM agent (logged to $(basename "$log_file"))" } -- 2.49.1 From b1028a65560e9c76ffc700141c29b0473eb030ea Mon Sep 17 00:00:00 2001 From: shokollm <270575765+shokollm@users.noreply.github.com> Date: Sun, 5 Apr 2026 22:17:59 +0000 Subject: [PATCH 2/4] fix(kugetsu): move queue functions to kugetsu-index.sh for daemon access The daemon (kugetsu-queue-daemon.sh) sources kugetsu-index.sh but not the main kugetsu script. Move update_queue_item_state and kugetsu_add_notification to kugetsu-index.sh so the daemon can use these functions when processing tasks. --- skills/kugetsu/scripts/kugetsu-index.sh | 92 +++++++++++++++++++++++++ 1 file changed, 92 insertions(+) diff --git a/skills/kugetsu/scripts/kugetsu-index.sh b/skills/kugetsu/scripts/kugetsu-index.sh index eb85d26..eba8a90 100755 --- a/skills/kugetsu/scripts/kugetsu-index.sh +++ b/skills/kugetsu/scripts/kugetsu-index.sh @@ -146,3 +146,95 @@ filename_to_issue_ref() { local name="${filename%.json}" echo "$name" | sed 's-\([0-9]*\)$-#\1-' | sed 's/-/\//g' } + +# Add notification to notifications file +kugetsu_add_notification() { + local type="$1" + local message="$2" + local issue_ref="${3:-}" + local gitea_url="${4:-}" + + mkdir -p "$(dirname "$NOTIFICATIONS_FILE")" + + python3 << PYEOF +import json +import os +from datetime import datetime + +notification = { + "type": "$type", + "message": "$message", + "issue_ref": "$issue_ref" if "$issue_ref" else None, + "gitea_url": "$gitea_url" if "$gitea_url" else None, + "timestamp": datetime.now().isoformat(), + "read": False +} + +file_path = os.path.expanduser("$NOTIFICATIONS_FILE") +notifications = [] + +if os.path.exists(file_path): + try: + with open(file_path, 'r') as f: + notifications = json.load(f) + except: + notifications = [] + +notifications.append(notification) + +with open(file_path, 'w') as f: + json.dump(notifications, f, indent=2) + +print("Notification added") +PYEOF +} + +# Update queue item state +update_queue_item_state() { + local queue_id="$1" + local new_state="$2" + local session_id="${3:-}" + local pid="${4:-}" + + local item_file="$QUEUE_ITEMS_DIR/${queue_id}.json" + if [ ! -f "$item_file" ]; then + echo "Error: Queue item not found: $queue_id" >&2 + return 1 + fi + + python3 << PYEOF +import json +import os +from datetime import datetime + +item_file = "$item_file" +new_state = "$new_state" +session_id = "$session_id" +pid = "$pid" + +with open(item_file, 'r') as f: + item = json.load(f) + +issue_ref = item.get('issue_ref', '') + +item['state'] = new_state + +if new_state == "notified": + item['notified_at'] = datetime.now().isoformat() + "Z" + if session_id: + item['opencode_session_id'] = session_id + if pid: + item['pid'] = int(pid) if pid.isdigit() else None +elif new_state == "completed": + item['completed_at'] = datetime.now().isoformat() + "Z" + os.system(f"kugetsu_add_notification 'task_completed' 'Task completed: {issue_ref}' '{issue_ref}'") +elif new_state == "error": + item['error'] = datetime.now().isoformat() + "Z" + os.system(f"kugetsu_add_notification 'task_error' 'Task error: {issue_ref}' '{issue_ref}'") + +with open(item_file, 'w') as f: + json.dump(item, f, indent=2) + +print(f"Updated $queue_id to state: $new_state") +PYEOF +} -- 2.49.1 From 5a0a54898bdcb5bfed557f104ebe67787bc3b91f Mon Sep 17 00:00:00 2001 From: shokollm <270575765+shokollm@users.noreply.github.com> Date: Sun, 5 Apr 2026 22:20:49 +0000 Subject: [PATCH 3/4] fix(kugetsu): kugetsu-session.sh needs to source required modules When daemon sources kugetsu-session.sh to call cmd_start/cmd_continue, it needs access to functions from kugetsu-config.sh, kugetsu-index.sh, kugetsu-worktree.sh, and kugetsu-log.sh. Add sourcing at top of kugetsu-session.sh. --- skills/kugetsu/scripts/kugetsu-session.sh | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/skills/kugetsu/scripts/kugetsu-session.sh b/skills/kugetsu/scripts/kugetsu-session.sh index b355e6c..dcbb1f3 100755 --- a/skills/kugetsu/scripts/kugetsu-session.sh +++ b/skills/kugetsu/scripts/kugetsu-session.sh @@ -1,6 +1,13 @@ #!/bin/bash set -euo pipefail +# Source required modules for session management functions +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +source "$SCRIPT_DIR/kugetsu-config.sh" +source "$SCRIPT_DIR/kugetsu-index.sh" +source "$SCRIPT_DIR/kugetsu-worktree.sh" +source "$SCRIPT_DIR/kugetsu-log.sh" + count_active_dev_sessions() { local count=0 if [ -d "$SESSIONS_DIR" ]; then -- 2.49.1 From bdcb7a476c3837f283b80ab6e8a86eff0831fb4f Mon Sep 17 00:00:00 2001 From: shokollm <270575765+shokollm@users.noreply.github.com> Date: Mon, 6 Apr 2026 00:01:41 +0000 Subject: [PATCH 4/4] fix(queue-daemon): add locking, proper state updates, and error handling - Add acquire_lock/release_lock to prevent daemon vs manual conflicts - Check cmd_start/cmd_continue success before updating state to 'notified' - Set state to 'error' if command fails - Track actual session_id from session file after cmd_start completes - Release lock when task completes (success or error) - Use load_agent_env 'pm-agent' for GITEA_TOKEN Fixes critical race conditions and failure handling in queue processing --- .../kugetsu/scripts/kugetsu-queue-daemon.sh | 172 ++++++++++++------ 1 file changed, 117 insertions(+), 55 deletions(-) diff --git a/skills/kugetsu/scripts/kugetsu-queue-daemon.sh b/skills/kugetsu/scripts/kugetsu-queue-daemon.sh index d1471f3..6a49602 100644 --- a/skills/kugetsu/scripts/kugetsu-queue-daemon.sh +++ b/skills/kugetsu/scripts/kugetsu-queue-daemon.sh @@ -8,86 +8,148 @@ source "$SCRIPT_DIR/kugetsu-index.sh" source "$SCRIPT_DIR/kugetsu-worktree.sh" source "$SCRIPT_DIR/kugetsu-log.sh" -# Load GITEA_TOKEN from default.env -if [ -f "$HOME/.kugetsu/env/default.env" ]; then - source "$HOME/.kugetsu/env/default.env" -fi +load_agent_env "pm-agent" + +acquire_lock() { + local issue_ref="$1" + local lock_file="$QUEUE_DIR/locks/$(echo "$issue_ref" | sed 's/[\/:]/-/g' | sed 's/#/-/').lock" + mkdir -p "$(dirname "$lock_file")" + if [ -f "$lock_file" ]; then + local pid=$(cat "$lock_file" 2>/dev/null || echo "") + if [ -n "$pid" ] && kill -0 "$pid" 2>/dev/null; then + return 1 + fi + rm -f "$lock_file" + fi + echo $$ > "$lock_file" + return 0 +} + +release_lock() { + local issue_ref="$1" + local lock_file="$QUEUE_DIR/locks/$(echo "$issue_ref" | sed 's/[\/:]/-/g' | sed 's/#/-/').lock" + rm -f "$lock_file" +} -# Check if a notified task has completed (forked session ended or has new commits) check_task_completion() { local item="$1" local queue_id=$(basename "$item" .json) local state=$(python3 -c "import json; print(json.load(open('$item')).get('state', ''))" 2>/dev/null) - + [ "$state" = "notified" ] || return 0 - - # Use opencode_session_id (the forked session, not the parent pm_session) + local session_id=$(python3 -c "import json; print(json.load(open('$item')).get('opencode_session_id', ''))" 2>/dev/null) local issue_ref=$(python3 -c "import json; print(json.load(open('$item')).get('issue_ref', ''))" 2>/dev/null) - - # If no session tracked, skip - [ -n "$session_id" ] || return 0 - - # Check if forked session still exists in opencode - if ! opencode session list 2>/dev/null | grep -q "$session_id"; then - # Forked session ended — check if work was done - local worktree_path=$(issue_ref_to_worktree_path "$issue_ref" "$HOME/.kugetsu-worktrees") - local has_commits=false - - if [ -d "$worktree_path" ] && [ -d "$worktree_path/.git" ]; then - # Check if worktree has new commits beyond origin/main - if [ -n "$(git -C "$worktree_path" log --oneline origin/main..HEAD 2>/dev/null)" ]; then - has_commits=true + local pid=$(python3 -c "import json; print(json.load(open('$item')).get('pid', ''))" 2>/dev/null) + + if [ -n "$pid" ] && [ "$pid" != "None" ]; then + if ! kill -0 "$pid" 2>/dev/null; then + local worktree_path=$(issue_ref_to_worktree_path "$issue_ref" "$HOME/.kugetsu-worktrees") + local has_commits=false + + if [ -d "$worktree_path" ] && [ -d "$worktree_path/.git" ]; then + if [ -n "$(git -C "$worktree_path" log --oneline origin/main..HEAD 2>/dev/null)" ]; then + has_commits=true + fi fi + + if [ "$has_commits" = true ]; then + update_queue_item_state "$queue_id" "completed" + echo "Task $queue_id ($issue_ref) completed — new commits found" + else + update_queue_item_state "$queue_id" "error" + echo "Task $queue_id ($issue_ref) marked error — no commits found after session ended" + fi + release_lock "$issue_ref" fi - - if [ "$has_commits" = true ]; then - update_queue_item_state "$queue_id" "completed" - echo "Task $queue_id ($issue_ref) completed — new commits found" - else - update_queue_item_state "$queue_id" "error" - echo "Task $queue_id ($issue_ref) marked error — no commits found after session ended" + else + if [ -n "$session_id" ] && ! opencode session list 2>/dev/null | grep -q "$session_id"; then + local worktree_path=$(issue_ref_to_worktree_path "$issue_ref" "$HOME/.kugetsu-worktrees") + local has_commits=false + + if [ -d "$worktree_path" ] && [ -d "$worktree_path/.git" ]; then + if [ -n "$(git -C "$worktree_path" log --oneline origin/main..HEAD 2>/dev/null)" ]; then + has_commits=true + fi + fi + + if [ "$has_commits" = true ]; then + update_queue_item_state "$queue_id" "completed" + echo "Task $queue_id ($issue_ref) completed — new commits found" + else + update_queue_item_state "$queue_id" "error" + echo "Task $queue_id ($issue_ref) marked error — no commits found after session ended" + fi + release_lock "$issue_ref" fi fi } +get_session_id_for_issue() { + local issue_ref="$1" + local session_file=$(issue_ref_to_filename "$issue_ref") + local session_path="$SESSIONS_DIR/$session_file" + if [ -f "$session_path" ]; then + python3 -c "import json; print(json.load(open('$session_path')).get('opencode_session_id', ''))" 2>/dev/null || echo "" + else + echo "" + fi +} + +process_task() { + local item="$1" + local queue_id=$(basename "$item" .json) + local issue_ref=$(python3 -c "import json; print(json.load(open('$item')).get('issue_ref', ''))" 2>/dev/null) + local message=$(python3 -c "import json; print(json.load(open('$item')).get('message', ''))" 2>/dev/null) + + if ! acquire_lock "$issue_ref"; then + echo "Task $queue_id ($issue_ref) skipped — another process is handling it" + return + fi + + source "$SCRIPT_DIR/kugetsu-session.sh" + + if worktree_exists "$issue_ref" "$HOME/.kugetsu-worktrees" || [ -f "$SESSIONS_DIR/$(issue_ref_to_filename "$issue_ref").json" ]; then + log_file="$LOGS_DIR/delegate-$(date +%s).log" + if cmd_continue "$issue_ref" "$message" >> "$log_file" 2>&1; then + sleep 1 + local session_id=$(get_session_id_for_issue "$issue_ref") + update_queue_item_state "$queue_id" "notified" "$session_id" "" + echo "Task $queue_id continued for $issue_ref" + else + update_queue_item_state "$queue_id" "error" + echo "Task $queue_id ($issue_ref) failed to continue" + fi + else + log_file="$LOGS_DIR/delegate-$(date +%s).log" + if cmd_start "$issue_ref" "$message" >> "$log_file" 2>&1; then + sleep 1 + local session_id=$(get_session_id_for_issue "$issue_ref") + update_queue_item_state "$queue_id" "notified" "$session_id" "" + echo "Task $queue_id started for $issue_ref" + else + update_queue_item_state "$queue_id" "error" + echo "Task $queue_id ($issue_ref) failed to start" + fi + fi + + release_lock "$issue_ref" +} + while true; do - # Check completion of notified tasks if [ -d "$QUEUE_ITEMS_DIR" ]; then for item in "$QUEUE_ITEMS_DIR"/*.json; do [ -f "$item" ] || continue check_task_completion "$item" done - - # Process pending tasks + for item in "$QUEUE_ITEMS_DIR"/*.json; do [ -f "$item" ] || continue state=$(python3 -c "import json; print(json.load(open('$item')).get('state', ''))" 2>/dev/null) if [ "$state" = "pending" ]; then - queue_id=$(basename "$item" .json) - issue_ref=$(python3 -c "import json; print(json.load(open('$item')).get('issue_ref', ''))" 2>/dev/null) - message=$(python3 -c "import json; print(json.load(open('$item')).get('message', ''))" 2>/dev/null) - - # Source session management and use cmd_start/cmd_continue - source "$SCRIPT_DIR/kugetsu-session.sh" - - if worktree_exists "$issue_ref" "$HOME/.kugetsu-worktrees" || [ -f "$SESSIONS_DIR/$(issue_ref_to_filename "$issue_ref").json" ]; then - # Continue existing session - log_file="$LOGS_DIR/delegate-$(date +%s).log" - cmd_continue "$issue_ref" "$message" >> "$log_file" 2>&1 & - pid=$! - update_queue_item_state "$queue_id" "notified" "" "$pid" - echo "Task $queue_id continued for $issue_ref" - else - # Start new session - log_file="$LOGS_DIR/delegate-$(date +%s).log" - cmd_start "$issue_ref" "$message" >> "$log_file" 2>&1 & - pid=$! - update_queue_item_state "$queue_id" "notified" "" "$pid" - echo "Task $queue_id started for $issue_ref" - fi + process_task "$item" fi done fi sleep "${QUEUE_DAEMON_INTERVAL_MINUTES:-5}m" -done +done \ No newline at end of file -- 2.49.1