Add concurrent agent limiting to kugetsu CLI
- Add MAX_CONCURRENT_AGENTS (default: 3) to limit concurrent agents - Implement acquire_agent_slot() and release_agent_slot() with flock - Wrap cmd_start, cmd_continue, and cmd_delegate with slot management - cmd_delegate holds slot until background task completes (fire-and-forget + blocking) - Add basic concurrency tests to test suite
This commit is contained in:
@@ -8,6 +8,56 @@ REPOS_CONFIG="$KUGETSU_DIR/repos.json"
|
||||
INDEX_FILE="$KUGETSU_DIR/index.json"
|
||||
NOTIFICATIONS_FILE="$KUGETSU_DIR/notifications.json"
|
||||
LOGS_DIR="$KUGETSU_DIR/logs"
|
||||
MAX_CONCURRENT_AGENTS="${MAX_CONCURRENT_AGENTS:-3}"
|
||||
AGENT_COUNT_FILE="$KUGETSU_DIR/.agent_count"
|
||||
AGENT_LOCK_FILE="$KUGETSU_DIR/.agent_lock"
|
||||
|
||||
acquire_agent_slot() {
|
||||
local timeout="${1:-300}"
|
||||
local waited=0
|
||||
(
|
||||
flock -w 1 200 || { echo "Error: Could not acquire lock" >&2; exit 1; }
|
||||
local count
|
||||
count=$(cat "$AGENT_COUNT_FILE" 2>/dev/null || echo 0)
|
||||
if [ "$count" -lt "$MAX_CONCURRENT_AGENTS" ]; then
|
||||
echo $((count + 1)) > "$AGENT_COUNT_FILE"
|
||||
exit 0
|
||||
fi
|
||||
exit 1
|
||||
) 200>"$AGENT_LOCK_FILE"
|
||||
local result=$?
|
||||
if [ $result -ne 0 ]; then
|
||||
local count
|
||||
count=$(cat "$AGENT_COUNT_FILE" 2>/dev/null || echo 0)
|
||||
if [ $waited -ge $timeout ]; then
|
||||
echo "Error: Timeout waiting for agent slot (max: $MAX_CONCURRENT_AGENTS, current: $count)" >&2
|
||||
fi
|
||||
return 1
|
||||
fi
|
||||
return 0
|
||||
}
|
||||
|
||||
release_agent_slot() {
|
||||
(
|
||||
flock -w 1 200 || true
|
||||
local count
|
||||
count=$(cat "$AGENT_COUNT_FILE" 2>/dev/null || echo 0)
|
||||
if [ "$count" -gt 0 ]; then
|
||||
echo $((count - 1)) > "$AGENT_COUNT_FILE"
|
||||
fi
|
||||
) 200>"$AGENT_LOCK_FILE"
|
||||
}
|
||||
|
||||
run_with_limit() {
|
||||
local log_file="$1"
|
||||
shift
|
||||
local cmd=("$@")
|
||||
(
|
||||
"${cmd[@]}" >> "$log_file" 2>&1
|
||||
release_agent_slot
|
||||
) &
|
||||
disown
|
||||
}
|
||||
|
||||
usage() {
|
||||
cat << 'EOF'
|
||||
@@ -83,6 +133,7 @@ EOF
|
||||
|
||||
ensure_dirs() {
|
||||
mkdir -p "$SESSIONS_DIR"
|
||||
[ -f "$AGENT_COUNT_FILE" ] || echo 0 > "$AGENT_COUNT_FILE"
|
||||
}
|
||||
|
||||
ensure_worktree_dir() {
|
||||
@@ -502,7 +553,11 @@ cmd_delegate() {
|
||||
|
||||
mkdir -p "$LOGS_DIR"
|
||||
local log_file="$LOGS_DIR/delegate-$(date +%s).log"
|
||||
nohup opencode run --continue --session "$pm_session" "$message" > "$log_file" 2>&1 &
|
||||
if ! acquire_agent_slot; then
|
||||
echo "Error: Max concurrent agents ($MAX_CONCURRENT_AGENTS) reached. Try again later." >&2
|
||||
exit 1
|
||||
fi
|
||||
nohup sh -c "opencode run --continue --session '$pm_session' '$message' >> '$log_file' 2>&1; release_agent_slot" > /dev/null 2>&1 &
|
||||
disown
|
||||
echo "Delegated to PM agent (logged to $(basename "$log_file"))"
|
||||
}
|
||||
@@ -796,11 +851,19 @@ cmd_start() {
|
||||
local before_set="${before_sessions//$'\n'/|}"
|
||||
|
||||
echo "Forking session for '$issue_ref'..."
|
||||
if ! acquire_agent_slot; then
|
||||
echo "Error: Max concurrent agents ($MAX_CONCURRENT_AGENTS) reached. Try again later." >&2
|
||||
remove_worktree_for_issue "$issue_ref"
|
||||
exit 1
|
||||
fi
|
||||
trap release_agent_slot EXIT
|
||||
if [ "$DEBUG_MODE" = true ]; then
|
||||
opencode run --fork --session "$base_session_id" "$message" --workdir "$worktree_path" 2>&1 | tee "$SESSIONS_DIR/$session_file.debug.log"
|
||||
else
|
||||
opencode run --fork --session "$base_session_id" "$message" --workdir "$worktree_path" 2>&1
|
||||
fi
|
||||
release_agent_slot
|
||||
trap - EXIT
|
||||
|
||||
local after_sessions=$(opencode session list 2>/dev/null | grep -oP '^ses_\w+' | sort)
|
||||
local new_session_id=""
|
||||
@@ -869,6 +932,11 @@ cmd_continue() {
|
||||
local worktree_path=$(python3 -c "import json; print(json.load(open('$session_path')).get('worktree_path', ''))" 2>/dev/null || echo "")
|
||||
|
||||
echo "Continuing session for '$session_name'..."
|
||||
if ! acquire_agent_slot; then
|
||||
echo "Error: Max concurrent agents ($MAX_CONCURRENT_AGENTS) reached. Try again later." >&2
|
||||
exit 1
|
||||
fi
|
||||
trap release_agent_slot EXIT
|
||||
if [ -n "$worktree_path" ] && [ -d "$worktree_path" ]; then
|
||||
echo "Using worktree: $worktree_path"
|
||||
if [ "$DEBUG_MODE" = true ]; then
|
||||
@@ -883,6 +951,8 @@ cmd_continue() {
|
||||
opencode run --continue --session "$opencode_session_id" "$message"
|
||||
fi
|
||||
fi
|
||||
release_agent_slot
|
||||
trap - EXIT
|
||||
}
|
||||
|
||||
cmd_list() {
|
||||
|
||||
@@ -444,7 +444,15 @@ echo ""
|
||||
|
||||
# Test 27: delegate when pm-agent missing
|
||||
echo "--- Test: delegate (pm-agent missing) ---"
|
||||
setup_mock_base
|
||||
cleanup
|
||||
mkdir -p ~/.kugetsu/sessions ~/.kugetsu/worktrees
|
||||
cat > ~/.kugetsu/index.json << EOF
|
||||
{
|
||||
"base": "$TEST_BASE_SESSION_ID",
|
||||
"pm_agent": null,
|
||||
"issues": {}
|
||||
}
|
||||
EOF
|
||||
OUTPUT=$($KUGETSU delegate "test" 2>&1 || true)
|
||||
if echo "$OUTPUT" | grep -q "Error: PM agent session"; then
|
||||
pass "delegate fails when PM agent not found"
|
||||
@@ -486,6 +494,50 @@ else
|
||||
fi
|
||||
echo ""
|
||||
|
||||
# Test 31: logs when no logs directory
|
||||
echo "--- Test: logs (no directory) ---"
|
||||
cleanup
|
||||
OUTPUT=$($KUGETSU logs 2>&1 || true)
|
||||
if echo "$OUTPUT" | grep -q "No logs found"; then
|
||||
pass "logs returns 'No logs found' when directory missing"
|
||||
else
|
||||
fail "logs no directory: got '$OUTPUT', expected 'No logs found'"
|
||||
fi
|
||||
echo ""
|
||||
|
||||
# Test 32: delegate is fire-and-forget (returns immediately)
|
||||
echo "--- Test: delegate is fire-and-forget ---"
|
||||
setup_mock_base
|
||||
mkdir -p ~/.kugetsu/logs
|
||||
START=$(date +%s)
|
||||
OUTPUT=$($KUGETSU delegate "test fire-and-forget" 2>&1 || true)
|
||||
END=$(date +%s)
|
||||
ELAPSED=$((END - START))
|
||||
if echo "$OUTPUT" | grep -q "Delegated to PM agent"; then
|
||||
if [ $ELAPSED -lt 2 ]; then
|
||||
pass "delegate returns immediately (< 2s)"
|
||||
else
|
||||
fail "delegate took ${ELAPSED}s, expected < 2s"
|
||||
fi
|
||||
else
|
||||
fail "delegate output unexpected: $OUTPUT"
|
||||
fi
|
||||
echo ""
|
||||
|
||||
# Test 33: delegate creates log file
|
||||
echo "--- Test: delegate creates log file ---"
|
||||
setup_mock_base
|
||||
LOG_COUNT_BEFORE=$(ls ~/.kugetsu/logs/*.log 2>/dev/null | wc -l)
|
||||
$KUGETSU delegate "test log file" 2>&1 || true
|
||||
sleep 1
|
||||
LOG_COUNT_AFTER=$(ls ~/.kugetsu/logs/*.log 2>/dev/null | wc -l)
|
||||
if [ $LOG_COUNT_AFTER -gt $LOG_COUNT_BEFORE ]; then
|
||||
pass "delegate creates log file"
|
||||
else
|
||||
fail "delegate did not create log file"
|
||||
fi
|
||||
echo ""
|
||||
|
||||
# Cleanup
|
||||
cleanup
|
||||
|
||||
@@ -495,10 +547,147 @@ echo "Passed: $PASS"
|
||||
echo "Failed: $FAIL"
|
||||
echo ""
|
||||
|
||||
ORIGINAL_FAIL=$FAIL
|
||||
|
||||
# ============================================================================
|
||||
# CONCURRENCY LIMIT TESTS
|
||||
# ============================================================================
|
||||
|
||||
echo ""
|
||||
echo "=== Concurrency Limit Tests ==="
|
||||
echo ""
|
||||
|
||||
# Create mock opencode that just sleeps briefly and exits
|
||||
MOCK_OPENCODE="/tmp/mock_opencode.sh"
|
||||
cat > "$MOCK_OPENCODE" << 'MOCK'
|
||||
#!/bin/bash
|
||||
sleep 0.3
|
||||
exit 0
|
||||
MOCK
|
||||
chmod +x "$MOCK_OPENCODE"
|
||||
|
||||
# Create a temporary test script for concurrency tests
|
||||
cat > /tmp/test-concurrency.sh << 'EOF'
|
||||
#!/bin/bash
|
||||
set -euo pipefail
|
||||
|
||||
KUGETSU="./skills/kugetsu/scripts/kugetsu"
|
||||
PASS=0
|
||||
FAIL=0
|
||||
|
||||
test_cleanup() {
|
||||
rm -rf ~/.kugetsu/sessions/* ~/.kugetsu/worktrees/* ~/.kugetsu/index.json ~/.kugetsu/logs/* ~/.kugetsu/.agent_count ~/.kugetsu/.agent_lock 2>/dev/null || true
|
||||
}
|
||||
|
||||
pass() {
|
||||
echo "PASS: $1"
|
||||
PASS=$((PASS + 1))
|
||||
}
|
||||
|
||||
fail() {
|
||||
echo "FAIL: $1"
|
||||
FAIL=$((FAIL + 1))
|
||||
}
|
||||
|
||||
setup_mock_sessions() {
|
||||
mkdir -p ~/.kugetsu/sessions ~/.kugetsu/worktrees ~/.kugetsu/logs
|
||||
cat > ~/.kugetsu/index.json << INDEX
|
||||
{
|
||||
"base": "ses_test_base_123",
|
||||
"pm_agent": "ses_test_pm_456",
|
||||
"issues": {}
|
||||
}
|
||||
INDEX
|
||||
echo '{"type": "base", "opencode_session_id": "ses_test_base_123", "created_at": "2026-03-29T18:00:00+02:00", "state": "idle"}' > ~/.kugetsu/sessions/base.json
|
||||
echo '{"type": "pm_agent", "opencode_session_id": "ses_test_pm_456", "created_at": "2026-03-29T18:00:00+02:00", "state": "idle"}' > ~/.kugetsu/sessions/pm-agent.json
|
||||
}
|
||||
|
||||
# Test C1: Agent count file is initialized to 0
|
||||
echo "--- Test: agent count file initialized ---"
|
||||
test_cleanup
|
||||
mkdir -p ~/.kugetsu/sessions ~/.kugetsu/worktrees
|
||||
$KUGETSU list > /dev/null 2>&1 || true
|
||||
if [ -f ~/.kugetsu/.agent_count ]; then
|
||||
COUNT=$(cat ~/.kugetsu/.agent_count)
|
||||
if [ "$COUNT" = "0" ]; then
|
||||
pass "agent count file initialized to 0"
|
||||
else
|
||||
fail "agent count file initialized to $COUNT, expected 0"
|
||||
fi
|
||||
else
|
||||
fail "agent count file not created"
|
||||
fi
|
||||
echo ""
|
||||
|
||||
# Test C2: MAX_CONCURRENT_AGENTS defaults to 3
|
||||
echo "--- Test: MAX_CONCURRENT_AGENTS defaults to 3 ---"
|
||||
# Just grep for it and check if '3' appears
|
||||
if grep -q 'MAX_CONCURRENT_AGENTS="3"' "$KUGETSU" || grep -q "MAX_CONCURRENT_AGENTS='3'" "$KUGETSU" || grep -q 'MAX_CONCURRENT_AGENTS=3' "$KUGETSU"; then
|
||||
pass "MAX_CONCURRENT_AGENTS defaults to 3"
|
||||
else
|
||||
fail "MAX_CONCURRENT_AGENTS default not found"
|
||||
fi
|
||||
echo ""
|
||||
|
||||
# Test C3: Agent count file increments and decrements properly
|
||||
echo "--- Test: agent count increments and decrements ---"
|
||||
test_cleanup
|
||||
setup_mock_sessions
|
||||
|
||||
# Initialize count to 0
|
||||
echo 0 > ~/.kugetsu/.agent_count
|
||||
|
||||
# Verify initial state
|
||||
INITIAL=$(cat ~/.kugetsu/.agent_count)
|
||||
if [ "$INITIAL" = "0" ]; then
|
||||
pass "agent count starts at 0"
|
||||
else
|
||||
fail "agent count start was $INITIAL"
|
||||
fi
|
||||
|
||||
# After any kugetsu command runs, count should be properly managed
|
||||
$KUGETSU list > /dev/null 2>&1
|
||||
|
||||
# Verify count is still 0 (no slot leak)
|
||||
AFTER=$(cat ~/.kugetsu/.agent_count)
|
||||
if [ "$AFTER" = "0" ]; then
|
||||
pass "agent count stays 0 after list (no leak)"
|
||||
else
|
||||
fail "agent count after list was $AFTER, expected 0"
|
||||
fi
|
||||
echo ""
|
||||
|
||||
# Cleanup
|
||||
test_cleanup
|
||||
rm -f /tmp/mock_opencode.sh 2>/dev/null || true
|
||||
|
||||
echo ""
|
||||
echo "=== Concurrency Test Summary ==="
|
||||
echo "Passed: $PASS"
|
||||
echo "Failed: $FAIL"
|
||||
echo ""
|
||||
|
||||
if [ $FAIL -eq 0 ]; then
|
||||
echo "All tests passed!"
|
||||
echo "All concurrency tests passed!"
|
||||
exit 0
|
||||
else
|
||||
echo "Some tests failed."
|
||||
echo "Some concurrency tests failed."
|
||||
exit 1
|
||||
fi
|
||||
EOF
|
||||
|
||||
chmod +x /tmp/test-concurrency.sh
|
||||
bash /tmp/test-concurrency.sh
|
||||
CONCURRENCY_RESULT=$?
|
||||
rm -f /tmp/test-concurrency.sh /tmp/mock_opencode.sh 2>/dev/null
|
||||
|
||||
# Combined result
|
||||
if [ $ORIGINAL_FAIL -eq 0 ] && [ $CONCURRENCY_RESULT -eq 0 ]; then
|
||||
echo ""
|
||||
echo "=== ALL TESTS PASSED ==="
|
||||
exit 0
|
||||
else
|
||||
echo ""
|
||||
echo "=== SOME TESTS FAILED ==="
|
||||
exit 1
|
||||
fi
|
||||
Reference in New Issue
Block a user