Compare commits

..

2 Commits

Author SHA1 Message Date
shokollm
fa8b8467ee fix(queue-daemon): use kugetsu-log for timeout messages
Use log_warn and log_error instead of echo for unified log formatting.
2026-04-08 02:39:40 +00:00
shokollm
51ec844365 fix(queue-daemon): implement timeout handling for long-running tasks
Check notified_at timestamp in check_task_completion() and mark tasks
as error if they exceed TASK_TIMEOUT_HOURS (defaults to 1 hour).

When timeout is detected:
- Kill the task process (PID) if running
- Stop the opencode session if exists
- Mark queue item as error state

Fixes #166
2026-04-07 12:53:35 +00:00
2 changed files with 166 additions and 171 deletions

View File

@@ -41,6 +41,31 @@ check_task_completion() {
local session_id=$(python3 -c "import json; print(json.load(open('$item')).get('opencode_session_id', ''))" 2>/dev/null)
local issue_ref=$(python3 -c "import json; print(json.load(open('$item')).get('issue_ref', ''))" 2>/dev/null)
local pid=$(python3 -c "import json; print(json.load(open('$item')).get('pid', ''))" 2>/dev/null)
local notified_at=$(python3 -c "import json; print(json.load(open('$item')).get('notified_at', ''))" 2>/dev/null)
local timed_out=false
if [ -n "$notified_at" ]; then
local notified_epoch=$(date -d "$notified_at" +%s 2>/dev/null || echo "0")
local now_epoch=$(date +%s)
local hours_elapsed=$(( (now_epoch - notified_epoch) / 3600 ))
if [ "$hours_elapsed" -ge "${TASK_TIMEOUT_HOURS:-1}" ]; then
timed_out=true
log_warn "queue-daemon" "Task $queue_id ($issue_ref) timed out after ${hours_elapsed}h"
fi
fi
if [ "$timed_out" = true ]; then
if [ -n "$pid" ] && [ "$pid" != "None" ]; then
kill "$pid" 2>/dev/null || true
fi
if [ -n "$session_id" ]; then
opencode session stop "$session_id" 2>/dev/null || true
fi
update_queue_item_state "$queue_id" "error"
log_error "queue-daemon" "Task $queue_id ($issue_ref) marked error — timeout after ${hours_elapsed}h"
release_lock "$issue_ref"
return
fi
if [ -n "$pid" ] && [ "$pid" != "None" ]; then
if ! kill -0 "$pid" 2>/dev/null; then
@@ -109,15 +134,28 @@ process_task() {
source "$SCRIPT_DIR/kugetsu-session.sh"
log_file="$LOGS_DIR/delegate-$(date +%s).log"
if cmd_continue "$issue_ref" "$message" >> "$log_file" 2>&1; then
sleep 1
local session_id=$(get_session_id_for_issue "$issue_ref")
update_queue_item_state "$queue_id" "notified" "$session_id" ""
echo "Task $queue_id continued for $issue_ref"
if worktree_exists "$issue_ref" "$WORKTREES_DIR" || [ -f "$SESSIONS_DIR/$(issue_ref_to_filename "$issue_ref").json" ]; then
log_file="$LOGS_DIR/delegate-$(date +%s).log"
if cmd_continue "$issue_ref" "$message" >> "$log_file" 2>&1; then
sleep 1
local session_id=$(get_session_id_for_issue "$issue_ref")
update_queue_item_state "$queue_id" "notified" "$session_id" ""
echo "Task $queue_id continued for $issue_ref"
else
update_queue_item_state "$queue_id" "error"
echo "Task $queue_id ($issue_ref) failed to continue"
fi
else
update_queue_item_state "$queue_id" "error"
echo "Task $queue_id ($issue_ref) failed to continue"
log_file="$LOGS_DIR/delegate-$(date +%s).log"
if cmd_start "$issue_ref" "$message" >> "$log_file" 2>&1; then
sleep 1
local session_id=$(get_session_id_for_issue "$issue_ref")
update_queue_item_state "$queue_id" "notified" "$session_id" ""
echo "Task $queue_id started for $issue_ref"
else
update_queue_item_state "$queue_id" "error"
echo "Task $queue_id ($issue_ref) failed to start"
fi
fi
release_lock "$issue_ref"

View File

@@ -272,44 +272,29 @@ build_dev_agent_message() {
if [ -n "$user_message" ]; then
cat <<EOF
You are assigned to work on $issue_ref.
You are continuing work on $issue_ref. A PR likely already exists.
IMPORTANT: Follow the workflow below as your guideline, but prioritize the delegator's message.
IMPORTANT - Review workflow:
1. First, check if PR exists: curl -s "https://$instance/api/v1/repos/$owner/$repo/pulls?state=open" -H "Authorization: Bearer \$GITEA_TOKEN" | grep -i "$number"
2. Get PR comments: curl -s "https://$instance/api/v1/repos/$owner/$repo/issues/$number/comments" -H "Authorization: Bearer \$GITEA_TOKEN"
3. Get PR reviews: curl -s "https://$instance/api/v1/repos/$owner/$repo/pulls/$number/reviews" -H "Authorization: Bearer \$GITEA_TOKEN"
Workflow:
1. Read the issue at $instance/$owner/$repo/issues/$number AND all comments on that issue
2. Check if a PR already exists for this issue
- If PR exists and is open, review it and learn from it
- CRITICAL: Check if PR has merge conflicts before asking for review:
- Use: curl -s "https://$instance/api/v1/repos/$owner/$repo/pulls/$number" -H "Authorization: Bearer \$GITEA_TOKEN"
- If "mergeable": false, there ARE conflicts - you MUST resolve them FIRST
- To resolve: cd to worktree, git fetch origin, git rebase origin/main, resolve conflicts, git rebase --continue, git push --force-with-lease
- Only after resolving conflicts (mergeable: true) can you ask for review
- If PR makes sense to continue, work on it instead
- If PR is not worth continuing, create a new branch/PR but explain in PR description why you're creating a new one instead of continuing the existing PR
3. Read README.md (if exists) to understand the general concept of this repository
4. Read CONTRIBUTING.md (if exists) to understand how to contribute
- If CONTRIBUTING.md doesn't exist, follow steps 5-9 as your guideline
5. Explore the repository to understand the codebase
6. If anything is unclear, post a comment on the issue asking for clarification before implementing
7. Implement the solution
8. Create a branch named fix/issue-$number and implement the fix
9. Create a PR when the implementation is complete using: tea pr create --repo $owner/$repo --title "Your PR title" --body "PR description"
- Make sure you are logged in with: tea login add --name gitea --token \$GITEA_TOKEN --url https://$instance
- If tea is not available, use: curl -X POST "https://$instance/api/v1/repos/$owner/$repo/pulls" -H "Authorization: Bearer \$GITEA_TOKEN" -H "Content-Type: application/json" -d '{"title":"PR Title","head":"branch-name","base":"main","body":"PR description"}'
You may need to:
- Make code changes and push to the same branch
- Reply to PR comments using: curl -X POST "https://$instance/api/v1/repos/$owner/$repo/issues/$number/comments" -H "Authorization: Bearer \$GITEA_TOKEN" -H "Content-Type: application/json" -d '{"body":"Your reply here"}'
- Or do both
Tools for PR interaction:
- Post issue/PR comment: curl -X POST "https://$instance/api/v1/repos/$owner/$repo/issues/$number/comments" -H "Authorization: Bearer \$GITEA_TOKEN" -H "Content-Type: application/json" -d '{"body":"Your comment"}'
- List PR comments: curl -s "https://$instance/api/v1/repos/$owner/$repo/issues/$number/comments" -H "Authorization: Bearer \$GITEA_TOKEN"
- List PR reviews: curl -s "https://$instance/api/v1/repos/$owner/$repo/pulls/$number/reviews" -H "Authorization: Bearer \$GITEA_TOKEN"
- Merge PR (only with approval): tea pr merge --repo $owner/$repo $number --style merge
- MERGING requires approval first! Check for: approval in reviews, OR "lgtm"/"approved" in comments
- If no approval, ask reviewer to approve first before merging
MERGING: If instructed to merge, you MUST confirm approval first before merging:
- Check for PR approval via: curl -s "https://$instance/api/v1/repos/$owner/$repo/pulls/$number/reviews" -H "Authorization: Bearer \$GITEA_TOKEN"
- Check for "lgtm" or "approved" in comments: curl -s "https://$instance/api/v1/repos/$owner/$repo/issues/$number/comments" -H "Authorization: Bearer \$GITEA_TOKEN"
- Only merge if you see approval OR the instruction explicitly says to merge (e.g., "merge the PR", "please merge", "go ahead and merge")
- To merge: tea pr merge --repo $owner/$repo $number --style merge
- If no approval yet, reply asking for review/approval first
Delegator's message:
$user_message
Work directory: $worktree_path
Work directory: $worktree_path (already on the fix branch)
EOF
else
cat <<EOF
@@ -319,11 +304,6 @@ Workflow:
1. Read the issue at $instance/$owner/$repo/issues/$number AND all comments on that issue
2. Check if a PR already exists for this issue
- If PR exists and is open, review it and learn from it
- CRITICAL: Check if PR has merge conflicts before asking for review:
- Use: curl -s "https://$instance/api/v1/repos/$owner/$repo/pulls/$number" -H "Authorization: Bearer \$GITEA_TOKEN"
- If "mergeable": false, there ARE conflicts - you MUST resolve them FIRST
- To resolve: cd to worktree, git fetch origin, git rebase origin/main, resolve conflicts, git rebase --continue, git push --force-with-lease
- Only after resolving conflicts (mergeable: true) can you ask for review
- If PR makes sense to continue, work on it instead
- If PR is not worth continuing, create a new branch/PR but explain in PR description why you're creating a new one instead of continuing the existing PR
3. Read README.md (if exists) to understand the general concept of this repository
@@ -341,8 +321,6 @@ Tools for PR interaction:
- Post issue/PR comment: curl -X POST "https://$instance/api/v1/repos/$owner/$repo/issues/$number/comments" -H "Authorization: Bearer \$GITEA_TOKEN" -H "Content-Type: application/json" -d '{"body":"Your comment"}'
- List PR comments: curl -s "https://$instance/api/v1/repos/$owner/$repo/issues/$number/comments" -H "Authorization: Bearer \$GITEA_TOKEN"
- List PR reviews: curl -s "https://$instance/api/v1/repos/$owner/$repo/pulls/$number/reviews" -H "Authorization: Bearer \$GITEA_TOKEN"
- IMPORTANT: After listing reviews, READ the review comments and incorporate feedback
- Check for review state: "APPROVED" means ready to merge, "COMMENT" means feedback to address
- Merge PR (only with approval): tea pr merge --repo $owner/$repo $number --style merge
- MERGING requires approval first! Check for: approval in reviews, OR "lgtm"/"approved" in comments
- If no approval, ask reviewer to approve first before merging
@@ -352,48 +330,29 @@ EOF
fi
}
ensure_worktree() {
local issue_ref="$1"
cmd_start() {
local issue_ref="${1:-}"
local message="${2:-}"
if worktree_exists "$issue_ref" "$WORKTREES_DIR"; then
log "info" "ensure_worktree" "Worktree already exists for $issue_ref"
echo "existed"
return 0
if [ -z "$issue_ref" ]; then
echo "Error: issue ref is required" >&2
echo "Usage: kugetsu start <issue-ref> [message]" >&2
exit 1
fi
validate_issue_ref "$issue_ref"
local base_session_id=$(get_base_session_id)
if [ -z "$base_session_id" ] || [ "$base_session_id" = "null" ]; then
log "error" "ensure_worktree" "Base session not found for $issue_ref"
echo "error"
return 1
echo "Error: Base session not found. Run 'kugetsu init' first." >&2
exit 1
fi
local active_count=$(count_active_dev_sessions)
if [ "$active_count" -ge "${MAX_CONCURRENT_AGENTS:-3}" ]; then
log "error" "ensure_worktree" "Max concurrent agents reached for $issue_ref"
echo "error"
return 1
fi
if create_worktree "$issue_ref" "$WORKTREES_DIR" 2>&1 | tee >(cat >&2); then
log "info" "ensure_worktree" "Created worktree for $issue_ref"
echo "created"
return 0
else
log "error" "ensure_worktree" "Failed to create worktree for $issue_ref"
echo "error"
return 1
fi
}
ensure_session() {
local issue_ref="$1"
local session_file=$(issue_ref_to_filename "$issue_ref")
local session_path="$SESSIONS_DIR/$session_file"
local worktree_exists=false
if worktree_exists "$issue_ref" "$WORKTREES_DIR"; then
if worktree_exists "$issue_ref"; then
worktree_exists=true
fi
@@ -403,40 +362,38 @@ ensure_session() {
fi
if $worktree_exists && $session_exists; then
log "info" "ensure_session" "Session already exists for $issue_ref"
echo "continued"
return 0
echo "Issue '$issue_ref' already has a worktree and session." >&2
echo "Use 'kugetsu continue $issue_ref' to continue work." >&2
exit 1
fi
if $worktree_exists && ! $session_exists; then
echo "Warning: Worktree exists but session is missing. Removing worktree to recreate both..." >&2
remove_worktree_for_issue "$issue_ref"
worktree_exists=false
fi
if ! $worktree_exists && $session_exists; then
log "warn" "ensure_session" "Session exists but worktree is missing. Removing stale session..."
echo "Warning: Session exists but worktree is missing. Removing stale session to recreate both..." >&2
rm -f "$session_path"
remove_issue_from_index "$issue_ref"
session_exists=false
fi
if ! $worktree_exists; then
local wt_status=$(ensure_worktree "$issue_ref")
if [ "$wt_status" != "created" ] && [ "$wt_status" != "existed" ]; then
log "error" "ensure_session" "Failed to ensure worktree for $issue_ref"
echo "error"
return 1
fi
local active_count=$(count_active_dev_sessions)
if [ "$active_count" -ge "${MAX_CONCURRENT_AGENTS:-3}" ]; then
echo "Error: Max concurrent agents (${MAX_CONCURRENT_AGENTS:-3}) reached. Use 'kugetsu continue' or wait for an agent to finish." >&2
exit 1
fi
local base_session_id=$(get_base_session_id)
if [ -z "$base_session_id" ] || [ "$base_session_id" = "null" ]; then
log "error" "ensure_session" "Base session not found for $issue_ref"
echo "error"
return 1
fi
create_worktree "$issue_ref" "$WORKTREES_DIR"
local new_session_id=$(create_session "$base_session_id")
if [ -z "$new_session_id" ]; then
log "error" "ensure_session" "Could not create session for $issue_ref"
echo "error"
return 1
echo "Error: Could not create session" >&2
remove_worktree_for_issue "$issue_ref"
exit 1
fi
local worktree_path=$(issue_ref_to_worktree_path "$issue_ref")
@@ -446,89 +403,89 @@ ensure_session() {
add_issue_to_index "$issue_ref" "$session_file"
log "info" "ensure_session" "Created session for $issue_ref: $new_session_id"
echo "created"
return 0
}
fork_agent() {
local session_id="$1"
local worktree_path="$2"
local message="$3"
if [ -z "$worktree_path" ] || [ ! -d "$worktree_path" ]; then
log "error" "fork_agent" "Invalid worktree path: $worktree_path"
echo "error"
return 1
fi
local dev_message=$(build_dev_agent_message "$issue_ref" "$message")
load_agent_env "dev"
cd "$worktree_path"
local sanitized_id=$(echo "$session_id" | sed 's/[^a-zA-Z0-9_-]/_/g')
local sanitized_id=$(echo "$new_session_id" | sed 's/[^a-zA-Z0-9_-]/_/g')
mkdir -p "$worktree_path/.kugetsu"
if [ ! -f "$worktree_path/.gitignore" ] || ! grep -q "^.kugetsu/" "$worktree_path/.gitignore"; then
echo ".kugetsu/" >> "$worktree_path/.gitignore" 2>/dev/null || true
fi
local msg_file="$worktree_path/.kugetsu/msg.txt"
printf '%s' "$dev_message" > "$msg_file"
nohup sh -c "GITEA_TOKEN='${GITEA_TOKEN:-}' opencode run '@$msg_file' --session '$new_session_id'" >> "$LOGS_DIR/dev-$sanitized_id.log" 2>&1 &
echo "Session started for '$issue_ref': $new_session_id"
echo "Worktree: $worktree_path"
}
cmd_continue() {
local session_name=""
local message=""
local args=("$@")
args=$(set_debug_mode "${args[@]}")
for arg in $args; do
if [ -z "$session_name" ]; then
session_name="$arg"
else
message="$arg"
fi
done
if [ -z "$session_name" ]; then
echo "Error: issue ref is required" >&2
echo "Usage: kugetsu continue <issue-ref> [message]" >&2
exit 1
fi
validate_issue_ref "$session_name"
local session_file=$(get_session_for_issue "$session_name")
if [ -z "$session_file" ] || [ "$session_file" = "null" ]; then
echo "Error: No session found for '$session_name'" >&2
echo "Use 'kugetsu start $session_name' to create a new session." >&2
exit 1
fi
local session_path="$SESSIONS_DIR/$session_file"
if [ ! -f "$session_path" ]; then
echo "Error: Session file not found: $session_path" >&2
exit 1
fi
load_agent_env "dev"
local opencode_session_id=$(python3 -c "import json; print(json.load(open('$session_path')).get('opencode_session_id', ''))" 2>/dev/null || echo "")
local worktree_path=$(python3 -c "import json; print(json.load(open('$session_path')).get('worktree_path', ''))" 2>/dev/null || echo "")
local issue_ref=$(python3 -c "import json; print(json.load(open('$session_path')).get('issue_ref', ''))" 2>/dev/null || echo "")
if [ -z "$worktree_path" ] || [ ! -d "$worktree_path" ]; then
echo "Warning: Worktree is missing for '$session_name'. Recovering..." >&2
rm -f "$session_path"
remove_issue_from_index "$session_name"
echo "Calling cmd_start to create new session and worktree..." >&2
cmd_start "$session_name" "$message"
return $?
fi
if [ -z "$message" ]; then
message=$(build_dev_agent_message "$issue_ref" "")
fi
cd "$worktree_path"
local sanitized_id=$(echo "$opencode_session_id" | sed 's/[^a-zA-Z0-9_-]/_/g')
mkdir -p "$worktree_path/.kugetsu"
if [ ! -f "$worktree_path/.gitignore" ] || ! grep -q "^.kugetsu/" "$worktree_path/.gitignore"; then
echo ".kugetsu/" >> "$worktree_path/.gitignore" 2>/dev/null || true
fi
local msg_file="$worktree_path/.kugetsu/msg.txt"
printf '%s' "$message" > "$msg_file"
nohup sh -c "GITEA_TOKEN='${GITEA_TOKEN:-}' opencode run '@$msg_file' --session '$session_id'" >> "$LOGS_DIR/dev-$sanitized_id.log" 2>&1 &
log "info" "fork_agent" "Forked agent for session $session_id in $worktree_path"
echo "forked"
return 0
}
cmd_start() {
cmd_continue "$@"
}
cmd_continue() {
local issue_ref="${1:-}"
local message="${2:-}"
if [ -z "$issue_ref" ]; then
echo "Error: issue ref is required" >&2
echo "Usage: kugetsu continue <issue-ref> [message]" >&2
exit 1
fi
validate_issue_ref "$issue_ref"
if [ -z "$message" ]; then
message=$(build_dev_agent_message "$issue_ref" "")
else
message=$(build_dev_agent_message "$issue_ref" "$message")
fi
local worktree_status=$(ensure_worktree "$issue_ref")
if [ "$worktree_status" = "error" ]; then
echo "Error: Failed to ensure worktree for '$issue_ref'" >&2
exit 1
fi
local session_status=$(ensure_session "$issue_ref")
if [ "$session_status" = "error" ]; then
echo "Error: Failed to ensure session for '$issue_ref'" >&2
exit 1
fi
local session_file=$(issue_ref_to_filename "$issue_ref")
local session_path="$SESSIONS_DIR/$session_file"
local opencode_session_id=$(python3 -c "import json; print(json.load(open('$session_path')).get('opencode_session_id', ''))" 2>/dev/null || echo "")
local worktree_path=$(python3 -c "import json; print(json.load(open('$session_path')).get('worktree_path', ''))" 2>/dev/null || echo "")
local fork_status=$(fork_agent "$opencode_session_id" "$worktree_path" "$message")
if [ "$fork_status" = "error" ]; then
echo "Error: Failed to fork agent for '$issue_ref'" >&2
exit 1
fi
log "info" "cmd_continue" "Result for $issue_ref: worktree=$worktree_status session=$session_status fork=$fork_status"
echo "Session continued for '$issue_ref': $opencode_session_id"
echo "Worktree: $worktree_path"
echo "${worktree_status}-${session_status}-${fork_status}"
nohup sh -c "GITEA_TOKEN='${GITEA_TOKEN:-}' opencode run '@$msg_file' --session '$opencode_session_id'" >> "$LOGS_DIR/dev-$sanitized_id.log" 2>&1 &
}
cmd_list() {