Files
kugetsu/tools/parallel-capacity-test/parallel_capacity_test.py

547 lines
20 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Parallel Capacity Test Tool for Hermes/OpenCode/Kugetsu
Tests concurrent agent capacity by spawning N parallel tasks.
Supports two modes:
- opencode: Direct opencode run (legacy)
- kugetsu: Via kugetsu CLI (tests full orchestration stack)
"""
import argparse
import json
import os
import subprocess
import sys
import time
import threading
import statistics
import uuid
from dataclasses import dataclass, asdict
from datetime import datetime
from pathlib import Path
from typing import List, Optional
try:
import psutil
HAS_PSUTIL = True
except ImportError:
HAS_PSUTIL = False
print("[WARN] psutil not available - resource monitoring will be limited")
@dataclass
class AgentResult:
agent_id: int
duration: float
status: str
return_code: int
output: str = ""
@dataclass
class ResourceSample:
timestamp: float
cpu_percent: float
memory_mb: float
memory_percent: float
opencode_processes: int
agent_count: int
@dataclass
class TestRun:
agent_count: int
total_duration: float
success_count: int
failed_count: int
timeout_count: int
avg_response_time: float
stddev_response_time: float
min_response_time: float
max_response_time: float
peak_cpu_percent: float
avg_cpu_percent: float
peak_memory_mb: float
avg_memory_mb: float
peak_memory_percent: float
avg_memory_percent: float
peak_opencode_procs: int
baseline_memory_mb: float = 0.0
memory_per_agent_mb: float = 0.0
total_cost_score: float = 0.0
class ResourceMonitor:
def __init__(self, sample_interval: float = 1.0):
self.sample_interval = sample_interval
self.samples: List[ResourceSample] = []
self._stop_event = threading.Event()
self._thread: Optional[threading.Thread] = None
self._current_agent_count = 0
def start(self, agent_count: int):
self._current_agent_count = agent_count
self.samples = []
self._stop_event.clear()
self._thread = threading.Thread(target=self._monitor_loop)
self._thread.daemon = True
self._thread.start()
def stop(self) -> List[ResourceSample]:
self._stop_event.set()
if self._thread:
self._thread.join(timeout=5)
return self.samples
def _monitor_loop(self):
while not self._stop_event.is_set():
try:
sample = self._collect_sample()
self.samples.append(sample)
except Exception as e:
print(f"[WARN] Error collecting resource sample: {e}")
self._stop_event.wait(self.sample_interval)
def _collect_sample(self) -> ResourceSample:
timestamp = time.time()
try:
opencode_procs = len(
[
p
for p in psutil.process_iter(["name"])
if "opencode" in p.info["name"].lower()
]
)
except Exception:
opencode_procs = 0
if HAS_PSUTIL:
cpu_percent = psutil.cpu_percent(interval=0.1)
virt_mem = psutil.virtual_memory()
memory_percent = virt_mem.percent
memory_mb = virt_mem.used / (1024 * 1024)
else:
cpu_percent = 0.0
memory_percent = 0.0
memory_mb = get_memory_mb_stdlib()
return ResourceSample(
timestamp=timestamp,
cpu_percent=cpu_percent,
memory_mb=memory_mb,
memory_percent=memory_percent,
opencode_processes=opencode_procs,
agent_count=self._current_agent_count,
)
def get_memory_mb_stdlib() -> float:
try:
with open("/proc/meminfo", "r") as f:
meminfo = f.read()
total_kb = 0
avail_kb = 0
for line in meminfo.splitlines():
if line.startswith("MemTotal:"):
total_kb = int(line.split()[1])
elif line.startswith("MemAvailable:"):
avail_kb = int(line.split()[1])
if total_kb > 0:
used_kb = total_kb - avail_kb
return used_kb / 1024
except Exception:
pass
return 0.0
class ParallelCapacityTester:
def __init__(
self,
timeout: int = 120,
workdir: Optional[str] = None,
use_kugetsu: bool = False,
memory_limit_mb: int = 1024,
test_repo: str = "git.example.com/test/kugetsu",
):
self.timeout = timeout
self.workdir = workdir or "/tmp/parallel_test"
self.use_kugetsu = use_kugetsu
self.memory_limit_mb = memory_limit_mb
self.test_repo = test_repo
self.monitor = ResourceMonitor(sample_interval=1.0)
self.results: List[TestRun] = []
self.baseline_memory_mb = 0.0
def _measure_baseline_memory(self) -> float:
if HAS_PSUTIL:
return psutil.virtual_memory().used / (1024 * 1024)
return get_memory_mb_stdlib()
def _create_test_workdir(self, agent_id: int) -> str:
agent_dir = os.path.join(self.workdir, f"agent_{agent_id}_{int(time.time())}")
os.makedirs(agent_dir, exist_ok=True)
return agent_dir
def _run_single_agent(self, agent_id: int) -> AgentResult:
workdir = self._create_test_workdir(agent_id)
start_time = time.time()
task = "Respond with exactly: PARALLEL_TEST_OK"
try:
if self.use_kugetsu:
unique_id = uuid.uuid4().hex[:8]
issue_ref = f"{self.test_repo}#{agent_id}-{unique_id}"
result = subprocess.run(
["kugetsu", "start", issue_ref, task],
capture_output=True,
text=True,
timeout=self.timeout,
)
else:
result = subprocess.run(
["opencode", "run", task, "--dir", workdir],
capture_output=True,
text=True,
timeout=self.timeout,
)
duration = time.time() - start_time
output = result.stdout + result.stderr
success = "PARALLEL_TEST_OK" in output or result.returncode == 0
return AgentResult(
agent_id=agent_id,
duration=duration,
status="success" if success else "failed",
return_code=result.returncode,
output=output[:500],
)
except subprocess.TimeoutExpired:
return AgentResult(
agent_id=agent_id,
duration=self.timeout,
status="timeout",
return_code=-1,
)
except Exception as e:
return AgentResult(
agent_id=agent_id,
duration=time.time() - start_time,
status="failed",
return_code=-1,
)
def _run_parallel_agents(self, num_agents: int) -> TestRun:
print(f"\n[TEST] Running with {num_agents} concurrent agent(s)...")
self.baseline_memory_mb = self._measure_baseline_memory()
print(f"[INFO] Baseline memory: {self.baseline_memory_mb:.1f} MB")
self.monitor.start(num_agents)
threads = []
results = []
results_lock = threading.Lock()
memory_exceeded = False
def run_and_record(agent_id: int):
nonlocal memory_exceeded
if not memory_exceeded:
current_mem = self._measure_baseline_memory()
if current_mem > self.baseline_memory_mb + self.memory_limit_mb:
memory_exceeded = True
print(
f"[WARN] Memory limit ({self.memory_limit_mb}MB) approached, not spawning more agents"
)
return
result = self._run_single_agent(agent_id)
with results_lock:
results.append(result)
start_time = time.time()
for i in range(1, num_agents + 1):
current_mem = self._measure_baseline_memory()
if current_mem > self.baseline_memory_mb + self.memory_limit_mb:
print(
f"[WARN] Memory limit ({self.memory_limit_mb}MB) would be exceeded, stopping spawn at {i - 1} agents"
)
memory_exceeded = True
break
t = threading.Thread(target=run_and_record, args=(i,))
t.start()
threads.append(t)
all_done = False
elapsed = 0
while elapsed < self.timeout and not all_done:
time.sleep(1)
elapsed = int(time.time() - start_time)
all_done = all(not t.is_alive() for t in threads)
subprocess.run(["pkill", "-f", "opencode run"], capture_output=True)
for t in threads:
t.join(timeout=5)
resource_samples = self.monitor.stop()
total_duration = time.time() - start_time
success_count = sum(1 for r in results if r.status == "success")
failed_count = sum(1 for r in results if r.status == "failed")
timeout_count = sum(1 for r in results if r.status == "timeout")
durations = [r.duration for r in results]
avg_duration = statistics.mean(durations) if durations else 0
stddev = statistics.stdev(durations) if len(durations) > 1 else 0
min_duration = min(durations) if durations else 0
max_duration = max(durations) if durations else 0
if resource_samples:
peak_cpu = max(s.cpu_percent for s in resource_samples)
avg_cpu = statistics.mean(s.cpu_percent for s in resource_samples)
peak_mem_pct = max(s.memory_percent for s in resource_samples)
avg_mem_pct = statistics.mean(s.memory_percent for s in resource_samples)
peak_mem_mb = max(s.memory_mb for s in resource_samples)
avg_mem_mb = statistics.mean(s.memory_mb for s in resource_samples)
peak_procs = max(s.opencode_processes for s in resource_samples)
else:
peak_cpu = avg_cpu = peak_mem_pct = avg_mem_pct = peak_mem_mb = (
avg_mem_mb
) = peak_procs = 0
actual_agents = len(results) if results else num_agents
memory_per_agent = (
(peak_mem_mb - self.baseline_memory_mb) / actual_agents
if actual_agents > 0
else 0
)
total_cost = (
(peak_mem_mb - self.baseline_memory_mb) * total_duration / 1000
if peak_mem_mb > self.baseline_memory_mb
else 0
)
print(
f"[RESULT] {num_agents} agents: {success_count} success, {failed_count} failed, {timeout_count} timeout"
)
print(
f"[COST] Memory per agent: {memory_per_agent:.1f} MB, Total cost score: {total_cost:.2f}"
)
return TestRun(
agent_count=num_agents,
total_duration=total_duration,
success_count=success_count,
failed_count=failed_count,
timeout_count=timeout_count,
avg_response_time=avg_duration,
stddev_response_time=stddev,
min_response_time=min_duration,
max_response_time=max_duration,
peak_cpu_percent=peak_cpu,
avg_cpu_percent=avg_cpu,
peak_memory_mb=peak_mem_mb,
avg_memory_mb=avg_mem_mb,
peak_memory_percent=peak_mem_pct,
avg_memory_percent=avg_mem_pct,
peak_opencode_procs=peak_procs,
baseline_memory_mb=self.baseline_memory_mb,
memory_per_agent_mb=memory_per_agent,
total_cost_score=total_cost,
)
def run_capacity_test(
self, max_agents: int = 10, step: int = 1, quick: bool = False
) -> List[TestRun]:
if quick:
agent_counts = [1, 2, 3, 5, 8]
else:
agent_counts = list(range(1, max_agents + 1, step))
print(f"[INFO] Starting capacity test with {len(agent_counts)} configurations")
print(f"[INFO] Agent counts: {agent_counts}")
self.results = []
for count in agent_counts:
subprocess.run(["pkill", "-f", "opencode run"], capture_output=True)
time.sleep(2)
result = self._run_parallel_agents(count)
self.results.append(result)
return self.results
def save_results(self, output_dir: str):
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
json_file = output_path / f"results_{timestamp}.json"
with open(json_file, "w") as f:
data = [asdict(run) for run in self.results]
json.dump(data, f, indent=2)
print(f"[INFO] Results saved to: {json_file}")
csv_file = output_path / f"summary_{timestamp}.csv"
with open(csv_file, "w") as f:
f.write(
"agents,duration,success,failed,timeout,avg_response,stddev,min_response,max_response,peak_cpu,avg_cpu,peak_mem_mb,avg_mem_mb,peak_mem_pct,avg_mem_pct,peak_procs,baseline_mem,mem_per_agent,cost_score\n"
)
for run in self.results:
f.write(
f"{run.agent_count},{run.total_duration:.2f},{run.success_count},"
f"{run.failed_count},{run.timeout_count},{run.avg_response_time:.2f},"
f"{run.stddev_response_time:.2f},{run.min_response_time:.2f},"
f"{run.max_response_time:.2f},{run.peak_cpu_percent:.1f},"
f"{run.avg_cpu_percent:.1f},{run.peak_memory_mb:.1f},"
f"{run.avg_memory_mb:.1f},{run.peak_memory_percent:.1f},"
f"{run.avg_memory_percent:.1f},{run.peak_opencode_procs},"
f"{run.baseline_memory_mb:.1f},{run.memory_per_agent_mb:.1f},{run.total_cost_score:.2f}\n"
)
print(f"[INFO] Summary saved to: {csv_file}")
report_file = output_path / f"report_{timestamp}.md"
self._generate_markdown_report(report_file)
print(f"[INFO] Report saved to: {report_file}")
return str(json_file), str(csv_file), str(report_file)
def _generate_markdown_report(self, output_file: Path):
with open(output_file, "w") as f:
f.write("# Parallel Capacity Test Report\n\n")
f.write(
f"**Generated:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n"
)
f.write("## Summary\n\n")
f.write(
"| Agents | Duration | Success | Failed | Timeout | Avg Response | Peak Mem (MB) | Mem/Agent | Cost Score |\n"
)
f.write(
"|--------|----------|---------|--------|---------|--------------|---------------|-----------|------------|\n"
)
for run in self.results:
f.write(
f"| {run.agent_count} | {run.total_duration:.1f}s | "
f"{run.success_count} | {run.failed_count} | "
f"{run.timeout_count} | {run.avg_response_time:.1f}s | "
f"{run.peak_memory_mb:.0f}MB | {run.memory_per_agent_mb:.1f}MB | {run.total_cost_score:.2f} |\n"
)
f.write("\n## Cost Analysis\n\n")
f.write("| Metric | Value |\n")
f.write("|--------|-------|\n")
if self.results:
baseline = self.results[0].baseline_memory_mb
f.write(f"| Baseline Memory | {baseline:.1f} MB |\n")
avg_mem_per = sum(r.memory_per_agent_mb for r in self.results) / len(
self.results
)
f.write(f"| Avg Memory per Agent | {avg_mem_per:.1f} MB |\n")
f.write(f"| Memory Limit | {self.memory_limit_mb} MB |\n")
max_capacity = (
int(self.memory_limit_mb / avg_mem_per) if avg_mem_per > 0 else 0
)
f.write(f"| Estimated Max Capacity | {max_capacity} agents |\n")
f.write("\n## Key Findings\n\n")
successful_runs = [
r for r in self.results if r.success_count == r.agent_count
]
optimal = max(successful_runs, key=lambda r: r.agent_count, default=None)
if optimal:
f.write(f"### Optimal Configuration\n")
f.write(
f"- **{optimal.agent_count} agents** achieved perfect success rate\n"
)
f.write(
f" - Average response time: {optimal.avg_response_time:.1f}s\n"
)
f.write(f" - Peak CPU: {optimal.peak_cpu_percent:.1f}%\n")
f.write(
f" - Peak Memory: {optimal.peak_memory_mb:.1f}MB ({optimal.peak_memory_percent:.1f}%)\n"
)
f.write(f" - Memory per agent: {optimal.memory_per_agent_mb:.1f}MB\n")
f.write(f" - Cost score: {optimal.total_cost_score:.2f}\n\n")
f.write("## Recommendations\n\n")
if optimal:
f.write(
f"1. **Recommended max agents:** {optimal.agent_count} for stable operation\n"
)
f.write("2. **Monitor closely:** 5+ agents\n")
f.write(
"3. **Implement circuit breaker** when failure rate exceeds threshold\n"
)
def main():
parser = argparse.ArgumentParser(
description="Parallel Capacity Test Tool for Hermes/OpenCode/Kugetsu"
)
parser.add_argument("--agents", "-n", type=int, default=10)
parser.add_argument("--timeout", "-t", type=int, default=120)
parser.add_argument("--step", "-s", type=int, default=1)
parser.add_argument("--quick", "-q", action="store_true")
parser.add_argument("--output", "-o", type=str, default=None)
parser.add_argument(
"--use-kugetsu",
"-k",
action="store_true",
help="Use kugetsu CLI instead of raw opencode (tests full orchestration)",
)
parser.add_argument(
"--memory-limit",
"-m",
type=int,
default=1024,
help="Memory limit per agent in MB (default: 1024 = 1GB)",
)
parser.add_argument(
"--test-repo",
"-r",
type=str,
default="git.example.com/test/kugetsu",
help="Repository for kugetsu issue refs (default: git.example.com/test/kugetsu)",
)
args = parser.parse_args()
script_dir = Path(__file__).parent
output_dir = args.output or str(script_dir / "results")
mode = "kugetsu" if args.use_kugetsu else "opencode"
print("=" * 60)
print(f"Parallel Capacity Test Tool ({mode} mode)")
print("=" * 60)
print(f"Max agents: {args.agents}")
print(f"Timeout: {args.timeout}s")
print(f"Memory limit: {args.memory_limit}MB")
if args.use_kugetsu:
print(f"Test repo: {args.test_repo}")
print()
tester = ParallelCapacityTester(
timeout=args.timeout,
use_kugetsu=args.use_kugetsu,
memory_limit_mb=args.memory_limit,
test_repo=args.test_repo,
)
try:
tester.run_capacity_test(
max_agents=args.agents, step=args.step, quick=args.quick
)
json_file, csv_file, report_file = tester.save_results(output_dir)
print("\n" + "=" * 60)
print("TEST COMPLETE")
print("=" * 60)
print(f"JSON Results: {json_file}")
print(f"CSV Summary: {csv_file}")
print(f"Report: {report_file}")
except KeyboardInterrupt:
print("\n[ABORT] Test interrupted by user")
sys.exit(1)
if __name__ == "__main__":
main()