Compare commits

..

11 Commits

Author SHA1 Message Date
shokollm
59f6a4883e fix(kugetsu): remove duplicate update_queue_item_state to use fixed version from kugetsu-index.sh
The main kugetsu script had its own update_queue_item_state() definition
with broken os.system() calls that overwrote the fixed version in kugetsu-index.sh.

Now kugetsu will use the fixed version from kugetsu-index.sh (which sources
kugetsu-log.sh) since that module is sourced first.

Fixes #170
2026-04-06 00:40:55 +00:00
9667c3e800 Merge pull request 'fix(kugetsu-index): call kugetsu_add_notification from bash instead of os.system()' (#169) from fix/issue-167-notification-bash into main 2026-04-06 02:35:36 +02:00
shokollm
796e1fe454 fix(kugetsu-index): call kugetsu_add_notification from bash instead of os.system()
os.system() spawns a new subprocess that cannot access bash functions.
Now calling kugetsu_add_notification directly from bash after Python updates JSON.

Fixes #167
2026-04-06 00:33:35 +00:00
84c59a3b64 Merge pull request 'fix(kugetsu): queue daemon improvements - locking, error handling, cmd_delegate enqueue' (#164) from fix/issue-156-queue-fixes into main 2026-04-06 02:06:36 +02:00
shokollm
dc9d4d7327 Merge origin/main into fix/issue-156-queue-fixes 2026-04-06 00:02:47 +00:00
shokollm
bdcb7a476c fix(queue-daemon): add locking, proper state updates, and error handling
- Add acquire_lock/release_lock to prevent daemon vs manual conflicts
- Check cmd_start/cmd_continue success before updating state to 'notified'
- Set state to 'error' if command fails
- Track actual session_id from session file after cmd_start completes
- Release lock when task completes (success or error)
- Use load_agent_env 'pm-agent' for GITEA_TOKEN

Fixes critical race conditions and failure handling in queue processing
2026-04-06 00:01:41 +00:00
77cf817568 Merge pull request 'fix(kugetsu): return proper JSON array from get_pending_tasks()' (#163) from fix/issue-155-queue-list-json into main 2026-04-06 01:45:09 +02:00
77b0963fa4 Merge pull request 'fix(queue-daemon): source pm-agent.env for GITEA_TOKEN instead of default.env' (#162) from fix/issue-160-gitea-token-from-pm-agent into main 2026-04-06 01:42:04 +02:00
shokollm
5a0a54898b fix(kugetsu): kugetsu-session.sh needs to source required modules
When daemon sources kugetsu-session.sh to call cmd_start/cmd_continue,
it needs access to functions from kugetsu-config.sh, kugetsu-index.sh,
kugetsu-worktree.sh, and kugetsu-log.sh. Add sourcing at top of
kugetsu-session.sh.
2026-04-05 22:20:49 +00:00
shokollm
b1028a6556 fix(kugetsu): move queue functions to kugetsu-index.sh for daemon access
The daemon (kugetsu-queue-daemon.sh) sources kugetsu-index.sh but not the main kugetsu script.
Move update_queue_item_state and kugetsu_add_notification to kugetsu-index.sh
so the daemon can use these functions when processing tasks.
2026-04-05 22:17:59 +00:00
shokollm
270219873f fix(kugetsu): cmd_delegate should enqueue instead of calling cmd_start
When cmd_delegate detects an issue ref with number (e.g. git.fbrns.co/shoko/kugetsu#158),
it was calling cmd_start directly which tries to create worktree and clone.
This breaks the queue-based workflow where daemon should handle task execution.

Now cmd_delegate calls enqueue_task to add to queue, and daemon processes
tasks by calling cmd_start/cmd_continue as appropriate.
2026-04-05 22:05:18 +00:00
4 changed files with 222 additions and 102 deletions

View File

@@ -361,55 +361,6 @@ get_queue_stats() {
echo "{\"total\": $total, \"pending\": $pending, \"notified\": $notified, \"completed\": $completed, \"error\": $error}"
}
update_queue_item_state() {
local queue_id="$1"
local new_state="$2"
local session_id="${3:-}"
local pid="${4:-}"
local item_file="$QUEUE_ITEMS_DIR/${queue_id}.json"
if [ ! -f "$item_file" ]; then
echo "Error: Queue item not found: $queue_id" >&2
return 1
fi
python3 << PYEOF
import json
import os
from datetime import datetime
item_file = "$item_file"
new_state = "$new_state"
session_id = "$session_id"
pid = "$pid"
with open(item_file, 'r') as f:
item = json.load(f)
issue_ref = item.get('issue_ref', '')
item['state'] = new_state
if new_state == "notified":
item['notified_at'] = datetime.now().isoformat() + "Z"
if session_id:
item['opencode_session_id'] = session_id
if pid:
item['pid'] = int(pid) if pid.isdigit() else None
elif new_state == "completed":
item['completed_at'] = datetime.now().isoformat() + "Z"
os.system(f"kugetsu_add_notification 'task_completed' 'Task completed: {issue_ref}' '{issue_ref}'")
elif new_state == "error":
item['error'] = datetime.now().isoformat() + "Z"
os.system(f"kugetsu_add_notification 'task_error' 'Task error: {issue_ref}' '{issue_ref}'")
with open(item_file, 'w') as f:
json.dump(item, f, indent=2)
print(f"Updated $queue_id to state: $new_state")
PYEOF
}
check_task_timeouts() {
if [ ! -d "$QUEUE_ITEMS_DIR" ]; then
return

View File

@@ -146,3 +146,98 @@ filename_to_issue_ref() {
local name="${filename%.json}"
echo "$name" | sed 's-\([0-9]*\)$-#\1-' | sed 's/-/\//g'
}
# Add notification to notifications file
kugetsu_add_notification() {
local type="$1"
local message="$2"
local issue_ref="${3:-}"
local gitea_url="${4:-}"
mkdir -p "$(dirname "$NOTIFICATIONS_FILE")"
python3 << PYEOF
import json
import os
from datetime import datetime
notification = {
"type": "$type",
"message": "$message",
"issue_ref": "$issue_ref" if "$issue_ref" else None,
"gitea_url": "$gitea_url" if "$gitea_url" else None,
"timestamp": datetime.now().isoformat(),
"read": False
}
file_path = os.path.expanduser("$NOTIFICATIONS_FILE")
notifications = []
if os.path.exists(file_path):
try:
with open(file_path, 'r') as f:
notifications = json.load(f)
except:
notifications = []
notifications.append(notification)
with open(file_path, 'w') as f:
json.dump(notifications, f, indent=2)
print("Notification added")
PYEOF
}
# Update queue item state
update_queue_item_state() {
local queue_id="$1"
local new_state="$2"
local session_id="${3:-}"
local pid="${4:-}"
local item_file="$QUEUE_ITEMS_DIR/${queue_id}.json"
if [ ! -f "$item_file" ]; then
echo "Error: Queue item not found: $queue_id" >&2
return 1
fi
local issue_ref=$(python3 -c "import json; print(json.load(open('$item_file')).get('issue_ref', ''))" 2>/dev/null || echo "")
python3 << PYEOF
import json
from datetime import datetime
item_file = "$item_file"
new_state = "$new_state"
session_id = "$session_id"
pid = "$pid"
with open(item_file, 'r') as f:
item = json.load(f)
item['state'] = new_state
if new_state == "notified":
item['notified_at'] = datetime.now().isoformat() + "Z"
if session_id:
item['opencode_session_id'] = session_id
if pid:
item['pid'] = int(pid) if pid.isdigit() else None
elif new_state == "completed":
item['completed_at'] = datetime.now().isoformat() + "Z"
elif new_state == "error":
item['error'] = datetime.now().isoformat() + "Z"
with open(item_file, 'w') as f:
json.dump(item, f, indent=2)
print(f"Updated $queue_id to state: $new_state")
PYEOF
if [ "$new_state" = "completed" ]; then
kugetsu_add_notification "task_completed" "Task completed: $issue_ref" "$issue_ref"
elif [ "$new_state" = "error" ]; then
kugetsu_add_notification "task_error" "Task error: $issue_ref" "$issue_ref"
fi
}

View File

@@ -10,7 +10,27 @@ source "$SCRIPT_DIR/kugetsu-log.sh"
load_agent_env "pm-agent"
# Check if a notified task has completed (forked session ended or has new commits)
acquire_lock() {
local issue_ref="$1"
local lock_file="$QUEUE_DIR/locks/$(echo "$issue_ref" | sed 's/[\/:]/-/g' | sed 's/#/-/').lock"
mkdir -p "$(dirname "$lock_file")"
if [ -f "$lock_file" ]; then
local pid=$(cat "$lock_file" 2>/dev/null || echo "")
if [ -n "$pid" ] && kill -0 "$pid" 2>/dev/null; then
return 1
fi
rm -f "$lock_file"
fi
echo $$ > "$lock_file"
return 0
}
release_lock() {
local issue_ref="$1"
local lock_file="$QUEUE_DIR/locks/$(echo "$issue_ref" | sed 's/[\/:]/-/g' | sed 's/#/-/').lock"
rm -f "$lock_file"
}
check_task_completion() {
local item="$1"
local queue_id=$(basename "$item" .json)
@@ -18,21 +38,16 @@ check_task_completion() {
[ "$state" = "notified" ] || return 0
# Use opencode_session_id (the forked session, not the parent pm_session)
local session_id=$(python3 -c "import json; print(json.load(open('$item')).get('opencode_session_id', ''))" 2>/dev/null)
local issue_ref=$(python3 -c "import json; print(json.load(open('$item')).get('issue_ref', ''))" 2>/dev/null)
local pid=$(python3 -c "import json; print(json.load(open('$item')).get('pid', ''))" 2>/dev/null)
# If no session tracked, skip
[ -n "$session_id" ] || return 0
# Check if forked session still exists in opencode
if ! opencode session list 2>/dev/null | grep -q "$session_id"; then
# Forked session ended — check if work was done
if [ -n "$pid" ] && [ "$pid" != "None" ]; then
if ! kill -0 "$pid" 2>/dev/null; then
local worktree_path=$(issue_ref_to_worktree_path "$issue_ref" "$HOME/.kugetsu-worktrees")
local has_commits=false
if [ -d "$worktree_path" ] && [ -d "$worktree_path/.git" ]; then
# Check if worktree has new commits beyond origin/main
if [ -n "$(git -C "$worktree_path" log --oneline origin/main..HEAD 2>/dev/null)" ]; then
has_commits=true
fi
@@ -45,44 +60,94 @@ check_task_completion() {
update_queue_item_state "$queue_id" "error"
echo "Task $queue_id ($issue_ref) marked error — no commits found after session ended"
fi
release_lock "$issue_ref"
fi
else
if [ -n "$session_id" ] && ! opencode session list 2>/dev/null | grep -q "$session_id"; then
local worktree_path=$(issue_ref_to_worktree_path "$issue_ref" "$HOME/.kugetsu-worktrees")
local has_commits=false
if [ -d "$worktree_path" ] && [ -d "$worktree_path/.git" ]; then
if [ -n "$(git -C "$worktree_path" log --oneline origin/main..HEAD 2>/dev/null)" ]; then
has_commits=true
fi
fi
if [ "$has_commits" = true ]; then
update_queue_item_state "$queue_id" "completed"
echo "Task $queue_id ($issue_ref) completed — new commits found"
else
update_queue_item_state "$queue_id" "error"
echo "Task $queue_id ($issue_ref) marked error — no commits found after session ended"
fi
release_lock "$issue_ref"
fi
fi
}
get_session_id_for_issue() {
local issue_ref="$1"
local session_file=$(issue_ref_to_filename "$issue_ref")
local session_path="$SESSIONS_DIR/$session_file"
if [ -f "$session_path" ]; then
python3 -c "import json; print(json.load(open('$session_path')).get('opencode_session_id', ''))" 2>/dev/null || echo ""
else
echo ""
fi
}
process_task() {
local item="$1"
local queue_id=$(basename "$item" .json)
local issue_ref=$(python3 -c "import json; print(json.load(open('$item')).get('issue_ref', ''))" 2>/dev/null)
local message=$(python3 -c "import json; print(json.load(open('$item')).get('message', ''))" 2>/dev/null)
if ! acquire_lock "$issue_ref"; then
echo "Task $queue_id ($issue_ref) skipped — another process is handling it"
return
fi
source "$SCRIPT_DIR/kugetsu-session.sh"
if worktree_exists "$issue_ref" "$HOME/.kugetsu-worktrees" || [ -f "$SESSIONS_DIR/$(issue_ref_to_filename "$issue_ref").json" ]; then
log_file="$LOGS_DIR/delegate-$(date +%s).log"
if cmd_continue "$issue_ref" "$message" >> "$log_file" 2>&1; then
sleep 1
local session_id=$(get_session_id_for_issue "$issue_ref")
update_queue_item_state "$queue_id" "notified" "$session_id" ""
echo "Task $queue_id continued for $issue_ref"
else
update_queue_item_state "$queue_id" "error"
echo "Task $queue_id ($issue_ref) failed to continue"
fi
else
log_file="$LOGS_DIR/delegate-$(date +%s).log"
if cmd_start "$issue_ref" "$message" >> "$log_file" 2>&1; then
sleep 1
local session_id=$(get_session_id_for_issue "$issue_ref")
update_queue_item_state "$queue_id" "notified" "$session_id" ""
echo "Task $queue_id started for $issue_ref"
else
update_queue_item_state "$queue_id" "error"
echo "Task $queue_id ($issue_ref) failed to start"
fi
fi
release_lock "$issue_ref"
}
while true; do
# Check completion of notified tasks
if [ -d "$QUEUE_ITEMS_DIR" ]; then
for item in "$QUEUE_ITEMS_DIR"/*.json; do
[ -f "$item" ] || continue
check_task_completion "$item"
done
# Process pending tasks
for item in "$QUEUE_ITEMS_DIR"/*.json; do
[ -f "$item" ] || continue
state=$(python3 -c "import json; print(json.load(open('$item')).get('state', ''))" 2>/dev/null)
if [ "$state" = "pending" ]; then
queue_id=$(basename "$item" .json)
issue_ref=$(python3 -c "import json; print(json.load(open('$item')).get('issue_ref', ''))" 2>/dev/null)
message=$(python3 -c "import json; print(json.load(open('$item')).get('message', ''))" 2>/dev/null)
# Source session management and use cmd_start/cmd_continue
source "$SCRIPT_DIR/kugetsu-session.sh"
if worktree_exists "$issue_ref" "$HOME/.kugetsu-worktrees" || [ -f "$SESSIONS_DIR/$(issue_ref_to_filename "$issue_ref").json" ]; then
# Continue existing session
log_file="$LOGS_DIR/delegate-$(date +%s).log"
cmd_continue "$issue_ref" "$message" >> "$log_file" 2>&1 &
pid=$!
update_queue_item_state "$queue_id" "notified" "" "$pid"
echo "Task $queue_id continued for $issue_ref"
else
# Start new session
log_file="$LOGS_DIR/delegate-$(date +%s).log"
cmd_start "$issue_ref" "$message" >> "$log_file" 2>&1 &
pid=$!
update_queue_item_state "$queue_id" "notified" "" "$pid"
echo "Task $queue_id started for $issue_ref"
fi
process_task "$item"
fi
done
fi

View File

@@ -1,6 +1,13 @@
#!/bin/bash
set -euo pipefail
# Source required modules for session management functions
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
source "$SCRIPT_DIR/kugetsu-config.sh"
source "$SCRIPT_DIR/kugetsu-index.sh"
source "$SCRIPT_DIR/kugetsu-worktree.sh"
source "$SCRIPT_DIR/kugetsu-log.sh"
count_active_dev_sessions() {
local count=0
if [ -d "$SESSIONS_DIR" ]; then
@@ -153,10 +160,12 @@ cmd_delegate() {
local issue_ref=$(extract_issue_ref_from_message "$message")
if [ -n "$issue_ref" ] && [[ "$issue_ref" =~ \#[0-9]+$ ]]; then
cmd_start "$issue_ref" "$message"
# Enqueue for daemon to process via cmd_start/cmd_continue
enqueue_task "$issue_ref" "$message"
return
fi
# No issue ref detected — delegate directly to PM agent (legacy path)
local pm_session=$(get_pm_agent_session_id)
if [ -z "$pm_session" ] || [ "$pm_session" = "null" ] || [ "$pm_session" = "None" ]; then
echo "Error: PM agent session not found. Run 'kugetsu init' first." >&2
@@ -165,7 +174,7 @@ cmd_delegate() {
mkdir -p "$LOGS_DIR"
local log_file="$LOGS_DIR/delegate-$(date +%s).log"
nohup sh -c "GITEA_TOKEN='${GITEA_TOKEN:-}' opencode run '$message' --continue --session '$pm_session' >> '$log_file' 2>&1" > /dev/null 2>&1 &
nohup sh -c "GITEA_TOKEN='***' opencode run '$message' --continue --session '$pm_session' >> '$log_file' 2>&1" > /dev/null 2>&1 &
disown
echo "Delegated to PM agent (logged to $(basename "$log_file"))"
}