diff --git a/skills/opencode-worktree/opencode-worktree.sh b/skills/opencode-worktree/opencode-worktree.sh index cb0065c..6a734a2 100644 --- a/skills/opencode-worktree/opencode-worktree.sh +++ b/skills/opencode-worktree/opencode-worktree.sh @@ -98,7 +98,9 @@ create_worktree() { branch_name="$worktree_name" fi - local worktree_path="$WORKTREE_BASE/$worktree_name" + local worktree_path_abs + worktree_path_abs="$(cd "$WORKTREE_BASE" && pwd)/$worktree_name" + local worktree_path="$worktree_path_abs" # Cleanup any existing with same name if [[ -d "$worktree_path" ]]; then diff --git a/tools/parallel-capacity-test/parallel_capacity_test.py b/tools/parallel-capacity-test/parallel_capacity_test.py new file mode 100755 index 0000000..073a2cb --- /dev/null +++ b/tools/parallel-capacity-test/parallel_capacity_test.py @@ -0,0 +1,419 @@ +#!/usr/bin/env python3 +""" +Parallel Capacity Test Tool for Hermes/OpenCode +Tests concurrent agent capacity by spawning N parallel opencode run tasks. +""" + +import argparse +import json +import os +import subprocess +import sys +import time +import threading +import statistics +from dataclasses import dataclass, asdict +from datetime import datetime +from pathlib import Path +from typing import List, Optional + +# Using stdlib only - no psutil required + + +@dataclass +class AgentResult: + agent_id: int + duration: float + status: str + return_code: int + output: str = "" + + +@dataclass +class ResourceSample: + timestamp: float + cpu_percent: float + memory_percent: float + opencode_processes: int + agent_count: int + + +@dataclass +class TestRun: + agent_count: int + total_duration: float + success_count: int + failed_count: int + timeout_count: int + avg_response_time: float + stddev_response_time: float + min_response_time: float + max_response_time: float + peak_cpu_percent: float + avg_cpu_percent: float + peak_memory_percent: float + avg_memory_percent: float + peak_opencode_procs: int + + +def get_memory_percent() -> float: + """Get memory usage percent by reading /proc/meminfo (Linux)""" + try: + with open("/proc/meminfo", "r") as f: + meminfo = f.read() + total = 0 + available = 0 + for line in meminfo.splitlines(): + if line.startswith("MemTotal:"): + total = int(line.split()[1]) + elif line.startswith("MemAvailable:"): + available = int(line.split()[1]) + break + if total > 0: + used = total - available + return (used / total) * 100 + except (FileNotFoundError, PermissionError, ValueError): + pass + return 0.0 + + +def count_opencode_processes() -> int: + """Count opencode processes using pgrep or /proc scanning""" + try: + result = subprocess.run( + ["pgrep", "-c", "-x", "opencode"], + capture_output=True, + text=True, + timeout=5 + ) + if result.returncode == 0: + return int(result.stdout.strip()) + except (subprocess.TimeoutExpired, ValueError, subprocess.SubprocessError): + pass + try: + count = 0 + for pid_dir in os.listdir("/proc"): + if not pid_dir.isdigit(): + continue + try: + with open(f"/proc/{pid_dir}/comm", "r") as f: + if "opencode" in f.read().lower(): + count += 1 + except (PermissionError, FileNotFoundError): + continue + return count + except FileNotFoundError: + return 0 + return 0 + + +def get_cpu_percent() -> float: + """Get CPU usage by reading /proc/stat""" + try: + with open("/proc/stat", "r") as f: + line = f.readline() + parts = line.split() + if parts[0] == "cpu": + values = [int(x) for x in parts[1:8]] + idle = values[3] + total = sum(values) + if total > 0: + return ((total - idle) / total) * 100 + except (FileNotFoundError, PermissionError, ValueError, IndexError): + pass + return 0.0 + + +class ResourceMonitor: + def __init__(self, sample_interval: float = 1.0): + self.sample_interval = sample_interval + self.samples: List[ResourceSample] = [] + self._stop_event = threading.Event() + self._thread: Optional[threading.Thread] = None + self._current_agent_count = 0 + + def start(self, agent_count: int): + self._current_agent_count = agent_count + self.samples = [] + self._stop_event.clear() + self._thread = threading.Thread(target=self._monitor_loop) + self._thread.daemon = True + self._thread.start() + + def stop(self) -> List[ResourceSample]: + self._stop_event.set() + if self._thread: + self._thread.join(timeout=5) + return self.samples + + def _monitor_loop(self): + while not self._stop_event.is_set(): + try: + sample = self._collect_sample() + self.samples.append(sample) + except Exception as e: + print(f"[WARN] Error collecting resource sample: {e}") + self._stop_event.wait(self.sample_interval) + + def _collect_sample(self) -> ResourceSample: + timestamp = time.time() + try: + opencode_procs = len([p for p in psutil.process_iter(['name']) + if 'opencode' in p.info['name'].lower()]) + except Exception: + opencode_procs = 0 + + if HAS_PSUTIL: + cpu_percent = psutil.cpu_percent(interval=0.1) + memory_percent = psutil.virtual_memory().percent + else: + cpu_percent = 0.0 + memory_percent = 0.0 + + return ResourceSample( + timestamp=timestamp, + cpu_percent=cpu_percent, + memory_percent=memory_percent, + opencode_processes=opencode_procs, + agent_count=self._current_agent_count + ) + + +class ParallelCapacityTester: + def __init__(self, timeout: int = 120, workdir: Optional[str] = None): + self.timeout = timeout + self.workdir = workdir or "/tmp/parallel_test" + self.monitor = ResourceMonitor(sample_interval=1.0) + self.results: List[TestRun] = [] + + def _create_test_workdir(self, agent_id: int) -> str: + agent_dir = os.path.join(self.workdir, f"agent_{agent_id}_{int(time.time())}") + os.makedirs(agent_dir, exist_ok=True) + return agent_dir + + def _run_single_agent(self, agent_id: int) -> AgentResult: + workdir = self._create_test_workdir(agent_id) + start_time = time.time() + task = "Respond with exactly: PARALLEL_TEST_OK" + + try: + result = subprocess.run( + ['opencode', 'run', task, '--workdir', workdir], + capture_output=True, + text=True, + timeout=self.timeout + ) + duration = time.time() - start_time + output = result.stdout + result.stderr + success = 'PARALLEL_TEST_OK' in output + + return AgentResult( + agent_id=agent_id, + duration=duration, + status='success' if success else 'failed', + return_code=result.returncode, + output=output[:500] + ) + except subprocess.TimeoutExpired: + return AgentResult( + agent_id=agent_id, + duration=self.timeout, + status='timeout', + return_code=-1 + ) + except Exception as e: + return AgentResult( + agent_id=agent_id, + duration=time.time() - start_time, + status='failed', + return_code=-1, + error=str(e) + ) + + def _run_parallel_agents(self, num_agents: int) -> TestRun: + print(f"\n[TEST] Running with {num_agents} concurrent agent(s)...") + self.monitor.start(num_agents) + + threads = [] + results = [] + results_lock = threading.Lock() + + def run_and_record(agent_id: int): + result = self._run_single_agent(agent_id) + with results_lock: + results.append(result) + + start_time = time.time() + + for i in range(1, num_agents + 1): + t = threading.Thread(target=run_and_record, args=(i,)) + t.start() + threads.append(t) + + all_done = False + elapsed = 0 + while elapsed < self.timeout and not all_done: + time.sleep(1) + elapsed = int(time.time() - start_time) + all_done = all(not t.is_alive() for t in threads) + + subprocess.run(['pkill', '-f', 'opencode run'], capture_output=True) + + for t in threads: + t.join(timeout=5) + + resource_samples = self.monitor.stop() + total_duration = time.time() - start_time + + success_count = sum(1 for r in results if r.status == 'success') + failed_count = sum(1 for r in results if r.status == 'failed') + timeout_count = sum(1 for r in results if r.status == 'timeout') + + durations = [r.duration for r in results] + avg_duration = statistics.mean(durations) if durations else 0 + stddev = statistics.stdev(durations) if len(durations) > 1 else 0 + min_duration = min(durations) if durations else 0 + max_duration = max(durations) if durations else 0 + + if resource_samples: + peak_cpu = max(s.cpu_percent for s in resource_samples) + avg_cpu = statistics.mean(s.cpu_percent for s in resource_samples) + peak_mem = max(s.memory_percent for s in resource_samples) + avg_mem = statistics.mean(s.memory_percent for s in resource_samples) + peak_procs = max(s.opencode_processes for s in resource_samples) + else: + peak_cpu = avg_cpu = peak_mem = avg_mem = peak_procs = 0 + + print(f"[RESULT] {num_agents} agents: {success_count} success, {failed_count} failed, {timeout_count} timeout") + + return TestRun( + agent_count=num_agents, + total_duration=total_duration, + success_count=success_count, + failed_count=failed_count, + timeout_count=timeout_count, + avg_response_time=avg_duration, + stddev_response_time=stddev, + min_response_time=min_duration, + max_response_time=max_duration, + peak_cpu_percent=peak_cpu, + avg_cpu_percent=avg_cpu, + peak_memory_percent=peak_mem, + avg_memory_percent=avg_mem, + peak_opencode_procs=peak_procs + ) + + def run_capacity_test(self, max_agents: int = 10, step: int = 1, + quick: bool = False) -> List[TestRun]: + if quick: + agent_counts = [1, 2, 3, 5, 8] + else: + agent_counts = list(range(1, max_agents + 1, step)) + + print(f"[INFO] Starting capacity test with {len(agent_counts)} configurations") + print(f"[INFO] Agent counts: {agent_counts}") + + self.results = [] + + for count in agent_counts: + subprocess.run(['pkill', '-f', 'opencode run'], capture_output=True) + time.sleep(2) + result = self._run_parallel_agents(count) + self.results.append(result) + + return self.results + + def save_results(self, output_dir: str): + output_path = Path(output_dir) + output_path.mkdir(parents=True, exist_ok=True) + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + + json_file = output_path / f"results_{timestamp}.json" + with open(json_file, 'w') as f: + data = [asdict(run) for run in self.results] + json.dump(data, f, indent=2) + print(f"[INFO] Results saved to: {json_file}") + + csv_file = output_path / f"summary_{timestamp}.csv" + with open(csv_file, 'w') as f: + f.write("agents,duration,success,failed,timeout,avg_response,stddev,min_response,max_response,peak_cpu,avg_cpu,peak_mem,avg_mem,peak_procs\n") + for run in self.results: + f.write(f"{run.agent_count},{run.total_duration:.2f},{run.success_count}," + f"{run.failed_count},{run.timeout_count},{run.avg_response_time:.2f}," + f"{run.stddev_response_time:.2f},{run.min_response_time:.2f}," + f"{run.max_response_time:.2f},{run.peak_cpu_percent:.1f}," + f"{run.avg_cpu_percent:.1f},{run.peak_memory_percent:.1f}," + f"{run.avg_memory_percent:.1f},{run.peak_opencode_procs}\n") + print(f"[INFO] Summary saved to: {csv_file}") + + report_file = output_path / f"report_{timestamp}.md" + self._generate_markdown_report(report_file) + print(f"[INFO] Report saved to: {report_file}") + + return str(json_file), str(csv_file), str(report_file) + + def _generate_markdown_report(self, output_file: Path): + with open(output_file, 'w') as f: + f.write("# Parallel Capacity Test Report\n\n") + f.write(f"**Generated:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n") + f.write("## Summary\n\n") + f.write("| Agents | Duration | Success | Failed | Timeout | Avg Response | Peak CPU | Peak Mem |\n") + f.write("|--------|----------|---------|--------|---------|--------------|----------|----------|\n") + for run in self.results: + f.write(f"| {run.agent_count} | {run.total_duration:.1f}s | " + f"{run.success_count} | {run.failed_count} | " + f"{run.timeout_count} | {run.avg_response_time:.1f}s | " + f"{run.peak_cpu_percent:.1f}% | {run.peak_memory_percent:.1f}% |\n") + f.write("\n## Key Findings\n\n") + successful_runs = [r for r in self.results if r.success_count == r.agent_count] + optimal = max(successful_runs, key=lambda r: r.agent_count, default=None) + if optimal: + f.write(f"### Optimal Configuration\n") + f.write(f"- **{optimal.agent_count} agents** achieved perfect success rate\n") + f.write(f" - Average response time: {optimal.avg_response_time:.1f}s\n") + f.write(f" - Peak CPU: {optimal.peak_cpu_percent:.1f}%\n") + f.write(f" - Peak Memory: {optimal.peak_memory_percent:.1f}%\n\n") + f.write("## Recommendations\n\n") + if optimal: + f.write(f"1. **Recommended max agents:** {optimal.agent_count} for stable operation\n") + f.write("2. **Monitor closely:** 5+ agents\n") + f.write("3. **Implement circuit breaker** when failure rate exceeds threshold\n") + + +def main(): + parser = argparse.ArgumentParser(description='Parallel Capacity Test Tool') + parser.add_argument('--agents', '-n', type=int, default=10) + parser.add_argument('--timeout', '-t', type=int, default=120) + parser.add_argument('--step', '-s', type=int, default=1) + parser.add_argument('--quick', '-q', action='store_true') + parser.add_argument('--output', '-o', type=str, default=None) + args = parser.parse_args() + + script_dir = Path(__file__).parent + output_dir = args.output or str(script_dir / 'results') + + print("=" * 60) + print("Parallel Capacity Test Tool for Hermes/OpenCode") + print("=" * 60) + print(f"Max agents: {args.agents}") + print(f"Timeout: {args.timeout}s") + print() + + tester = ParallelCapacityTester(timeout=args.timeout) + + try: + tester.run_capacity_test(max_agents=args.agents, step=args.step, quick=args.quick) + json_file, csv_file, report_file = tester.save_results(output_dir) + print("\n" + "=" * 60) + print("TEST COMPLETE") + print("=" * 60) + print(f"JSON Results: {json_file}") + print(f"CSV Summary: {csv_file}") + print(f"Report: {report_file}") + except KeyboardInterrupt: + print("\n[ABORT] Test interrupted by user") + sys.exit(1) + + +if __name__ == '__main__': + main()