547 lines
20 KiB
Python
Executable File
547 lines
20 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Parallel Capacity Test Tool for Hermes/OpenCode/Kugetsu
|
|
Tests concurrent agent capacity by spawning N parallel tasks.
|
|
|
|
Supports two modes:
|
|
- opencode: Direct opencode run (legacy)
|
|
- kugetsu: Via kugetsu CLI (tests full orchestration stack)
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
import threading
|
|
import statistics
|
|
import uuid
|
|
from dataclasses import dataclass, asdict
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import List, Optional
|
|
|
|
|
|
try:
|
|
import psutil
|
|
|
|
HAS_PSUTIL = True
|
|
except ImportError:
|
|
HAS_PSUTIL = False
|
|
print("[WARN] psutil not available - resource monitoring will be limited")
|
|
|
|
|
|
@dataclass
|
|
class AgentResult:
|
|
agent_id: int
|
|
duration: float
|
|
status: str
|
|
return_code: int
|
|
output: str = ""
|
|
|
|
|
|
@dataclass
|
|
class ResourceSample:
|
|
timestamp: float
|
|
cpu_percent: float
|
|
memory_mb: float
|
|
memory_percent: float
|
|
opencode_processes: int
|
|
agent_count: int
|
|
|
|
|
|
@dataclass
|
|
class TestRun:
|
|
agent_count: int
|
|
total_duration: float
|
|
success_count: int
|
|
failed_count: int
|
|
timeout_count: int
|
|
avg_response_time: float
|
|
stddev_response_time: float
|
|
min_response_time: float
|
|
max_response_time: float
|
|
peak_cpu_percent: float
|
|
avg_cpu_percent: float
|
|
peak_memory_mb: float
|
|
avg_memory_mb: float
|
|
peak_memory_percent: float
|
|
avg_memory_percent: float
|
|
peak_opencode_procs: int
|
|
baseline_memory_mb: float = 0.0
|
|
memory_per_agent_mb: float = 0.0
|
|
total_cost_score: float = 0.0
|
|
|
|
|
|
class ResourceMonitor:
|
|
def __init__(self, sample_interval: float = 1.0):
|
|
self.sample_interval = sample_interval
|
|
self.samples: List[ResourceSample] = []
|
|
self._stop_event = threading.Event()
|
|
self._thread: Optional[threading.Thread] = None
|
|
self._current_agent_count = 0
|
|
|
|
def start(self, agent_count: int):
|
|
self._current_agent_count = agent_count
|
|
self.samples = []
|
|
self._stop_event.clear()
|
|
self._thread = threading.Thread(target=self._monitor_loop)
|
|
self._thread.daemon = True
|
|
self._thread.start()
|
|
|
|
def stop(self) -> List[ResourceSample]:
|
|
self._stop_event.set()
|
|
if self._thread:
|
|
self._thread.join(timeout=5)
|
|
return self.samples
|
|
|
|
def _monitor_loop(self):
|
|
while not self._stop_event.is_set():
|
|
try:
|
|
sample = self._collect_sample()
|
|
self.samples.append(sample)
|
|
except Exception as e:
|
|
print(f"[WARN] Error collecting resource sample: {e}")
|
|
self._stop_event.wait(self.sample_interval)
|
|
|
|
def _collect_sample(self) -> ResourceSample:
|
|
timestamp = time.time()
|
|
try:
|
|
opencode_procs = len(
|
|
[
|
|
p
|
|
for p in psutil.process_iter(["name"])
|
|
if "opencode" in p.info["name"].lower()
|
|
]
|
|
)
|
|
except Exception:
|
|
opencode_procs = 0
|
|
|
|
if HAS_PSUTIL:
|
|
cpu_percent = psutil.cpu_percent(interval=0.1)
|
|
virt_mem = psutil.virtual_memory()
|
|
memory_percent = virt_mem.percent
|
|
memory_mb = virt_mem.used / (1024 * 1024)
|
|
else:
|
|
cpu_percent = 0.0
|
|
memory_percent = 0.0
|
|
memory_mb = get_memory_mb_stdlib()
|
|
|
|
return ResourceSample(
|
|
timestamp=timestamp,
|
|
cpu_percent=cpu_percent,
|
|
memory_mb=memory_mb,
|
|
memory_percent=memory_percent,
|
|
opencode_processes=opencode_procs,
|
|
agent_count=self._current_agent_count,
|
|
)
|
|
|
|
|
|
def get_memory_mb_stdlib() -> float:
|
|
try:
|
|
with open("/proc/meminfo", "r") as f:
|
|
meminfo = f.read()
|
|
total_kb = 0
|
|
avail_kb = 0
|
|
for line in meminfo.splitlines():
|
|
if line.startswith("MemTotal:"):
|
|
total_kb = int(line.split()[1])
|
|
elif line.startswith("MemAvailable:"):
|
|
avail_kb = int(line.split()[1])
|
|
if total_kb > 0:
|
|
used_kb = total_kb - avail_kb
|
|
return used_kb / 1024
|
|
except Exception:
|
|
pass
|
|
return 0.0
|
|
|
|
|
|
class ParallelCapacityTester:
|
|
def __init__(
|
|
self,
|
|
timeout: int = 120,
|
|
workdir: Optional[str] = None,
|
|
use_kugetsu: bool = False,
|
|
memory_limit_mb: int = 1024,
|
|
test_repo: str = "git.example.com/test/kugetsu",
|
|
):
|
|
self.timeout = timeout
|
|
self.workdir = workdir or "/tmp/parallel_test"
|
|
self.use_kugetsu = use_kugetsu
|
|
self.memory_limit_mb = memory_limit_mb
|
|
self.test_repo = test_repo
|
|
self.monitor = ResourceMonitor(sample_interval=1.0)
|
|
self.results: List[TestRun] = []
|
|
self.baseline_memory_mb = 0.0
|
|
|
|
def _measure_baseline_memory(self) -> float:
|
|
if HAS_PSUTIL:
|
|
return psutil.virtual_memory().used / (1024 * 1024)
|
|
return get_memory_mb_stdlib()
|
|
|
|
def _create_test_workdir(self, agent_id: int) -> str:
|
|
agent_dir = os.path.join(self.workdir, f"agent_{agent_id}_{int(time.time())}")
|
|
os.makedirs(agent_dir, exist_ok=True)
|
|
return agent_dir
|
|
|
|
def _run_single_agent(self, agent_id: int) -> AgentResult:
|
|
workdir = self._create_test_workdir(agent_id)
|
|
start_time = time.time()
|
|
task = "Respond with exactly: PARALLEL_TEST_OK"
|
|
|
|
try:
|
|
if self.use_kugetsu:
|
|
unique_id = uuid.uuid4().hex[:8]
|
|
issue_ref = f"{self.test_repo}#{agent_id}-{unique_id}"
|
|
result = subprocess.run(
|
|
["kugetsu", "start", issue_ref, task],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=self.timeout,
|
|
)
|
|
else:
|
|
result = subprocess.run(
|
|
["opencode", "run", task, "--dir", workdir],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=self.timeout,
|
|
)
|
|
duration = time.time() - start_time
|
|
output = result.stdout + result.stderr
|
|
success = "PARALLEL_TEST_OK" in output or result.returncode == 0
|
|
|
|
return AgentResult(
|
|
agent_id=agent_id,
|
|
duration=duration,
|
|
status="success" if success else "failed",
|
|
return_code=result.returncode,
|
|
output=output[:500],
|
|
)
|
|
except subprocess.TimeoutExpired:
|
|
return AgentResult(
|
|
agent_id=agent_id,
|
|
duration=self.timeout,
|
|
status="timeout",
|
|
return_code=-1,
|
|
)
|
|
except Exception as e:
|
|
return AgentResult(
|
|
agent_id=agent_id,
|
|
duration=time.time() - start_time,
|
|
status="failed",
|
|
return_code=-1,
|
|
)
|
|
|
|
def _run_parallel_agents(self, num_agents: int) -> TestRun:
|
|
print(f"\n[TEST] Running with {num_agents} concurrent agent(s)...")
|
|
|
|
self.baseline_memory_mb = self._measure_baseline_memory()
|
|
print(f"[INFO] Baseline memory: {self.baseline_memory_mb:.1f} MB")
|
|
|
|
self.monitor.start(num_agents)
|
|
|
|
threads = []
|
|
results = []
|
|
results_lock = threading.Lock()
|
|
memory_exceeded = False
|
|
|
|
def run_and_record(agent_id: int):
|
|
nonlocal memory_exceeded
|
|
if not memory_exceeded:
|
|
current_mem = self._measure_baseline_memory()
|
|
if current_mem > self.baseline_memory_mb + self.memory_limit_mb:
|
|
memory_exceeded = True
|
|
print(
|
|
f"[WARN] Memory limit ({self.memory_limit_mb}MB) approached, not spawning more agents"
|
|
)
|
|
return
|
|
result = self._run_single_agent(agent_id)
|
|
with results_lock:
|
|
results.append(result)
|
|
|
|
start_time = time.time()
|
|
|
|
for i in range(1, num_agents + 1):
|
|
current_mem = self._measure_baseline_memory()
|
|
if current_mem > self.baseline_memory_mb + self.memory_limit_mb:
|
|
print(
|
|
f"[WARN] Memory limit ({self.memory_limit_mb}MB) would be exceeded, stopping spawn at {i - 1} agents"
|
|
)
|
|
memory_exceeded = True
|
|
break
|
|
t = threading.Thread(target=run_and_record, args=(i,))
|
|
t.start()
|
|
threads.append(t)
|
|
|
|
all_done = False
|
|
elapsed = 0
|
|
while elapsed < self.timeout and not all_done:
|
|
time.sleep(1)
|
|
elapsed = int(time.time() - start_time)
|
|
all_done = all(not t.is_alive() for t in threads)
|
|
|
|
subprocess.run(["pkill", "-f", "opencode run"], capture_output=True)
|
|
|
|
for t in threads:
|
|
t.join(timeout=5)
|
|
|
|
resource_samples = self.monitor.stop()
|
|
total_duration = time.time() - start_time
|
|
|
|
success_count = sum(1 for r in results if r.status == "success")
|
|
failed_count = sum(1 for r in results if r.status == "failed")
|
|
timeout_count = sum(1 for r in results if r.status == "timeout")
|
|
|
|
durations = [r.duration for r in results]
|
|
avg_duration = statistics.mean(durations) if durations else 0
|
|
stddev = statistics.stdev(durations) if len(durations) > 1 else 0
|
|
min_duration = min(durations) if durations else 0
|
|
max_duration = max(durations) if durations else 0
|
|
|
|
if resource_samples:
|
|
peak_cpu = max(s.cpu_percent for s in resource_samples)
|
|
avg_cpu = statistics.mean(s.cpu_percent for s in resource_samples)
|
|
peak_mem_pct = max(s.memory_percent for s in resource_samples)
|
|
avg_mem_pct = statistics.mean(s.memory_percent for s in resource_samples)
|
|
peak_mem_mb = max(s.memory_mb for s in resource_samples)
|
|
avg_mem_mb = statistics.mean(s.memory_mb for s in resource_samples)
|
|
peak_procs = max(s.opencode_processes for s in resource_samples)
|
|
else:
|
|
peak_cpu = avg_cpu = peak_mem_pct = avg_mem_pct = peak_mem_mb = (
|
|
avg_mem_mb
|
|
) = peak_procs = 0
|
|
|
|
actual_agents = len(results) if results else num_agents
|
|
memory_per_agent = (
|
|
(peak_mem_mb - self.baseline_memory_mb) / actual_agents
|
|
if actual_agents > 0
|
|
else 0
|
|
)
|
|
total_cost = (
|
|
(peak_mem_mb - self.baseline_memory_mb) * total_duration / 1000
|
|
if peak_mem_mb > self.baseline_memory_mb
|
|
else 0
|
|
)
|
|
|
|
print(
|
|
f"[RESULT] {num_agents} agents: {success_count} success, {failed_count} failed, {timeout_count} timeout"
|
|
)
|
|
print(
|
|
f"[COST] Memory per agent: {memory_per_agent:.1f} MB, Total cost score: {total_cost:.2f}"
|
|
)
|
|
|
|
return TestRun(
|
|
agent_count=num_agents,
|
|
total_duration=total_duration,
|
|
success_count=success_count,
|
|
failed_count=failed_count,
|
|
timeout_count=timeout_count,
|
|
avg_response_time=avg_duration,
|
|
stddev_response_time=stddev,
|
|
min_response_time=min_duration,
|
|
max_response_time=max_duration,
|
|
peak_cpu_percent=peak_cpu,
|
|
avg_cpu_percent=avg_cpu,
|
|
peak_memory_mb=peak_mem_mb,
|
|
avg_memory_mb=avg_mem_mb,
|
|
peak_memory_percent=peak_mem_pct,
|
|
avg_memory_percent=avg_mem_pct,
|
|
peak_opencode_procs=peak_procs,
|
|
baseline_memory_mb=self.baseline_memory_mb,
|
|
memory_per_agent_mb=memory_per_agent,
|
|
total_cost_score=total_cost,
|
|
)
|
|
|
|
def run_capacity_test(
|
|
self, max_agents: int = 10, step: int = 1, quick: bool = False
|
|
) -> List[TestRun]:
|
|
if quick:
|
|
agent_counts = [1, 2, 3, 5, 8]
|
|
else:
|
|
agent_counts = list(range(1, max_agents + 1, step))
|
|
|
|
print(f"[INFO] Starting capacity test with {len(agent_counts)} configurations")
|
|
print(f"[INFO] Agent counts: {agent_counts}")
|
|
|
|
self.results = []
|
|
|
|
for count in agent_counts:
|
|
subprocess.run(["pkill", "-f", "opencode run"], capture_output=True)
|
|
time.sleep(2)
|
|
result = self._run_parallel_agents(count)
|
|
self.results.append(result)
|
|
|
|
return self.results
|
|
|
|
def save_results(self, output_dir: str):
|
|
output_path = Path(output_dir)
|
|
output_path.mkdir(parents=True, exist_ok=True)
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
|
|
json_file = output_path / f"results_{timestamp}.json"
|
|
with open(json_file, "w") as f:
|
|
data = [asdict(run) for run in self.results]
|
|
json.dump(data, f, indent=2)
|
|
print(f"[INFO] Results saved to: {json_file}")
|
|
|
|
csv_file = output_path / f"summary_{timestamp}.csv"
|
|
with open(csv_file, "w") as f:
|
|
f.write(
|
|
"agents,duration,success,failed,timeout,avg_response,stddev,min_response,max_response,peak_cpu,avg_cpu,peak_mem_mb,avg_mem_mb,peak_mem_pct,avg_mem_pct,peak_procs,baseline_mem,mem_per_agent,cost_score\n"
|
|
)
|
|
for run in self.results:
|
|
f.write(
|
|
f"{run.agent_count},{run.total_duration:.2f},{run.success_count},"
|
|
f"{run.failed_count},{run.timeout_count},{run.avg_response_time:.2f},"
|
|
f"{run.stddev_response_time:.2f},{run.min_response_time:.2f},"
|
|
f"{run.max_response_time:.2f},{run.peak_cpu_percent:.1f},"
|
|
f"{run.avg_cpu_percent:.1f},{run.peak_memory_mb:.1f},"
|
|
f"{run.avg_memory_mb:.1f},{run.peak_memory_percent:.1f},"
|
|
f"{run.avg_memory_percent:.1f},{run.peak_opencode_procs},"
|
|
f"{run.baseline_memory_mb:.1f},{run.memory_per_agent_mb:.1f},{run.total_cost_score:.2f}\n"
|
|
)
|
|
print(f"[INFO] Summary saved to: {csv_file}")
|
|
|
|
report_file = output_path / f"report_{timestamp}.md"
|
|
self._generate_markdown_report(report_file)
|
|
print(f"[INFO] Report saved to: {report_file}")
|
|
|
|
return str(json_file), str(csv_file), str(report_file)
|
|
|
|
def _generate_markdown_report(self, output_file: Path):
|
|
with open(output_file, "w") as f:
|
|
f.write("# Parallel Capacity Test Report\n\n")
|
|
f.write(
|
|
f"**Generated:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n"
|
|
)
|
|
f.write("## Summary\n\n")
|
|
f.write(
|
|
"| Agents | Duration | Success | Failed | Timeout | Avg Response | Peak Mem (MB) | Mem/Agent | Cost Score |\n"
|
|
)
|
|
f.write(
|
|
"|--------|----------|---------|--------|---------|--------------|---------------|-----------|------------|\n"
|
|
)
|
|
for run in self.results:
|
|
f.write(
|
|
f"| {run.agent_count} | {run.total_duration:.1f}s | "
|
|
f"{run.success_count} | {run.failed_count} | "
|
|
f"{run.timeout_count} | {run.avg_response_time:.1f}s | "
|
|
f"{run.peak_memory_mb:.0f}MB | {run.memory_per_agent_mb:.1f}MB | {run.total_cost_score:.2f} |\n"
|
|
)
|
|
f.write("\n## Cost Analysis\n\n")
|
|
f.write("| Metric | Value |\n")
|
|
f.write("|--------|-------|\n")
|
|
if self.results:
|
|
baseline = self.results[0].baseline_memory_mb
|
|
f.write(f"| Baseline Memory | {baseline:.1f} MB |\n")
|
|
avg_mem_per = sum(r.memory_per_agent_mb for r in self.results) / len(
|
|
self.results
|
|
)
|
|
f.write(f"| Avg Memory per Agent | {avg_mem_per:.1f} MB |\n")
|
|
f.write(f"| Memory Limit | {self.memory_limit_mb} MB |\n")
|
|
max_capacity = (
|
|
int(self.memory_limit_mb / avg_mem_per) if avg_mem_per > 0 else 0
|
|
)
|
|
f.write(f"| Estimated Max Capacity | {max_capacity} agents |\n")
|
|
f.write("\n## Key Findings\n\n")
|
|
successful_runs = [
|
|
r for r in self.results if r.success_count == r.agent_count
|
|
]
|
|
optimal = max(successful_runs, key=lambda r: r.agent_count, default=None)
|
|
if optimal:
|
|
f.write(f"### Optimal Configuration\n")
|
|
f.write(
|
|
f"- **{optimal.agent_count} agents** achieved perfect success rate\n"
|
|
)
|
|
f.write(
|
|
f" - Average response time: {optimal.avg_response_time:.1f}s\n"
|
|
)
|
|
f.write(f" - Peak CPU: {optimal.peak_cpu_percent:.1f}%\n")
|
|
f.write(
|
|
f" - Peak Memory: {optimal.peak_memory_mb:.1f}MB ({optimal.peak_memory_percent:.1f}%)\n"
|
|
)
|
|
f.write(f" - Memory per agent: {optimal.memory_per_agent_mb:.1f}MB\n")
|
|
f.write(f" - Cost score: {optimal.total_cost_score:.2f}\n\n")
|
|
f.write("## Recommendations\n\n")
|
|
if optimal:
|
|
f.write(
|
|
f"1. **Recommended max agents:** {optimal.agent_count} for stable operation\n"
|
|
)
|
|
f.write("2. **Monitor closely:** 5+ agents\n")
|
|
f.write(
|
|
"3. **Implement circuit breaker** when failure rate exceeds threshold\n"
|
|
)
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Parallel Capacity Test Tool for Hermes/OpenCode/Kugetsu"
|
|
)
|
|
parser.add_argument("--agents", "-n", type=int, default=10)
|
|
parser.add_argument("--timeout", "-t", type=int, default=120)
|
|
parser.add_argument("--step", "-s", type=int, default=1)
|
|
parser.add_argument("--quick", "-q", action="store_true")
|
|
parser.add_argument("--output", "-o", type=str, default=None)
|
|
parser.add_argument(
|
|
"--use-kugetsu",
|
|
"-k",
|
|
action="store_true",
|
|
help="Use kugetsu CLI instead of raw opencode (tests full orchestration)",
|
|
)
|
|
parser.add_argument(
|
|
"--memory-limit",
|
|
"-m",
|
|
type=int,
|
|
default=1024,
|
|
help="Memory limit per agent in MB (default: 1024 = 1GB)",
|
|
)
|
|
parser.add_argument(
|
|
"--test-repo",
|
|
"-r",
|
|
type=str,
|
|
default="git.example.com/test/kugetsu",
|
|
help="Repository for kugetsu issue refs (default: git.example.com/test/kugetsu)",
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
script_dir = Path(__file__).parent
|
|
output_dir = args.output or str(script_dir / "results")
|
|
|
|
mode = "kugetsu" if args.use_kugetsu else "opencode"
|
|
print("=" * 60)
|
|
print(f"Parallel Capacity Test Tool ({mode} mode)")
|
|
print("=" * 60)
|
|
print(f"Max agents: {args.agents}")
|
|
print(f"Timeout: {args.timeout}s")
|
|
print(f"Memory limit: {args.memory_limit}MB")
|
|
if args.use_kugetsu:
|
|
print(f"Test repo: {args.test_repo}")
|
|
print()
|
|
|
|
tester = ParallelCapacityTester(
|
|
timeout=args.timeout,
|
|
use_kugetsu=args.use_kugetsu,
|
|
memory_limit_mb=args.memory_limit,
|
|
test_repo=args.test_repo,
|
|
)
|
|
|
|
try:
|
|
tester.run_capacity_test(
|
|
max_agents=args.agents, step=args.step, quick=args.quick
|
|
)
|
|
json_file, csv_file, report_file = tester.save_results(output_dir)
|
|
print("\n" + "=" * 60)
|
|
print("TEST COMPLETE")
|
|
print("=" * 60)
|
|
print(f"JSON Results: {json_file}")
|
|
print(f"CSV Summary: {csv_file}")
|
|
print(f"Report: {report_file}")
|
|
except KeyboardInterrupt:
|
|
print("\n[ABORT] Test interrupted by user")
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|