# Deep Research: Pi (agent-core) Integration for Kugetsu ## Executive Summary This document outlines the research and implementation plan for replacing OpenCode with Pi (agent-core) in the Kugetsu orchestration system. The goal is to reduce memory usage, eliminate session poisoning (context leakage), and improve reliability while maintaining the parallel execution workflow. --- ## 1. Current System Analysis ### 1.1 Current Architecture ``` ┌─────────────────────────────────────────────────────────────────┐ │ Current Setup │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ User (Telegram) ──► Hermes (gateway) ──► Kugetsu (orchestrate)│ │ │ │ │ ┌──────────────┴───────┤ │ ▼ │ │ ┌─────────────┐ │ │ │ OpenCode │ (Agent) │ │ │ (340MB/ea) │ │ │ └─────────────┘ │ │ │ │ │ ┌───────────┴───────────┐ │ │ ▼ ▼ │ │ ┌────────────┐ ┌────────────┐ │ │ │ Shadow 1 │ │ Shadow 2 │ │ │ │ (Worktree) │ │ (Worktree) │ │ │ └────────────┘ └────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────┘ ``` ### 1.2 Identified Problems | Problem | Cause | Impact | |---------|-------|--------| | **Session Poisoning** | Context from Agent A bleeds into Agent B | Wrong task execution, confused agents | | **High Memory** | ~340MB per OpenCode instance | Max 5 concurrent agents on 4GB RAM | | **Silent Crashes** | Process dies without PR/commit | Lost work, no recovery | | **No Structured Output** | OpenCode lacks JSON output | Hard to integrate with Hermes | --- ## 2. Pi (agent-core) Deep Dive ### 2.1 Overview **Repository**: https://github.com/badlogic/pi-mono **Package**: `@mariozechner/pi-agent-core` **Language**: TypeScript **Memory Footprint**: ~50-100MB (core only) ### 2.2 Architecture Pi is designed as a **minimal, extensible agent runtime**. Unlike OpenCode or Hermes, it doesn't include: - Built-in sub-agent spawning - TUI (terminal UI) - Session persistence (you control this) - MCP support (intentionally) This is actually **beneficial** for Kugetsu because: - You control exactly how shadows are managed - No opinionated session isolation to fight against - Full control over context management ### 2.3 Core API ```typescript import { Agent } from "@mariozechner/pi-agent-core"; import { getModel } from "@mariozechner/pi-ai"; const agent = new Agent({ initialState: { systemPrompt: "You are a coding agent.", model: getModel("anthropic", "claude-sonnet-4-20250514"), tools: [myTool], messages: [], }, convertToLlm: (msgs) => msgs.filter(m => ["user", "assistant", "toolResult"].includes(m.role) ), }); // Stream events agent.subscribe((event) => { console.log(event.type); }); await agent.prompt("Fix the bug in auth.py"); ``` ### 2.4 Key Features for Kugetsu #### Event-Driven Architecture Pi emits rich events for UI integration: - `agent_start` / `agent_end` - `turn_start` / `turn_end` - `message_start` / `message_update` / `message_end` - `tool_execution_start` / `tool_execution_update` / `tool_execution_end` This is **critical** for headless UX - you can reconstruct TUI-like behavior by subscribing to these events. #### Tool Execution Control ```typescript // Block dangerous tools beforeToolCall: async ({ toolCall, args }) => { if (toolCall.name === "bash" && args.command.includes("rm -rf")) { return { block: true, reason: "Dangerous command blocked" }; } } // Audit tool results afterToolCall: async ({ toolCall, result }) => { console.log(`Tool ${toolCall.name} executed:`, result); return { details: { ...result.details, audited: true } }; } ``` #### Context Management ```typescript transformContext: async (messages, signal) => { // Prune old messages if (estimateTokens(messages) > MAX_TOKENS) { return pruneOldMessages(messages); } // Inject external context return injectContext(messages); } ``` #### Steering & Follow-up ```typescript // Interrupt agent while running agent.steer({ role: "user", content: "Stop! Do this instead.", }); // Queue work after agent finishes agent.followUp({ role: "user", content: "Also summarize the result.", }); ``` --- ## 3. Integration Design ### 3.1 Proposed Architecture ``` ┌─────────────────────────────────────────────────────────────────┐ │ Proposed Setup │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ User (Telegram) ──► Hermes (gateway) ──► Kugetsu-Pi (orch) │ │ │ │ │ ┌────────────────┴─────┤ │ ▼ │ │ ┌─────────────────────┐ │ │ │ Shadow Manager │ │ │ │ (New Component) │ │ │ └─────────────────────┘ │ │ │ │ │ ┌─────────────────────┼─────────────────────┤ │ ▼ ▼ ▼ │ ┌────────────┐ ┌────────────┐ ┌────────────┐ │ │ Shadow 1 │ │ Shadow 2 │ │ Shadow N │ │ │ (Pi Agent)│ │ (Pi Agent)│ │ (Pi Agent) │ │ │ ~80MB │ │ ~80MB │ │ ~80MB │ │ └────────────┘ └────────────┘ └────────────┘ │ │ │ │ │ ▼ ▼ ▼ │ ┌────────────┐ ┌────────────┐ ┌────────────┐ │ │ Worktree 1 │ │ Worktree 2 │ │ Worktree N │ │ └────────────┘ └────────────┘ └────────────┘ │ │ └─────────────────────────────────────────────────────────────────┘ ``` ### 3.2 Shadow Manager Component The Shadow Manager replaces Kugetsu's OpenCode wrapper with Pi-native logic: ```typescript interface ShadowManager { // Create a new shadow (sub-agent) spawnShadow(config: ShadowConfig): Promise; // Get existing shadow getShadow(id: string): Shadow | undefined; // List all active shadows listShadows(): Shadow[]; // Terminate shadow terminateShadow(id: string): Promise; // Resource management getResourceUsage(): ResourceStats; } interface Shadow { id: string; agent: Agent; worktree: Worktree; state: ShadowState; createdAt: Date; prompt(message: string): Promise; continue(): Promise; abort(): void; } ``` ### 3.3 Session Isolation (Fixing Context Poisoning) The key to preventing session poisoning is **strict context boundaries**: ```typescript class Shadow { private isolatedMessages: AgentMessage[] = []; constructor(config: ShadowConfig) { this.agent = new Agent({ initialState: { systemPrompt: config.systemPrompt, model: config.model, tools: config.tools, messages: [], // Start empty }, convertToLlm: (msgs) => this.filterAndConvert(msgs), }); } private filterAndConvert(messages: AgentMessage[]): Message[] { // STRICT: Only this shadow's messages const myMessages = messages.filter(m => m._shadowId === this.id // Tag each message with shadow ID ); return myMessages.map(m => ({ role: m.role, content: m.content, })); } async prompt(message: string): Promise { // Inject shadow ID into message const myMessage: AgentMessage = { role: "user", content: message, timestamp: Date.now(), _shadowId: this.id, // Tag with shadow ID }; return this.agent.prompt(myMessage); } } ``` **Why This Works:** - Each message is tagged with its shadow ID - `convertToLlm` filters to only that shadow's messages - No cross-contamination possible - Even if agent state is shared, LLM only sees isolated context --- ## 4. Resource Benchmarks ### 4.1 Estimated Memory Usage | Component | OpenCode (Current) | Pi (Proposed) | Savings | |-----------|-------------------|---------------|---------| | Agent Core | ~340MB | ~80MB | 76% | | Node.js Runtime | (included) | ~100MB | - | | Tools/Extensions | Varies | Minimal | - | | **Per Shadow** | **~340MB** | **~80-100MB** | **~70%** | ### 4.2 Capacity Planning Based on 4GB RAM, 2 CPU cores: | Scenario | OpenCode | Pi | Improvement | |----------|----------|-----|------------| | Max Concurrent | 5 | 15-20 | 3-4x | | CPU Bound | 5 (contention) | 8-10 | 60-100% | | Memory Bound | 5 | 40+ | 8x | **Conservative Estimate**: 10-15 concurrent shadows with Pi vs 5 with OpenCode ### 4.3 Scaling Model ``` Memory Budget: 4GB Reserve: 512MB (system) Available: 3.5GB Pi Shadow: ~80MB base + ~20MB tools/context Safe limit: 3.5GB / 100MB = 35 shadows Recommended: 15-20 shadows (leaves headroom) ``` ### 4.4 Scaling Beyond 4GB | RAM | Recommended Shadows | Notes | |-----|---------------------|-------| | 4GB | 15-20 | Target | | 8GB | 35-45 | Smooth scaling | | 16GB | 80-100 | High concurrency | | 32GB | 180-200 | Dedicated workload | --- ## 5. Headless UX Patterns ### 5.1 The TUI Gap You mentioned headless lacks "TUI qualities", specifically: > "TUI handles prompt better... if it ends right away with question or any blocker, it just feels not right" Pi addresses this through its **event-driven architecture**. ### 5.2 Prompt Handling in Headless **TUI Pattern**: Agent stops → User sees prompt → User responds → Agent continues **Pi Headless Pattern**: ```typescript class HeadlessUX { private pendingPrompts: Map = new Map(); subscribeToAgent(agent: Agent) { agent.subscribe(async (event) => { switch (event.type) { case "turn_end": // Check if agent is waiting for input const isWaiting = await this.checkForPendingPrompt(event); if (isWaiting) { // Queue for user response via Hermes await this.escalateToUser(event); } break; case "tool_execution_start": // Log what's happening this.log(`${event.toolName} starting...`); break; case "tool_execution_end": this.log(`${event.toolName} completed`); break; } }); } private async checkForPendingPrompt(event: TurnEndEvent): Promise { // Analyze if agent is blocked waiting for: // - Clarification // - Confirmation // - Missing information // This can be inferred from: // - Tool results asking questions // - Assistant message content patterns // - Custom "prompt" tool results return false; // Implement based on your needs } private async escalateToUser(event: TurnEndEvent) { // Send to Hermes/Telegram await hermes.sendMessage({ chat_id: this.userId, text: `Agent needs input: ${extractQuestion(event)}`, keyboard: generateKeyboard(event), }); } } ``` ### 5.3 Rich Event Streaming Reconstruct TUI-like output: ```typescript async function streamToTelegram(agent: Agent, chatId: string) { const messageBuilder = new TelegramMessageBuilder(chatId); agent.subscribe(async (event) => { switch (event.type) { case "turn_start": messageBuilder.startTyping(); break; case "message_update": if (event.assistantMessageEvent.type === "text_delta") { messageBuilder.append(event.assistantMessageEvent.delta); } if (event.assistantMessageEvent.type === "thinking_delta") { messageBuilder.setThinking(event.assistantMessageEvent.thinking); } break; case "tool_execution_start": messageBuilder.appendCode(`🔧 Running ${event.toolName}...`); break; case "tool_execution_end": if (event.isError) { messageBuilder.append(`❌ Error: ${event.result}`); } else { messageBuilder.append(`✅ ${event.toolName} done`); } break; case "agent_end": await messageBuilder.send(); break; } }); await agent.prompt(userMessage); } ``` ### 5.4 Thinking Time Pi supports configurable thinking levels: ```typescript thinkingBudgets: { minimal: 128, low: 512, medium: 1024, high: 2048, } ``` In headless, you can expose this as a parameter: ``` /think high /solve complex-problem ``` --- ## 6. Error Handling & Recovery ### 6.1 Crash Recovery OpenCode "suddenly dies" → Pi has better observability: ```typescript class Shadow { private checkpointInterval: NodeJS.Timeout; constructor(config: ShadowConfig) { // Save state every 30 seconds this.checkpointInterval = setInterval(() => { this.saveCheckpoint(); }, 30_000); this.agent.subscribe(async (event) => { if (event.type === "agent_end") { // Successful completion - clean up checkpoint this.clearCheckpoint(); } }); } private saveCheckpoint() { const state = { messages: this.agent.state.messages, id: this.id, timestamp: Date.now(), }; fs.writeFileSync( `checkpoints/${this.id}.json`, JSON.stringify(state) ); } static async recover(checkpointId: string): Promise { const state = JSON.parse( fs.readFileSync(`checkpoints/${checkpointId}.json`) ); const shadow = new Shadow({ /* config */ }); shadow.agent.state.messages = state.messages; return shadow; } } ``` ### 6.2 Tool Execution Safety ```typescript const safeTools: AgentTool[] = [ { name: "read", label: "Read File", description: "Read file contents", parameters: Type.Object({ path: Type.String() }), execute: async (id, params) => { // Path validation if (!isSafePath(params.path, this.worktree.path)) { throw new Error("Path outside worktree"); } return { content: [{ text: await fs.readFile(params.path) }] }; }, }, { name: "bash", label: "Run Command", description: "Run shell command", parameters: Type.Object({ command: Type.String() }), execute: async (id, params, signal) => { // Command allowlist const allowed = ["git", "npm", "npx", "pnpm", "make"]; if (!allowed.some(cmd => params.command.startsWith(cmd))) { throw new Error("Command not allowed"); } // Execute in worktree return execInWorktree(params.command, this.worktree, signal); }, }, ]; ``` --- ## 7. Implementation Roadmap ### Phase 1: Core Integration (Week 1-2) - [ ] Install `@mariozechner/pi-agent-core` and `@mariozechner/pi-ai` - [ ] Create basic `Shadow` class with isolated context - [ ] Implement tool registry (read, write, edit, bash) - [ ] Connect Hermes message format to Pi prompt ### Phase 2: Session Management (Week 2-3) - [ ] Implement Shadow Manager - [ ] Worktree creation/cleanup per shadow - [ ] Checkpoint/save state logic - [ ] Graceful shutdown handling ### Phase 3: Parallel Orchestration (Week 3-4) - [ ] Task queue with concurrency limits - [ ] Resource monitoring (memory, CPU) - [ ] Auto-scale based on load - [ ] Shadow pool for reuse ### Phase 4: UX Enhancement (Week 4-5) - [ ] Event streaming to Telegram - [ ] Thinking time configuration - [ ] Prompt escalation flow - [ ] Progress indicators ### Phase 5: Production Hardening (Week 5-6) - [ ] Error recovery patterns - [ ] Logging and observability - [ ] Rate limiting - [ ] Security hardening --- ## 8. Open Questions | Question | Notes | |----------|-------| | **PM Agent location** | Run as separate Pi instance or part of Shadow Manager? | | **Message history** | Store in Hermes context or Shadow Manager state? | | **Cross-shadow communication** | How should PM Agent talk to Coding Agents? | | **Memory monitoring** | Use cgroup stats or Node.js process.memoryUsage()? | | **Checkpoint storage** | File-based, Redis, or database? | --- ## 9. Recommendations 1. **Start with Pi + Kugetsu** (keep Kugetsu, swap OpenCode) - Lower risk, proven orchestration layer - Focus on Shadow isolation first 2. **Implement strict context tagging** to prevent session poisoning - Each message has shadow ID - convertToLlm filters by shadow ID 3. **Target 10-15 concurrent shadows** on 4GB RAM - Conservative estimate: 10 - Monitor and adjust 4. **Expose thinking levels** in headless for complex tasks - `/think high` prefix for deep reasoning 5. **Build checkpointing early** for crash recovery --- ## Sources - Pi agent-core: https://github.com/badlogic/pi-mono/tree/main/packages/agent - Pi coding-agent: https://github.com/badlogic/pi-mono/tree/main/packages/coding-agent - Pi npm packages: https://www.npmjs.com/package/@mariozechner/pi-agent-core - Kugetsu: https://git.fbrns.co/shoko/kugetsu --- ## Appendix: Code Examples ### A.1 Minimal Shadow Implementation ```typescript import { Agent } from "@mariozechner/pi-agent-core"; import { getModel } from "@mariozechner/pi-ai"; interface ShadowConfig { id: string; systemPrompt: string; model: string; worktreePath: string; tools: AgentTool[]; } class Shadow { public readonly agent: Agent; public readonly id: string; public readonly worktreePath: string; constructor(config: ShadowConfig) { this.id = config.id; this.worktreePath = config.worktreePath; this.agent = new Agent({ initialState: { systemPrompt: config.systemPrompt, model: getModel("anthropic", config.model), tools: config.tools, messages: [], }, convertToLlm: (msgs) => { // Strict: only user, assistant, toolResult roles return msgs .filter(m => ["user", "assistant", "toolResult"].includes(m.role)) .map(m => ({ role: m.role, content: m.content })); }, }); } async prompt(message: string) { return this.agent.prompt(message); } abort() { this.agent.abort(); } } ``` ### A.2 Shadow Manager with Queue ```typescript class ShadowManager { private shadows: Map = new Map(); private queue: AsyncQueue; private maxConcurrent: number; private activeCount = 0; constructor(maxConcurrent = 10) { this.maxConcurrent = maxConcurrent; this.queue = new AsyncQueue({ concurrency: maxConcurrent, processor: (req) => this.processRequest(req), }); } async submitRequest(request: PromptRequest) { return this.queue.enqueue(request); } private async processRequest(req: PromptRequest): Promise { // Check if shadow exists let shadow = this.shadows.get(req.shadowId); if (!shadow) { // Create new shadow shadow = new Shadow({ id: req.shadowId, systemPrompt: req.systemPrompt, model: req.model, worktreePath: req.worktreePath, tools: req.tools, }); this.shadows.set(req.shadowId, shadow); } this.activeCount++; try { return await shadow.prompt(req.message); } finally { this.activeCount--; } } getStats() { return { active: this.activeCount, queued: this.queue.size, totalShadows: this.shadows.size, maxConcurrent: this.maxConcurrent, }; } } ```