Compare commits

...

15 Commits

Author SHA1 Message Date
1b19c9a92c Merge pull request 'fix: init script captures wrong session IDs when old sessions exist' (#173) from fix/issue-172-init-script-wrong-session-ids into main 2026-04-06 03:09:30 +02:00
shokollm
85a4239383 fix: init script captures wrong session IDs when old sessions exist
The init script used 'tail -1' to find newly created sessions, which
fails when old sessions exist because it picks an existing session
instead of the newly created one.

The fix captures session IDs before and after creating new sessions,
then diffs to identify newly created sessions.

Fixes #172
2026-04-06 01:08:09 +00:00
shokollm
91b51f62c0 docs: update changelog for v0.2.4 2026-04-06 00:49:00 +00:00
7234837284 Merge pull request 'fix(kugetsu): remove duplicate update_queue_item_state to use fixed version from kugetsu-index.sh' (#171) from fix/issue-170-duplicate-update-queue into main 2026-04-06 02:42:24 +02:00
shokollm
59f6a4883e fix(kugetsu): remove duplicate update_queue_item_state to use fixed version from kugetsu-index.sh
The main kugetsu script had its own update_queue_item_state() definition
with broken os.system() calls that overwrote the fixed version in kugetsu-index.sh.

Now kugetsu will use the fixed version from kugetsu-index.sh (which sources
kugetsu-log.sh) since that module is sourced first.

Fixes #170
2026-04-06 00:40:55 +00:00
9667c3e800 Merge pull request 'fix(kugetsu-index): call kugetsu_add_notification from bash instead of os.system()' (#169) from fix/issue-167-notification-bash into main 2026-04-06 02:35:36 +02:00
shokollm
796e1fe454 fix(kugetsu-index): call kugetsu_add_notification from bash instead of os.system()
os.system() spawns a new subprocess that cannot access bash functions.
Now calling kugetsu_add_notification directly from bash after Python updates JSON.

Fixes #167
2026-04-06 00:33:35 +00:00
84c59a3b64 Merge pull request 'fix(kugetsu): queue daemon improvements - locking, error handling, cmd_delegate enqueue' (#164) from fix/issue-156-queue-fixes into main 2026-04-06 02:06:36 +02:00
shokollm
dc9d4d7327 Merge origin/main into fix/issue-156-queue-fixes 2026-04-06 00:02:47 +00:00
shokollm
bdcb7a476c fix(queue-daemon): add locking, proper state updates, and error handling
- Add acquire_lock/release_lock to prevent daemon vs manual conflicts
- Check cmd_start/cmd_continue success before updating state to 'notified'
- Set state to 'error' if command fails
- Track actual session_id from session file after cmd_start completes
- Release lock when task completes (success or error)
- Use load_agent_env 'pm-agent' for GITEA_TOKEN

Fixes critical race conditions and failure handling in queue processing
2026-04-06 00:01:41 +00:00
77cf817568 Merge pull request 'fix(kugetsu): return proper JSON array from get_pending_tasks()' (#163) from fix/issue-155-queue-list-json into main 2026-04-06 01:45:09 +02:00
shokollm
0f6a30f01c fix(kugetsu): return proper JSON array from get_pending_tasks() 2026-04-05 23:43:52 +00:00
77b0963fa4 Merge pull request 'fix(queue-daemon): source pm-agent.env for GITEA_TOKEN instead of default.env' (#162) from fix/issue-160-gitea-token-from-pm-agent into main 2026-04-06 01:42:04 +02:00
shokollm
f39e39156a fix(queue-daemon): source pm-agent.env for GITEA_TOKEN instead of default.env 2026-04-05 23:38:07 +00:00
deb18f1e32 Merge pull request 'fix(kugetsu): queue daemon runs PM agent in correct worktree with proper token' (#157) from fix/issue-156 into main 2026-04-05 23:39:35 +02:00
5 changed files with 193 additions and 117 deletions

View File

@@ -6,6 +6,25 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
## [Unreleased] ## [Unreleased]
## [v0.2.4] - 2026-04-06
### Fixed
- Queue daemon: Locking to prevent daemon vs manual conflicts
- Queue daemon: Proper error handling for failed tasks
- Queue daemon: Fix GITEA_TOKEN loading from pm-agent.env
- cmd_delegate: Enqueue tasks instead of bypassing queue
- Notifications: Call kugetsu_add_notification from bash instead of os.system()
- kugetsu: Remove duplicate update_queue_item_state that overwrote fixed version
### Added
- Queue functions moved to kugetsu-index.sh for daemon access
- kugetsu-session.sh sources required modules for daemon use
## [v0.2.3] - 2026-04-06
### Fixed
- get_pending_tasks() returns proper JSON array instead of concatenated JSON objects
## [v0.2.1] - 2026-04-03 ## [v0.2.1] - 2026-04-03
### Fixed ### Fixed

View File

@@ -310,12 +310,31 @@ get_pending_tasks() {
return return
fi fi
find "$QUEUE_ITEMS_DIR" -name "*.json" -type f 2>/dev/null | while read -r file; do python3 -c "
local state=$(python3 -c "import json; print(json.load(open('$file')).get('state', ''))" 2>/dev/null || echo "") import json
if [ "$state" = "pending" ]; then import os
cat "$file" import sys
fi
done | head -"$limit" queue_dir = os.environ.get('QUEUE_ITEMS_DIR', '')
limit = int(sys.argv[1]) if len(sys.argv) > 1 else 10
items = []
if os.path.isdir(queue_dir):
for filename in os.listdir(queue_dir):
if filename.endswith('.json'):
filepath = os.path.join(queue_dir, filename)
try:
with open(filepath) as f:
data = json.load(f)
if data.get('state') == 'pending':
items.append(data)
if len(items) >= limit:
break
except:
pass
print(json.dumps(items))
" "$limit"
} }
get_queue_stats() { get_queue_stats() {
@@ -342,55 +361,6 @@ get_queue_stats() {
echo "{\"total\": $total, \"pending\": $pending, \"notified\": $notified, \"completed\": $completed, \"error\": $error}" echo "{\"total\": $total, \"pending\": $pending, \"notified\": $notified, \"completed\": $completed, \"error\": $error}"
} }
update_queue_item_state() {
local queue_id="$1"
local new_state="$2"
local session_id="${3:-}"
local pid="${4:-}"
local item_file="$QUEUE_ITEMS_DIR/${queue_id}.json"
if [ ! -f "$item_file" ]; then
echo "Error: Queue item not found: $queue_id" >&2
return 1
fi
python3 << PYEOF
import json
import os
from datetime import datetime
item_file = "$item_file"
new_state = "$new_state"
session_id = "$session_id"
pid = "$pid"
with open(item_file, 'r') as f:
item = json.load(f)
issue_ref = item.get('issue_ref', '')
item['state'] = new_state
if new_state == "notified":
item['notified_at'] = datetime.now().isoformat() + "Z"
if session_id:
item['opencode_session_id'] = session_id
if pid:
item['pid'] = int(pid) if pid.isdigit() else None
elif new_state == "completed":
item['completed_at'] = datetime.now().isoformat() + "Z"
os.system(f"kugetsu_add_notification 'task_completed' 'Task completed: {issue_ref}' '{issue_ref}'")
elif new_state == "error":
item['error'] = datetime.now().isoformat() + "Z"
os.system(f"kugetsu_add_notification 'task_error' 'Task error: {issue_ref}' '{issue_ref}'")
with open(item_file, 'w') as f:
json.dump(item, f, indent=2)
print(f"Updated $queue_id to state: $new_state")
PYEOF
}
check_task_timeouts() { check_task_timeouts() {
if [ ! -d "$QUEUE_ITEMS_DIR" ]; then if [ ! -d "$QUEUE_ITEMS_DIR" ]; then
return return

View File

@@ -202,9 +202,10 @@ update_queue_item_state() {
return 1 return 1
fi fi
local issue_ref=$(python3 -c "import json; print(json.load(open('$item_file')).get('issue_ref', ''))" 2>/dev/null || echo "")
python3 << PYEOF python3 << PYEOF
import json import json
import os
from datetime import datetime from datetime import datetime
item_file = "$item_file" item_file = "$item_file"
@@ -215,8 +216,6 @@ pid = "$pid"
with open(item_file, 'r') as f: with open(item_file, 'r') as f:
item = json.load(f) item = json.load(f)
issue_ref = item.get('issue_ref', '')
item['state'] = new_state item['state'] = new_state
if new_state == "notified": if new_state == "notified":
@@ -227,14 +226,18 @@ if new_state == "notified":
item['pid'] = int(pid) if pid.isdigit() else None item['pid'] = int(pid) if pid.isdigit() else None
elif new_state == "completed": elif new_state == "completed":
item['completed_at'] = datetime.now().isoformat() + "Z" item['completed_at'] = datetime.now().isoformat() + "Z"
os.system(f"kugetsu_add_notification 'task_completed' 'Task completed: {issue_ref}' '{issue_ref}'")
elif new_state == "error": elif new_state == "error":
item['error'] = datetime.now().isoformat() + "Z" item['error'] = datetime.now().isoformat() + "Z"
os.system(f"kugetsu_add_notification 'task_error' 'Task error: {issue_ref}' '{issue_ref}'")
with open(item_file, 'w') as f: with open(item_file, 'w') as f:
json.dump(item, f, indent=2) json.dump(item, f, indent=2)
print(f"Updated $queue_id to state: $new_state") print(f"Updated $queue_id to state: $new_state")
PYEOF PYEOF
if [ "$new_state" = "completed" ]; then
kugetsu_add_notification "task_completed" "Task completed: $issue_ref" "$issue_ref"
elif [ "$new_state" = "error" ]; then
kugetsu_add_notification "task_error" "Task error: $issue_ref" "$issue_ref"
fi
} }

View File

@@ -8,12 +8,29 @@ source "$SCRIPT_DIR/kugetsu-index.sh"
source "$SCRIPT_DIR/kugetsu-worktree.sh" source "$SCRIPT_DIR/kugetsu-worktree.sh"
source "$SCRIPT_DIR/kugetsu-log.sh" source "$SCRIPT_DIR/kugetsu-log.sh"
# Load GITEA_TOKEN from default.env load_agent_env "pm-agent"
if [ -f "$HOME/.kugetsu/env/default.env" ]; then
source "$HOME/.kugetsu/env/default.env" acquire_lock() {
fi local issue_ref="$1"
local lock_file="$QUEUE_DIR/locks/$(echo "$issue_ref" | sed 's/[\/:]/-/g' | sed 's/#/-/').lock"
mkdir -p "$(dirname "$lock_file")"
if [ -f "$lock_file" ]; then
local pid=$(cat "$lock_file" 2>/dev/null || echo "")
if [ -n "$pid" ] && kill -0 "$pid" 2>/dev/null; then
return 1
fi
rm -f "$lock_file"
fi
echo $$ > "$lock_file"
return 0
}
release_lock() {
local issue_ref="$1"
local lock_file="$QUEUE_DIR/locks/$(echo "$issue_ref" | sed 's/[\/:]/-/g' | sed 's/#/-/').lock"
rm -f "$lock_file"
}
# Check if a notified task has completed (forked session ended or has new commits)
check_task_completion() { check_task_completion() {
local item="$1" local item="$1"
local queue_id=$(basename "$item" .json) local queue_id=$(basename "$item" .json)
@@ -21,71 +38,116 @@ check_task_completion() {
[ "$state" = "notified" ] || return 0 [ "$state" = "notified" ] || return 0
# Use opencode_session_id (the forked session, not the parent pm_session)
local session_id=$(python3 -c "import json; print(json.load(open('$item')).get('opencode_session_id', ''))" 2>/dev/null) local session_id=$(python3 -c "import json; print(json.load(open('$item')).get('opencode_session_id', ''))" 2>/dev/null)
local issue_ref=$(python3 -c "import json; print(json.load(open('$item')).get('issue_ref', ''))" 2>/dev/null) local issue_ref=$(python3 -c "import json; print(json.load(open('$item')).get('issue_ref', ''))" 2>/dev/null)
local pid=$(python3 -c "import json; print(json.load(open('$item')).get('pid', ''))" 2>/dev/null)
# If no session tracked, skip if [ -n "$pid" ] && [ "$pid" != "None" ]; then
[ -n "$session_id" ] || return 0 if ! kill -0 "$pid" 2>/dev/null; then
local worktree_path=$(issue_ref_to_worktree_path "$issue_ref" "$HOME/.kugetsu-worktrees")
local has_commits=false
# Check if forked session still exists in opencode if [ -d "$worktree_path" ] && [ -d "$worktree_path/.git" ]; then
if ! opencode session list 2>/dev/null | grep -q "$session_id"; then if [ -n "$(git -C "$worktree_path" log --oneline origin/main..HEAD 2>/dev/null)" ]; then
# Forked session ended — check if work was done has_commits=true
local worktree_path=$(issue_ref_to_worktree_path "$issue_ref" "$HOME/.kugetsu-worktrees") fi
local has_commits=false
if [ -d "$worktree_path" ] && [ -d "$worktree_path/.git" ]; then
# Check if worktree has new commits beyond origin/main
if [ -n "$(git -C "$worktree_path" log --oneline origin/main..HEAD 2>/dev/null)" ]; then
has_commits=true
fi fi
fi
if [ "$has_commits" = true ]; then if [ "$has_commits" = true ]; then
update_queue_item_state "$queue_id" "completed" update_queue_item_state "$queue_id" "completed"
echo "Task $queue_id ($issue_ref) completed — new commits found" echo "Task $queue_id ($issue_ref) completed — new commits found"
else else
update_queue_item_state "$queue_id" "error" update_queue_item_state "$queue_id" "error"
echo "Task $queue_id ($issue_ref) marked error — no commits found after session ended" echo "Task $queue_id ($issue_ref) marked error — no commits found after session ended"
fi
release_lock "$issue_ref"
fi
else
if [ -n "$session_id" ] && ! opencode session list 2>/dev/null | grep -q "$session_id"; then
local worktree_path=$(issue_ref_to_worktree_path "$issue_ref" "$HOME/.kugetsu-worktrees")
local has_commits=false
if [ -d "$worktree_path" ] && [ -d "$worktree_path/.git" ]; then
if [ -n "$(git -C "$worktree_path" log --oneline origin/main..HEAD 2>/dev/null)" ]; then
has_commits=true
fi
fi
if [ "$has_commits" = true ]; then
update_queue_item_state "$queue_id" "completed"
echo "Task $queue_id ($issue_ref) completed — new commits found"
else
update_queue_item_state "$queue_id" "error"
echo "Task $queue_id ($issue_ref) marked error — no commits found after session ended"
fi
release_lock "$issue_ref"
fi fi
fi fi
} }
get_session_id_for_issue() {
local issue_ref="$1"
local session_file=$(issue_ref_to_filename "$issue_ref")
local session_path="$SESSIONS_DIR/$session_file"
if [ -f "$session_path" ]; then
python3 -c "import json; print(json.load(open('$session_path')).get('opencode_session_id', ''))" 2>/dev/null || echo ""
else
echo ""
fi
}
process_task() {
local item="$1"
local queue_id=$(basename "$item" .json)
local issue_ref=$(python3 -c "import json; print(json.load(open('$item')).get('issue_ref', ''))" 2>/dev/null)
local message=$(python3 -c "import json; print(json.load(open('$item')).get('message', ''))" 2>/dev/null)
if ! acquire_lock "$issue_ref"; then
echo "Task $queue_id ($issue_ref) skipped — another process is handling it"
return
fi
source "$SCRIPT_DIR/kugetsu-session.sh"
if worktree_exists "$issue_ref" "$HOME/.kugetsu-worktrees" || [ -f "$SESSIONS_DIR/$(issue_ref_to_filename "$issue_ref").json" ]; then
log_file="$LOGS_DIR/delegate-$(date +%s).log"
if cmd_continue "$issue_ref" "$message" >> "$log_file" 2>&1; then
sleep 1
local session_id=$(get_session_id_for_issue "$issue_ref")
update_queue_item_state "$queue_id" "notified" "$session_id" ""
echo "Task $queue_id continued for $issue_ref"
else
update_queue_item_state "$queue_id" "error"
echo "Task $queue_id ($issue_ref) failed to continue"
fi
else
log_file="$LOGS_DIR/delegate-$(date +%s).log"
if cmd_start "$issue_ref" "$message" >> "$log_file" 2>&1; then
sleep 1
local session_id=$(get_session_id_for_issue "$issue_ref")
update_queue_item_state "$queue_id" "notified" "$session_id" ""
echo "Task $queue_id started for $issue_ref"
else
update_queue_item_state "$queue_id" "error"
echo "Task $queue_id ($issue_ref) failed to start"
fi
fi
release_lock "$issue_ref"
}
while true; do while true; do
# Check completion of notified tasks
if [ -d "$QUEUE_ITEMS_DIR" ]; then if [ -d "$QUEUE_ITEMS_DIR" ]; then
for item in "$QUEUE_ITEMS_DIR"/*.json; do for item in "$QUEUE_ITEMS_DIR"/*.json; do
[ -f "$item" ] || continue [ -f "$item" ] || continue
check_task_completion "$item" check_task_completion "$item"
done done
# Process pending tasks
for item in "$QUEUE_ITEMS_DIR"/*.json; do for item in "$QUEUE_ITEMS_DIR"/*.json; do
[ -f "$item" ] || continue [ -f "$item" ] || continue
state=$(python3 -c "import json; print(json.load(open('$item')).get('state', ''))" 2>/dev/null) state=$(python3 -c "import json; print(json.load(open('$item')).get('state', ''))" 2>/dev/null)
if [ "$state" = "pending" ]; then if [ "$state" = "pending" ]; then
queue_id=$(basename "$item" .json) process_task "$item"
issue_ref=$(python3 -c "import json; print(json.load(open('$item')).get('issue_ref', ''))" 2>/dev/null)
message=$(python3 -c "import json; print(json.load(open('$item')).get('message', ''))" 2>/dev/null)
# Source session management and use cmd_start/cmd_continue
source "$SCRIPT_DIR/kugetsu-session.sh"
if worktree_exists "$issue_ref" "$HOME/.kugetsu-worktrees" || [ -f "$SESSIONS_DIR/$(issue_ref_to_filename "$issue_ref").json" ]; then
# Continue existing session
log_file="$LOGS_DIR/delegate-$(date +%s).log"
cmd_continue "$issue_ref" "$message" >> "$log_file" 2>&1 &
pid=$!
update_queue_item_state "$queue_id" "notified" "" "$pid"
echo "Task $queue_id continued for $issue_ref"
else
# Start new session
log_file="$LOGS_DIR/delegate-$(date +%s).log"
cmd_start "$issue_ref" "$message" >> "$log_file" 2>&1 &
pid=$!
update_queue_item_state "$queue_id" "notified" "" "$pid"
echo "Task $queue_id started for $issue_ref"
fi
fi fi
done done
fi fi

View File

@@ -81,9 +81,20 @@ EOF
echo "Press Ctrl+C to cancel or wait for session to be created" echo "Press Ctrl+C to cancel or wait for session to be created"
sleep 2 sleep 2
local before_sessions=$(opencode session list 2>/dev/null | grep -E '^ses_' | awk '{print $1}' || true)
opencode opencode
local session_ids=$(opencode session list 2>/dev/null | grep -E '^ses_' | awk '{print $1}' | tail -1) local after_sessions=$(opencode session list 2>/dev/null | grep -E '^ses_' | awk '{print $1}' || true)
local session_ids=""
while IFS= read -r line; do
local sid=$(echo "$line" | awk '{print $1}')
if [ -n "$sid" ] && ! echo "$before_sessions" | grep -q "^${sid}$"; then
session_ids="$sid"
break
fi
done <<< "$after_sessions"
if [ -z "$session_ids" ]; then if [ -z "$session_ids" ]; then
echo "Error: Could not find newly created session" >&2 echo "Error: Could not find newly created session" >&2
exit 1 exit 1
@@ -95,9 +106,20 @@ EOF
echo "Base session created: $session_ids" echo "Base session created: $session_ids"
echo "Starting PM agent..." echo "Starting PM agent..."
before_sessions="$after_sessions"
opencode opencode
local pm_session_ids=$(opencode session list 2>/dev/null | grep -E '^ses_' | grep -v "$session_ids" | tail -1) after_sessions=$(opencode session list 2>/dev/null | grep -E '^ses_' | awk '{print $1}' || true)
local pm_session_ids=""
while IFS= read -r line; do
local sid=$(echo "$line" | awk '{print $1}')
if [ -n "$sid" ] && ! echo "$before_sessions" | grep -q "^${sid}$"; then
pm_session_ids="$sid"
break
fi
done <<< "$after_sessions"
if [ -z "$pm_session_ids" ]; then if [ -z "$pm_session_ids" ]; then
echo "Warning: Could not find separate PM agent session" >&2 echo "Warning: Could not find separate PM agent session" >&2
pm_session_ids="$session_ids" pm_session_ids="$session_ids"