#!/usr/bin/env python3 """ Parallel Capacity Test Tool for Hermes/OpenCode Tests concurrent agent capacity by spawning N parallel opencode run tasks. """ import argparse import json import os import subprocess import sys import time import threading import statistics from dataclasses import dataclass, asdict from datetime import datetime from pathlib import Path from typing import List, Optional # Using stdlib only - no psutil required @dataclass class AgentResult: agent_id: int duration: float status: str return_code: int output: str = "" @dataclass class ResourceSample: timestamp: float cpu_percent: float memory_percent: float opencode_processes: int agent_count: int @dataclass class TestRun: agent_count: int total_duration: float success_count: int failed_count: int timeout_count: int avg_response_time: float stddev_response_time: float min_response_time: float max_response_time: float peak_cpu_percent: float avg_cpu_percent: float peak_memory_percent: float avg_memory_percent: float peak_opencode_procs: int def get_memory_percent() -> float: """Get memory usage percent by reading /proc/meminfo (Linux)""" try: with open("/proc/meminfo", "r") as f: meminfo = f.read() total = 0 available = 0 for line in meminfo.splitlines(): if line.startswith("MemTotal:"): total = int(line.split()[1]) elif line.startswith("MemAvailable:"): available = int(line.split()[1]) break if total > 0: used = total - available return (used / total) * 100 except (FileNotFoundError, PermissionError, ValueError): pass return 0.0 def count_opencode_processes() -> int: """Count opencode processes using pgrep or /proc scanning""" try: result = subprocess.run( ["pgrep", "-c", "-x", "opencode"], capture_output=True, text=True, timeout=5 ) if result.returncode == 0: return int(result.stdout.strip()) except (subprocess.TimeoutExpired, ValueError, subprocess.SubprocessError): pass try: count = 0 for pid_dir in os.listdir("/proc"): if not pid_dir.isdigit(): continue try: with open(f"/proc/{pid_dir}/comm", "r") as f: if "opencode" in f.read().lower(): count += 1 except (PermissionError, FileNotFoundError): continue return count except FileNotFoundError: return 0 return 0 def get_cpu_percent() -> float: """Get CPU usage by reading /proc/stat""" try: with open("/proc/stat", "r") as f: line = f.readline() parts = line.split() if parts[0] == "cpu": values = [int(x) for x in parts[1:8]] idle = values[3] total = sum(values) if total > 0: return ((total - idle) / total) * 100 except (FileNotFoundError, PermissionError, ValueError, IndexError): pass return 0.0 class ResourceMonitor: def __init__(self, sample_interval: float = 1.0): self.sample_interval = sample_interval self.samples: List[ResourceSample] = [] self._stop_event = threading.Event() self._thread: Optional[threading.Thread] = None self._current_agent_count = 0 def start(self, agent_count: int): self._current_agent_count = agent_count self.samples = [] self._stop_event.clear() self._thread = threading.Thread(target=self._monitor_loop) self._thread.daemon = True self._thread.start() def stop(self) -> List[ResourceSample]: self._stop_event.set() if self._thread: self._thread.join(timeout=5) return self.samples def _monitor_loop(self): while not self._stop_event.is_set(): try: sample = self._collect_sample() self.samples.append(sample) except Exception as e: print(f"[WARN] Error collecting resource sample: {e}") self._stop_event.wait(self.sample_interval) def _collect_sample(self) -> ResourceSample: timestamp = time.time() try: opencode_procs = len([p for p in psutil.process_iter(['name']) if 'opencode' in p.info['name'].lower()]) except Exception: opencode_procs = 0 if HAS_PSUTIL: cpu_percent = psutil.cpu_percent(interval=0.1) memory_percent = psutil.virtual_memory().percent else: cpu_percent = 0.0 memory_percent = 0.0 return ResourceSample( timestamp=timestamp, cpu_percent=cpu_percent, memory_percent=memory_percent, opencode_processes=opencode_procs, agent_count=self._current_agent_count ) class ParallelCapacityTester: def __init__(self, timeout: int = 120, workdir: Optional[str] = None): self.timeout = timeout self.workdir = workdir or "/tmp/parallel_test" self.monitor = ResourceMonitor(sample_interval=1.0) self.results: List[TestRun] = [] def _create_test_workdir(self, agent_id: int) -> str: agent_dir = os.path.join(self.workdir, f"agent_{agent_id}_{int(time.time())}") os.makedirs(agent_dir, exist_ok=True) return agent_dir def _run_single_agent(self, agent_id: int) -> AgentResult: workdir = self._create_test_workdir(agent_id) start_time = time.time() task = "Respond with exactly: PARALLEL_TEST_OK" try: result = subprocess.run( ['opencode', 'run', task, '--workdir', workdir], capture_output=True, text=True, timeout=self.timeout ) duration = time.time() - start_time output = result.stdout + result.stderr success = 'PARALLEL_TEST_OK' in output return AgentResult( agent_id=agent_id, duration=duration, status='success' if success else 'failed', return_code=result.returncode, output=output[:500] ) except subprocess.TimeoutExpired: return AgentResult( agent_id=agent_id, duration=self.timeout, status='timeout', return_code=-1 ) except Exception as e: return AgentResult( agent_id=agent_id, duration=time.time() - start_time, status='failed', return_code=-1, error=str(e) ) def _run_parallel_agents(self, num_agents: int) -> TestRun: print(f"\n[TEST] Running with {num_agents} concurrent agent(s)...") self.monitor.start(num_agents) threads = [] results = [] results_lock = threading.Lock() def run_and_record(agent_id: int): result = self._run_single_agent(agent_id) with results_lock: results.append(result) start_time = time.time() for i in range(1, num_agents + 1): t = threading.Thread(target=run_and_record, args=(i,)) t.start() threads.append(t) all_done = False elapsed = 0 while elapsed < self.timeout and not all_done: time.sleep(1) elapsed = int(time.time() - start_time) all_done = all(not t.is_alive() for t in threads) subprocess.run(['pkill', '-f', 'opencode run'], capture_output=True) for t in threads: t.join(timeout=5) resource_samples = self.monitor.stop() total_duration = time.time() - start_time success_count = sum(1 for r in results if r.status == 'success') failed_count = sum(1 for r in results if r.status == 'failed') timeout_count = sum(1 for r in results if r.status == 'timeout') durations = [r.duration for r in results] avg_duration = statistics.mean(durations) if durations else 0 stddev = statistics.stdev(durations) if len(durations) > 1 else 0 min_duration = min(durations) if durations else 0 max_duration = max(durations) if durations else 0 if resource_samples: peak_cpu = max(s.cpu_percent for s in resource_samples) avg_cpu = statistics.mean(s.cpu_percent for s in resource_samples) peak_mem = max(s.memory_percent for s in resource_samples) avg_mem = statistics.mean(s.memory_percent for s in resource_samples) peak_procs = max(s.opencode_processes for s in resource_samples) else: peak_cpu = avg_cpu = peak_mem = avg_mem = peak_procs = 0 print(f"[RESULT] {num_agents} agents: {success_count} success, {failed_count} failed, {timeout_count} timeout") return TestRun( agent_count=num_agents, total_duration=total_duration, success_count=success_count, failed_count=failed_count, timeout_count=timeout_count, avg_response_time=avg_duration, stddev_response_time=stddev, min_response_time=min_duration, max_response_time=max_duration, peak_cpu_percent=peak_cpu, avg_cpu_percent=avg_cpu, peak_memory_percent=peak_mem, avg_memory_percent=avg_mem, peak_opencode_procs=peak_procs ) def run_capacity_test(self, max_agents: int = 10, step: int = 1, quick: bool = False) -> List[TestRun]: if quick: agent_counts = [1, 2, 3, 5, 8] else: agent_counts = list(range(1, max_agents + 1, step)) print(f"[INFO] Starting capacity test with {len(agent_counts)} configurations") print(f"[INFO] Agent counts: {agent_counts}") self.results = [] for count in agent_counts: subprocess.run(['pkill', '-f', 'opencode run'], capture_output=True) time.sleep(2) result = self._run_parallel_agents(count) self.results.append(result) return self.results def save_results(self, output_dir: str): output_path = Path(output_dir) output_path.mkdir(parents=True, exist_ok=True) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") json_file = output_path / f"results_{timestamp}.json" with open(json_file, 'w') as f: data = [asdict(run) for run in self.results] json.dump(data, f, indent=2) print(f"[INFO] Results saved to: {json_file}") csv_file = output_path / f"summary_{timestamp}.csv" with open(csv_file, 'w') as f: f.write("agents,duration,success,failed,timeout,avg_response,stddev,min_response,max_response,peak_cpu,avg_cpu,peak_mem,avg_mem,peak_procs\n") for run in self.results: f.write(f"{run.agent_count},{run.total_duration:.2f},{run.success_count}," f"{run.failed_count},{run.timeout_count},{run.avg_response_time:.2f}," f"{run.stddev_response_time:.2f},{run.min_response_time:.2f}," f"{run.max_response_time:.2f},{run.peak_cpu_percent:.1f}," f"{run.avg_cpu_percent:.1f},{run.peak_memory_percent:.1f}," f"{run.avg_memory_percent:.1f},{run.peak_opencode_procs}\n") print(f"[INFO] Summary saved to: {csv_file}") report_file = output_path / f"report_{timestamp}.md" self._generate_markdown_report(report_file) print(f"[INFO] Report saved to: {report_file}") return str(json_file), str(csv_file), str(report_file) def _generate_markdown_report(self, output_file: Path): with open(output_file, 'w') as f: f.write("# Parallel Capacity Test Report\n\n") f.write(f"**Generated:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n") f.write("## Summary\n\n") f.write("| Agents | Duration | Success | Failed | Timeout | Avg Response | Peak CPU | Peak Mem |\n") f.write("|--------|----------|---------|--------|---------|--------------|----------|----------|\n") for run in self.results: f.write(f"| {run.agent_count} | {run.total_duration:.1f}s | " f"{run.success_count} | {run.failed_count} | " f"{run.timeout_count} | {run.avg_response_time:.1f}s | " f"{run.peak_cpu_percent:.1f}% | {run.peak_memory_percent:.1f}% |\n") f.write("\n## Key Findings\n\n") successful_runs = [r for r in self.results if r.success_count == r.agent_count] optimal = max(successful_runs, key=lambda r: r.agent_count, default=None) if optimal: f.write(f"### Optimal Configuration\n") f.write(f"- **{optimal.agent_count} agents** achieved perfect success rate\n") f.write(f" - Average response time: {optimal.avg_response_time:.1f}s\n") f.write(f" - Peak CPU: {optimal.peak_cpu_percent:.1f}%\n") f.write(f" - Peak Memory: {optimal.peak_memory_percent:.1f}%\n\n") f.write("## Recommendations\n\n") if optimal: f.write(f"1. **Recommended max agents:** {optimal.agent_count} for stable operation\n") f.write("2. **Monitor closely:** 5+ agents\n") f.write("3. **Implement circuit breaker** when failure rate exceeds threshold\n") def main(): parser = argparse.ArgumentParser(description='Parallel Capacity Test Tool') parser.add_argument('--agents', '-n', type=int, default=10) parser.add_argument('--timeout', '-t', type=int, default=120) parser.add_argument('--step', '-s', type=int, default=1) parser.add_argument('--quick', '-q', action='store_true') parser.add_argument('--output', '-o', type=str, default=None) args = parser.parse_args() script_dir = Path(__file__).parent output_dir = args.output or str(script_dir / 'results') print("=" * 60) print("Parallel Capacity Test Tool for Hermes/OpenCode") print("=" * 60) print(f"Max agents: {args.agents}") print(f"Timeout: {args.timeout}s") print() tester = ParallelCapacityTester(timeout=args.timeout) try: tester.run_capacity_test(max_agents=args.agents, step=args.step, quick=args.quick) json_file, csv_file, report_file = tester.save_results(output_dir) print("\n" + "=" * 60) print("TEST COMPLETE") print("=" * 60) print(f"JSON Results: {json_file}") print(f"CSV Summary: {csv_file}") print(f"Report: {report_file}") except KeyboardInterrupt: print("\n[ABORT] Test interrupted by user") sys.exit(1) if __name__ == '__main__': main()