Add parallel capacity test tool for Hermes/OpenCode #5

Merged
shoko merged 7 commits from fix/issue-3-parallel-test into main 2026-03-31 06:28:58 +02:00
Showing only changes of commit 5bc70dd515 - Show all commits

View File

@@ -1,7 +1,11 @@
#!/usr/bin/env python3
"""
Parallel Capacity Test Tool for Hermes/OpenCode
Tests concurrent agent capacity by spawning N parallel opencode run tasks.
Parallel Capacity Test Tool for Hermes/OpenCode/Kugetsu
Tests concurrent agent capacity by spawning N parallel tasks.
Supports two modes:
- opencode: Direct opencode run (legacy)
- kugetsu: Via kugetsu CLI (tests full orchestration stack)
"""
import argparse
@@ -12,11 +16,13 @@ import sys
import time
import threading
import statistics
import uuid
from dataclasses import dataclass, asdict
from datetime import datetime
from pathlib import Path
from typing import List, Optional
try:
import psutil
@@ -26,71 +32,6 @@ except ImportError:
print("[WARN] psutil not available - resource monitoring will be limited")
def get_memory_percent() -> float:
"""Get memory usage percent by reading /proc/meminfo (Linux)"""
try:
with open("/proc/meminfo", "r") as f:
meminfo = f.read()
total = 0
available = 0
for line in meminfo.splitlines():
if line.startswith("MemTotal:"):
total = int(line.split()[1])
elif line.startswith("MemAvailable:"):
available = int(line.split()[1])
break
if total > 0:
used = total - available
return (used / total) * 100
except (FileNotFoundError, PermissionError, ValueError):
pass
return 0.0
def count_opencode_processes() -> int:
"""Count opencode processes using pgrep or /proc scanning"""
try:
result = subprocess.run(
["pgrep", "-c", "-x", "opencode"], capture_output=True, text=True, timeout=5
)
if result.returncode == 0:
return int(result.stdout.strip())
except (subprocess.TimeoutExpired, ValueError, subprocess.SubprocessError):
pass
try:
count = 0
for pid_dir in os.listdir("/proc"):
if not pid_dir.isdigit():
continue
try:
with open(f"/proc/{pid_dir}/comm", "r") as f:
if "opencode" in f.read().lower():
count += 1
except (PermissionError, FileNotFoundError):
continue
return count
except FileNotFoundError:
return 0
return 0
def get_cpu_percent() -> float:
"""Get CPU usage by reading /proc/stat"""
try:
with open("/proc/stat", "r") as f:
line = f.readline()
parts = line.split()
if parts[0] == "cpu":
values = [int(x) for x in parts[1:8]]
idle = values[3]
total = sum(values)
if total > 0:
return ((total - idle) / total) * 100
except (FileNotFoundError, PermissionError, ValueError, IndexError):
pass
return 0.0
@dataclass
class AgentResult:
agent_id: int
@@ -104,6 +45,7 @@ class AgentResult:
class ResourceSample:
timestamp: float
cpu_percent: float
memory_mb: float
memory_percent: float
opencode_processes: int
agent_count: int
@@ -122,9 +64,14 @@ class TestRun:
max_response_time: float
peak_cpu_percent: float
avg_cpu_percent: float
peak_memory_mb: float
avg_memory_mb: float
peak_memory_percent: float
avg_memory_percent: float
peak_opencode_procs: int
baseline_memory_mb: float = 0.0
memory_per_agent_mb: float = 0.0
total_cost_score: float = 0.0
class ResourceMonitor:
@@ -173,26 +120,65 @@ class ResourceMonitor:
if HAS_PSUTIL:
cpu_percent = psutil.cpu_percent(interval=0.1)
memory_percent = psutil.virtual_memory().percent
virt_mem = psutil.virtual_memory()
memory_percent = virt_mem.percent
memory_mb = virt_mem.used / (1024 * 1024)
else:
cpu_percent = 0.0
memory_percent = 0.0
memory_mb = get_memory_mb_stdlib()
return ResourceSample(
timestamp=timestamp,
cpu_percent=cpu_percent,
memory_mb=memory_mb,
memory_percent=memory_percent,
opencode_processes=opencode_procs,
agent_count=self._current_agent_count,
)
def get_memory_mb_stdlib() -> float:
try:
with open("/proc/meminfo", "r") as f:
meminfo = f.read()
total_kb = 0
avail_kb = 0
for line in meminfo.splitlines():
if line.startswith("MemTotal:"):
total_kb = int(line.split()[1])
elif line.startswith("MemAvailable:"):
avail_kb = int(line.split()[1])
if total_kb > 0:
used_kb = total_kb - avail_kb
return used_kb / 1024
except Exception:
pass
return 0.0
class ParallelCapacityTester:
def __init__(self, timeout: int = 120, workdir: Optional[str] = None):
def __init__(
self,
timeout: int = 120,
workdir: Optional[str] = None,
use_kugetsu: bool = False,
memory_limit_mb: int = 1024,
test_repo: str = "git.example.com/test/kugetsu",
):
self.timeout = timeout
self.workdir = workdir or "/tmp/parallel_test"
self.use_kugetsu = use_kugetsu
self.memory_limit_mb = memory_limit_mb
self.test_repo = test_repo
self.monitor = ResourceMonitor(sample_interval=1.0)
self.results: List[TestRun] = []
self.baseline_memory_mb = 0.0
def _measure_baseline_memory(self) -> float:
if HAS_PSUTIL:
return psutil.virtual_memory().used / (1024 * 1024)
return get_memory_mb_stdlib()
def _create_test_workdir(self, agent_id: int) -> str:
agent_dir = os.path.join(self.workdir, f"agent_{agent_id}_{int(time.time())}")
@@ -205,6 +191,16 @@ class ParallelCapacityTester:
task = "Respond with exactly: PARALLEL_TEST_OK"
try:
if self.use_kugetsu:
unique_id = uuid.uuid4().hex[:8]
issue_ref = f"{self.test_repo}#{agent_id}-{unique_id}"
result = subprocess.run(
["kugetsu", "start", issue_ref, task],
capture_output=True,
text=True,
timeout=self.timeout,
)
else:
result = subprocess.run(
["opencode", "run", task, "--dir", workdir],
capture_output=True,
@@ -213,7 +209,7 @@ class ParallelCapacityTester:
)
duration = time.time() - start_time
output = result.stdout + result.stderr
success = "PARALLEL_TEST_OK" in output
success = "PARALLEL_TEST_OK" in output or result.returncode == 0
return AgentResult(
agent_id=agent_id,
@@ -239,13 +235,27 @@ class ParallelCapacityTester:
def _run_parallel_agents(self, num_agents: int) -> TestRun:
print(f"\n[TEST] Running with {num_agents} concurrent agent(s)...")
self.baseline_memory_mb = self._measure_baseline_memory()
print(f"[INFO] Baseline memory: {self.baseline_memory_mb:.1f} MB")
self.monitor.start(num_agents)
threads = []
results = []
results_lock = threading.Lock()
memory_exceeded = False
def run_and_record(agent_id: int):
nonlocal memory_exceeded
if not memory_exceeded:
current_mem = self._measure_baseline_memory()
if current_mem > self.baseline_memory_mb + self.memory_limit_mb:
memory_exceeded = True
print(
f"[WARN] Memory limit ({self.memory_limit_mb}MB) approached, not spawning more agents"
)
return
result = self._run_single_agent(agent_id)
with results_lock:
results.append(result)
@@ -253,6 +263,13 @@ class ParallelCapacityTester:
start_time = time.time()
for i in range(1, num_agents + 1):
current_mem = self._measure_baseline_memory()
if current_mem > self.baseline_memory_mb + self.memory_limit_mb:
print(
f"[WARN] Memory limit ({self.memory_limit_mb}MB) would be exceeded, stopping spawn at {i - 1} agents"
)
memory_exceeded = True
break
t = threading.Thread(target=run_and_record, args=(i,))
t.start()
threads.append(t)
@@ -285,15 +302,34 @@ class ParallelCapacityTester:
if resource_samples:
peak_cpu = max(s.cpu_percent for s in resource_samples)
avg_cpu = statistics.mean(s.cpu_percent for s in resource_samples)
peak_mem = max(s.memory_percent for s in resource_samples)
avg_mem = statistics.mean(s.memory_percent for s in resource_samples)
peak_mem_pct = max(s.memory_percent for s in resource_samples)
avg_mem_pct = statistics.mean(s.memory_percent for s in resource_samples)
peak_mem_mb = max(s.memory_mb for s in resource_samples)
avg_mem_mb = statistics.mean(s.memory_mb for s in resource_samples)
peak_procs = max(s.opencode_processes for s in resource_samples)
else:
peak_cpu = avg_cpu = peak_mem = avg_mem = peak_procs = 0
peak_cpu = avg_cpu = peak_mem_pct = avg_mem_pct = peak_mem_mb = (
avg_mem_mb
) = peak_procs = 0
actual_agents = len(results) if results else num_agents
memory_per_agent = (
(peak_mem_mb - self.baseline_memory_mb) / actual_agents
if actual_agents > 0
else 0
)
total_cost = (
(peak_mem_mb - self.baseline_memory_mb) * total_duration / 1000
if peak_mem_mb > self.baseline_memory_mb
else 0
)
print(
f"[RESULT] {num_agents} agents: {success_count} success, {failed_count} failed, {timeout_count} timeout"
)
print(
f"[COST] Memory per agent: {memory_per_agent:.1f} MB, Total cost score: {total_cost:.2f}"
)
return TestRun(
agent_count=num_agents,
@@ -307,9 +343,14 @@ class ParallelCapacityTester:
max_response_time=max_duration,
peak_cpu_percent=peak_cpu,
avg_cpu_percent=avg_cpu,
peak_memory_percent=peak_mem,
avg_memory_percent=avg_mem,
peak_memory_mb=peak_mem_mb,
avg_memory_mb=avg_mem_mb,
peak_memory_percent=peak_mem_pct,
avg_memory_percent=avg_mem_pct,
peak_opencode_procs=peak_procs,
baseline_memory_mb=self.baseline_memory_mb,
memory_per_agent_mb=memory_per_agent,
total_cost_score=total_cost,
)
def run_capacity_test(
@@ -347,7 +388,7 @@ class ParallelCapacityTester:
csv_file = output_path / f"summary_{timestamp}.csv"
with open(csv_file, "w") as f:
f.write(
"agents,duration,success,failed,timeout,avg_response,stddev,min_response,max_response,peak_cpu,avg_cpu,peak_mem,avg_mem,peak_procs\n"
"agents,duration,success,failed,timeout,avg_response,stddev,min_response,max_response,peak_cpu,avg_cpu,peak_mem_mb,avg_mem_mb,peak_mem_pct,avg_mem_pct,peak_procs,baseline_mem,mem_per_agent,cost_score\n"
)
for run in self.results:
f.write(
@@ -355,8 +396,10 @@ class ParallelCapacityTester:
f"{run.failed_count},{run.timeout_count},{run.avg_response_time:.2f},"
f"{run.stddev_response_time:.2f},{run.min_response_time:.2f},"
f"{run.max_response_time:.2f},{run.peak_cpu_percent:.1f},"
f"{run.avg_cpu_percent:.1f},{run.peak_memory_percent:.1f},"
f"{run.avg_memory_percent:.1f},{run.peak_opencode_procs}\n"
f"{run.avg_cpu_percent:.1f},{run.peak_memory_mb:.1f},"
f"{run.avg_memory_mb:.1f},{run.peak_memory_percent:.1f},"
f"{run.avg_memory_percent:.1f},{run.peak_opencode_procs},"
f"{run.baseline_memory_mb:.1f},{run.memory_per_agent_mb:.1f},{run.total_cost_score:.2f}\n"
)
print(f"[INFO] Summary saved to: {csv_file}")
@@ -374,18 +417,33 @@ class ParallelCapacityTester:
)
f.write("## Summary\n\n")
f.write(
"| Agents | Duration | Success | Failed | Timeout | Avg Response | Peak CPU | Peak Mem |\n"
"| Agents | Duration | Success | Failed | Timeout | Avg Response | Peak Mem (MB) | Mem/Agent | Cost Score |\n"
)
f.write(
"|--------|----------|---------|--------|---------|--------------|----------|----------|\n"
"|--------|----------|---------|--------|---------|--------------|---------------|-----------|------------|\n"
)
for run in self.results:
f.write(
f"| {run.agent_count} | {run.total_duration:.1f}s | "
f"{run.success_count} | {run.failed_count} | "
f"{run.timeout_count} | {run.avg_response_time:.1f}s | "
f"{run.peak_cpu_percent:.1f}% | {run.peak_memory_percent:.1f}% |\n"
f"{run.peak_memory_mb:.0f}MB | {run.memory_per_agent_mb:.1f}MB | {run.total_cost_score:.2f} |\n"
)
f.write("\n## Cost Analysis\n\n")
f.write("| Metric | Value |\n")
f.write("|--------|-------|\n")
if self.results:
baseline = self.results[0].baseline_memory_mb
f.write(f"| Baseline Memory | {baseline:.1f} MB |\n")
avg_mem_per = sum(r.memory_per_agent_mb for r in self.results) / len(
self.results
)
f.write(f"| Avg Memory per Agent | {avg_mem_per:.1f} MB |\n")
f.write(f"| Memory Limit | {self.memory_limit_mb} MB |\n")
max_capacity = (
int(self.memory_limit_mb / avg_mem_per) if avg_mem_per > 0 else 0
)
f.write(f"| Estimated Max Capacity | {max_capacity} agents |\n")
f.write("\n## Key Findings\n\n")
successful_runs = [
r for r in self.results if r.success_count == r.agent_count
@@ -400,7 +458,11 @@ class ParallelCapacityTester:
f" - Average response time: {optimal.avg_response_time:.1f}s\n"
)
f.write(f" - Peak CPU: {optimal.peak_cpu_percent:.1f}%\n")
f.write(f" - Peak Memory: {optimal.peak_memory_percent:.1f}%\n\n")
f.write(
f" - Peak Memory: {optimal.peak_memory_mb:.1f}MB ({optimal.peak_memory_percent:.1f}%)\n"
)
f.write(f" - Memory per agent: {optimal.memory_per_agent_mb:.1f}MB\n")
f.write(f" - Cost score: {optimal.total_cost_score:.2f}\n\n")
f.write("## Recommendations\n\n")
if optimal:
f.write(
@@ -413,25 +475,56 @@ class ParallelCapacityTester:
def main():
parser = argparse.ArgumentParser(description="Parallel Capacity Test Tool")
parser = argparse.ArgumentParser(
description="Parallel Capacity Test Tool for Hermes/OpenCode/Kugetsu"
)
parser.add_argument("--agents", "-n", type=int, default=10)
parser.add_argument("--timeout", "-t", type=int, default=120)
parser.add_argument("--step", "-s", type=int, default=1)
parser.add_argument("--quick", "-q", action="store_true")
parser.add_argument("--output", "-o", type=str, default=None)
parser.add_argument(
"--use-kugetsu",
"-k",
action="store_true",
help="Use kugetsu CLI instead of raw opencode (tests full orchestration)",
)
parser.add_argument(
"--memory-limit",
"-m",
type=int,
default=1024,
help="Memory limit per agent in MB (default: 1024 = 1GB)",
)
parser.add_argument(
"--test-repo",
"-r",
type=str,
default="git.example.com/test/kugetsu",
help="Repository for kugetsu issue refs (default: git.example.com/test/kugetsu)",
)
args = parser.parse_args()
script_dir = Path(__file__).parent
output_dir = args.output or str(script_dir / "results")
mode = "kugetsu" if args.use_kugetsu else "opencode"
print("=" * 60)
print("Parallel Capacity Test Tool for Hermes/OpenCode")
print(f"Parallel Capacity Test Tool ({mode} mode)")
print("=" * 60)
print(f"Max agents: {args.agents}")
print(f"Timeout: {args.timeout}s")
print(f"Memory limit: {args.memory_limit}MB")
if args.use_kugetsu:
print(f"Test repo: {args.test_repo}")
print()
tester = ParallelCapacityTester(timeout=args.timeout)
tester = ParallelCapacityTester(
timeout=args.timeout,
use_kugetsu=args.use_kugetsu,
memory_limit_mb=args.memory_limit,
test_repo=args.test_repo,
)
try:
tester.run_capacity_test(