From 5bc70dd515616e2e180e5eff95d707b6cbb6f3e9 Mon Sep 17 00:00:00 2001 From: shokollm <270575765+shokollm@users.noreply.github.com> Date: Tue, 31 Mar 2026 03:47:38 +0000 Subject: [PATCH] feat(parallel-test): add kugetsu mode, memory limits, and cost tracking --- .../parallel_capacity_test.py | 281 ++++++++++++------ 1 file changed, 187 insertions(+), 94 deletions(-) diff --git a/tools/parallel-capacity-test/parallel_capacity_test.py b/tools/parallel-capacity-test/parallel_capacity_test.py index 5bb3d43..3b8cbed 100755 --- a/tools/parallel-capacity-test/parallel_capacity_test.py +++ b/tools/parallel-capacity-test/parallel_capacity_test.py @@ -1,7 +1,11 @@ #!/usr/bin/env python3 """ -Parallel Capacity Test Tool for Hermes/OpenCode -Tests concurrent agent capacity by spawning N parallel opencode run tasks. +Parallel Capacity Test Tool for Hermes/OpenCode/Kugetsu +Tests concurrent agent capacity by spawning N parallel tasks. + +Supports two modes: +- opencode: Direct opencode run (legacy) +- kugetsu: Via kugetsu CLI (tests full orchestration stack) """ import argparse @@ -12,11 +16,13 @@ import sys import time import threading import statistics +import uuid from dataclasses import dataclass, asdict from datetime import datetime from pathlib import Path from typing import List, Optional + try: import psutil @@ -26,71 +32,6 @@ except ImportError: print("[WARN] psutil not available - resource monitoring will be limited") -def get_memory_percent() -> float: - """Get memory usage percent by reading /proc/meminfo (Linux)""" - try: - with open("/proc/meminfo", "r") as f: - meminfo = f.read() - total = 0 - available = 0 - for line in meminfo.splitlines(): - if line.startswith("MemTotal:"): - total = int(line.split()[1]) - elif line.startswith("MemAvailable:"): - available = int(line.split()[1]) - break - if total > 0: - used = total - available - return (used / total) * 100 - except (FileNotFoundError, PermissionError, ValueError): - pass - return 0.0 - - -def count_opencode_processes() -> int: - """Count opencode processes using pgrep or /proc scanning""" - try: - result = subprocess.run( - ["pgrep", "-c", "-x", "opencode"], capture_output=True, text=True, timeout=5 - ) - if result.returncode == 0: - return int(result.stdout.strip()) - except (subprocess.TimeoutExpired, ValueError, subprocess.SubprocessError): - pass - try: - count = 0 - for pid_dir in os.listdir("/proc"): - if not pid_dir.isdigit(): - continue - try: - with open(f"/proc/{pid_dir}/comm", "r") as f: - if "opencode" in f.read().lower(): - count += 1 - except (PermissionError, FileNotFoundError): - continue - return count - except FileNotFoundError: - return 0 - return 0 - - -def get_cpu_percent() -> float: - """Get CPU usage by reading /proc/stat""" - try: - with open("/proc/stat", "r") as f: - line = f.readline() - parts = line.split() - if parts[0] == "cpu": - values = [int(x) for x in parts[1:8]] - idle = values[3] - total = sum(values) - if total > 0: - return ((total - idle) / total) * 100 - except (FileNotFoundError, PermissionError, ValueError, IndexError): - pass - return 0.0 - - @dataclass class AgentResult: agent_id: int @@ -104,6 +45,7 @@ class AgentResult: class ResourceSample: timestamp: float cpu_percent: float + memory_mb: float memory_percent: float opencode_processes: int agent_count: int @@ -122,9 +64,14 @@ class TestRun: max_response_time: float peak_cpu_percent: float avg_cpu_percent: float + peak_memory_mb: float + avg_memory_mb: float peak_memory_percent: float avg_memory_percent: float peak_opencode_procs: int + baseline_memory_mb: float = 0.0 + memory_per_agent_mb: float = 0.0 + total_cost_score: float = 0.0 class ResourceMonitor: @@ -173,26 +120,65 @@ class ResourceMonitor: if HAS_PSUTIL: cpu_percent = psutil.cpu_percent(interval=0.1) - memory_percent = psutil.virtual_memory().percent + virt_mem = psutil.virtual_memory() + memory_percent = virt_mem.percent + memory_mb = virt_mem.used / (1024 * 1024) else: cpu_percent = 0.0 memory_percent = 0.0 + memory_mb = get_memory_mb_stdlib() return ResourceSample( timestamp=timestamp, cpu_percent=cpu_percent, + memory_mb=memory_mb, memory_percent=memory_percent, opencode_processes=opencode_procs, agent_count=self._current_agent_count, ) +def get_memory_mb_stdlib() -> float: + try: + with open("/proc/meminfo", "r") as f: + meminfo = f.read() + total_kb = 0 + avail_kb = 0 + for line in meminfo.splitlines(): + if line.startswith("MemTotal:"): + total_kb = int(line.split()[1]) + elif line.startswith("MemAvailable:"): + avail_kb = int(line.split()[1]) + if total_kb > 0: + used_kb = total_kb - avail_kb + return used_kb / 1024 + except Exception: + pass + return 0.0 + + class ParallelCapacityTester: - def __init__(self, timeout: int = 120, workdir: Optional[str] = None): + def __init__( + self, + timeout: int = 120, + workdir: Optional[str] = None, + use_kugetsu: bool = False, + memory_limit_mb: int = 1024, + test_repo: str = "git.example.com/test/kugetsu", + ): self.timeout = timeout self.workdir = workdir or "/tmp/parallel_test" + self.use_kugetsu = use_kugetsu + self.memory_limit_mb = memory_limit_mb + self.test_repo = test_repo self.monitor = ResourceMonitor(sample_interval=1.0) self.results: List[TestRun] = [] + self.baseline_memory_mb = 0.0 + + def _measure_baseline_memory(self) -> float: + if HAS_PSUTIL: + return psutil.virtual_memory().used / (1024 * 1024) + return get_memory_mb_stdlib() def _create_test_workdir(self, agent_id: int) -> str: agent_dir = os.path.join(self.workdir, f"agent_{agent_id}_{int(time.time())}") @@ -205,15 +191,25 @@ class ParallelCapacityTester: task = "Respond with exactly: PARALLEL_TEST_OK" try: - result = subprocess.run( - ["opencode", "run", task, "--dir", workdir], - capture_output=True, - text=True, - timeout=self.timeout, - ) + if self.use_kugetsu: + unique_id = uuid.uuid4().hex[:8] + issue_ref = f"{self.test_repo}#{agent_id}-{unique_id}" + result = subprocess.run( + ["kugetsu", "start", issue_ref, task], + capture_output=True, + text=True, + timeout=self.timeout, + ) + else: + result = subprocess.run( + ["opencode", "run", task, "--dir", workdir], + capture_output=True, + text=True, + timeout=self.timeout, + ) duration = time.time() - start_time output = result.stdout + result.stderr - success = "PARALLEL_TEST_OK" in output + success = "PARALLEL_TEST_OK" in output or result.returncode == 0 return AgentResult( agent_id=agent_id, @@ -239,20 +235,41 @@ class ParallelCapacityTester: def _run_parallel_agents(self, num_agents: int) -> TestRun: print(f"\n[TEST] Running with {num_agents} concurrent agent(s)...") + + self.baseline_memory_mb = self._measure_baseline_memory() + print(f"[INFO] Baseline memory: {self.baseline_memory_mb:.1f} MB") + self.monitor.start(num_agents) threads = [] results = [] results_lock = threading.Lock() + memory_exceeded = False def run_and_record(agent_id: int): - result = self._run_single_agent(agent_id) - with results_lock: - results.append(result) + nonlocal memory_exceeded + if not memory_exceeded: + current_mem = self._measure_baseline_memory() + if current_mem > self.baseline_memory_mb + self.memory_limit_mb: + memory_exceeded = True + print( + f"[WARN] Memory limit ({self.memory_limit_mb}MB) approached, not spawning more agents" + ) + return + result = self._run_single_agent(agent_id) + with results_lock: + results.append(result) start_time = time.time() for i in range(1, num_agents + 1): + current_mem = self._measure_baseline_memory() + if current_mem > self.baseline_memory_mb + self.memory_limit_mb: + print( + f"[WARN] Memory limit ({self.memory_limit_mb}MB) would be exceeded, stopping spawn at {i - 1} agents" + ) + memory_exceeded = True + break t = threading.Thread(target=run_and_record, args=(i,)) t.start() threads.append(t) @@ -285,15 +302,34 @@ class ParallelCapacityTester: if resource_samples: peak_cpu = max(s.cpu_percent for s in resource_samples) avg_cpu = statistics.mean(s.cpu_percent for s in resource_samples) - peak_mem = max(s.memory_percent for s in resource_samples) - avg_mem = statistics.mean(s.memory_percent for s in resource_samples) + peak_mem_pct = max(s.memory_percent for s in resource_samples) + avg_mem_pct = statistics.mean(s.memory_percent for s in resource_samples) + peak_mem_mb = max(s.memory_mb for s in resource_samples) + avg_mem_mb = statistics.mean(s.memory_mb for s in resource_samples) peak_procs = max(s.opencode_processes for s in resource_samples) else: - peak_cpu = avg_cpu = peak_mem = avg_mem = peak_procs = 0 + peak_cpu = avg_cpu = peak_mem_pct = avg_mem_pct = peak_mem_mb = ( + avg_mem_mb + ) = peak_procs = 0 + + actual_agents = len(results) if results else num_agents + memory_per_agent = ( + (peak_mem_mb - self.baseline_memory_mb) / actual_agents + if actual_agents > 0 + else 0 + ) + total_cost = ( + (peak_mem_mb - self.baseline_memory_mb) * total_duration / 1000 + if peak_mem_mb > self.baseline_memory_mb + else 0 + ) print( f"[RESULT] {num_agents} agents: {success_count} success, {failed_count} failed, {timeout_count} timeout" ) + print( + f"[COST] Memory per agent: {memory_per_agent:.1f} MB, Total cost score: {total_cost:.2f}" + ) return TestRun( agent_count=num_agents, @@ -307,9 +343,14 @@ class ParallelCapacityTester: max_response_time=max_duration, peak_cpu_percent=peak_cpu, avg_cpu_percent=avg_cpu, - peak_memory_percent=peak_mem, - avg_memory_percent=avg_mem, + peak_memory_mb=peak_mem_mb, + avg_memory_mb=avg_mem_mb, + peak_memory_percent=peak_mem_pct, + avg_memory_percent=avg_mem_pct, peak_opencode_procs=peak_procs, + baseline_memory_mb=self.baseline_memory_mb, + memory_per_agent_mb=memory_per_agent, + total_cost_score=total_cost, ) def run_capacity_test( @@ -347,7 +388,7 @@ class ParallelCapacityTester: csv_file = output_path / f"summary_{timestamp}.csv" with open(csv_file, "w") as f: f.write( - "agents,duration,success,failed,timeout,avg_response,stddev,min_response,max_response,peak_cpu,avg_cpu,peak_mem,avg_mem,peak_procs\n" + "agents,duration,success,failed,timeout,avg_response,stddev,min_response,max_response,peak_cpu,avg_cpu,peak_mem_mb,avg_mem_mb,peak_mem_pct,avg_mem_pct,peak_procs,baseline_mem,mem_per_agent,cost_score\n" ) for run in self.results: f.write( @@ -355,8 +396,10 @@ class ParallelCapacityTester: f"{run.failed_count},{run.timeout_count},{run.avg_response_time:.2f}," f"{run.stddev_response_time:.2f},{run.min_response_time:.2f}," f"{run.max_response_time:.2f},{run.peak_cpu_percent:.1f}," - f"{run.avg_cpu_percent:.1f},{run.peak_memory_percent:.1f}," - f"{run.avg_memory_percent:.1f},{run.peak_opencode_procs}\n" + f"{run.avg_cpu_percent:.1f},{run.peak_memory_mb:.1f}," + f"{run.avg_memory_mb:.1f},{run.peak_memory_percent:.1f}," + f"{run.avg_memory_percent:.1f},{run.peak_opencode_procs}," + f"{run.baseline_memory_mb:.1f},{run.memory_per_agent_mb:.1f},{run.total_cost_score:.2f}\n" ) print(f"[INFO] Summary saved to: {csv_file}") @@ -374,18 +417,33 @@ class ParallelCapacityTester: ) f.write("## Summary\n\n") f.write( - "| Agents | Duration | Success | Failed | Timeout | Avg Response | Peak CPU | Peak Mem |\n" + "| Agents | Duration | Success | Failed | Timeout | Avg Response | Peak Mem (MB) | Mem/Agent | Cost Score |\n" ) f.write( - "|--------|----------|---------|--------|---------|--------------|----------|----------|\n" + "|--------|----------|---------|--------|---------|--------------|---------------|-----------|------------|\n" ) for run in self.results: f.write( f"| {run.agent_count} | {run.total_duration:.1f}s | " f"{run.success_count} | {run.failed_count} | " f"{run.timeout_count} | {run.avg_response_time:.1f}s | " - f"{run.peak_cpu_percent:.1f}% | {run.peak_memory_percent:.1f}% |\n" + f"{run.peak_memory_mb:.0f}MB | {run.memory_per_agent_mb:.1f}MB | {run.total_cost_score:.2f} |\n" ) + f.write("\n## Cost Analysis\n\n") + f.write("| Metric | Value |\n") + f.write("|--------|-------|\n") + if self.results: + baseline = self.results[0].baseline_memory_mb + f.write(f"| Baseline Memory | {baseline:.1f} MB |\n") + avg_mem_per = sum(r.memory_per_agent_mb for r in self.results) / len( + self.results + ) + f.write(f"| Avg Memory per Agent | {avg_mem_per:.1f} MB |\n") + f.write(f"| Memory Limit | {self.memory_limit_mb} MB |\n") + max_capacity = ( + int(self.memory_limit_mb / avg_mem_per) if avg_mem_per > 0 else 0 + ) + f.write(f"| Estimated Max Capacity | {max_capacity} agents |\n") f.write("\n## Key Findings\n\n") successful_runs = [ r for r in self.results if r.success_count == r.agent_count @@ -400,7 +458,11 @@ class ParallelCapacityTester: f" - Average response time: {optimal.avg_response_time:.1f}s\n" ) f.write(f" - Peak CPU: {optimal.peak_cpu_percent:.1f}%\n") - f.write(f" - Peak Memory: {optimal.peak_memory_percent:.1f}%\n\n") + f.write( + f" - Peak Memory: {optimal.peak_memory_mb:.1f}MB ({optimal.peak_memory_percent:.1f}%)\n" + ) + f.write(f" - Memory per agent: {optimal.memory_per_agent_mb:.1f}MB\n") + f.write(f" - Cost score: {optimal.total_cost_score:.2f}\n\n") f.write("## Recommendations\n\n") if optimal: f.write( @@ -413,25 +475,56 @@ class ParallelCapacityTester: def main(): - parser = argparse.ArgumentParser(description="Parallel Capacity Test Tool") + parser = argparse.ArgumentParser( + description="Parallel Capacity Test Tool for Hermes/OpenCode/Kugetsu" + ) parser.add_argument("--agents", "-n", type=int, default=10) parser.add_argument("--timeout", "-t", type=int, default=120) parser.add_argument("--step", "-s", type=int, default=1) parser.add_argument("--quick", "-q", action="store_true") parser.add_argument("--output", "-o", type=str, default=None) + parser.add_argument( + "--use-kugetsu", + "-k", + action="store_true", + help="Use kugetsu CLI instead of raw opencode (tests full orchestration)", + ) + parser.add_argument( + "--memory-limit", + "-m", + type=int, + default=1024, + help="Memory limit per agent in MB (default: 1024 = 1GB)", + ) + parser.add_argument( + "--test-repo", + "-r", + type=str, + default="git.example.com/test/kugetsu", + help="Repository for kugetsu issue refs (default: git.example.com/test/kugetsu)", + ) args = parser.parse_args() script_dir = Path(__file__).parent output_dir = args.output or str(script_dir / "results") + mode = "kugetsu" if args.use_kugetsu else "opencode" print("=" * 60) - print("Parallel Capacity Test Tool for Hermes/OpenCode") + print(f"Parallel Capacity Test Tool ({mode} mode)") print("=" * 60) print(f"Max agents: {args.agents}") print(f"Timeout: {args.timeout}s") + print(f"Memory limit: {args.memory_limit}MB") + if args.use_kugetsu: + print(f"Test repo: {args.test_repo}") print() - tester = ParallelCapacityTester(timeout=args.timeout) + tester = ParallelCapacityTester( + timeout=args.timeout, + use_kugetsu=args.use_kugetsu, + memory_limit_mb=args.memory_limit, + test_repo=args.test_repo, + ) try: tester.run_capacity_test(