fix(kugetsu): queue daemon improvements - locking, error handling, cmd_delegate enqueue #164
@@ -146,3 +146,95 @@ filename_to_issue_ref() {
|
|||||||
local name="${filename%.json}"
|
local name="${filename%.json}"
|
||||||
echo "$name" | sed 's-\([0-9]*\)$-#\1-' | sed 's/-/\//g'
|
echo "$name" | sed 's-\([0-9]*\)$-#\1-' | sed 's/-/\//g'
|
||||||
}
|
}
|
||||||
|
|
||||||
|
# Add notification to notifications file
|
||||||
|
kugetsu_add_notification() {
|
||||||
|
local type="$1"
|
||||||
|
local message="$2"
|
||||||
|
local issue_ref="${3:-}"
|
||||||
|
local gitea_url="${4:-}"
|
||||||
|
|
||||||
|
mkdir -p "$(dirname "$NOTIFICATIONS_FILE")"
|
||||||
|
|
||||||
|
python3 << PYEOF
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
notification = {
|
||||||
|
"type": "$type",
|
||||||
|
"message": "$message",
|
||||||
|
"issue_ref": "$issue_ref" if "$issue_ref" else None,
|
||||||
|
"gitea_url": "$gitea_url" if "$gitea_url" else None,
|
||||||
|
"timestamp": datetime.now().isoformat(),
|
||||||
|
"read": False
|
||||||
|
}
|
||||||
|
|
||||||
|
file_path = os.path.expanduser("$NOTIFICATIONS_FILE")
|
||||||
|
notifications = []
|
||||||
|
|
||||||
|
if os.path.exists(file_path):
|
||||||
|
try:
|
||||||
|
with open(file_path, 'r') as f:
|
||||||
|
notifications = json.load(f)
|
||||||
|
except:
|
||||||
|
notifications = []
|
||||||
|
|
||||||
|
notifications.append(notification)
|
||||||
|
|
||||||
|
with open(file_path, 'w') as f:
|
||||||
|
json.dump(notifications, f, indent=2)
|
||||||
|
|
||||||
|
print("Notification added")
|
||||||
|
PYEOF
|
||||||
|
}
|
||||||
|
|
||||||
|
# Update queue item state
|
||||||
|
update_queue_item_state() {
|
||||||
|
local queue_id="$1"
|
||||||
|
local new_state="$2"
|
||||||
|
local session_id="${3:-}"
|
||||||
|
local pid="${4:-}"
|
||||||
|
|
||||||
|
local item_file="$QUEUE_ITEMS_DIR/${queue_id}.json"
|
||||||
|
if [ ! -f "$item_file" ]; then
|
||||||
|
echo "Error: Queue item not found: $queue_id" >&2
|
||||||
|
return 1
|
||||||
|
fi
|
||||||
|
|
||||||
|
python3 << PYEOF
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
item_file = "$item_file"
|
||||||
|
new_state = "$new_state"
|
||||||
|
session_id = "$session_id"
|
||||||
|
pid = "$pid"
|
||||||
|
|
||||||
|
with open(item_file, 'r') as f:
|
||||||
|
item = json.load(f)
|
||||||
|
|
||||||
|
issue_ref = item.get('issue_ref', '')
|
||||||
|
|
||||||
|
item['state'] = new_state
|
||||||
|
|
||||||
|
if new_state == "notified":
|
||||||
|
item['notified_at'] = datetime.now().isoformat() + "Z"
|
||||||
|
if session_id:
|
||||||
|
item['opencode_session_id'] = session_id
|
||||||
|
if pid:
|
||||||
|
item['pid'] = int(pid) if pid.isdigit() else None
|
||||||
|
elif new_state == "completed":
|
||||||
|
item['completed_at'] = datetime.now().isoformat() + "Z"
|
||||||
|
os.system(f"kugetsu_add_notification 'task_completed' 'Task completed: {issue_ref}' '{issue_ref}'")
|
||||||
|
elif new_state == "error":
|
||||||
|
item['error'] = datetime.now().isoformat() + "Z"
|
||||||
|
os.system(f"kugetsu_add_notification 'task_error' 'Task error: {issue_ref}' '{issue_ref}'")
|
||||||
|
|
||||||
|
with open(item_file, 'w') as f:
|
||||||
|
json.dump(item, f, indent=2)
|
||||||
|
|
||||||
|
print(f"Updated $queue_id to state: $new_state")
|
||||||
|
PYEOF
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user