Compare commits

...

15 Commits

Author SHA1 Message Date
98a31070a7 Merge pull request '[FIX] process_queue: add missing closing parentheses' (#143) from fix/issue-142-process-queue-missing-parens into main 2026-04-05 08:56:59 +02:00
26346235c9 fix: add missing closing parenthesis in process_queue Python extraction
Fixes #142 - process_queue silently skips all queue items because
issue_ref and message Python extraction commands were missing a closing
parenthesis. The error was silently swallowed by 2>/dev/null causing
both variables to be empty, so every queue item was skipped.
2026-04-05 06:51:57 +00:00
2212fabf22 Merge pull request 'feat(timeout): add agent timeout handling' (#141) from feat/agent-timeout into main 2026-04-05 06:59:05 +02:00
shokollm
0fa778353b feat(timeout): add agent timeout handling
Implements #137 - Agent timeout handling.

Changes:
- Add TASK_TIMEOUT_HOURS config (default: 1 hour)
- Update queue item to track opencode_session_id and pid
- Add check_task_timeouts() function that:
  - Checks notified tasks against timeout threshold
  - Kills process if exceeded
  - Marks session as 'timeout' state
- Integrate timeout check into queue daemon loop

Timeout behavior:
- Task is marked 'notified' when PM receives it
- If not completed within TASK_TIMEOUT_HOURS, task is killed
- Queue item marked 'error', session marked 'timeout'
2026-04-05 04:53:27 +00:00
151efadca3 Merge pull request 'feat(queue): add queue system with background daemon' (#140) from feat/queue-daemon into main 2026-04-05 06:49:13 +02:00
shokollm
379d53cedc docs: update SKILL.md with queue system documentation
- Update kugetsu delegate section to explain queue-based processing
- Add queue-daemon command documentation
- Update queue command with proper list/stats/clear/enqueue
- Add queue-related config options
- Update directory structure to include queue/
- Update workflow example with queue daemon setup
2026-04-05 04:45:56 +00:00
shokollm
043542344a feat(queue): add queue system with background daemon
Implements #134 - Queue system with background daemon.

## Changes

### Configuration
- QUEUE_DIR, QUEUE_ITEMS_DIR for queue storage
- QUEUE_DAEMON_PID_FILE, LOCK_FILE, LOG_FILE for daemon management
- QUEUE_DAEMON_INTERVAL_MINUTES (default: 5)
- QUEUE_DAEMON_BATCH_SIZE (default: 2)
- QUEUE_CLEANUP_AGE_DAYS (default: 7)

### Queue System
- File-based queue at ~/.kugetsu/queue/items/
- One JSON file per queue item
- States: pending, notified, completed, error

### New Commands
- kugetsu queue [list|stats|clear] - View queue status
- kugetsu queue enqueue <issue-ref> <message> - Manually enqueue
- kugetsu queue-daemon [start|stop|restart|status|logs] - Daemon management

### Behavior Change
- kugetsu delegate now always enqueues (fire-and-forget)
- Queue daemon polls queue and invokes PM when slots available

### Queue Item Format
```json
{
  "id": "q_xxx",
  "issue_ref": "github.com/user/repo#123",
  "message": "task description",
  "state": "pending",
  "pending_since": "...",
  "notified_at": null,
  "completed_at": null,
  "error": null
}
```

Closes #134
2026-04-05 04:28:41 +00:00
e763ceb0ad Merge pull request 'feat(context): add context dump/load for session isolation' (#139) from feat/context-dump-load into main 2026-04-05 06:23:40 +02:00
shokollm
61f06f825f Add context dump/load feature
Adds session context management to prevent session poisoning:
- CONTEXT_DIR and ENABLE_CONTEXT_DUMP config options
- issue_ref_to_context_file() - derive context file path
- kugetsu_context_load() - load previous context
- kugetsu_context_dump() - save context on session start
- kugetsu_context_update_message() - append to conversation history
- Integration in cmd_start and cmd_continue
- New 'kugetsu context' command
2026-04-05 04:23:26 +00:00
b76a9b883a Merge pull request 'feat(worktree-lifecycle): add PR tracking and safe destroy' (#138) from feat/worktree-lifecycle into main 2026-04-05 06:00:15 +02:00
shokollm
ac850869fd fix(worktree-lifecycle): use github.com as example in set-pr help
- Remove accidentally committed worktree directory
2026-04-05 03:56:48 +00:00
shokollm
3107dbf1e5 fix(worktree-lifecycle): use GIT_SERVERS config for check_pr_status
- Extract hostname from pr_url instead of hardcoding domains
- Look up server base URL from GIT_SERVERS config
- Append /api/v1 to derive API URL (configurable per server)
- Works with any server configured in GIT_SERVERS
2026-04-05 03:41:41 +00:00
shokollm
b8b97e3c09 fix(worktree-lifecycle): address PR review feedback
- Rename update-pr to set-pr for clarity (it's setting the PR URL, not updating PR)
- Add optional pr-url argument to kugetsu start command
  Usage: kugetsu start <issue-ref> <message> [pr-url]
- If pr-url is provided at start, it's stored directly in session file
2026-04-05 03:16:05 +00:00
shokollm
d8af560e6d feat(worktree-lifecycle): add PR tracking and safe destroy
- Add WORKTREE_CHECK_PR_STATUS config (default: true)
- Add pr_url and branch_name fields to session files
- Add check_pr_status() to query PR status via API (Gitea/GitHub)
- Add update_session_pr_url() to update PR URL in session
- Add kugetsu update-pr command to set PR URL
- Modify cmd_destroy to check PR status before destroying worktree

Closes #135
2026-04-05 02:50:09 +00:00
5d12f6ca42 Merge pull request 'feat(kugetsu): smart delegate with worktree awareness' (#130) from feature/smart-delegate-worktree-awareness into main 2026-04-03 16:31:30 +02:00
2 changed files with 877 additions and 107 deletions

View File

@@ -49,6 +49,9 @@ A default config file is created during `kugetsu init` with commented examples:
| `MAX_CONCURRENT_AGENTS` | 3 | Maximum number of concurrent dev agents | | `MAX_CONCURRENT_AGENTS` | 3 | Maximum number of concurrent dev agents |
| `KUGETSU_TEMP_DIR` | `~/.local/share/opencode/tool-output` | Temp directory for subagent tool output (useful in headless environments where /tmp is restricted) | | `KUGETSU_TEMP_DIR` | `~/.local/share/opencode/tool-output` | Temp directory for subagent tool output (useful in headless environments where /tmp is restricted) |
| `KUGETSU_VERBOSITY` | `default` | PM agent verbosity level: `verbose`, `default`, or `quiet` | | `KUGETSU_VERBOSITY` | `default` | PM agent verbosity level: `verbose`, `default`, or `quiet` |
| `QUEUE_DAEMON_INTERVAL_MINUTES` | 5 | How often daemon polls queue (in minutes) |
| `QUEUE_DAEMON_BATCH_SIZE` | 2 | How many tasks daemon picks per poll |
| `QUEUE_CLEANUP_AGE_DAYS` | 7 | Auto-cleanup completed/error items older than N days |
### Environment Variables for Agents ### Environment Variables for Agents
@@ -111,6 +114,10 @@ Each issue session gets its own git worktree to prevent conflicts:
├── worktrees/ ├── worktrees/
│ ├── github.com-shoko-kugetsu-14/ # Isolated workdir for issue #14 │ ├── github.com-shoko-kugetsu-14/ # Isolated workdir for issue #14
│ └── github.com-shoko-kugetsu-15/ # Isolated workdir for issue #15 │ └── github.com-shoko-kugetsu-15/ # Isolated workdir for issue #15
├── queue/
│ ├── items/ # Queue item JSON files
│ ├── daemon.pid # Daemon process ID
│ └── daemon.log # Daemon log output
└── index.json # Maps session IDs and issue refs to session files └── index.json # Maps session IDs and issue refs to session files
``` ```
@@ -258,16 +265,17 @@ kugetsu destroy --base -y
### kugetsu delegate `<message>` ### kugetsu delegate `<message>`
Send a message to the PM agent for task coordination (fire-and-forget): Send a message to the PM agent for task coordination via queue:
```bash ```bash
kugetsu delegate "work on issue #14" kugetsu delegate "work on issue #14"
kugetsu delegate "review PR #92" kugetsu delegate "review PR #92"
``` ```
- Non-blocking: returns immediately, runs in background - **Always enqueues** (fire-and-forget): returns immediately
- PM agent processes the message asynchronously - Queue daemon polls queue and invokes PM when slots available
- Uses `KUGETSU_VERBOSITY` env var to control PM agent output verbosity - Tasks are processed FIFO (first-in-first-out)
- Log output stored in `~/.kugetsu/logs/delegate-<timestamp>.log` - Use `kugetsu queue list` to see pending tasks
- Use `kugetsu queue-daemon logs` to debug queue processing
### kugetsu logs [n] ### kugetsu logs [n]
@@ -328,35 +336,79 @@ kugetsu server default github # Set default server
kugetsu server get github # Get server URL kugetsu server get github # Get server URL
``` ```
### kugetsu queue <list|enqueue|dequeue|clear> ### kugetsu queue <list|stats|clear>
Manage task queue for autonomous PM operation: Manage task queue for autonomous PM operation:
```bash ```bash
kugetsu queue list # Show queued tasks kugetsu queue list # Show queued tasks with status
kugetsu queue enqueue "task" # Add task to queue kugetsu queue stats # Show queue statistics (total, pending, notified, completed, error)
kugetsu queue dequeue # Remove next task from queue kugetsu queue clear # Clean up old completed/error items
kugetsu queue clear # Clear all queued tasks kugetsu queue enqueue <issue-ref> <message> # Manually enqueue a task
``` ```
- Queue stored in `~/.kugetsu/queue.json` **Queue Item States:**
- `pending` - Waiting in queue, daemon can pick up
- `notified` - PM agent has picked up the task
- `completed` - Dev agent finished, PR created
- `error` - Timeout or failure
### kugetsu queue-daemon <start|stop|restart|status|logs>
Manage the queue daemon background process:
```bash
kugetsu queue-daemon start # Start daemon in background
kugetsu queue-daemon stop # Stop daemon
kugetsu queue-daemon restart # Restart daemon
kugetsu queue-daemon status # Check if daemon is running
kugetsu queue-daemon logs # Show recent daemon logs
```
**Daemon Behavior:**
1. Runs at configurable interval (default: 5 minutes)
2. Checks if active agents < MAX_CONCURRENT_AGENTS
3. Picks 1-N pending items (configurable batch size)
4. Forks PM session for each picked item
5. PM decides whether to use `start` or `continue`
**Queue Directory:**
```
~/.kugetsu/queue/
├── items/ # Queue item JSON files
│ ├── q_1234567890.json # One file per queued task
│ └── q_1234567891.json
├── daemon.pid # Daemon process ID
├── daemon.lock # Daemon lock file
└── daemon.log # Daemon log output
```
## Workflow Example ## Workflow Example
### First-time Setup
```bash ```bash
# First-time setup (requires TTY) # Initialize kugetsu (requires TTY)
kugetsu init kugetsu init
# Creates: base session + pm-agent session
# Start work on issue # Start the queue daemon (for autonomous operation)
kugetsu start github.com/shoko/kugetsu#14 "implement feature X" kugetsu queue-daemon start
# Creates: worktree at ~/.kugetsu/worktrees/github.com-shoko-kugetsu-14/ ```
# Continue later ### Normal Workflow
```bash
# Enqueue tasks via delegate - agents will process them automatically
kugetsu delegate "work on issue #14"
kugetsu delegate "review PR #92"
# Check queue status
kugetsu queue list # See pending tasks
kugetsu queue stats # See statistics
# Debug queue daemon
kugetsu queue-daemon status # Is daemon running?
kugetsu queue-daemon logs # See daemon logs
# Continue work on existing issue
kugetsu continue github.com/shoko/kugetsu#14 "add tests" kugetsu continue github.com/shoko/kugetsu#14 "add tests"
# Continue again
kugetsu continue github.com/shoko/kugetsu#14 "fix failing test"
# List all sessions # List all sessions
kugetsu list kugetsu list
@@ -367,6 +419,21 @@ kugetsu prune --force
kugetsu destroy github.com/shoko/kugetsu#14 kugetsu destroy github.com/shoko/kugetsu#14
``` ```
### Queue Daemon Management
```bash
# Check if daemon is running
kugetsu queue-daemon status
# View daemon logs for debugging
kugetsu queue-daemon logs
# Restart daemon if needed
kugetsu queue-daemon restart
# Stop daemon
kugetsu queue-daemon stop
```
## Headless Operation ## Headless Operation
This design solves the headless CLI limitation discovered in Issue #14: This design solves the headless CLI limitation discovered in Issue #14:

View File

@@ -13,6 +13,19 @@ VERBOSITY_DIR="$KUGETSU_DIR/verbosity"
MAX_CONCURRENT_AGENTS="${MAX_CONCURRENT_AGENTS:-3}" MAX_CONCURRENT_AGENTS="${MAX_CONCURRENT_AGENTS:-3}"
KUGETSU_VERBOSITY="${KUGETSU_VERBOSITY:-default}" KUGETSU_VERBOSITY="${KUGETSU_VERBOSITY:-default}"
CONTEXT_DIR="${CONTEXT_DIR:-$KUGETSU_DIR/context}"
ENABLE_CONTEXT_DUMP="${ENABLE_CONTEXT_DUMP:-true}"
WORKTREE_CHECK_PR_STATUS="${WORKTREE_CHECK_PR_STATUS:-true}"
QUEUE_DIR="${QUEUE_DIR:-$KUGETSU_DIR/queue}"
QUEUE_ITEMS_DIR="${QUEUE_ITEMS_DIR:-$QUEUE_DIR/items}"
QUEUE_DAEMON_PID_FILE="${QUEUE_DAEMON_PID_FILE:-$QUEUE_DIR/daemon.pid}"
QUEUE_DAEMON_LOCK_FILE="${QUEUE_DAEMON_LOCK_FILE:-$QUEUE_DIR/daemon.lock}"
QUEUE_DAEMON_LOG_FILE="${QUEUE_DAEMON_LOG_FILE:-$QUEUE_DIR/daemon.log}"
QUEUE_DAEMON_INTERVAL_MINUTES="${QUEUE_DAEMON_INTERVAL_MINUTES:-5}"
QUEUE_DAEMON_BATCH_SIZE="${QUEUE_DAEMON_BATCH_SIZE:-2}"
QUEUE_CLEANUP_AGE_DAYS="${QUEUE_CLEANUP_AGE_DAYS:-7}"
TASK_TIMEOUT_HOURS="${TASK_TIMEOUT_HOURS:-1}"
# Load user config overrides (~/.kugetsu/config) # Load user config overrides (~/.kugetsu/config)
if [ -f "$KUGETSU_DIR/config" ]; then if [ -f "$KUGETSU_DIR/config" ]; then
@@ -77,6 +90,11 @@ Usage:
kugetsu destroy <issue-ref> [-y] Delete session for issue kugetsu destroy <issue-ref> [-y] Delete session for issue
kugetsu destroy --pm-agent [-y] Delete pm-agent session (not recommended) kugetsu destroy --pm-agent [-y] Delete pm-agent session (not recommended)
kugetsu destroy --base [-y] Delete base session kugetsu destroy --base [-y] Delete base session
kugetsu set-pr <issue-ref> <pr-url> Set PR URL for session (for PR tracking)
kugetsu context <issue-ref> Show context for issue
kugetsu queue [list|stats|clear] Show queue status or statistics
kugetsu queue enqueue <issue-ref> <message> Enqueue a task (normally via delegate)
kugetsu queue-daemon [start|stop|restart|status|logs] Manage queue daemon
kugetsu help Show this help kugetsu help Show this help
Issue Ref Format: Issue Ref Format:
@@ -256,6 +274,54 @@ get_worktree_path_for_session() {
fi fi
} }
check_pr_status() {
local pr_url="$1"
if [ -z "$pr_url" ]; then
echo "no_pr_url"
return 1
fi
local hostname=$(echo "$pr_url" | sed -E 's|https://([^/]+)/.*|\1|')
local server_base="${GIT_SERVERS[$hostname]:-}"
if [ -z "$server_base" ]; then
echo "unknown_server"
return 1
fi
local api_base="${server_base}/api/v1"
local api_url=$(echo "$pr_url" | sed -E 's|https://[^/]+/([^/]+)/([^/]+)/(pulls|merge_requests)/([0-9]+)|'"${api_base}"'/repos/\1/\2/\3/\4|')
local token=""
if [[ "$hostname" == "github.com" ]]; then
token="${GITHUB_TOKEN:-}"
else
token="${GITEA_TOKEN:-}"
fi
local response
if [ -n "$token" ]; then
response=$(curl -s -H "Authorization: token $token" "$api_url" 2>/dev/null || echo "{}")
else
response=$(curl -s "$api_url" 2>/dev/null || echo "{}")
fi
local state=$(echo "$response" | python3 -c "import json, sys; d=json.load(sys.stdin); print(d.get('state', 'unknown'))" 2>/dev/null || echo "unknown")
local merged=$(echo "$response" | python3 -c "import json, sys; d=json.load(sys.stdin); print('true' if d.get('merged', False) else 'false')" 2>/dev/null || echo "false")
if [ "$merged" = "true" ]; then
echo "merged"
elif [ "$state" = "closed" ]; then
echo "closed"
elif [ "$state" = "open" ]; then
echo "open"
else
echo "unknown"
fi
}
issue_ref_to_filename() { issue_ref_to_filename() {
local issue_ref="$1" local issue_ref="$1"
echo "$issue_ref" | sed 's/[\/:]/-/g' | sed 's/#/-/' echo "$issue_ref" | sed 's/[\/:]/-/g' | sed 's/#/-/'
@@ -267,6 +333,424 @@ filename_to_issue_ref() {
echo "$name" | sed 's/-\([0-9]*\)$/#\1' | sed 's/-/\//g' echo "$name" | sed 's/-\([0-9]*\)$/#\1' | sed 's/-/\//g'
} }
issue_ref_to_context_file() {
local issue_ref="$1"
local context_filename=$(issue_ref_to_filename "$issue_ref")
echo "$CONTEXT_DIR/${context_filename}.json"
}
kugetsu_context_load() {
local issue_ref="$1"
if [ "$ENABLE_CONTEXT_DUMP" != "true" ]; then
echo ""
return
fi
local context_file=$(issue_ref_to_context_file "$issue_ref")
if [ ! -f "$context_file" ]; then
echo ""
return
fi
python3 << PYEOF
import json
import sys
context_file = "$context_file"
try:
with open(context_file, 'r') as f:
ctx = json.load(f)
lines = []
lines.append("## PREVIOUS CONTEXT")
lines.append(f"Issue: {ctx.get('issue_ref', 'unknown')}")
lines.append(f"Last updated: {ctx.get('updated_at', 'unknown')}")
lines.append(f"Current branch: {ctx.get('current_branch', 'unknown')}")
lines.append("")
lines.append("### Previous work summary:")
lines.append(ctx.get('last_message', '(no previous message)'))
lines.append("")
history = ctx.get('conversation_history', [])
if history:
lines.append("### Conversation history:")
for msg in history[-5:]:
role = msg.get('role', 'unknown')
content = msg.get('content', '')
ts = msg.get('timestamp', '')
lines.append(f"- [{ts}] {role}: {content[:200]}...")
print('\n'.join(lines))
except Exception as e:
print(f"Warning: Failed to load context: {e}", file=sys.stderr)
print("", file=sys.stderr)
PYEOF
}
kugetsu_context_dump() {
local issue_ref="$1"
local message="$2"
local branch_name="${3:-}"
if [ "$ENABLE_CONTEXT_DUMP" != "true" ]; then
return
fi
local context_file=$(issue_ref_to_context_file "$issue_ref")
mkdir -p "$CONTEXT_DIR"
python3 << PYEOF
import json
import os
from datetime import datetime
context_file = "$context_file"
issue_ref = "$issue_ref"
message = """$message"""
branch_name = "$branch_name"
context = {
"issue_ref": issue_ref,
"current_branch": branch_name,
"updated_at": datetime.now().isoformat() + "Z",
"last_message": message[:500] if message else "",
"conversation_history": []
}
if os.path.exists(context_file):
try:
with open(context_file, 'r') as f:
existing = json.load(f)
history = existing.get('conversation_history', [])
history.append({
"role": "user",
"content": message[:1000] if message else "",
"timestamp": datetime.now().isoformat() + "Z"
})
history = history[-20:]
context["conversation_history"] = history
context["created_at"] = existing.get("created_at", context["updated_at"])
except:
context["created_at"] = datetime.now().isoformat() + "Z"
else:
context["created_at"] = datetime.now().isoformat() + "Z"
with open(context_file, 'w') as f:
json.dump(context, f, indent=2)
PYEOF
}
kugetsu_context_update_message() {
local issue_ref="$1"
local message="$2"
if [ "$ENABLE_CONTEXT_DUMP" != "true" ]; then
return
fi
local context_file=$(issue_ref_to_context_file "$issue_ref")
if [ ! -f "$context_file" ]; then
return
fi
python3 << PYEOF
import json
from datetime import datetime
context_file = "$context_file"
message = """$message"""
try:
with open(context_file, 'r') as f:
ctx = json.load(f)
history = ctx.get('conversation_history', [])
history.append({
"role": "assistant",
"content": message[:1000] if message else "",
"timestamp": datetime.now().isoformat() + "Z"
})
history = history[-20:]
ctx["conversation_history"] = history
ctx["last_message"] = message[:500] if message else ""
ctx["updated_at"] = datetime.now().isoformat() + "Z"
with open(context_file, 'w') as f:
json.dump(ctx, f, indent=2)
except Exception as e:
pass
PYEOF
}
ensure_queue_dirs() {
mkdir -p "$QUEUE_ITEMS_DIR"
}
generate_queue_id() {
echo "q_$(date +%s)_$$_$RANDOM"
}
enqueue_task() {
local issue_ref="$1"
local message="$2"
if [ -z "$issue_ref" ] || [ -z "$message" ]; then
echo "Error: enqueue_task requires <issue-ref> and <message>" >&2
return 1
fi
validate_issue_ref "$issue_ref"
ensure_queue_dirs
local queue_id=$(generate_queue_id)
local pending_since=$(date -Iseconds)
python3 << PYEOF
import json
queue_item = {
"id": "$queue_id",
"issue_ref": "$issue_ref",
"message": """$message""",
"state": "pending",
"pending_since": "$pending_since",
"notified_at": None,
"completed_at": None,
"error": None
}
with open("$QUEUE_ITEMS_DIR/${queue_id}.json", "w") as f:
json.dump(queue_item, f, indent=2)
print(f"Enqueued: $queue_id")
PYEOF
}
get_pending_tasks() {
local limit="${1:-10}"
if [ ! -d "$QUEUE_ITEMS_DIR" ]; then
echo "[]"
return
fi
find "$QUEUE_ITEMS_DIR" -name "*.json" -type f 2>/dev/null | while read -r file; do
local state=$(python3 -c "import json; print(json.load(open('$file')).get('state', ''))" 2>/dev/null || echo "")
if [ "$state" = "pending" ]; then
cat "$file"
fi
done | head -"$limit"
}
get_queue_stats() {
local total=0
local pending=0
local notified=0
local completed=0
local error=0
if [ -d "$QUEUE_ITEMS_DIR" ]; then
for file in "$QUEUE_ITEMS_DIR"/*.json; do
[ -f "$file" ] || continue
total=$((total + 1))
local state=$(python3 -c "import json; print(json.load(open('$file')).get('state', ''))" 2>/dev/null || echo "")
case "$state" in
pending) pending=$((pending + 1)) ;;
notified) notified=$((notified + 1)) ;;
completed) completed=$((completed + 1)) ;;
error) error=$((error + 1)) ;;
esac
done
fi
echo "{\"total\": $total, \"pending\": $pending, \"notified\": $notified, \"completed\": $completed, \"error\": $error}"
}
update_queue_item_state() {
local queue_id="$1"
local new_state="$2"
local session_id="${3:-}"
local pid="${4:-}"
local item_file="$QUEUE_ITEMS_DIR/${queue_id}.json"
if [ ! -f "$item_file" ]; then
echo "Error: Queue item not found: $queue_id" >&2
return 1
fi
python3 << PYEOF
import json
from datetime import datetime
item_file = "$item_file"
new_state = "$new_state"
session_id = "$session_id"
pid = "$pid"
with open(item_file, 'r') as f:
item = json.load(f)
item['state'] = new_state
if new_state == "notified":
item['notified_at'] = datetime.now().isoformat() + "Z"
if session_id:
item['opencode_session_id'] = session_id
if pid:
item['pid'] = int(pid) if pid.isdigit() else None
elif new_state == "completed":
item['completed_at'] = datetime.now().isoformat() + "Z"
elif new_state == "error":
item['error'] = datetime.now().isoformat() + "Z"
with open(item_file, 'w') as f:
json.dump(item, f, indent=2)
print(f"Updated $queue_id to state: $new_state")
PYEOF
}
check_task_timeouts() {
if [ ! -d "$QUEUE_ITEMS_DIR" ]; then
return
fi
local timeout_hours="${TASK_TIMEOUT_HOURS:-1}"
for item in "$QUEUE_ITEMS_DIR"/*.json; do
[ -f "$item" ] || continue
local state=$(python3 -c "import json; print(json.load(open('$item')).get('state', ''))" 2>/dev/null)
if [ "$state" != "notified" ]; then
continue
fi
local notified_at=$(python3 -c "import json; print(json.load(open('$item')).get('notified_at', ''))" 2>/dev/null)
if [ -z "$notified_at" ]; then
continue
fi
local queue_id=$(basename "$item" .json)
local pid=$(python3 -c "import json; print(json.load(open('$item')).get('pid', ''))" 2>/dev/null)
local session_id=$(python3 -c "import json; print(json.load(open('$item')).get('opencode_session_id', ''))" 2>/dev/null)
local notified_epoch=$(date -d "$notified_at" +%s 2>/dev/null || echo "0")
local now_epoch=$(date +%s)
local hours_elapsed=$(( (now_epoch - notified_epoch) / 3600 ))
if [ "$hours_elapsed" -ge "$timeout_hours" ]; then
echo "Task $queue_id timed out after ${hours_elapsed}h (limit: ${timeout_hours}h)"
if [ -n "$pid" ] && kill -0 "$pid" 2>/dev/null; then
echo "Killing process $pid"
kill "$pid" 2>/dev/null || true
fi
if [ -n "$session_id" ]; then
local worktree_path=""
for session_file in "$SESSIONS_DIR"/*.json; do
[ -f "$session_file" ] || continue
local sess_id=$(python3 -c "import json; print(json.load(open('$session_file')).get('opencode_session_id', ''))" 2>/dev/null)
if [ "$sess_id" = "$session_id" ]; then
worktree_path=$(python3 -c "import json; print(json.load(open('$session_file')).get('worktree_path', ''))" 2>/dev/null)
break
fi
done
if [ -n "$worktree_path" ]; then
pkill -f "opencode.*$worktree_path" 2>/dev/null || true
fi
fi
update_queue_item_state "$queue_id" "error"
local issue_ref=$(python3 -c "import json; print(json.load(open('$item')).get('issue_ref', ''))" 2>/dev/null)
if [ -n "$issue_ref" ]; then
local session_file=$(get_session_for_issue "$issue_ref")
if [ -n "$session_file" ] && [ "$session_file" != "null" ]; then
python3 << PYEOF
import json
session_path = "$SESSIONS_DIR/$session_file"
try:
with open(session_path, 'r') as f:
session = json.load(f)
session['state'] = 'timeout'
with open(session_path, 'w') as f:
json.dump(session, f, indent=2)
print(f"Marked session for $issue_ref as timeout")
except Exception as e:
print(f"Error marking session: {e}")
PYEOF
fi
fi
fi
done
}
cleanup_old_queue_items() {
local days="${QUEUE_CLEANUP_AGE_DAYS:-7}"
if [ ! -d "$QUEUE_ITEMS_DIR" ]; then
return
fi
find "$QUEUE_ITEMS_DIR" -name "*.json" -type f -mtime "+$days" 2>/dev/null | while read -r file; do
local state=$(python3 -c "import json; print(json.load(open('$file')).get('state', ''))" 2>/dev/null || echo "")
if [ "$state" = "completed" ] || [ "$state" = "error" ]; then
rm -f "$file"
echo "Cleaned up: $(basename "$file")"
fi
done
}
update_session_pr_url() {
local issue_ref="$1"
local pr_url="$2"
if [ -z "$issue_ref" ] || [ -z "$pr_url" ]; then
echo "Error: update_session_pr_url requires <issue-ref> and <pr-url>" >&2
return 1
fi
local session_file=$(get_session_for_issue "$issue_ref")
if [ -z "$session_file" ] || [ "$session_file" = "null" ]; then
echo "Error: No session found for '$issue_ref'" >&2
return 1
fi
local session_path="$SESSIONS_DIR/$session_file"
if [ ! -f "$session_path" ]; then
echo "Error: Session file not found: $session_path" >&2
return 1
fi
python3 << PYEOF
import json
session_path = "$session_path"
pr_url = "$pr_url"
with open(session_path, 'r') as f:
session = json.load(f)
session['pr_url'] = pr_url
with open(session_path, 'w') as f:
json.dump(session, f, indent=2)
print(f"Updated pr_url to: {pr_url}")
PYEOF
}
read_index() { read_index() {
if [ -f "$INDEX_FILE" ]; then if [ -f "$INDEX_FILE" ]; then
cat "$INDEX_FILE" cat "$INDEX_FILE"
@@ -734,93 +1218,190 @@ find_sessions_by_issue_number() {
echo "$results" echo "$results"
} }
cmd_delegate() { cmd_queue() {
local message="${1:-}" local action="${1:-list}"
local verbosity="${KUGETSU_VERBOSITY:-default}" shift
if [ -z "$message" ]; then case "$action" in
echo "Error: message is required" >&2 list)
echo "Usage: kugetsu delegate <message>" >&2 ensure_queue_dirs
local stats=$(get_queue_stats)
echo "Queue Statistics:"
echo "$stats" | python3 -c "import json, sys; d=json.load(sys.stdin); print(f\" Total: {d['total']}\n Pending: {d['pending']}\n Notified: {d['notified']}\n Completed: {d['completed']}\n Error: {d['error']}\")"
echo ""
echo "Pending tasks:"
local count=0
for item in "$QUEUE_ITEMS_DIR"/*.json; do
[ -f "$item" ] || continue
local state=$(python3 -c "import json; print(json.load(open('$item')).get('state', '')" 2>/dev/null)
if [ "$state" = "pending" ]; then
count=$((count + 1))
python3 -c "import json; d=json.load(open('$item')); print(f\" [{d['id']}] {d['issue_ref']}: {d['message'][:50]}...\n pending since: {d['pending_since']}\")" 2>/dev/null
fi
done
if [ $count -eq 0 ]; then
echo " (none)"
fi
;;
stats)
local stats=$(get_queue_stats)
echo "$stats" | python3 -c "import json, sys; d=json.load(sys.stdin); print(json.dumps(d, indent=2))"
;;
clear)
echo "Cleaning up old queue items..."
cleanup_old_queue_items
;;
enqueue)
local issue_ref="${1:-}"
local message="${2:-}"
if [ -z "$issue_ref" ] || [ -z "$message" ]; then
echo "Usage: kugetsu queue enqueue <issue-ref> <message>" >&2
exit 1 exit 1
fi fi
enqueue_task "$issue_ref" "$message"
;;
*)
echo "Usage: kugetsu queue [list|stats|clear|enqueue <issue-ref> <message>]" >&2
exit 1
;;
esac
}
cmd_queue_daemon() {
local action="${1:-status}"
shift
case "$action" in
start)
if [ -f "$QUEUE_DAEMON_PID_FILE" ]; then
local old_pid=$(cat "$QUEUE_DAEMON_PID_FILE" 2>/dev/null)
if [ -n "$old_pid" ] && kill -0 "$old_pid" 2>/dev/null; then
echo "Daemon is already running with PID $old_pid"
exit 1
fi
rm -f "$QUEUE_DAEMON_PID_FILE"
fi
mkdir -p "$(dirname "$QUEUE_DAEMON_LOG_FILE")"
nohup bash "$0" queue-daemon run >> "$QUEUE_DAEMON_LOG_FILE" 2>&1 &
local daemon_pid=$!
echo "$daemon_pid" > "$QUEUE_DAEMON_PID_FILE"
echo "Queue daemon started with PID $daemon_pid"
echo "Log file: $QUEUE_DAEMON_LOG_FILE"
;;
stop)
if [ ! -f "$QUEUE_DAEMON_PID_FILE" ]; then
echo "Daemon PID file not found. Is the daemon running?"
exit 1
fi
local pid=$(cat "$QUEUE_DAEMON_PID_FILE")
if [ -n "$pid" ] && kill -0 "$pid" 2>/dev/null; then
kill "$pid"
rm -f "$QUEUE_DAEMON_PID_FILE"
echo "Daemon stopped (PID $pid)"
else
echo "Daemon not running (stale PID file)"
rm -f "$QUEUE_DAEMON_PID_FILE"
fi
;;
restart)
cmd_queue_daemon stop
sleep 1
cmd_queue_daemon start
;;
status)
if [ -f "$QUEUE_DAEMON_PID_FILE" ]; then
local pid=$(cat "$QUEUE_DAEMON_PID_FILE")
if [ -n "$pid" ] && kill -0 "$pid" 2>/dev/null; then
echo "Queue daemon is running (PID $pid)"
else
echo "Daemon not running (stale PID file)"
rm -f "$QUEUE_DAEMON_PID_FILE"
fi
else
echo "Queue daemon is not running"
fi
;;
logs)
local lines="${1:-50}"
if [ -f "$QUEUE_DAEMON_LOG_FILE" ]; then
tail -"$lines" "$QUEUE_DAEMON_LOG_FILE"
else
echo "No daemon log file found"
fi
;;
run)
queue_daemon_loop
;;
*)
echo "Usage: kugetsu queue-daemon [start|stop|restart|status|logs]" >&2
exit 1
;;
esac
}
queue_daemon_loop() {
local pid=$$
echo "$pid" > "$QUEUE_DAEMON_PID_FILE"
echo "Queue daemon started (PID $pid) at $(date)"
while true; do
sleep $((QUEUE_DAEMON_INTERVAL_MINUTES * 60))
if [ ! -f "$QUEUE_DAEMON_PID_FILE" ] || [ "$(cat "$QUEUE_DAEMON_PID_FILE")" != "$pid" ]; then
echo "PID file changed, stopping daemon"
exit 0
fi
check_task_timeouts
process_queue
done
}
process_queue() {
local active_count=$(count_active_dev_sessions)
if [ "$active_count" -ge "$MAX_CONCURRENT_AGENTS" ]; then
return
fi
local available_slots=$((MAX_CONCURRENT_AGENTS - active_count))
local batch_size=$QUEUE_DAEMON_BATCH_SIZE
[ "$batch_size" -gt "$available_slots" ] && batch_size=$available_slots
if [ "$batch_size" -le 0 ]; then
return
fi
local pm_session=$(get_pm_agent_session_id) local pm_session=$(get_pm_agent_session_id)
if [ -z "$pm_session" ] || [ "$pm_session" = "null" ] || [ "$pm_session" = "None" ]; then if [ -z "$pm_session" ] || [ "$pm_session" = "null" ]; then
echo "Error: PM agent session not found. Run 'kugetsu init' first." >&2 return
exit 1
fi fi
local count=0
for item in $(ls -t "$QUEUE_ITEMS_DIR"/*.json 2>/dev/null | head -20); do
[ -f "$item" ] || continue
[ $count -ge "$batch_size" ] && break
local state=$(python3 -c "import json; print(json.load(open('$item')).get('state', ''))" 2>/dev/null)
if [ "$state" != "pending" ]; then
continue
fi
local queue_id=$(basename "$item" .json)
local issue_ref=$(python3 -c "import json; print(json.load(open('$item')).get('issue_ref', ''))" 2>/dev/null)
local message=$(python3 -c "import json; print(json.load(open('$item')).get('message', ''))" 2>/dev/null)
if [ -z "$issue_ref" ] || [ -z "$message" ]; then
continue
fi
update_queue_item_state "$queue_id" "notified"
local log_file="$LOGS_DIR/delegate-${queue_id}.log"
mkdir -p "$LOGS_DIR" mkdir -p "$LOGS_DIR"
local log_file="$LOGS_DIR/delegate-$(date +%s).log"
local parsed=$(parse_issue_ref_from_message "$message") local env_sh="set -a; "
local gitserver=$(echo "$parsed" | cut -d'|' -f1)
local owner=$(echo "$parsed" | cut -d'|' -f2)
local repo=$(echo "$parsed" | cut -d'|' -f3)
local issue_number=$(echo "$parsed" | cut -d'|' -f4)
local missing_info=$(get_missing_info "$parsed")
local context_injection=""
if [ -n "$missing_info" ]; then
context_injection=$(build_missing_info_context "$missing_info")
echo "NOTE: Delegation missing information: ${missing_info}"
fi
local candidates=""
local candidate_count=0
if [ -n "$issue_number" ]; then
local worktrees=$(find_worktrees_by_issue_number "$issue_number")
local sessions=$(find_sessions_by_issue_number "$issue_number")
while IFS=: read -r path type; do
if [ -n "$path" ]; then
candidate_count=$((candidate_count + 1))
candidates="${candidates}${candidate_count}) ${path} (${type})
"
fi
done <<< "$worktrees"
while IFS=: read -r path type; do
if [ -n "$path" ]; then
candidate_count=$((candidate_count + 1))
candidates="${candidates}${candidate_count}) ${path} (${type})
"
fi
done <<< "$sessions"
fi
local use_worktree=""
if [ $candidate_count -gt 0 ]; then
echo "Found $candidate_count existing worktree(s)/session(s) for issue #${issue_number}:"
echo "$candidates"
echo "r) Delegate anyway (without routing)"
echo "Which one to use? [1-${candidate_count}/r]: "
read -r choice
if [ "$choice" = "r" ] || [ -z "$choice" ]; then
use_worktree=""
elif [ "$choice" -ge 1 ] && [ "$choice" -le "$candidate_count" ]; then
local selected=$(echo "$candidates" | sed -n "${choice}p")
use_worktree=$(echo "$selected" | sed 's/) .*//')
fi
fi
local final_message="${message}${context_injection}"
if [ -n "$use_worktree" ]; then
if [ -d "$use_worktree" ]; then
echo "Using worktree: $use_worktree"
final_message="${final_message}
NOTE: Worktree selected: ${use_worktree}"
fi
fi
local temp_dir="${KUGETSU_TEMP_DIR:-$HOME/.local/share/opencode/tool-output}"
mkdir -p "$ENV_DIR"
local env_sh="set -a; export KUGETSU_TEMP_DIR='$temp_dir'; export KUGETSU_VERBOSITY='$verbosity'; "
if [ -f "$ENV_DIR/pm-agent.env" ]; then if [ -f "$ENV_DIR/pm-agent.env" ]; then
env_sh="${env_sh}source '$ENV_DIR/pm-agent.env'; " env_sh="${env_sh}source '$ENV_DIR/pm-agent.env'; "
elif [ -f "$ENV_DIR/default.env" ]; then elif [ -f "$ENV_DIR/default.env" ]; then
@@ -828,10 +1409,41 @@ NOTE: Worktree selected: ${use_worktree}"
fi fi
env_sh="${env_sh}set +a; " env_sh="${env_sh}set +a; "
nohup sh -c "${env_sh}opencode run '${final_message}' --continue --session '$pm_session' >> '$log_file' 2>&1" > /dev/null 2>&1 & nohup sh -c "${env_sh}opencode run 'Delegate task: ${message}' --continue --session '$pm_session'" >> "$log_file" 2>&1 &
disown local fork_pid=$!
echo "Delegated to PM agent (logged to $(basename "$log_file"))"
echo "Verbosity: $verbosity" update_queue_item_state "$queue_id" "notified" "" "$fork_pid"
echo "Queued task $queue_id for PM agent (PID: $fork_pid)"
count=$((count + 1))
done
}
cmd_delegate() {
local message="${1:-}"
if [ -z "$message" ]; then
echo "Error: message is required" >&2
echo "Usage: kugetsu delegate <message>" >&2
exit 1
fi
local parsed=$(parse_issue_ref_from_message "$message")
local gitserver=$(echo "$parsed" | cut -d'|' -f1)
local owner=$(echo "$parsed" | cut -d'|' -f2)
local repo=$(echo "$parsed" | cut -d'|' -f3)
local issue_number=$(echo "$parsed" | cut -d'|' -f4)
if [ -z "$issue_number" ] || [ -z "$gitserver" ] || [ -z "$owner" ] || [ -z "$repo" ]; then
echo "Error: Could not parse issue reference from message" >&2
echo "Message should contain an issue reference like 'github.com/user/repo#123'" >&2
exit 1
fi
local issue_ref="${gitserver}/${owner}/${repo}#${issue_number}"
enqueue_task "$issue_ref" "$message"
echo "Task enqueued. The queue daemon will process it when a slot is available."
} }
cmd_logs() { cmd_logs() {
@@ -1401,6 +2013,7 @@ EOF
cmd_start() { cmd_start() {
local issue_ref="" local issue_ref=""
local message="" local message=""
local pr_url=""
local args=("$@") local args=("$@")
args=$(set_debug_mode "${args[@]}") args=$(set_debug_mode "${args[@]}")
@@ -1410,11 +2023,14 @@ cmd_start() {
issue_ref="$arg" issue_ref="$arg"
elif [ -z "$message" ]; then elif [ -z "$message" ]; then
message="$arg" message="$arg"
elif [ -z "$pr_url" ]; then
pr_url="$arg"
fi fi
done done
if [ -z "$issue_ref" ] || [ -z "$message" ]; then if [ -z "$issue_ref" ] || [ -z "$message" ]; then
echo "Error: start requires <issue-ref> and <message>" >&2 echo "Error: start requires <issue-ref> and <message>" >&2
echo "Usage: kugetsu start <issue-ref> <message> [pr-url]" >&2
exit 1 exit 1
fi fi
@@ -1463,7 +2079,10 @@ cmd_start() {
> "$fork_log" > "$fork_log"
local fork_context=$(kugetsu_get_fork_context "$issue_ref") local fork_context=$(kugetsu_get_fork_context "$issue_ref")
local previous_context=$(kugetsu_context_load "$issue_ref")
local branch_name=$(issue_ref_to_branch_name "$issue_ref")
local full_message="${fork_context} local full_message="${fork_context}
${previous_context}
## YOUR TASK ## YOUR TASK
$message" $message"
@@ -1543,11 +2162,30 @@ for row in cursor.fetchall():
" 2>/dev/null || echo " (failed to query DB)" " 2>/dev/null || echo " (failed to query DB)"
fi fi
printf '{"type": "forked", "issue_ref": "%s", "opencode_session_id": "%s", "worktree_path": "%s", "created_at": "%s", "state": "idle"}\n' \ local branch_name=$(issue_ref_to_branch_name "$issue_ref")
"$issue_ref" "$new_session_id" "$worktree_path" "$(date -Iseconds)" > "$SESSIONS_DIR/$session_file"
python3 << PYEOF > "$SESSIONS_DIR/$session_file"
import json
session = {
"type": "forked",
"issue_ref": "$issue_ref",
"opencode_session_id": "$new_session_id",
"worktree_path": "$worktree_path",
"created_at": "$(date -Iseconds)",
"state": "idle",
"branch_name": "$branch_name",
"pr_url": "$pr_url" if "$pr_url" else None
}
with open("$SESSIONS_DIR/$session_file", "w") as f:
json.dump(session, f, indent=2)
PYEOF
add_issue_to_index "$issue_ref" "$session_file" add_issue_to_index "$issue_ref" "$session_file"
kugetsu_context_dump "$issue_ref" "$message" "$branch_name"
echo "Session started for '$issue_ref': $new_session_id" echo "Session started for '$issue_ref': $new_session_id"
echo "Worktree: $worktree_path" echo "Worktree: $worktree_path"
} }
@@ -1595,22 +2233,31 @@ cmd_continue() {
local worktree_path=$(python3 -c "import json; print(json.load(open('$session_path')).get('worktree_path', ''))" 2>/dev/null || echo "") local worktree_path=$(python3 -c "import json; print(json.load(open('$session_path')).get('worktree_path', ''))" 2>/dev/null || echo "")
echo "Continuing session for '$session_name'..." echo "Continuing session for '$session_name'..."
local previous_context=$(kugetsu_context_load "$session_name")
local full_message="${previous_context}
## CONTINUE TASK
$message"
# Note: --continue always allowed (existing sessions don't count toward limit) # Note: --continue always allowed (existing sessions don't count toward limit)
# Wrap in subshell with cd to ensure worktree directory is set correctly in session DB # Wrap in subshell with cd to ensure worktree directory is set correctly in session DB
if [ -n "$worktree_path" ] && [ -d "$worktree_path" ]; then if [ -n "$worktree_path" ] && [ -d "$worktree_path" ]; then
echo "Using worktree: $worktree_path" echo "Using worktree: $worktree_path"
if [ "$DEBUG_MODE" = true ]; then if [ "$DEBUG_MODE" = true ]; then
(cd "$worktree_path" && opencode run "$message" --continue --session "$opencode_session_id" --dir "$worktree_path" 2>&1) | tee "$session_path.debug.log" & (cd "$worktree_path" && opencode run "$full_message" --continue --session "$opencode_session_id" --dir "$worktree_path" 2>&1) | tee "$session_path.debug.log" &
else else
(cd "$worktree_path" && opencode run "$message" --continue --session "$opencode_session_id" --dir "$worktree_path" 2>&1) & (cd "$worktree_path" && opencode run "$full_message" --continue --session "$opencode_session_id" --dir "$worktree_path" 2>&1) &
fi fi
else else
if [ "$DEBUG_MODE" = true ]; then if [ "$DEBUG_MODE" = true ]; then
opencode run "$message" --continue --session "$opencode_session_id" 2>&1 | tee "$session_path.debug.log" & opencode run "$full_message" --continue --session "$opencode_session_id" 2>&1 | tee "$session_path.debug.log" &
else else
opencode run "$message" --continue --session "$opencode_session_id" 2>&1 & opencode run "$full_message" --continue --session "$opencode_session_id" 2>&1 &
fi fi
fi fi
kugetsu_context_update_message "$session_name" "$message"
} }
cmd_list() { cmd_list() {
@@ -1820,6 +2467,25 @@ cmd_destroy() {
remove_issue_from_index "$target" remove_issue_from_index "$target"
echo "Session for '$target' destroyed" echo "Session for '$target' destroyed"
else else
if [ "$WORKTREE_CHECK_PR_STATUS" = "true" ]; then
local pr_url=$(python3 -c "import json; print(json.load(open('$session_path')).get('pr_url', '') or '')" 2>/dev/null || echo "")
if [ -n "$pr_url" ] && [ "$pr_url" != "None" ]; then
echo "Checking PR status at '$pr_url'..."
local pr_status=$(check_pr_status "$pr_url")
if [ "$pr_status" = "open" ]; then
echo "Error: PR is still open at $pr_url" >&2
echo "Use --force to destroy anyway, or close the PR first" >&2
exit 1
elif [ "$pr_status" = "merged" ]; then
echo "PR has been merged. Safe to destroy."
elif [ "$pr_status" = "closed" ]; then
echo "PR has been closed. Safe to destroy."
else
echo "Warning: Could not determine PR status (got: $pr_status). Proceeding anyway." >&2
fi
fi
fi
echo "Delete session and worktree for '$target'? [y/N] " echo "Delete session and worktree for '$target'? [y/N] "
local reply local reply
read reply read reply
@@ -1887,6 +2553,43 @@ main() {
destroy) destroy)
cmd_destroy "$@" cmd_destroy "$@"
;; ;;
set-pr)
local issue_ref="${1:-}"
local pr_url="${2:-}"
if [ -z "$issue_ref" ] || [ -z "$pr_url" ]; then
echo "Usage: kugetsu set-pr <issue-ref> <pr-url>" >&2
echo "Example: kugetsu set-pr github.com/shoko/kugetsu#14 https://github.com/shoko/kugetsu/pulls/123" >&2
exit 1
fi
validate_issue_ref "$issue_ref"
update_session_pr_url "$issue_ref" "$pr_url"
;;
context)
local issue_ref="${1:-}"
if [ -z "$issue_ref" ]; then
echo "Usage: kugetsu context <issue-ref>" >&2
echo "Show context for an issue" >&2
exit 1
fi
validate_issue_ref "$issue_ref"
local context_file=$(issue_ref_to_context_file "$issue_ref")
if [ -f "$context_file" ]; then
cat "$context_file"
else
echo "No context found for '$issue_ref'" >&2
exit 1
fi
;;
queue)
local action="${1:-list}"
shift
cmd_queue "$action" "$@"
;;
queue-daemon)
local action="${1:-status}"
shift
cmd_queue_daemon "$action" "$@"
;;
*) *)
echo "Error: unknown command '$command'" >&2 echo "Error: unknown command '$command'" >&2
usage usage