Compare commits
7 Commits
fix/issue-
...
fix/issue-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
dc9d4d7327 | ||
|
|
bdcb7a476c | ||
| 77cf817568 | |||
| 77b0963fa4 | |||
|
|
5a0a54898b | ||
|
|
b1028a6556 | ||
|
|
270219873f |
@@ -146,3 +146,95 @@ filename_to_issue_ref() {
|
|||||||
local name="${filename%.json}"
|
local name="${filename%.json}"
|
||||||
echo "$name" | sed 's-\([0-9]*\)$-#\1-' | sed 's/-/\//g'
|
echo "$name" | sed 's-\([0-9]*\)$-#\1-' | sed 's/-/\//g'
|
||||||
}
|
}
|
||||||
|
|
||||||
|
# Add notification to notifications file
|
||||||
|
kugetsu_add_notification() {
|
||||||
|
local type="$1"
|
||||||
|
local message="$2"
|
||||||
|
local issue_ref="${3:-}"
|
||||||
|
local gitea_url="${4:-}"
|
||||||
|
|
||||||
|
mkdir -p "$(dirname "$NOTIFICATIONS_FILE")"
|
||||||
|
|
||||||
|
python3 << PYEOF
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
notification = {
|
||||||
|
"type": "$type",
|
||||||
|
"message": "$message",
|
||||||
|
"issue_ref": "$issue_ref" if "$issue_ref" else None,
|
||||||
|
"gitea_url": "$gitea_url" if "$gitea_url" else None,
|
||||||
|
"timestamp": datetime.now().isoformat(),
|
||||||
|
"read": False
|
||||||
|
}
|
||||||
|
|
||||||
|
file_path = os.path.expanduser("$NOTIFICATIONS_FILE")
|
||||||
|
notifications = []
|
||||||
|
|
||||||
|
if os.path.exists(file_path):
|
||||||
|
try:
|
||||||
|
with open(file_path, 'r') as f:
|
||||||
|
notifications = json.load(f)
|
||||||
|
except:
|
||||||
|
notifications = []
|
||||||
|
|
||||||
|
notifications.append(notification)
|
||||||
|
|
||||||
|
with open(file_path, 'w') as f:
|
||||||
|
json.dump(notifications, f, indent=2)
|
||||||
|
|
||||||
|
print("Notification added")
|
||||||
|
PYEOF
|
||||||
|
}
|
||||||
|
|
||||||
|
# Update queue item state
|
||||||
|
update_queue_item_state() {
|
||||||
|
local queue_id="$1"
|
||||||
|
local new_state="$2"
|
||||||
|
local session_id="${3:-}"
|
||||||
|
local pid="${4:-}"
|
||||||
|
|
||||||
|
local item_file="$QUEUE_ITEMS_DIR/${queue_id}.json"
|
||||||
|
if [ ! -f "$item_file" ]; then
|
||||||
|
echo "Error: Queue item not found: $queue_id" >&2
|
||||||
|
return 1
|
||||||
|
fi
|
||||||
|
|
||||||
|
python3 << PYEOF
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
item_file = "$item_file"
|
||||||
|
new_state = "$new_state"
|
||||||
|
session_id = "$session_id"
|
||||||
|
pid = "$pid"
|
||||||
|
|
||||||
|
with open(item_file, 'r') as f:
|
||||||
|
item = json.load(f)
|
||||||
|
|
||||||
|
issue_ref = item.get('issue_ref', '')
|
||||||
|
|
||||||
|
item['state'] = new_state
|
||||||
|
|
||||||
|
if new_state == "notified":
|
||||||
|
item['notified_at'] = datetime.now().isoformat() + "Z"
|
||||||
|
if session_id:
|
||||||
|
item['opencode_session_id'] = session_id
|
||||||
|
if pid:
|
||||||
|
item['pid'] = int(pid) if pid.isdigit() else None
|
||||||
|
elif new_state == "completed":
|
||||||
|
item['completed_at'] = datetime.now().isoformat() + "Z"
|
||||||
|
os.system(f"kugetsu_add_notification 'task_completed' 'Task completed: {issue_ref}' '{issue_ref}'")
|
||||||
|
elif new_state == "error":
|
||||||
|
item['error'] = datetime.now().isoformat() + "Z"
|
||||||
|
os.system(f"kugetsu_add_notification 'task_error' 'Task error: {issue_ref}' '{issue_ref}'")
|
||||||
|
|
||||||
|
with open(item_file, 'w') as f:
|
||||||
|
json.dump(item, f, indent=2)
|
||||||
|
|
||||||
|
print(f"Updated $queue_id to state: $new_state")
|
||||||
|
PYEOF
|
||||||
|
}
|
||||||
|
|||||||
@@ -10,81 +10,146 @@ source "$SCRIPT_DIR/kugetsu-log.sh"
|
|||||||
|
|
||||||
load_agent_env "pm-agent"
|
load_agent_env "pm-agent"
|
||||||
|
|
||||||
# Check if a notified task has completed (forked session ended or has new commits)
|
acquire_lock() {
|
||||||
|
local issue_ref="$1"
|
||||||
|
local lock_file="$QUEUE_DIR/locks/$(echo "$issue_ref" | sed 's/[\/:]/-/g' | sed 's/#/-/').lock"
|
||||||
|
mkdir -p "$(dirname "$lock_file")"
|
||||||
|
if [ -f "$lock_file" ]; then
|
||||||
|
local pid=$(cat "$lock_file" 2>/dev/null || echo "")
|
||||||
|
if [ -n "$pid" ] && kill -0 "$pid" 2>/dev/null; then
|
||||||
|
return 1
|
||||||
|
fi
|
||||||
|
rm -f "$lock_file"
|
||||||
|
fi
|
||||||
|
echo $$ > "$lock_file"
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
release_lock() {
|
||||||
|
local issue_ref="$1"
|
||||||
|
local lock_file="$QUEUE_DIR/locks/$(echo "$issue_ref" | sed 's/[\/:]/-/g' | sed 's/#/-/').lock"
|
||||||
|
rm -f "$lock_file"
|
||||||
|
}
|
||||||
|
|
||||||
check_task_completion() {
|
check_task_completion() {
|
||||||
local item="$1"
|
local item="$1"
|
||||||
local queue_id=$(basename "$item" .json)
|
local queue_id=$(basename "$item" .json)
|
||||||
local state=$(python3 -c "import json; print(json.load(open('$item')).get('state', ''))" 2>/dev/null)
|
local state=$(python3 -c "import json; print(json.load(open('$item')).get('state', ''))" 2>/dev/null)
|
||||||
|
|
||||||
[ "$state" = "notified" ] || return 0
|
[ "$state" = "notified" ] || return 0
|
||||||
|
|
||||||
# Use opencode_session_id (the forked session, not the parent pm_session)
|
|
||||||
local session_id=$(python3 -c "import json; print(json.load(open('$item')).get('opencode_session_id', ''))" 2>/dev/null)
|
local session_id=$(python3 -c "import json; print(json.load(open('$item')).get('opencode_session_id', ''))" 2>/dev/null)
|
||||||
local issue_ref=$(python3 -c "import json; print(json.load(open('$item')).get('issue_ref', ''))" 2>/dev/null)
|
local issue_ref=$(python3 -c "import json; print(json.load(open('$item')).get('issue_ref', ''))" 2>/dev/null)
|
||||||
|
local pid=$(python3 -c "import json; print(json.load(open('$item')).get('pid', ''))" 2>/dev/null)
|
||||||
# If no session tracked, skip
|
|
||||||
[ -n "$session_id" ] || return 0
|
if [ -n "$pid" ] && [ "$pid" != "None" ]; then
|
||||||
|
if ! kill -0 "$pid" 2>/dev/null; then
|
||||||
# Check if forked session still exists in opencode
|
local worktree_path=$(issue_ref_to_worktree_path "$issue_ref" "$HOME/.kugetsu-worktrees")
|
||||||
if ! opencode session list 2>/dev/null | grep -q "$session_id"; then
|
local has_commits=false
|
||||||
# Forked session ended — check if work was done
|
|
||||||
local worktree_path=$(issue_ref_to_worktree_path "$issue_ref" "$HOME/.kugetsu-worktrees")
|
if [ -d "$worktree_path" ] && [ -d "$worktree_path/.git" ]; then
|
||||||
local has_commits=false
|
if [ -n "$(git -C "$worktree_path" log --oneline origin/main..HEAD 2>/dev/null)" ]; then
|
||||||
|
has_commits=true
|
||||||
if [ -d "$worktree_path" ] && [ -d "$worktree_path/.git" ]; then
|
fi
|
||||||
# Check if worktree has new commits beyond origin/main
|
|
||||||
if [ -n "$(git -C "$worktree_path" log --oneline origin/main..HEAD 2>/dev/null)" ]; then
|
|
||||||
has_commits=true
|
|
||||||
fi
|
fi
|
||||||
|
|
||||||
|
if [ "$has_commits" = true ]; then
|
||||||
|
update_queue_item_state "$queue_id" "completed"
|
||||||
|
echo "Task $queue_id ($issue_ref) completed — new commits found"
|
||||||
|
else
|
||||||
|
update_queue_item_state "$queue_id" "error"
|
||||||
|
echo "Task $queue_id ($issue_ref) marked error — no commits found after session ended"
|
||||||
|
fi
|
||||||
|
release_lock "$issue_ref"
|
||||||
fi
|
fi
|
||||||
|
else
|
||||||
if [ "$has_commits" = true ]; then
|
if [ -n "$session_id" ] && ! opencode session list 2>/dev/null | grep -q "$session_id"; then
|
||||||
update_queue_item_state "$queue_id" "completed"
|
local worktree_path=$(issue_ref_to_worktree_path "$issue_ref" "$HOME/.kugetsu-worktrees")
|
||||||
echo "Task $queue_id ($issue_ref) completed — new commits found"
|
local has_commits=false
|
||||||
else
|
|
||||||
update_queue_item_state "$queue_id" "error"
|
if [ -d "$worktree_path" ] && [ -d "$worktree_path/.git" ]; then
|
||||||
echo "Task $queue_id ($issue_ref) marked error — no commits found after session ended"
|
if [ -n "$(git -C "$worktree_path" log --oneline origin/main..HEAD 2>/dev/null)" ]; then
|
||||||
|
has_commits=true
|
||||||
|
fi
|
||||||
|
fi
|
||||||
|
|
||||||
|
if [ "$has_commits" = true ]; then
|
||||||
|
update_queue_item_state "$queue_id" "completed"
|
||||||
|
echo "Task $queue_id ($issue_ref) completed — new commits found"
|
||||||
|
else
|
||||||
|
update_queue_item_state "$queue_id" "error"
|
||||||
|
echo "Task $queue_id ($issue_ref) marked error — no commits found after session ended"
|
||||||
|
fi
|
||||||
|
release_lock "$issue_ref"
|
||||||
fi
|
fi
|
||||||
fi
|
fi
|
||||||
}
|
}
|
||||||
|
|
||||||
|
get_session_id_for_issue() {
|
||||||
|
local issue_ref="$1"
|
||||||
|
local session_file=$(issue_ref_to_filename "$issue_ref")
|
||||||
|
local session_path="$SESSIONS_DIR/$session_file"
|
||||||
|
if [ -f "$session_path" ]; then
|
||||||
|
python3 -c "import json; print(json.load(open('$session_path')).get('opencode_session_id', ''))" 2>/dev/null || echo ""
|
||||||
|
else
|
||||||
|
echo ""
|
||||||
|
fi
|
||||||
|
}
|
||||||
|
|
||||||
|
process_task() {
|
||||||
|
local item="$1"
|
||||||
|
local queue_id=$(basename "$item" .json)
|
||||||
|
local issue_ref=$(python3 -c "import json; print(json.load(open('$item')).get('issue_ref', ''))" 2>/dev/null)
|
||||||
|
local message=$(python3 -c "import json; print(json.load(open('$item')).get('message', ''))" 2>/dev/null)
|
||||||
|
|
||||||
|
if ! acquire_lock "$issue_ref"; then
|
||||||
|
echo "Task $queue_id ($issue_ref) skipped — another process is handling it"
|
||||||
|
return
|
||||||
|
fi
|
||||||
|
|
||||||
|
source "$SCRIPT_DIR/kugetsu-session.sh"
|
||||||
|
|
||||||
|
if worktree_exists "$issue_ref" "$HOME/.kugetsu-worktrees" || [ -f "$SESSIONS_DIR/$(issue_ref_to_filename "$issue_ref").json" ]; then
|
||||||
|
log_file="$LOGS_DIR/delegate-$(date +%s).log"
|
||||||
|
if cmd_continue "$issue_ref" "$message" >> "$log_file" 2>&1; then
|
||||||
|
sleep 1
|
||||||
|
local session_id=$(get_session_id_for_issue "$issue_ref")
|
||||||
|
update_queue_item_state "$queue_id" "notified" "$session_id" ""
|
||||||
|
echo "Task $queue_id continued for $issue_ref"
|
||||||
|
else
|
||||||
|
update_queue_item_state "$queue_id" "error"
|
||||||
|
echo "Task $queue_id ($issue_ref) failed to continue"
|
||||||
|
fi
|
||||||
|
else
|
||||||
|
log_file="$LOGS_DIR/delegate-$(date +%s).log"
|
||||||
|
if cmd_start "$issue_ref" "$message" >> "$log_file" 2>&1; then
|
||||||
|
sleep 1
|
||||||
|
local session_id=$(get_session_id_for_issue "$issue_ref")
|
||||||
|
update_queue_item_state "$queue_id" "notified" "$session_id" ""
|
||||||
|
echo "Task $queue_id started for $issue_ref"
|
||||||
|
else
|
||||||
|
update_queue_item_state "$queue_id" "error"
|
||||||
|
echo "Task $queue_id ($issue_ref) failed to start"
|
||||||
|
fi
|
||||||
|
fi
|
||||||
|
|
||||||
|
release_lock "$issue_ref"
|
||||||
|
}
|
||||||
|
|
||||||
while true; do
|
while true; do
|
||||||
# Check completion of notified tasks
|
|
||||||
if [ -d "$QUEUE_ITEMS_DIR" ]; then
|
if [ -d "$QUEUE_ITEMS_DIR" ]; then
|
||||||
for item in "$QUEUE_ITEMS_DIR"/*.json; do
|
for item in "$QUEUE_ITEMS_DIR"/*.json; do
|
||||||
[ -f "$item" ] || continue
|
[ -f "$item" ] || continue
|
||||||
check_task_completion "$item"
|
check_task_completion "$item"
|
||||||
done
|
done
|
||||||
|
|
||||||
# Process pending tasks
|
|
||||||
for item in "$QUEUE_ITEMS_DIR"/*.json; do
|
for item in "$QUEUE_ITEMS_DIR"/*.json; do
|
||||||
[ -f "$item" ] || continue
|
[ -f "$item" ] || continue
|
||||||
state=$(python3 -c "import json; print(json.load(open('$item')).get('state', ''))" 2>/dev/null)
|
state=$(python3 -c "import json; print(json.load(open('$item')).get('state', ''))" 2>/dev/null)
|
||||||
if [ "$state" = "pending" ]; then
|
if [ "$state" = "pending" ]; then
|
||||||
queue_id=$(basename "$item" .json)
|
process_task "$item"
|
||||||
issue_ref=$(python3 -c "import json; print(json.load(open('$item')).get('issue_ref', ''))" 2>/dev/null)
|
|
||||||
message=$(python3 -c "import json; print(json.load(open('$item')).get('message', ''))" 2>/dev/null)
|
|
||||||
|
|
||||||
# Source session management and use cmd_start/cmd_continue
|
|
||||||
source "$SCRIPT_DIR/kugetsu-session.sh"
|
|
||||||
|
|
||||||
if worktree_exists "$issue_ref" "$HOME/.kugetsu-worktrees" || [ -f "$SESSIONS_DIR/$(issue_ref_to_filename "$issue_ref").json" ]; then
|
|
||||||
# Continue existing session
|
|
||||||
log_file="$LOGS_DIR/delegate-$(date +%s).log"
|
|
||||||
cmd_continue "$issue_ref" "$message" >> "$log_file" 2>&1 &
|
|
||||||
pid=$!
|
|
||||||
update_queue_item_state "$queue_id" "notified" "" "$pid"
|
|
||||||
echo "Task $queue_id continued for $issue_ref"
|
|
||||||
else
|
|
||||||
# Start new session
|
|
||||||
log_file="$LOGS_DIR/delegate-$(date +%s).log"
|
|
||||||
cmd_start "$issue_ref" "$message" >> "$log_file" 2>&1 &
|
|
||||||
pid=$!
|
|
||||||
update_queue_item_state "$queue_id" "notified" "" "$pid"
|
|
||||||
echo "Task $queue_id started for $issue_ref"
|
|
||||||
fi
|
|
||||||
fi
|
fi
|
||||||
done
|
done
|
||||||
fi
|
fi
|
||||||
sleep "${QUEUE_DAEMON_INTERVAL_MINUTES:-5}m"
|
sleep "${QUEUE_DAEMON_INTERVAL_MINUTES:-5}m"
|
||||||
done
|
done
|
||||||
@@ -1,6 +1,13 @@
|
|||||||
#!/bin/bash
|
#!/bin/bash
|
||||||
set -euo pipefail
|
set -euo pipefail
|
||||||
|
|
||||||
|
# Source required modules for session management functions
|
||||||
|
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||||
|
source "$SCRIPT_DIR/kugetsu-config.sh"
|
||||||
|
source "$SCRIPT_DIR/kugetsu-index.sh"
|
||||||
|
source "$SCRIPT_DIR/kugetsu-worktree.sh"
|
||||||
|
source "$SCRIPT_DIR/kugetsu-log.sh"
|
||||||
|
|
||||||
count_active_dev_sessions() {
|
count_active_dev_sessions() {
|
||||||
local count=0
|
local count=0
|
||||||
if [ -d "$SESSIONS_DIR" ]; then
|
if [ -d "$SESSIONS_DIR" ]; then
|
||||||
@@ -153,10 +160,12 @@ cmd_delegate() {
|
|||||||
local issue_ref=$(extract_issue_ref_from_message "$message")
|
local issue_ref=$(extract_issue_ref_from_message "$message")
|
||||||
|
|
||||||
if [ -n "$issue_ref" ] && [[ "$issue_ref" =~ \#[0-9]+$ ]]; then
|
if [ -n "$issue_ref" ] && [[ "$issue_ref" =~ \#[0-9]+$ ]]; then
|
||||||
cmd_start "$issue_ref" "$message"
|
# Enqueue for daemon to process via cmd_start/cmd_continue
|
||||||
|
enqueue_task "$issue_ref" "$message"
|
||||||
return
|
return
|
||||||
fi
|
fi
|
||||||
|
|
||||||
|
# No issue ref detected — delegate directly to PM agent (legacy path)
|
||||||
local pm_session=$(get_pm_agent_session_id)
|
local pm_session=$(get_pm_agent_session_id)
|
||||||
if [ -z "$pm_session" ] || [ "$pm_session" = "null" ] || [ "$pm_session" = "None" ]; then
|
if [ -z "$pm_session" ] || [ "$pm_session" = "null" ] || [ "$pm_session" = "None" ]; then
|
||||||
echo "Error: PM agent session not found. Run 'kugetsu init' first." >&2
|
echo "Error: PM agent session not found. Run 'kugetsu init' first." >&2
|
||||||
@@ -165,7 +174,7 @@ cmd_delegate() {
|
|||||||
|
|
||||||
mkdir -p "$LOGS_DIR"
|
mkdir -p "$LOGS_DIR"
|
||||||
local log_file="$LOGS_DIR/delegate-$(date +%s).log"
|
local log_file="$LOGS_DIR/delegate-$(date +%s).log"
|
||||||
nohup sh -c "GITEA_TOKEN='${GITEA_TOKEN:-}' opencode run '$message' --continue --session '$pm_session' >> '$log_file' 2>&1" > /dev/null 2>&1 &
|
nohup sh -c "GITEA_TOKEN='***' opencode run '$message' --continue --session '$pm_session' >> '$log_file' 2>&1" > /dev/null 2>&1 &
|
||||||
disown
|
disown
|
||||||
echo "Delegated to PM agent (logged to $(basename "$log_file"))"
|
echo "Delegated to PM agent (logged to $(basename "$log_file"))"
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user