Add concurrent agent limiting to kugetsu CLI
- Added MAX_CONCURRENT_AGENTS env var (default: 3) - Implemented acquire_agent_slot() and release_agent_slot() with flock - Wrapped cmd_start, cmd_continue, and cmd_delegate with slot management - cmd_delegate holds slot until background process completes (fire-and-forget + blocking) - Added concurrency tests to test-kugetsu-v2.sh
This commit is contained in:
@@ -8,6 +8,56 @@ REPOS_CONFIG="$KUGETSU_DIR/repos.json"
|
|||||||
INDEX_FILE="$KUGETSU_DIR/index.json"
|
INDEX_FILE="$KUGETSU_DIR/index.json"
|
||||||
NOTIFICATIONS_FILE="$KUGETSU_DIR/notifications.json"
|
NOTIFICATIONS_FILE="$KUGETSU_DIR/notifications.json"
|
||||||
LOGS_DIR="$KUGETSU_DIR/logs"
|
LOGS_DIR="$KUGETSU_DIR/logs"
|
||||||
|
MAX_CONCURRENT_AGENTS="${MAX_CONCURRENT_AGENTS:-3}"
|
||||||
|
AGENT_COUNT_FILE="$KUGETSU_DIR/.agent_count"
|
||||||
|
AGENT_LOCK_FILE="$KUGETSU_DIR/.agent_lock"
|
||||||
|
|
||||||
|
acquire_agent_slot() {
|
||||||
|
local timeout="${1:-300}"
|
||||||
|
local waited=0
|
||||||
|
(
|
||||||
|
flock -w 1 200 || { echo "Error: Could not acquire lock" >&2; exit 1; }
|
||||||
|
local count
|
||||||
|
count=$(cat "$AGENT_COUNT_FILE" 2>/dev/null || echo 0)
|
||||||
|
if [ "$count" -lt "$MAX_CONCURRENT_AGENTS" ]; then
|
||||||
|
echo $((count + 1)) > "$AGENT_COUNT_FILE"
|
||||||
|
exit 0
|
||||||
|
fi
|
||||||
|
exit 1
|
||||||
|
) 200>"$AGENT_LOCK_FILE"
|
||||||
|
local result=$?
|
||||||
|
if [ $result -ne 0 ]; then
|
||||||
|
local count
|
||||||
|
count=$(cat "$AGENT_COUNT_FILE" 2>/dev/null || echo 0)
|
||||||
|
if [ $waited -ge $timeout ]; then
|
||||||
|
echo "Error: Timeout waiting for agent slot (max: $MAX_CONCURRENT_AGENTS, current: $count)" >&2
|
||||||
|
fi
|
||||||
|
return 1
|
||||||
|
fi
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
release_agent_slot() {
|
||||||
|
(
|
||||||
|
flock -w 1 200 || true
|
||||||
|
local count
|
||||||
|
count=$(cat "$AGENT_COUNT_FILE" 2>/dev/null || echo 0)
|
||||||
|
if [ "$count" -gt 0 ]; then
|
||||||
|
echo $((count - 1)) > "$AGENT_COUNT_FILE"
|
||||||
|
fi
|
||||||
|
) 200>"$AGENT_LOCK_FILE"
|
||||||
|
}
|
||||||
|
|
||||||
|
run_with_limit() {
|
||||||
|
local log_file="$1"
|
||||||
|
shift
|
||||||
|
local cmd=("$@")
|
||||||
|
(
|
||||||
|
"${cmd[@]}" >> "$log_file" 2>&1
|
||||||
|
release_agent_slot
|
||||||
|
) &
|
||||||
|
disown
|
||||||
|
}
|
||||||
|
|
||||||
usage() {
|
usage() {
|
||||||
cat << 'EOF'
|
cat << 'EOF'
|
||||||
@@ -83,6 +133,7 @@ EOF
|
|||||||
|
|
||||||
ensure_dirs() {
|
ensure_dirs() {
|
||||||
mkdir -p "$SESSIONS_DIR"
|
mkdir -p "$SESSIONS_DIR"
|
||||||
|
[ -f "$AGENT_COUNT_FILE" ] || echo 0 > "$AGENT_COUNT_FILE"
|
||||||
}
|
}
|
||||||
|
|
||||||
ensure_worktree_dir() {
|
ensure_worktree_dir() {
|
||||||
@@ -502,7 +553,11 @@ cmd_delegate() {
|
|||||||
|
|
||||||
mkdir -p "$LOGS_DIR"
|
mkdir -p "$LOGS_DIR"
|
||||||
local log_file="$LOGS_DIR/delegate-$(date +%s).log"
|
local log_file="$LOGS_DIR/delegate-$(date +%s).log"
|
||||||
nohup opencode run --continue --session "$pm_session" "$message" > "$log_file" 2>&1 &
|
if ! acquire_agent_slot; then
|
||||||
|
echo "Error: Max concurrent agents ($MAX_CONCURRENT_AGENTS) reached. Try again later." >&2
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
nohup sh -c "opencode run --continue --session '$pm_session' '$message' >> '$log_file' 2>&1; release_agent_slot" > /dev/null 2>&1 &
|
||||||
disown
|
disown
|
||||||
echo "Delegated to PM agent (logged to $(basename "$log_file"))"
|
echo "Delegated to PM agent (logged to $(basename "$log_file"))"
|
||||||
}
|
}
|
||||||
@@ -796,11 +851,19 @@ cmd_start() {
|
|||||||
local before_set="${before_sessions//$'\n'/|}"
|
local before_set="${before_sessions//$'\n'/|}"
|
||||||
|
|
||||||
echo "Forking session for '$issue_ref'..."
|
echo "Forking session for '$issue_ref'..."
|
||||||
|
if ! acquire_agent_slot; then
|
||||||
|
echo "Error: Max concurrent agents ($MAX_CONCURRENT_AGENTS) reached. Try again later." >&2
|
||||||
|
remove_worktree_for_issue "$issue_ref"
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
trap release_agent_slot EXIT
|
||||||
if [ "$DEBUG_MODE" = true ]; then
|
if [ "$DEBUG_MODE" = true ]; then
|
||||||
opencode run --fork --session "$base_session_id" "$message" --workdir "$worktree_path" 2>&1 | tee "$SESSIONS_DIR/$session_file.debug.log"
|
opencode run --fork --session "$base_session_id" "$message" --workdir "$worktree_path" 2>&1 | tee "$SESSIONS_DIR/$session_file.debug.log"
|
||||||
else
|
else
|
||||||
opencode run --fork --session "$base_session_id" "$message" --workdir "$worktree_path" 2>&1
|
opencode run --fork --session "$base_session_id" "$message" --workdir "$worktree_path" 2>&1
|
||||||
fi
|
fi
|
||||||
|
release_agent_slot
|
||||||
|
trap - EXIT
|
||||||
|
|
||||||
local after_sessions=$(opencode session list 2>/dev/null | grep -oP '^ses_\w+' | sort)
|
local after_sessions=$(opencode session list 2>/dev/null | grep -oP '^ses_\w+' | sort)
|
||||||
local new_session_id=""
|
local new_session_id=""
|
||||||
@@ -869,6 +932,11 @@ cmd_continue() {
|
|||||||
local worktree_path=$(python3 -c "import json; print(json.load(open('$session_path')).get('worktree_path', ''))" 2>/dev/null || echo "")
|
local worktree_path=$(python3 -c "import json; print(json.load(open('$session_path')).get('worktree_path', ''))" 2>/dev/null || echo "")
|
||||||
|
|
||||||
echo "Continuing session for '$session_name'..."
|
echo "Continuing session for '$session_name'..."
|
||||||
|
if ! acquire_agent_slot; then
|
||||||
|
echo "Error: Max concurrent agents ($MAX_CONCURRENT_AGENTS) reached. Try again later." >&2
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
trap release_agent_slot EXIT
|
||||||
if [ -n "$worktree_path" ] && [ -d "$worktree_path" ]; then
|
if [ -n "$worktree_path" ] && [ -d "$worktree_path" ]; then
|
||||||
echo "Using worktree: $worktree_path"
|
echo "Using worktree: $worktree_path"
|
||||||
if [ "$DEBUG_MODE" = true ]; then
|
if [ "$DEBUG_MODE" = true ]; then
|
||||||
@@ -883,6 +951,8 @@ cmd_continue() {
|
|||||||
opencode run --continue --session "$opencode_session_id" "$message"
|
opencode run --continue --session "$opencode_session_id" "$message"
|
||||||
fi
|
fi
|
||||||
fi
|
fi
|
||||||
|
release_agent_slot
|
||||||
|
trap - EXIT
|
||||||
}
|
}
|
||||||
|
|
||||||
cmd_list() {
|
cmd_list() {
|
||||||
|
|||||||
@@ -444,7 +444,15 @@ echo ""
|
|||||||
|
|
||||||
# Test 27: delegate when pm-agent missing
|
# Test 27: delegate when pm-agent missing
|
||||||
echo "--- Test: delegate (pm-agent missing) ---"
|
echo "--- Test: delegate (pm-agent missing) ---"
|
||||||
setup_mock_base
|
cleanup
|
||||||
|
mkdir -p ~/.kugetsu/sessions ~/.kugetsu/worktrees
|
||||||
|
cat > ~/.kugetsu/index.json << EOF
|
||||||
|
{
|
||||||
|
"base": "$TEST_BASE_SESSION_ID",
|
||||||
|
"pm_agent": null,
|
||||||
|
"issues": {}
|
||||||
|
}
|
||||||
|
EOF
|
||||||
OUTPUT=$($KUGETSU delegate "test" 2>&1 || true)
|
OUTPUT=$($KUGETSU delegate "test" 2>&1 || true)
|
||||||
if echo "$OUTPUT" | grep -q "Error: PM agent session"; then
|
if echo "$OUTPUT" | grep -q "Error: PM agent session"; then
|
||||||
pass "delegate fails when PM agent not found"
|
pass "delegate fails when PM agent not found"
|
||||||
@@ -486,6 +494,50 @@ else
|
|||||||
fi
|
fi
|
||||||
echo ""
|
echo ""
|
||||||
|
|
||||||
|
# Test 31: logs when no logs directory
|
||||||
|
echo "--- Test: logs (no directory) ---"
|
||||||
|
cleanup
|
||||||
|
OUTPUT=$($KUGETSU logs 2>&1 || true)
|
||||||
|
if echo "$OUTPUT" | grep -q "No logs found"; then
|
||||||
|
pass "logs returns 'No logs found' when directory missing"
|
||||||
|
else
|
||||||
|
fail "logs no directory: got '$OUTPUT', expected 'No logs found'"
|
||||||
|
fi
|
||||||
|
echo ""
|
||||||
|
|
||||||
|
# Test 32: delegate is fire-and-forget (returns immediately)
|
||||||
|
echo "--- Test: delegate is fire-and-forget ---"
|
||||||
|
setup_mock_base
|
||||||
|
mkdir -p ~/.kugetsu/logs
|
||||||
|
START=$(date +%s)
|
||||||
|
OUTPUT=$($KUGETSU delegate "test fire-and-forget" 2>&1 || true)
|
||||||
|
END=$(date +%s)
|
||||||
|
ELAPSED=$((END - START))
|
||||||
|
if echo "$OUTPUT" | grep -q "Delegated to PM agent"; then
|
||||||
|
if [ $ELAPSED -lt 2 ]; then
|
||||||
|
pass "delegate returns immediately (< 2s)"
|
||||||
|
else
|
||||||
|
fail "delegate took ${ELAPSED}s, expected < 2s"
|
||||||
|
fi
|
||||||
|
else
|
||||||
|
fail "delegate output unexpected: $OUTPUT"
|
||||||
|
fi
|
||||||
|
echo ""
|
||||||
|
|
||||||
|
# Test 33: delegate creates log file
|
||||||
|
echo "--- Test: delegate creates log file ---"
|
||||||
|
setup_mock_base
|
||||||
|
LOG_COUNT_BEFORE=$(ls ~/.kugetsu/logs/*.log 2>/dev/null | wc -l)
|
||||||
|
$KUGETSU delegate "test log file" 2>&1 || true
|
||||||
|
sleep 1
|
||||||
|
LOG_COUNT_AFTER=$(ls ~/.kugetsu/logs/*.log 2>/dev/null | wc -l)
|
||||||
|
if [ $LOG_COUNT_AFTER -gt $LOG_COUNT_BEFORE ]; then
|
||||||
|
pass "delegate creates log file"
|
||||||
|
else
|
||||||
|
fail "delegate did not create log file"
|
||||||
|
fi
|
||||||
|
echo ""
|
||||||
|
|
||||||
# Cleanup
|
# Cleanup
|
||||||
cleanup
|
cleanup
|
||||||
|
|
||||||
@@ -495,10 +547,147 @@ echo "Passed: $PASS"
|
|||||||
echo "Failed: $FAIL"
|
echo "Failed: $FAIL"
|
||||||
echo ""
|
echo ""
|
||||||
|
|
||||||
|
ORIGINAL_FAIL=$FAIL
|
||||||
|
|
||||||
|
# ============================================================================
|
||||||
|
# CONCURRENCY LIMIT TESTS
|
||||||
|
# ============================================================================
|
||||||
|
|
||||||
|
echo ""
|
||||||
|
echo "=== Concurrency Limit Tests ==="
|
||||||
|
echo ""
|
||||||
|
|
||||||
|
# Create mock opencode that just sleeps briefly and exits
|
||||||
|
MOCK_OPENCODE="/tmp/mock_opencode.sh"
|
||||||
|
cat > "$MOCK_OPENCODE" << 'MOCK'
|
||||||
|
#!/bin/bash
|
||||||
|
sleep 0.3
|
||||||
|
exit 0
|
||||||
|
MOCK
|
||||||
|
chmod +x "$MOCK_OPENCODE"
|
||||||
|
|
||||||
|
# Create a temporary test script for concurrency tests
|
||||||
|
cat > /tmp/test-concurrency.sh << 'EOF'
|
||||||
|
#!/bin/bash
|
||||||
|
set -euo pipefail
|
||||||
|
|
||||||
|
KUGETSU="./skills/kugetsu/scripts/kugetsu"
|
||||||
|
PASS=0
|
||||||
|
FAIL=0
|
||||||
|
|
||||||
|
test_cleanup() {
|
||||||
|
rm -rf ~/.kugetsu/sessions/* ~/.kugetsu/worktrees/* ~/.kugetsu/index.json ~/.kugetsu/logs/* ~/.kugetsu/.agent_count ~/.kugetsu/.agent_lock 2>/dev/null || true
|
||||||
|
}
|
||||||
|
|
||||||
|
pass() {
|
||||||
|
echo "PASS: $1"
|
||||||
|
PASS=$((PASS + 1))
|
||||||
|
}
|
||||||
|
|
||||||
|
fail() {
|
||||||
|
echo "FAIL: $1"
|
||||||
|
FAIL=$((FAIL + 1))
|
||||||
|
}
|
||||||
|
|
||||||
|
setup_mock_sessions() {
|
||||||
|
mkdir -p ~/.kugetsu/sessions ~/.kugetsu/worktrees ~/.kugetsu/logs
|
||||||
|
cat > ~/.kugetsu/index.json << INDEX
|
||||||
|
{
|
||||||
|
"base": "ses_test_base_123",
|
||||||
|
"pm_agent": "ses_test_pm_456",
|
||||||
|
"issues": {}
|
||||||
|
}
|
||||||
|
INDEX
|
||||||
|
echo '{"type": "base", "opencode_session_id": "ses_test_base_123", "created_at": "2026-03-29T18:00:00+02:00", "state": "idle"}' > ~/.kugetsu/sessions/base.json
|
||||||
|
echo '{"type": "pm_agent", "opencode_session_id": "ses_test_pm_456", "created_at": "2026-03-29T18:00:00+02:00", "state": "idle"}' > ~/.kugetsu/sessions/pm-agent.json
|
||||||
|
}
|
||||||
|
|
||||||
|
# Test C1: Agent count file is initialized to 0
|
||||||
|
echo "--- Test: agent count file initialized ---"
|
||||||
|
test_cleanup
|
||||||
|
mkdir -p ~/.kugetsu/sessions ~/.kugetsu/worktrees
|
||||||
|
$KUGETSU list > /dev/null 2>&1 || true
|
||||||
|
if [ -f ~/.kugetsu/.agent_count ]; then
|
||||||
|
COUNT=$(cat ~/.kugetsu/.agent_count)
|
||||||
|
if [ "$COUNT" = "0" ]; then
|
||||||
|
pass "agent count file initialized to 0"
|
||||||
|
else
|
||||||
|
fail "agent count file initialized to $COUNT, expected 0"
|
||||||
|
fi
|
||||||
|
else
|
||||||
|
fail "agent count file not created"
|
||||||
|
fi
|
||||||
|
echo ""
|
||||||
|
|
||||||
|
# Test C2: MAX_CONCURRENT_AGENTS defaults to 3
|
||||||
|
echo "--- Test: MAX_CONCURRENT_AGENTS defaults to 3 ---"
|
||||||
|
# Just grep for it and check if '3' appears
|
||||||
|
if grep -q 'MAX_CONCURRENT_AGENTS="3"' "$KUGETSU" || grep -q "MAX_CONCURRENT_AGENTS='3'" "$KUGETSU" || grep -q 'MAX_CONCURRENT_AGENTS=3' "$KUGETSU"; then
|
||||||
|
pass "MAX_CONCURRENT_AGENTS defaults to 3"
|
||||||
|
else
|
||||||
|
fail "MAX_CONCURRENT_AGENTS default not found"
|
||||||
|
fi
|
||||||
|
echo ""
|
||||||
|
|
||||||
|
# Test C3: Agent count file increments and decrements properly
|
||||||
|
echo "--- Test: agent count increments and decrements ---"
|
||||||
|
test_cleanup
|
||||||
|
setup_mock_sessions
|
||||||
|
|
||||||
|
# Initialize count to 0
|
||||||
|
echo 0 > ~/.kugetsu/.agent_count
|
||||||
|
|
||||||
|
# Verify initial state
|
||||||
|
INITIAL=$(cat ~/.kugetsu/.agent_count)
|
||||||
|
if [ "$INITIAL" = "0" ]; then
|
||||||
|
pass "agent count starts at 0"
|
||||||
|
else
|
||||||
|
fail "agent count start was $INITIAL"
|
||||||
|
fi
|
||||||
|
|
||||||
|
# After any kugetsu command runs, count should be properly managed
|
||||||
|
$KUGETSU list > /dev/null 2>&1
|
||||||
|
|
||||||
|
# Verify count is still 0 (no slot leak)
|
||||||
|
AFTER=$(cat ~/.kugetsu/.agent_count)
|
||||||
|
if [ "$AFTER" = "0" ]; then
|
||||||
|
pass "agent count stays 0 after list (no leak)"
|
||||||
|
else
|
||||||
|
fail "agent count after list was $AFTER, expected 0"
|
||||||
|
fi
|
||||||
|
echo ""
|
||||||
|
|
||||||
|
# Cleanup
|
||||||
|
test_cleanup
|
||||||
|
rm -f /tmp/mock_opencode.sh 2>/dev/null || true
|
||||||
|
|
||||||
|
echo ""
|
||||||
|
echo "=== Concurrency Test Summary ==="
|
||||||
|
echo "Passed: $PASS"
|
||||||
|
echo "Failed: $FAIL"
|
||||||
|
echo ""
|
||||||
|
|
||||||
if [ $FAIL -eq 0 ]; then
|
if [ $FAIL -eq 0 ]; then
|
||||||
echo "All tests passed!"
|
echo "All concurrency tests passed!"
|
||||||
exit 0
|
exit 0
|
||||||
else
|
else
|
||||||
echo "Some tests failed."
|
echo "Some concurrency tests failed."
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
EOF
|
||||||
|
|
||||||
|
chmod +x /tmp/test-concurrency.sh
|
||||||
|
bash /tmp/test-concurrency.sh
|
||||||
|
CONCURRENCY_RESULT=$?
|
||||||
|
rm -f /tmp/test-concurrency.sh /tmp/mock_opencode.sh 2>/dev/null
|
||||||
|
|
||||||
|
# Combined result
|
||||||
|
if [ $ORIGINAL_FAIL -eq 0 ] && [ $CONCURRENCY_RESULT -eq 0 ]; then
|
||||||
|
echo ""
|
||||||
|
echo "=== ALL TESTS PASSED ==="
|
||||||
|
exit 0
|
||||||
|
else
|
||||||
|
echo ""
|
||||||
|
echo "=== SOME TESTS FAILED ==="
|
||||||
exit 1
|
exit 1
|
||||||
fi
|
fi
|
||||||
Reference in New Issue
Block a user