420 lines
15 KiB
Python
Executable File
420 lines
15 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Parallel Capacity Test Tool for Hermes/OpenCode
|
|
Tests concurrent agent capacity by spawning N parallel opencode run tasks.
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
import threading
|
|
import statistics
|
|
from dataclasses import dataclass, asdict
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import List, Optional
|
|
|
|
# Using stdlib only - no psutil required
|
|
|
|
|
|
@dataclass
|
|
class AgentResult:
|
|
agent_id: int
|
|
duration: float
|
|
status: str
|
|
return_code: int
|
|
output: str = ""
|
|
|
|
|
|
@dataclass
|
|
class ResourceSample:
|
|
timestamp: float
|
|
cpu_percent: float
|
|
memory_percent: float
|
|
opencode_processes: int
|
|
agent_count: int
|
|
|
|
|
|
@dataclass
|
|
class TestRun:
|
|
agent_count: int
|
|
total_duration: float
|
|
success_count: int
|
|
failed_count: int
|
|
timeout_count: int
|
|
avg_response_time: float
|
|
stddev_response_time: float
|
|
min_response_time: float
|
|
max_response_time: float
|
|
peak_cpu_percent: float
|
|
avg_cpu_percent: float
|
|
peak_memory_percent: float
|
|
avg_memory_percent: float
|
|
peak_opencode_procs: int
|
|
|
|
|
|
def get_memory_percent() -> float:
|
|
"""Get memory usage percent by reading /proc/meminfo (Linux)"""
|
|
try:
|
|
with open("/proc/meminfo", "r") as f:
|
|
meminfo = f.read()
|
|
total = 0
|
|
available = 0
|
|
for line in meminfo.splitlines():
|
|
if line.startswith("MemTotal:"):
|
|
total = int(line.split()[1])
|
|
elif line.startswith("MemAvailable:"):
|
|
available = int(line.split()[1])
|
|
break
|
|
if total > 0:
|
|
used = total - available
|
|
return (used / total) * 100
|
|
except (FileNotFoundError, PermissionError, ValueError):
|
|
pass
|
|
return 0.0
|
|
|
|
|
|
def count_opencode_processes() -> int:
|
|
"""Count opencode processes using pgrep or /proc scanning"""
|
|
try:
|
|
result = subprocess.run(
|
|
["pgrep", "-c", "-x", "opencode"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5
|
|
)
|
|
if result.returncode == 0:
|
|
return int(result.stdout.strip())
|
|
except (subprocess.TimeoutExpired, ValueError, subprocess.SubprocessError):
|
|
pass
|
|
try:
|
|
count = 0
|
|
for pid_dir in os.listdir("/proc"):
|
|
if not pid_dir.isdigit():
|
|
continue
|
|
try:
|
|
with open(f"/proc/{pid_dir}/comm", "r") as f:
|
|
if "opencode" in f.read().lower():
|
|
count += 1
|
|
except (PermissionError, FileNotFoundError):
|
|
continue
|
|
return count
|
|
except FileNotFoundError:
|
|
return 0
|
|
return 0
|
|
|
|
|
|
def get_cpu_percent() -> float:
|
|
"""Get CPU usage by reading /proc/stat"""
|
|
try:
|
|
with open("/proc/stat", "r") as f:
|
|
line = f.readline()
|
|
parts = line.split()
|
|
if parts[0] == "cpu":
|
|
values = [int(x) for x in parts[1:8]]
|
|
idle = values[3]
|
|
total = sum(values)
|
|
if total > 0:
|
|
return ((total - idle) / total) * 100
|
|
except (FileNotFoundError, PermissionError, ValueError, IndexError):
|
|
pass
|
|
return 0.0
|
|
|
|
|
|
class ResourceMonitor:
|
|
def __init__(self, sample_interval: float = 1.0):
|
|
self.sample_interval = sample_interval
|
|
self.samples: List[ResourceSample] = []
|
|
self._stop_event = threading.Event()
|
|
self._thread: Optional[threading.Thread] = None
|
|
self._current_agent_count = 0
|
|
|
|
def start(self, agent_count: int):
|
|
self._current_agent_count = agent_count
|
|
self.samples = []
|
|
self._stop_event.clear()
|
|
self._thread = threading.Thread(target=self._monitor_loop)
|
|
self._thread.daemon = True
|
|
self._thread.start()
|
|
|
|
def stop(self) -> List[ResourceSample]:
|
|
self._stop_event.set()
|
|
if self._thread:
|
|
self._thread.join(timeout=5)
|
|
return self.samples
|
|
|
|
def _monitor_loop(self):
|
|
while not self._stop_event.is_set():
|
|
try:
|
|
sample = self._collect_sample()
|
|
self.samples.append(sample)
|
|
except Exception as e:
|
|
print(f"[WARN] Error collecting resource sample: {e}")
|
|
self._stop_event.wait(self.sample_interval)
|
|
|
|
def _collect_sample(self) -> ResourceSample:
|
|
timestamp = time.time()
|
|
try:
|
|
opencode_procs = len([p for p in psutil.process_iter(['name'])
|
|
if 'opencode' in p.info['name'].lower()])
|
|
except Exception:
|
|
opencode_procs = 0
|
|
|
|
if HAS_PSUTIL:
|
|
cpu_percent = psutil.cpu_percent(interval=0.1)
|
|
memory_percent = psutil.virtual_memory().percent
|
|
else:
|
|
cpu_percent = 0.0
|
|
memory_percent = 0.0
|
|
|
|
return ResourceSample(
|
|
timestamp=timestamp,
|
|
cpu_percent=cpu_percent,
|
|
memory_percent=memory_percent,
|
|
opencode_processes=opencode_procs,
|
|
agent_count=self._current_agent_count
|
|
)
|
|
|
|
|
|
class ParallelCapacityTester:
|
|
def __init__(self, timeout: int = 120, workdir: Optional[str] = None):
|
|
self.timeout = timeout
|
|
self.workdir = workdir or "/tmp/parallel_test"
|
|
self.monitor = ResourceMonitor(sample_interval=1.0)
|
|
self.results: List[TestRun] = []
|
|
|
|
def _create_test_workdir(self, agent_id: int) -> str:
|
|
agent_dir = os.path.join(self.workdir, f"agent_{agent_id}_{int(time.time())}")
|
|
os.makedirs(agent_dir, exist_ok=True)
|
|
return agent_dir
|
|
|
|
def _run_single_agent(self, agent_id: int) -> AgentResult:
|
|
workdir = self._create_test_workdir(agent_id)
|
|
start_time = time.time()
|
|
task = "Respond with exactly: PARALLEL_TEST_OK"
|
|
|
|
try:
|
|
result = subprocess.run(
|
|
['opencode', 'run', task, '--workdir', workdir],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=self.timeout
|
|
)
|
|
duration = time.time() - start_time
|
|
output = result.stdout + result.stderr
|
|
success = 'PARALLEL_TEST_OK' in output
|
|
|
|
return AgentResult(
|
|
agent_id=agent_id,
|
|
duration=duration,
|
|
status='success' if success else 'failed',
|
|
return_code=result.returncode,
|
|
output=output[:500]
|
|
)
|
|
except subprocess.TimeoutExpired:
|
|
return AgentResult(
|
|
agent_id=agent_id,
|
|
duration=self.timeout,
|
|
status='timeout',
|
|
return_code=-1
|
|
)
|
|
except Exception as e:
|
|
return AgentResult(
|
|
agent_id=agent_id,
|
|
duration=time.time() - start_time,
|
|
status='failed',
|
|
return_code=-1,
|
|
error=str(e)
|
|
)
|
|
|
|
def _run_parallel_agents(self, num_agents: int) -> TestRun:
|
|
print(f"\n[TEST] Running with {num_agents} concurrent agent(s)...")
|
|
self.monitor.start(num_agents)
|
|
|
|
threads = []
|
|
results = []
|
|
results_lock = threading.Lock()
|
|
|
|
def run_and_record(agent_id: int):
|
|
result = self._run_single_agent(agent_id)
|
|
with results_lock:
|
|
results.append(result)
|
|
|
|
start_time = time.time()
|
|
|
|
for i in range(1, num_agents + 1):
|
|
t = threading.Thread(target=run_and_record, args=(i,))
|
|
t.start()
|
|
threads.append(t)
|
|
|
|
all_done = False
|
|
elapsed = 0
|
|
while elapsed < self.timeout and not all_done:
|
|
time.sleep(1)
|
|
elapsed = int(time.time() - start_time)
|
|
all_done = all(not t.is_alive() for t in threads)
|
|
|
|
subprocess.run(['pkill', '-f', 'opencode run'], capture_output=True)
|
|
|
|
for t in threads:
|
|
t.join(timeout=5)
|
|
|
|
resource_samples = self.monitor.stop()
|
|
total_duration = time.time() - start_time
|
|
|
|
success_count = sum(1 for r in results if r.status == 'success')
|
|
failed_count = sum(1 for r in results if r.status == 'failed')
|
|
timeout_count = sum(1 for r in results if r.status == 'timeout')
|
|
|
|
durations = [r.duration for r in results]
|
|
avg_duration = statistics.mean(durations) if durations else 0
|
|
stddev = statistics.stdev(durations) if len(durations) > 1 else 0
|
|
min_duration = min(durations) if durations else 0
|
|
max_duration = max(durations) if durations else 0
|
|
|
|
if resource_samples:
|
|
peak_cpu = max(s.cpu_percent for s in resource_samples)
|
|
avg_cpu = statistics.mean(s.cpu_percent for s in resource_samples)
|
|
peak_mem = max(s.memory_percent for s in resource_samples)
|
|
avg_mem = statistics.mean(s.memory_percent for s in resource_samples)
|
|
peak_procs = max(s.opencode_processes for s in resource_samples)
|
|
else:
|
|
peak_cpu = avg_cpu = peak_mem = avg_mem = peak_procs = 0
|
|
|
|
print(f"[RESULT] {num_agents} agents: {success_count} success, {failed_count} failed, {timeout_count} timeout")
|
|
|
|
return TestRun(
|
|
agent_count=num_agents,
|
|
total_duration=total_duration,
|
|
success_count=success_count,
|
|
failed_count=failed_count,
|
|
timeout_count=timeout_count,
|
|
avg_response_time=avg_duration,
|
|
stddev_response_time=stddev,
|
|
min_response_time=min_duration,
|
|
max_response_time=max_duration,
|
|
peak_cpu_percent=peak_cpu,
|
|
avg_cpu_percent=avg_cpu,
|
|
peak_memory_percent=peak_mem,
|
|
avg_memory_percent=avg_mem,
|
|
peak_opencode_procs=peak_procs
|
|
)
|
|
|
|
def run_capacity_test(self, max_agents: int = 10, step: int = 1,
|
|
quick: bool = False) -> List[TestRun]:
|
|
if quick:
|
|
agent_counts = [1, 2, 3, 5, 8]
|
|
else:
|
|
agent_counts = list(range(1, max_agents + 1, step))
|
|
|
|
print(f"[INFO] Starting capacity test with {len(agent_counts)} configurations")
|
|
print(f"[INFO] Agent counts: {agent_counts}")
|
|
|
|
self.results = []
|
|
|
|
for count in agent_counts:
|
|
subprocess.run(['pkill', '-f', 'opencode run'], capture_output=True)
|
|
time.sleep(2)
|
|
result = self._run_parallel_agents(count)
|
|
self.results.append(result)
|
|
|
|
return self.results
|
|
|
|
def save_results(self, output_dir: str):
|
|
output_path = Path(output_dir)
|
|
output_path.mkdir(parents=True, exist_ok=True)
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
|
|
json_file = output_path / f"results_{timestamp}.json"
|
|
with open(json_file, 'w') as f:
|
|
data = [asdict(run) for run in self.results]
|
|
json.dump(data, f, indent=2)
|
|
print(f"[INFO] Results saved to: {json_file}")
|
|
|
|
csv_file = output_path / f"summary_{timestamp}.csv"
|
|
with open(csv_file, 'w') as f:
|
|
f.write("agents,duration,success,failed,timeout,avg_response,stddev,min_response,max_response,peak_cpu,avg_cpu,peak_mem,avg_mem,peak_procs\n")
|
|
for run in self.results:
|
|
f.write(f"{run.agent_count},{run.total_duration:.2f},{run.success_count},"
|
|
f"{run.failed_count},{run.timeout_count},{run.avg_response_time:.2f},"
|
|
f"{run.stddev_response_time:.2f},{run.min_response_time:.2f},"
|
|
f"{run.max_response_time:.2f},{run.peak_cpu_percent:.1f},"
|
|
f"{run.avg_cpu_percent:.1f},{run.peak_memory_percent:.1f},"
|
|
f"{run.avg_memory_percent:.1f},{run.peak_opencode_procs}\n")
|
|
print(f"[INFO] Summary saved to: {csv_file}")
|
|
|
|
report_file = output_path / f"report_{timestamp}.md"
|
|
self._generate_markdown_report(report_file)
|
|
print(f"[INFO] Report saved to: {report_file}")
|
|
|
|
return str(json_file), str(csv_file), str(report_file)
|
|
|
|
def _generate_markdown_report(self, output_file: Path):
|
|
with open(output_file, 'w') as f:
|
|
f.write("# Parallel Capacity Test Report\n\n")
|
|
f.write(f"**Generated:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")
|
|
f.write("## Summary\n\n")
|
|
f.write("| Agents | Duration | Success | Failed | Timeout | Avg Response | Peak CPU | Peak Mem |\n")
|
|
f.write("|--------|----------|---------|--------|---------|--------------|----------|----------|\n")
|
|
for run in self.results:
|
|
f.write(f"| {run.agent_count} | {run.total_duration:.1f}s | "
|
|
f"{run.success_count} | {run.failed_count} | "
|
|
f"{run.timeout_count} | {run.avg_response_time:.1f}s | "
|
|
f"{run.peak_cpu_percent:.1f}% | {run.peak_memory_percent:.1f}% |\n")
|
|
f.write("\n## Key Findings\n\n")
|
|
successful_runs = [r for r in self.results if r.success_count == r.agent_count]
|
|
optimal = max(successful_runs, key=lambda r: r.agent_count, default=None)
|
|
if optimal:
|
|
f.write(f"### Optimal Configuration\n")
|
|
f.write(f"- **{optimal.agent_count} agents** achieved perfect success rate\n")
|
|
f.write(f" - Average response time: {optimal.avg_response_time:.1f}s\n")
|
|
f.write(f" - Peak CPU: {optimal.peak_cpu_percent:.1f}%\n")
|
|
f.write(f" - Peak Memory: {optimal.peak_memory_percent:.1f}%\n\n")
|
|
f.write("## Recommendations\n\n")
|
|
if optimal:
|
|
f.write(f"1. **Recommended max agents:** {optimal.agent_count} for stable operation\n")
|
|
f.write("2. **Monitor closely:** 5+ agents\n")
|
|
f.write("3. **Implement circuit breaker** when failure rate exceeds threshold\n")
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='Parallel Capacity Test Tool')
|
|
parser.add_argument('--agents', '-n', type=int, default=10)
|
|
parser.add_argument('--timeout', '-t', type=int, default=120)
|
|
parser.add_argument('--step', '-s', type=int, default=1)
|
|
parser.add_argument('--quick', '-q', action='store_true')
|
|
parser.add_argument('--output', '-o', type=str, default=None)
|
|
args = parser.parse_args()
|
|
|
|
script_dir = Path(__file__).parent
|
|
output_dir = args.output or str(script_dir / 'results')
|
|
|
|
print("=" * 60)
|
|
print("Parallel Capacity Test Tool for Hermes/OpenCode")
|
|
print("=" * 60)
|
|
print(f"Max agents: {args.agents}")
|
|
print(f"Timeout: {args.timeout}s")
|
|
print()
|
|
|
|
tester = ParallelCapacityTester(timeout=args.timeout)
|
|
|
|
try:
|
|
tester.run_capacity_test(max_agents=args.agents, step=args.step, quick=args.quick)
|
|
json_file, csv_file, report_file = tester.save_results(output_dir)
|
|
print("\n" + "=" * 60)
|
|
print("TEST COMPLETE")
|
|
print("=" * 60)
|
|
print(f"JSON Results: {json_file}")
|
|
print(f"CSV Summary: {csv_file}")
|
|
print(f"Report: {report_file}")
|
|
except KeyboardInterrupt:
|
|
print("\n[ABORT] Test interrupted by user")
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|