Files
kugetsu/tools/parallel-capacity-test/parallel_capacity_test.py
shokollm 74424c1f82 Add parallel capacity test tool for Hermes/OpenCode
This tool tests the practical limits of parallel agent execution
by spawning N concurrent opencode run tasks and measuring:
- Response time
- CPU and memory usage
- Success/failure rates

Includes both bash (run_test.sh) and Python (parallel_capacity_test.py)
implementations with full metrics collection and reporting.

Fixes #3
2026-03-27 10:29:34 +00:00

357 lines
13 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Parallel Capacity Test Tool for Hermes/OpenCode
Tests concurrent agent capacity by spawning N parallel opencode run tasks.
"""
import argparse
import json
import os
import subprocess
import sys
import time
import threading
import statistics
from dataclasses import dataclass, asdict
from datetime import datetime
from pathlib import Path
from typing import List, Optional
try:
import psutil
HAS_PSUTIL = True
except ImportError:
HAS_PSUTIL = False
print("[WARN] psutil not available - resource monitoring will be limited")
@dataclass
class AgentResult:
agent_id: int
duration: float
status: str
return_code: int
output: str = ""
@dataclass
class ResourceSample:
timestamp: float
cpu_percent: float
memory_percent: float
opencode_processes: int
agent_count: int
@dataclass
class TestRun:
agent_count: int
total_duration: float
success_count: int
failed_count: int
timeout_count: int
avg_response_time: float
stddev_response_time: float
min_response_time: float
max_response_time: float
peak_cpu_percent: float
avg_cpu_percent: float
peak_memory_percent: float
avg_memory_percent: float
peak_opencode_procs: int
class ResourceMonitor:
def __init__(self, sample_interval: float = 1.0):
self.sample_interval = sample_interval
self.samples: List[ResourceSample] = []
self._stop_event = threading.Event()
self._thread: Optional[threading.Thread] = None
self._current_agent_count = 0
def start(self, agent_count: int):
self._current_agent_count = agent_count
self.samples = []
self._stop_event.clear()
self._thread = threading.Thread(target=self._monitor_loop)
self._thread.daemon = True
self._thread.start()
def stop(self) -> List[ResourceSample]:
self._stop_event.set()
if self._thread:
self._thread.join(timeout=5)
return self.samples
def _monitor_loop(self):
while not self._stop_event.is_set():
try:
sample = self._collect_sample()
self.samples.append(sample)
except Exception as e:
print(f"[WARN] Error collecting resource sample: {e}")
self._stop_event.wait(self.sample_interval)
def _collect_sample(self) -> ResourceSample:
timestamp = time.time()
try:
opencode_procs = len([p for p in psutil.process_iter(['name'])
if 'opencode' in p.info['name'].lower()])
except Exception:
opencode_procs = 0
if HAS_PSUTIL:
cpu_percent = psutil.cpu_percent(interval=0.1)
memory_percent = psutil.virtual_memory().percent
else:
cpu_percent = 0.0
memory_percent = 0.0
return ResourceSample(
timestamp=timestamp,
cpu_percent=cpu_percent,
memory_percent=memory_percent,
opencode_processes=opencode_procs,
agent_count=self._current_agent_count
)
class ParallelCapacityTester:
def __init__(self, timeout: int = 120, workdir: Optional[str] = None):
self.timeout = timeout
self.workdir = workdir or "/tmp/parallel_test"
self.monitor = ResourceMonitor(sample_interval=1.0)
self.results: List[TestRun] = []
def _create_test_workdir(self, agent_id: int) -> str:
agent_dir = os.path.join(self.workdir, f"agent_{agent_id}_{int(time.time())}")
os.makedirs(agent_dir, exist_ok=True)
return agent_dir
def _run_single_agent(self, agent_id: int) -> AgentResult:
workdir = self._create_test_workdir(agent_id)
start_time = time.time()
task = "Respond with exactly: PARALLEL_TEST_OK"
try:
result = subprocess.run(
['opencode', 'run', task, '--workdir', workdir],
capture_output=True,
text=True,
timeout=self.timeout
)
duration = time.time() - start_time
output = result.stdout + result.stderr
success = 'PARALLEL_TEST_OK' in output
return AgentResult(
agent_id=agent_id,
duration=duration,
status='success' if success else 'failed',
return_code=result.returncode,
output=output[:500]
)
except subprocess.TimeoutExpired:
return AgentResult(
agent_id=agent_id,
duration=self.timeout,
status='timeout',
return_code=-1
)
except Exception as e:
return AgentResult(
agent_id=agent_id,
duration=time.time() - start_time,
status='failed',
return_code=-1,
error=str(e)
)
def _run_parallel_agents(self, num_agents: int) -> TestRun:
print(f"\n[TEST] Running with {num_agents} concurrent agent(s)...")
self.monitor.start(num_agents)
threads = []
results = []
results_lock = threading.Lock()
def run_and_record(agent_id: int):
result = self._run_single_agent(agent_id)
with results_lock:
results.append(result)
start_time = time.time()
for i in range(1, num_agents + 1):
t = threading.Thread(target=run_and_record, args=(i,))
t.start()
threads.append(t)
all_done = False
elapsed = 0
while elapsed < self.timeout and not all_done:
time.sleep(1)
elapsed = int(time.time() - start_time)
all_done = all(not t.is_alive() for t in threads)
subprocess.run(['pkill', '-f', 'opencode run'], capture_output=True)
for t in threads:
t.join(timeout=5)
resource_samples = self.monitor.stop()
total_duration = time.time() - start_time
success_count = sum(1 for r in results if r.status == 'success')
failed_count = sum(1 for r in results if r.status == 'failed')
timeout_count = sum(1 for r in results if r.status == 'timeout')
durations = [r.duration for r in results]
avg_duration = statistics.mean(durations) if durations else 0
stddev = statistics.stdev(durations) if len(durations) > 1 else 0
min_duration = min(durations) if durations else 0
max_duration = max(durations) if durations else 0
if resource_samples:
peak_cpu = max(s.cpu_percent for s in resource_samples)
avg_cpu = statistics.mean(s.cpu_percent for s in resource_samples)
peak_mem = max(s.memory_percent for s in resource_samples)
avg_mem = statistics.mean(s.memory_percent for s in resource_samples)
peak_procs = max(s.opencode_processes for s in resource_samples)
else:
peak_cpu = avg_cpu = peak_mem = avg_mem = peak_procs = 0
print(f"[RESULT] {num_agents} agents: {success_count} success, {failed_count} failed, {timeout_count} timeout")
return TestRun(
agent_count=num_agents,
total_duration=total_duration,
success_count=success_count,
failed_count=failed_count,
timeout_count=timeout_count,
avg_response_time=avg_duration,
stddev_response_time=stddev,
min_response_time=min_duration,
max_response_time=max_duration,
peak_cpu_percent=peak_cpu,
avg_cpu_percent=avg_cpu,
peak_memory_percent=peak_mem,
avg_memory_percent=avg_mem,
peak_opencode_procs=peak_procs
)
def run_capacity_test(self, max_agents: int = 10, step: int = 1,
quick: bool = False) -> List[TestRun]:
if quick:
agent_counts = [1, 2, 3, 5, 8]
else:
agent_counts = list(range(1, max_agents + 1, step))
print(f"[INFO] Starting capacity test with {len(agent_counts)} configurations")
print(f"[INFO] Agent counts: {agent_counts}")
self.results = []
for count in agent_counts:
subprocess.run(['pkill', '-f', 'opencode run'], capture_output=True)
time.sleep(2)
result = self._run_parallel_agents(count)
self.results.append(result)
return self.results
def save_results(self, output_dir: str):
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
json_file = output_path / f"results_{timestamp}.json"
with open(json_file, 'w') as f:
data = [asdict(run) for run in self.results]
json.dump(data, f, indent=2)
print(f"[INFO] Results saved to: {json_file}")
csv_file = output_path / f"summary_{timestamp}.csv"
with open(csv_file, 'w') as f:
f.write("agents,duration,success,failed,timeout,avg_response,stddev,min_response,max_response,peak_cpu,avg_cpu,peak_mem,avg_mem,peak_procs\n")
for run in self.results:
f.write(f"{run.agent_count},{run.total_duration:.2f},{run.success_count},"
f"{run.failed_count},{run.timeout_count},{run.avg_response_time:.2f},"
f"{run.stddev_response_time:.2f},{run.min_response_time:.2f},"
f"{run.max_response_time:.2f},{run.peak_cpu_percent:.1f},"
f"{run.avg_cpu_percent:.1f},{run.peak_memory_percent:.1f},"
f"{run.avg_memory_percent:.1f},{run.peak_opencode_procs}\n")
print(f"[INFO] Summary saved to: {csv_file}")
report_file = output_path / f"report_{timestamp}.md"
self._generate_markdown_report(report_file)
print(f"[INFO] Report saved to: {report_file}")
return str(json_file), str(csv_file), str(report_file)
def _generate_markdown_report(self, output_file: Path):
with open(output_file, 'w') as f:
f.write("# Parallel Capacity Test Report\n\n")
f.write(f"**Generated:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")
f.write("## Summary\n\n")
f.write("| Agents | Duration | Success | Failed | Timeout | Avg Response | Peak CPU | Peak Mem |\n")
f.write("|--------|----------|---------|--------|---------|--------------|----------|----------|\n")
for run in self.results:
f.write(f"| {run.agent_count} | {run.total_duration:.1f}s | "
f"{run.success_count} | {run.failed_count} | "
f"{run.timeout_count} | {run.avg_response_time:.1f}s | "
f"{run.peak_cpu_percent:.1f}% | {run.peak_memory_percent:.1f}% |\n")
f.write("\n## Key Findings\n\n")
successful_runs = [r for r in self.results if r.success_count == r.agent_count]
optimal = max(successful_runs, key=lambda r: r.agent_count, default=None)
if optimal:
f.write(f"### Optimal Configuration\n")
f.write(f"- **{optimal.agent_count} agents** achieved perfect success rate\n")
f.write(f" - Average response time: {optimal.avg_response_time:.1f}s\n")
f.write(f" - Peak CPU: {optimal.peak_cpu_percent:.1f}%\n")
f.write(f" - Peak Memory: {optimal.peak_memory_percent:.1f}%\n\n")
f.write("## Recommendations\n\n")
if optimal:
f.write(f"1. **Recommended max agents:** {optimal.agent_count} for stable operation\n")
f.write("2. **Monitor closely:** 5+ agents\n")
f.write("3. **Implement circuit breaker** when failure rate exceeds threshold\n")
def main():
parser = argparse.ArgumentParser(description='Parallel Capacity Test Tool')
parser.add_argument('--agents', '-n', type=int, default=10)
parser.add_argument('--timeout', '-t', type=int, default=120)
parser.add_argument('--step', '-s', type=int, default=1)
parser.add_argument('--quick', '-q', action='store_true')
parser.add_argument('--output', '-o', type=str, default=None)
args = parser.parse_args()
script_dir = Path(__file__).parent
output_dir = args.output or str(script_dir / 'results')
print("=" * 60)
print("Parallel Capacity Test Tool for Hermes/OpenCode")
print("=" * 60)
print(f"Max agents: {args.agents}")
print(f"Timeout: {args.timeout}s")
print()
tester = ParallelCapacityTester(timeout=args.timeout)
try:
tester.run_capacity_test(max_agents=args.agents, step=args.step, quick=args.quick)
json_file, csv_file, report_file = tester.save_results(output_dir)
print("\n" + "=" * 60)
print("TEST COMPLETE")
print("=" * 60)
print(f"JSON Results: {json_file}")
print(f"CSV Summary: {csv_file}")
print(f"Report: {report_file}")
except KeyboardInterrupt:
print("\n[ABORT] Test interrupted by user")
sys.exit(1)
if __name__ == '__main__':
main()