#!/usr/bin/env python3 """ Parallel Capacity Test Tool for Hermes/OpenCode/Kugetsu Tests concurrent agent capacity by spawning N parallel tasks. Supports two modes: - opencode: Direct opencode run (legacy) - kugetsu: Via kugetsu CLI (tests full orchestration stack) """ import argparse import json import os import subprocess import sys import time import threading import statistics import uuid from dataclasses import dataclass, asdict from datetime import datetime from pathlib import Path from typing import List, Optional try: import psutil HAS_PSUTIL = True except ImportError: HAS_PSUTIL = False print("[WARN] psutil not available - resource monitoring will be limited") @dataclass class AgentResult: agent_id: int duration: float status: str return_code: int output: str = "" @dataclass class ResourceSample: timestamp: float cpu_percent: float memory_mb: float memory_percent: float opencode_processes: int agent_count: int @dataclass class TestRun: agent_count: int total_duration: float success_count: int failed_count: int timeout_count: int avg_response_time: float stddev_response_time: float min_response_time: float max_response_time: float peak_cpu_percent: float avg_cpu_percent: float peak_memory_mb: float avg_memory_mb: float peak_memory_percent: float avg_memory_percent: float peak_opencode_procs: int baseline_memory_mb: float = 0.0 memory_per_agent_mb: float = 0.0 total_cost_score: float = 0.0 class ResourceMonitor: def __init__(self, sample_interval: float = 1.0): self.sample_interval = sample_interval self.samples: List[ResourceSample] = [] self._stop_event = threading.Event() self._thread: Optional[threading.Thread] = None self._current_agent_count = 0 def start(self, agent_count: int): self._current_agent_count = agent_count self.samples = [] self._stop_event.clear() self._thread = threading.Thread(target=self._monitor_loop) self._thread.daemon = True self._thread.start() def stop(self) -> List[ResourceSample]: self._stop_event.set() if self._thread: self._thread.join(timeout=5) return self.samples def _monitor_loop(self): while not self._stop_event.is_set(): try: sample = self._collect_sample() self.samples.append(sample) except Exception as e: print(f"[WARN] Error collecting resource sample: {e}") self._stop_event.wait(self.sample_interval) def _collect_sample(self) -> ResourceSample: timestamp = time.time() try: opencode_procs = len( [ p for p in psutil.process_iter(["name"]) if "opencode" in p.info["name"].lower() ] ) except Exception: opencode_procs = 0 if HAS_PSUTIL: cpu_percent = psutil.cpu_percent(interval=0.1) virt_mem = psutil.virtual_memory() memory_percent = virt_mem.percent memory_mb = virt_mem.used / (1024 * 1024) else: cpu_percent = 0.0 memory_percent = 0.0 memory_mb = get_memory_mb_stdlib() return ResourceSample( timestamp=timestamp, cpu_percent=cpu_percent, memory_mb=memory_mb, memory_percent=memory_percent, opencode_processes=opencode_procs, agent_count=self._current_agent_count, ) def get_memory_mb_stdlib() -> float: try: with open("/proc/meminfo", "r") as f: meminfo = f.read() total_kb = 0 avail_kb = 0 for line in meminfo.splitlines(): if line.startswith("MemTotal:"): total_kb = int(line.split()[1]) elif line.startswith("MemAvailable:"): avail_kb = int(line.split()[1]) if total_kb > 0: used_kb = total_kb - avail_kb return used_kb / 1024 except Exception: pass return 0.0 class ParallelCapacityTester: def __init__( self, timeout: int = 120, workdir: Optional[str] = None, use_kugetsu: bool = False, memory_limit_mb: int = 1024, test_repo: str = "git.example.com/test/kugetsu", ): self.timeout = timeout self.workdir = workdir or "/tmp/parallel_test" self.use_kugetsu = use_kugetsu self.memory_limit_mb = memory_limit_mb self.test_repo = test_repo self.monitor = ResourceMonitor(sample_interval=1.0) self.results: List[TestRun] = [] self.baseline_memory_mb = 0.0 def _measure_baseline_memory(self) -> float: if HAS_PSUTIL: return psutil.virtual_memory().used / (1024 * 1024) return get_memory_mb_stdlib() def _create_test_workdir(self, agent_id: int) -> str: agent_dir = os.path.join(self.workdir, f"agent_{agent_id}_{int(time.time())}") os.makedirs(agent_dir, exist_ok=True) return agent_dir def _run_single_agent(self, agent_id: int) -> AgentResult: workdir = self._create_test_workdir(agent_id) start_time = time.time() task = "Respond with exactly: PARALLEL_TEST_OK" try: if self.use_kugetsu: unique_id = uuid.uuid4().hex[:8] issue_ref = f"{self.test_repo}#{agent_id}-{unique_id}" result = subprocess.run( ["kugetsu", "start", issue_ref, task], capture_output=True, text=True, timeout=self.timeout, ) else: result = subprocess.run( ["opencode", "run", task, "--dir", workdir], capture_output=True, text=True, timeout=self.timeout, ) duration = time.time() - start_time output = result.stdout + result.stderr success = "PARALLEL_TEST_OK" in output or result.returncode == 0 return AgentResult( agent_id=agent_id, duration=duration, status="success" if success else "failed", return_code=result.returncode, output=output[:500], ) except subprocess.TimeoutExpired: return AgentResult( agent_id=agent_id, duration=self.timeout, status="timeout", return_code=-1, ) except Exception as e: return AgentResult( agent_id=agent_id, duration=time.time() - start_time, status="failed", return_code=-1, ) def _run_parallel_agents(self, num_agents: int) -> TestRun: print(f"\n[TEST] Running with {num_agents} concurrent agent(s)...") self.baseline_memory_mb = self._measure_baseline_memory() print(f"[INFO] Baseline memory: {self.baseline_memory_mb:.1f} MB") self.monitor.start(num_agents) threads = [] results = [] results_lock = threading.Lock() memory_exceeded = False def run_and_record(agent_id: int): nonlocal memory_exceeded if not memory_exceeded: current_mem = self._measure_baseline_memory() if current_mem > self.baseline_memory_mb + self.memory_limit_mb: memory_exceeded = True print( f"[WARN] Memory limit ({self.memory_limit_mb}MB) approached, not spawning more agents" ) return result = self._run_single_agent(agent_id) with results_lock: results.append(result) start_time = time.time() for i in range(1, num_agents + 1): current_mem = self._measure_baseline_memory() if current_mem > self.baseline_memory_mb + self.memory_limit_mb: print( f"[WARN] Memory limit ({self.memory_limit_mb}MB) would be exceeded, stopping spawn at {i - 1} agents" ) memory_exceeded = True break t = threading.Thread(target=run_and_record, args=(i,)) t.start() threads.append(t) all_done = False elapsed = 0 while elapsed < self.timeout and not all_done: time.sleep(1) elapsed = int(time.time() - start_time) all_done = all(not t.is_alive() for t in threads) subprocess.run(["pkill", "-f", "opencode run"], capture_output=True) for t in threads: t.join(timeout=5) resource_samples = self.monitor.stop() total_duration = time.time() - start_time success_count = sum(1 for r in results if r.status == "success") failed_count = sum(1 for r in results if r.status == "failed") timeout_count = sum(1 for r in results if r.status == "timeout") durations = [r.duration for r in results] avg_duration = statistics.mean(durations) if durations else 0 stddev = statistics.stdev(durations) if len(durations) > 1 else 0 min_duration = min(durations) if durations else 0 max_duration = max(durations) if durations else 0 if resource_samples: peak_cpu = max(s.cpu_percent for s in resource_samples) avg_cpu = statistics.mean(s.cpu_percent for s in resource_samples) peak_mem_pct = max(s.memory_percent for s in resource_samples) avg_mem_pct = statistics.mean(s.memory_percent for s in resource_samples) peak_mem_mb = max(s.memory_mb for s in resource_samples) avg_mem_mb = statistics.mean(s.memory_mb for s in resource_samples) peak_procs = max(s.opencode_processes for s in resource_samples) else: peak_cpu = avg_cpu = peak_mem_pct = avg_mem_pct = peak_mem_mb = ( avg_mem_mb ) = peak_procs = 0 actual_agents = len(results) if results else num_agents memory_per_agent = ( (peak_mem_mb - self.baseline_memory_mb) / actual_agents if actual_agents > 0 else 0 ) total_cost = ( (peak_mem_mb - self.baseline_memory_mb) * total_duration / 1000 if peak_mem_mb > self.baseline_memory_mb else 0 ) print( f"[RESULT] {num_agents} agents: {success_count} success, {failed_count} failed, {timeout_count} timeout" ) print( f"[COST] Memory per agent: {memory_per_agent:.1f} MB, Total cost score: {total_cost:.2f}" ) return TestRun( agent_count=num_agents, total_duration=total_duration, success_count=success_count, failed_count=failed_count, timeout_count=timeout_count, avg_response_time=avg_duration, stddev_response_time=stddev, min_response_time=min_duration, max_response_time=max_duration, peak_cpu_percent=peak_cpu, avg_cpu_percent=avg_cpu, peak_memory_mb=peak_mem_mb, avg_memory_mb=avg_mem_mb, peak_memory_percent=peak_mem_pct, avg_memory_percent=avg_mem_pct, peak_opencode_procs=peak_procs, baseline_memory_mb=self.baseline_memory_mb, memory_per_agent_mb=memory_per_agent, total_cost_score=total_cost, ) def run_capacity_test( self, max_agents: int = 10, step: int = 1, quick: bool = False ) -> List[TestRun]: if quick: agent_counts = [1, 2, 3, 5, 8] else: agent_counts = list(range(1, max_agents + 1, step)) print(f"[INFO] Starting capacity test with {len(agent_counts)} configurations") print(f"[INFO] Agent counts: {agent_counts}") self.results = [] for count in agent_counts: subprocess.run(["pkill", "-f", "opencode run"], capture_output=True) time.sleep(2) result = self._run_parallel_agents(count) self.results.append(result) return self.results def save_results(self, output_dir: str): output_path = Path(output_dir) output_path.mkdir(parents=True, exist_ok=True) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") json_file = output_path / f"results_{timestamp}.json" with open(json_file, "w") as f: data = [asdict(run) for run in self.results] json.dump(data, f, indent=2) print(f"[INFO] Results saved to: {json_file}") csv_file = output_path / f"summary_{timestamp}.csv" with open(csv_file, "w") as f: f.write( "agents,duration,success,failed,timeout,avg_response,stddev,min_response,max_response,peak_cpu,avg_cpu,peak_mem_mb,avg_mem_mb,peak_mem_pct,avg_mem_pct,peak_procs,baseline_mem,mem_per_agent,cost_score\n" ) for run in self.results: f.write( f"{run.agent_count},{run.total_duration:.2f},{run.success_count}," f"{run.failed_count},{run.timeout_count},{run.avg_response_time:.2f}," f"{run.stddev_response_time:.2f},{run.min_response_time:.2f}," f"{run.max_response_time:.2f},{run.peak_cpu_percent:.1f}," f"{run.avg_cpu_percent:.1f},{run.peak_memory_mb:.1f}," f"{run.avg_memory_mb:.1f},{run.peak_memory_percent:.1f}," f"{run.avg_memory_percent:.1f},{run.peak_opencode_procs}," f"{run.baseline_memory_mb:.1f},{run.memory_per_agent_mb:.1f},{run.total_cost_score:.2f}\n" ) print(f"[INFO] Summary saved to: {csv_file}") report_file = output_path / f"report_{timestamp}.md" self._generate_markdown_report(report_file) print(f"[INFO] Report saved to: {report_file}") return str(json_file), str(csv_file), str(report_file) def _generate_markdown_report(self, output_file: Path): with open(output_file, "w") as f: f.write("# Parallel Capacity Test Report\n\n") f.write( f"**Generated:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n" ) f.write("## Summary\n\n") f.write( "| Agents | Duration | Success | Failed | Timeout | Avg Response | Peak Mem (MB) | Mem/Agent | Cost Score |\n" ) f.write( "|--------|----------|---------|--------|---------|--------------|---------------|-----------|------------|\n" ) for run in self.results: f.write( f"| {run.agent_count} | {run.total_duration:.1f}s | " f"{run.success_count} | {run.failed_count} | " f"{run.timeout_count} | {run.avg_response_time:.1f}s | " f"{run.peak_memory_mb:.0f}MB | {run.memory_per_agent_mb:.1f}MB | {run.total_cost_score:.2f} |\n" ) f.write("\n## Cost Analysis\n\n") f.write("| Metric | Value |\n") f.write("|--------|-------|\n") if self.results: baseline = self.results[0].baseline_memory_mb f.write(f"| Baseline Memory | {baseline:.1f} MB |\n") avg_mem_per = sum(r.memory_per_agent_mb for r in self.results) / len( self.results ) f.write(f"| Avg Memory per Agent | {avg_mem_per:.1f} MB |\n") f.write(f"| Memory Limit | {self.memory_limit_mb} MB |\n") max_capacity = ( int(self.memory_limit_mb / avg_mem_per) if avg_mem_per > 0 else 0 ) f.write(f"| Estimated Max Capacity | {max_capacity} agents |\n") f.write("\n## Key Findings\n\n") successful_runs = [ r for r in self.results if r.success_count == r.agent_count ] optimal = max(successful_runs, key=lambda r: r.agent_count, default=None) if optimal: f.write(f"### Optimal Configuration\n") f.write( f"- **{optimal.agent_count} agents** achieved perfect success rate\n" ) f.write( f" - Average response time: {optimal.avg_response_time:.1f}s\n" ) f.write(f" - Peak CPU: {optimal.peak_cpu_percent:.1f}%\n") f.write( f" - Peak Memory: {optimal.peak_memory_mb:.1f}MB ({optimal.peak_memory_percent:.1f}%)\n" ) f.write(f" - Memory per agent: {optimal.memory_per_agent_mb:.1f}MB\n") f.write(f" - Cost score: {optimal.total_cost_score:.2f}\n\n") f.write("## Recommendations\n\n") if optimal: f.write( f"1. **Recommended max agents:** {optimal.agent_count} for stable operation\n" ) f.write("2. **Monitor closely:** 5+ agents\n") f.write( "3. **Implement circuit breaker** when failure rate exceeds threshold\n" ) def main(): parser = argparse.ArgumentParser( description="Parallel Capacity Test Tool for Hermes/OpenCode/Kugetsu" ) parser.add_argument("--agents", "-n", type=int, default=10) parser.add_argument("--timeout", "-t", type=int, default=120) parser.add_argument("--step", "-s", type=int, default=1) parser.add_argument("--quick", "-q", action="store_true") parser.add_argument("--output", "-o", type=str, default=None) parser.add_argument( "--use-kugetsu", "-k", action="store_true", help="Use kugetsu CLI instead of raw opencode (tests full orchestration)", ) parser.add_argument( "--memory-limit", "-m", type=int, default=1024, help="Memory limit per agent in MB (default: 1024 = 1GB)", ) parser.add_argument( "--test-repo", "-r", type=str, default="git.example.com/test/kugetsu", help="Repository for kugetsu issue refs (default: git.example.com/test/kugetsu)", ) args = parser.parse_args() script_dir = Path(__file__).parent output_dir = args.output or str(script_dir / "results") mode = "kugetsu" if args.use_kugetsu else "opencode" print("=" * 60) print(f"Parallel Capacity Test Tool ({mode} mode)") print("=" * 60) print(f"Max agents: {args.agents}") print(f"Timeout: {args.timeout}s") print(f"Memory limit: {args.memory_limit}MB") if args.use_kugetsu: print(f"Test repo: {args.test_repo}") print() tester = ParallelCapacityTester( timeout=args.timeout, use_kugetsu=args.use_kugetsu, memory_limit_mb=args.memory_limit, test_repo=args.test_repo, ) try: tester.run_capacity_test( max_agents=args.agents, step=args.step, quick=args.quick ) json_file, csv_file, report_file = tester.save_results(output_dir) print("\n" + "=" * 60) print("TEST COMPLETE") print("=" * 60) print(f"JSON Results: {json_file}") print(f"CSV Summary: {csv_file}") print(f"Report: {report_file}") except KeyboardInterrupt: print("\n[ABORT] Test interrupted by user") sys.exit(1) if __name__ == "__main__": main()