Compare commits

...

17 Commits

Author SHA1 Message Date
shokollm
85a4239383 fix: init script captures wrong session IDs when old sessions exist
The init script used 'tail -1' to find newly created sessions, which
fails when old sessions exist because it picks an existing session
instead of the newly created one.

The fix captures session IDs before and after creating new sessions,
then diffs to identify newly created sessions.

Fixes #172
2026-04-06 01:08:09 +00:00
shokollm
91b51f62c0 docs: update changelog for v0.2.4 2026-04-06 00:49:00 +00:00
7234837284 Merge pull request 'fix(kugetsu): remove duplicate update_queue_item_state to use fixed version from kugetsu-index.sh' (#171) from fix/issue-170-duplicate-update-queue into main 2026-04-06 02:42:24 +02:00
shokollm
59f6a4883e fix(kugetsu): remove duplicate update_queue_item_state to use fixed version from kugetsu-index.sh
The main kugetsu script had its own update_queue_item_state() definition
with broken os.system() calls that overwrote the fixed version in kugetsu-index.sh.

Now kugetsu will use the fixed version from kugetsu-index.sh (which sources
kugetsu-log.sh) since that module is sourced first.

Fixes #170
2026-04-06 00:40:55 +00:00
9667c3e800 Merge pull request 'fix(kugetsu-index): call kugetsu_add_notification from bash instead of os.system()' (#169) from fix/issue-167-notification-bash into main 2026-04-06 02:35:36 +02:00
shokollm
796e1fe454 fix(kugetsu-index): call kugetsu_add_notification from bash instead of os.system()
os.system() spawns a new subprocess that cannot access bash functions.
Now calling kugetsu_add_notification directly from bash after Python updates JSON.

Fixes #167
2026-04-06 00:33:35 +00:00
84c59a3b64 Merge pull request 'fix(kugetsu): queue daemon improvements - locking, error handling, cmd_delegate enqueue' (#164) from fix/issue-156-queue-fixes into main 2026-04-06 02:06:36 +02:00
shokollm
dc9d4d7327 Merge origin/main into fix/issue-156-queue-fixes 2026-04-06 00:02:47 +00:00
shokollm
bdcb7a476c fix(queue-daemon): add locking, proper state updates, and error handling
- Add acquire_lock/release_lock to prevent daemon vs manual conflicts
- Check cmd_start/cmd_continue success before updating state to 'notified'
- Set state to 'error' if command fails
- Track actual session_id from session file after cmd_start completes
- Release lock when task completes (success or error)
- Use load_agent_env 'pm-agent' for GITEA_TOKEN

Fixes critical race conditions and failure handling in queue processing
2026-04-06 00:01:41 +00:00
77cf817568 Merge pull request 'fix(kugetsu): return proper JSON array from get_pending_tasks()' (#163) from fix/issue-155-queue-list-json into main 2026-04-06 01:45:09 +02:00
shokollm
0f6a30f01c fix(kugetsu): return proper JSON array from get_pending_tasks() 2026-04-05 23:43:52 +00:00
77b0963fa4 Merge pull request 'fix(queue-daemon): source pm-agent.env for GITEA_TOKEN instead of default.env' (#162) from fix/issue-160-gitea-token-from-pm-agent into main 2026-04-06 01:42:04 +02:00
shokollm
f39e39156a fix(queue-daemon): source pm-agent.env for GITEA_TOKEN instead of default.env 2026-04-05 23:38:07 +00:00
shokollm
5a0a54898b fix(kugetsu): kugetsu-session.sh needs to source required modules
When daemon sources kugetsu-session.sh to call cmd_start/cmd_continue,
it needs access to functions from kugetsu-config.sh, kugetsu-index.sh,
kugetsu-worktree.sh, and kugetsu-log.sh. Add sourcing at top of
kugetsu-session.sh.
2026-04-05 22:20:49 +00:00
shokollm
b1028a6556 fix(kugetsu): move queue functions to kugetsu-index.sh for daemon access
The daemon (kugetsu-queue-daemon.sh) sources kugetsu-index.sh but not the main kugetsu script.
Move update_queue_item_state and kugetsu_add_notification to kugetsu-index.sh
so the daemon can use these functions when processing tasks.
2026-04-05 22:17:59 +00:00
shokollm
270219873f fix(kugetsu): cmd_delegate should enqueue instead of calling cmd_start
When cmd_delegate detects an issue ref with number (e.g. git.fbrns.co/shoko/kugetsu#158),
it was calling cmd_start directly which tries to create worktree and clone.
This breaks the queue-based workflow where daemon should handle task execution.

Now cmd_delegate calls enqueue_task to add to queue, and daemon processes
tasks by calling cmd_start/cmd_continue as appropriate.
2026-04-05 22:05:18 +00:00
deb18f1e32 Merge pull request 'fix(kugetsu): queue daemon runs PM agent in correct worktree with proper token' (#157) from fix/issue-156 into main 2026-04-05 23:39:35 +02:00
5 changed files with 291 additions and 114 deletions

View File

@@ -6,6 +6,25 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
## [Unreleased]
## [v0.2.4] - 2026-04-06
### Fixed
- Queue daemon: Locking to prevent daemon vs manual conflicts
- Queue daemon: Proper error handling for failed tasks
- Queue daemon: Fix GITEA_TOKEN loading from pm-agent.env
- cmd_delegate: Enqueue tasks instead of bypassing queue
- Notifications: Call kugetsu_add_notification from bash instead of os.system()
- kugetsu: Remove duplicate update_queue_item_state that overwrote fixed version
### Added
- Queue functions moved to kugetsu-index.sh for daemon access
- kugetsu-session.sh sources required modules for daemon use
## [v0.2.3] - 2026-04-06
### Fixed
- get_pending_tasks() returns proper JSON array instead of concatenated JSON objects
## [v0.2.1] - 2026-04-03
### Fixed

View File

@@ -310,12 +310,31 @@ get_pending_tasks() {
return
fi
find "$QUEUE_ITEMS_DIR" -name "*.json" -type f 2>/dev/null | while read -r file; do
local state=$(python3 -c "import json; print(json.load(open('$file')).get('state', ''))" 2>/dev/null || echo "")
if [ "$state" = "pending" ]; then
cat "$file"
fi
done | head -"$limit"
python3 -c "
import json
import os
import sys
queue_dir = os.environ.get('QUEUE_ITEMS_DIR', '')
limit = int(sys.argv[1]) if len(sys.argv) > 1 else 10
items = []
if os.path.isdir(queue_dir):
for filename in os.listdir(queue_dir):
if filename.endswith('.json'):
filepath = os.path.join(queue_dir, filename)
try:
with open(filepath) as f:
data = json.load(f)
if data.get('state') == 'pending':
items.append(data)
if len(items) >= limit:
break
except:
pass
print(json.dumps(items))
" "$limit"
}
get_queue_stats() {
@@ -342,55 +361,6 @@ get_queue_stats() {
echo "{\"total\": $total, \"pending\": $pending, \"notified\": $notified, \"completed\": $completed, \"error\": $error}"
}
update_queue_item_state() {
local queue_id="$1"
local new_state="$2"
local session_id="${3:-}"
local pid="${4:-}"
local item_file="$QUEUE_ITEMS_DIR/${queue_id}.json"
if [ ! -f "$item_file" ]; then
echo "Error: Queue item not found: $queue_id" >&2
return 1
fi
python3 << PYEOF
import json
import os
from datetime import datetime
item_file = "$item_file"
new_state = "$new_state"
session_id = "$session_id"
pid = "$pid"
with open(item_file, 'r') as f:
item = json.load(f)
issue_ref = item.get('issue_ref', '')
item['state'] = new_state
if new_state == "notified":
item['notified_at'] = datetime.now().isoformat() + "Z"
if session_id:
item['opencode_session_id'] = session_id
if pid:
item['pid'] = int(pid) if pid.isdigit() else None
elif new_state == "completed":
item['completed_at'] = datetime.now().isoformat() + "Z"
os.system(f"kugetsu_add_notification 'task_completed' 'Task completed: {issue_ref}' '{issue_ref}'")
elif new_state == "error":
item['error'] = datetime.now().isoformat() + "Z"
os.system(f"kugetsu_add_notification 'task_error' 'Task error: {issue_ref}' '{issue_ref}'")
with open(item_file, 'w') as f:
json.dump(item, f, indent=2)
print(f"Updated $queue_id to state: $new_state")
PYEOF
}
check_task_timeouts() {
if [ ! -d "$QUEUE_ITEMS_DIR" ]; then
return

View File

@@ -146,3 +146,98 @@ filename_to_issue_ref() {
local name="${filename%.json}"
echo "$name" | sed 's-\([0-9]*\)$-#\1-' | sed 's/-/\//g'
}
# Add notification to notifications file
kugetsu_add_notification() {
local type="$1"
local message="$2"
local issue_ref="${3:-}"
local gitea_url="${4:-}"
mkdir -p "$(dirname "$NOTIFICATIONS_FILE")"
python3 << PYEOF
import json
import os
from datetime import datetime
notification = {
"type": "$type",
"message": "$message",
"issue_ref": "$issue_ref" if "$issue_ref" else None,
"gitea_url": "$gitea_url" if "$gitea_url" else None,
"timestamp": datetime.now().isoformat(),
"read": False
}
file_path = os.path.expanduser("$NOTIFICATIONS_FILE")
notifications = []
if os.path.exists(file_path):
try:
with open(file_path, 'r') as f:
notifications = json.load(f)
except:
notifications = []
notifications.append(notification)
with open(file_path, 'w') as f:
json.dump(notifications, f, indent=2)
print("Notification added")
PYEOF
}
# Update queue item state
update_queue_item_state() {
local queue_id="$1"
local new_state="$2"
local session_id="${3:-}"
local pid="${4:-}"
local item_file="$QUEUE_ITEMS_DIR/${queue_id}.json"
if [ ! -f "$item_file" ]; then
echo "Error: Queue item not found: $queue_id" >&2
return 1
fi
local issue_ref=$(python3 -c "import json; print(json.load(open('$item_file')).get('issue_ref', ''))" 2>/dev/null || echo "")
python3 << PYEOF
import json
from datetime import datetime
item_file = "$item_file"
new_state = "$new_state"
session_id = "$session_id"
pid = "$pid"
with open(item_file, 'r') as f:
item = json.load(f)
item['state'] = new_state
if new_state == "notified":
item['notified_at'] = datetime.now().isoformat() + "Z"
if session_id:
item['opencode_session_id'] = session_id
if pid:
item['pid'] = int(pid) if pid.isdigit() else None
elif new_state == "completed":
item['completed_at'] = datetime.now().isoformat() + "Z"
elif new_state == "error":
item['error'] = datetime.now().isoformat() + "Z"
with open(item_file, 'w') as f:
json.dump(item, f, indent=2)
print(f"Updated $queue_id to state: $new_state")
PYEOF
if [ "$new_state" = "completed" ]; then
kugetsu_add_notification "task_completed" "Task completed: $issue_ref" "$issue_ref"
elif [ "$new_state" = "error" ]; then
kugetsu_add_notification "task_error" "Task error: $issue_ref" "$issue_ref"
fi
}

View File

@@ -8,86 +8,148 @@ source "$SCRIPT_DIR/kugetsu-index.sh"
source "$SCRIPT_DIR/kugetsu-worktree.sh"
source "$SCRIPT_DIR/kugetsu-log.sh"
# Load GITEA_TOKEN from default.env
if [ -f "$HOME/.kugetsu/env/default.env" ]; then
source "$HOME/.kugetsu/env/default.env"
fi
load_agent_env "pm-agent"
acquire_lock() {
local issue_ref="$1"
local lock_file="$QUEUE_DIR/locks/$(echo "$issue_ref" | sed 's/[\/:]/-/g' | sed 's/#/-/').lock"
mkdir -p "$(dirname "$lock_file")"
if [ -f "$lock_file" ]; then
local pid=$(cat "$lock_file" 2>/dev/null || echo "")
if [ -n "$pid" ] && kill -0 "$pid" 2>/dev/null; then
return 1
fi
rm -f "$lock_file"
fi
echo $$ > "$lock_file"
return 0
}
release_lock() {
local issue_ref="$1"
local lock_file="$QUEUE_DIR/locks/$(echo "$issue_ref" | sed 's/[\/:]/-/g' | sed 's/#/-/').lock"
rm -f "$lock_file"
}
# Check if a notified task has completed (forked session ended or has new commits)
check_task_completion() {
local item="$1"
local queue_id=$(basename "$item" .json)
local state=$(python3 -c "import json; print(json.load(open('$item')).get('state', ''))" 2>/dev/null)
[ "$state" = "notified" ] || return 0
# Use opencode_session_id (the forked session, not the parent pm_session)
local session_id=$(python3 -c "import json; print(json.load(open('$item')).get('opencode_session_id', ''))" 2>/dev/null)
local issue_ref=$(python3 -c "import json; print(json.load(open('$item')).get('issue_ref', ''))" 2>/dev/null)
# If no session tracked, skip
[ -n "$session_id" ] || return 0
# Check if forked session still exists in opencode
if ! opencode session list 2>/dev/null | grep -q "$session_id"; then
# Forked session ended — check if work was done
local worktree_path=$(issue_ref_to_worktree_path "$issue_ref" "$HOME/.kugetsu-worktrees")
local has_commits=false
if [ -d "$worktree_path" ] && [ -d "$worktree_path/.git" ]; then
# Check if worktree has new commits beyond origin/main
if [ -n "$(git -C "$worktree_path" log --oneline origin/main..HEAD 2>/dev/null)" ]; then
has_commits=true
local pid=$(python3 -c "import json; print(json.load(open('$item')).get('pid', ''))" 2>/dev/null)
if [ -n "$pid" ] && [ "$pid" != "None" ]; then
if ! kill -0 "$pid" 2>/dev/null; then
local worktree_path=$(issue_ref_to_worktree_path "$issue_ref" "$HOME/.kugetsu-worktrees")
local has_commits=false
if [ -d "$worktree_path" ] && [ -d "$worktree_path/.git" ]; then
if [ -n "$(git -C "$worktree_path" log --oneline origin/main..HEAD 2>/dev/null)" ]; then
has_commits=true
fi
fi
if [ "$has_commits" = true ]; then
update_queue_item_state "$queue_id" "completed"
echo "Task $queue_id ($issue_ref) completed — new commits found"
else
update_queue_item_state "$queue_id" "error"
echo "Task $queue_id ($issue_ref) marked error — no commits found after session ended"
fi
release_lock "$issue_ref"
fi
if [ "$has_commits" = true ]; then
update_queue_item_state "$queue_id" "completed"
echo "Task $queue_id ($issue_ref) completed — new commits found"
else
update_queue_item_state "$queue_id" "error"
echo "Task $queue_id ($issue_ref) marked error — no commits found after session ended"
else
if [ -n "$session_id" ] && ! opencode session list 2>/dev/null | grep -q "$session_id"; then
local worktree_path=$(issue_ref_to_worktree_path "$issue_ref" "$HOME/.kugetsu-worktrees")
local has_commits=false
if [ -d "$worktree_path" ] && [ -d "$worktree_path/.git" ]; then
if [ -n "$(git -C "$worktree_path" log --oneline origin/main..HEAD 2>/dev/null)" ]; then
has_commits=true
fi
fi
if [ "$has_commits" = true ]; then
update_queue_item_state "$queue_id" "completed"
echo "Task $queue_id ($issue_ref) completed — new commits found"
else
update_queue_item_state "$queue_id" "error"
echo "Task $queue_id ($issue_ref) marked error — no commits found after session ended"
fi
release_lock "$issue_ref"
fi
fi
}
get_session_id_for_issue() {
local issue_ref="$1"
local session_file=$(issue_ref_to_filename "$issue_ref")
local session_path="$SESSIONS_DIR/$session_file"
if [ -f "$session_path" ]; then
python3 -c "import json; print(json.load(open('$session_path')).get('opencode_session_id', ''))" 2>/dev/null || echo ""
else
echo ""
fi
}
process_task() {
local item="$1"
local queue_id=$(basename "$item" .json)
local issue_ref=$(python3 -c "import json; print(json.load(open('$item')).get('issue_ref', ''))" 2>/dev/null)
local message=$(python3 -c "import json; print(json.load(open('$item')).get('message', ''))" 2>/dev/null)
if ! acquire_lock "$issue_ref"; then
echo "Task $queue_id ($issue_ref) skipped — another process is handling it"
return
fi
source "$SCRIPT_DIR/kugetsu-session.sh"
if worktree_exists "$issue_ref" "$HOME/.kugetsu-worktrees" || [ -f "$SESSIONS_DIR/$(issue_ref_to_filename "$issue_ref").json" ]; then
log_file="$LOGS_DIR/delegate-$(date +%s).log"
if cmd_continue "$issue_ref" "$message" >> "$log_file" 2>&1; then
sleep 1
local session_id=$(get_session_id_for_issue "$issue_ref")
update_queue_item_state "$queue_id" "notified" "$session_id" ""
echo "Task $queue_id continued for $issue_ref"
else
update_queue_item_state "$queue_id" "error"
echo "Task $queue_id ($issue_ref) failed to continue"
fi
else
log_file="$LOGS_DIR/delegate-$(date +%s).log"
if cmd_start "$issue_ref" "$message" >> "$log_file" 2>&1; then
sleep 1
local session_id=$(get_session_id_for_issue "$issue_ref")
update_queue_item_state "$queue_id" "notified" "$session_id" ""
echo "Task $queue_id started for $issue_ref"
else
update_queue_item_state "$queue_id" "error"
echo "Task $queue_id ($issue_ref) failed to start"
fi
fi
release_lock "$issue_ref"
}
while true; do
# Check completion of notified tasks
if [ -d "$QUEUE_ITEMS_DIR" ]; then
for item in "$QUEUE_ITEMS_DIR"/*.json; do
[ -f "$item" ] || continue
check_task_completion "$item"
done
# Process pending tasks
for item in "$QUEUE_ITEMS_DIR"/*.json; do
[ -f "$item" ] || continue
state=$(python3 -c "import json; print(json.load(open('$item')).get('state', ''))" 2>/dev/null)
if [ "$state" = "pending" ]; then
queue_id=$(basename "$item" .json)
issue_ref=$(python3 -c "import json; print(json.load(open('$item')).get('issue_ref', ''))" 2>/dev/null)
message=$(python3 -c "import json; print(json.load(open('$item')).get('message', ''))" 2>/dev/null)
# Source session management and use cmd_start/cmd_continue
source "$SCRIPT_DIR/kugetsu-session.sh"
if worktree_exists "$issue_ref" "$HOME/.kugetsu-worktrees" || [ -f "$SESSIONS_DIR/$(issue_ref_to_filename "$issue_ref").json" ]; then
# Continue existing session
log_file="$LOGS_DIR/delegate-$(date +%s).log"
cmd_continue "$issue_ref" "$message" >> "$log_file" 2>&1 &
pid=$!
update_queue_item_state "$queue_id" "notified" "" "$pid"
echo "Task $queue_id continued for $issue_ref"
else
# Start new session
log_file="$LOGS_DIR/delegate-$(date +%s).log"
cmd_start "$issue_ref" "$message" >> "$log_file" 2>&1 &
pid=$!
update_queue_item_state "$queue_id" "notified" "" "$pid"
echo "Task $queue_id started for $issue_ref"
fi
process_task "$item"
fi
done
fi
sleep "${QUEUE_DAEMON_INTERVAL_MINUTES:-5}m"
done
done

View File

@@ -1,6 +1,13 @@
#!/bin/bash
set -euo pipefail
# Source required modules for session management functions
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
source "$SCRIPT_DIR/kugetsu-config.sh"
source "$SCRIPT_DIR/kugetsu-index.sh"
source "$SCRIPT_DIR/kugetsu-worktree.sh"
source "$SCRIPT_DIR/kugetsu-log.sh"
count_active_dev_sessions() {
local count=0
if [ -d "$SESSIONS_DIR" ]; then
@@ -74,9 +81,20 @@ EOF
echo "Press Ctrl+C to cancel or wait for session to be created"
sleep 2
local before_sessions=$(opencode session list 2>/dev/null | grep -E '^ses_' | awk '{print $1}' || true)
opencode
local session_ids=$(opencode session list 2>/dev/null | grep -E '^ses_' | awk '{print $1}' | tail -1)
local after_sessions=$(opencode session list 2>/dev/null | grep -E '^ses_' | awk '{print $1}' || true)
local session_ids=""
while IFS= read -r line; do
local sid=$(echo "$line" | awk '{print $1}')
if [ -n "$sid" ] && ! echo "$before_sessions" | grep -q "^${sid}$"; then
session_ids="$sid"
break
fi
done <<< "$after_sessions"
if [ -z "$session_ids" ]; then
echo "Error: Could not find newly created session" >&2
exit 1
@@ -88,9 +106,20 @@ EOF
echo "Base session created: $session_ids"
echo "Starting PM agent..."
before_sessions="$after_sessions"
opencode
local pm_session_ids=$(opencode session list 2>/dev/null | grep -E '^ses_' | grep -v "$session_ids" | tail -1)
after_sessions=$(opencode session list 2>/dev/null | grep -E '^ses_' | awk '{print $1}' || true)
local pm_session_ids=""
while IFS= read -r line; do
local sid=$(echo "$line" | awk '{print $1}')
if [ -n "$sid" ] && ! echo "$before_sessions" | grep -q "^${sid}$"; then
pm_session_ids="$sid"
break
fi
done <<< "$after_sessions"
if [ -z "$pm_session_ids" ]; then
echo "Warning: Could not find separate PM agent session" >&2
pm_session_ids="$session_ids"
@@ -153,10 +182,12 @@ cmd_delegate() {
local issue_ref=$(extract_issue_ref_from_message "$message")
if [ -n "$issue_ref" ] && [[ "$issue_ref" =~ \#[0-9]+$ ]]; then
cmd_start "$issue_ref" "$message"
# Enqueue for daemon to process via cmd_start/cmd_continue
enqueue_task "$issue_ref" "$message"
return
fi
# No issue ref detected — delegate directly to PM agent (legacy path)
local pm_session=$(get_pm_agent_session_id)
if [ -z "$pm_session" ] || [ "$pm_session" = "null" ] || [ "$pm_session" = "None" ]; then
echo "Error: PM agent session not found. Run 'kugetsu init' first." >&2
@@ -165,7 +196,7 @@ cmd_delegate() {
mkdir -p "$LOGS_DIR"
local log_file="$LOGS_DIR/delegate-$(date +%s).log"
nohup sh -c "GITEA_TOKEN='${GITEA_TOKEN:-}' opencode run '$message' --continue --session '$pm_session' >> '$log_file' 2>&1" > /dev/null 2>&1 &
nohup sh -c "GITEA_TOKEN='***' opencode run '$message' --continue --session '$pm_session' >> '$log_file' 2>&1" > /dev/null 2>&1 &
disown
echo "Delegated to PM agent (logged to $(basename "$log_file"))"
}