/** * Level 3: Checkpoint/Recovery + Task Tracking * * Features: * 1. Task status (pending/running/completed/failed) * 2. Error tracking (why it failed) * 3. Retry mechanism with backoff * 4. Checkpoint/recovery */ import { Agent, type AgentTool, type AgentMessage, type AgentEvent } from "@mariozechner/pi-agent-core"; import { registerBuiltInApiProviders } from "@mariozechner/pi-ai"; import type { Model } from "@mariozechner/pi-ai"; import * as fs from "fs"; import * as path from "path"; import { exec } from "child_process"; // ============== CONFIG ============== const OPENROUTER_API_KEY = process.env.OPENROUTER_API_KEY || "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"; process.env.OPENROUTER_API_KEY = OPENROUTER_API_KEY; const model: Model<"openai-responses"> = { id: "stepfun/step-3.5-flash:free", name: "Step-3.5 Flash (Free)", api: "openai-responses", provider: "openrouter", baseUrl: "https://openrouter.ai/api/v1", reasoning: false, input: ["text"], output: ["text"], cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 }, contextWindow: 128000, maxTokens: 8192, }; const WORKSPACE = "/tmp/shadows-level3"; // ============== TASK STATUS ============== type TaskStatus = "pending" | "running" | "completed" | "failed" | "retrying"; interface TaskError { message: string; tool?: string; timestamp: number; attempt: number; } interface Task { id: string; message: string; status: TaskStatus; createdAt: number; startedAt?: number; completedAt?: number; error?: TaskError; attempts: number; maxRetries: number; retryDelay: number; // ms result?: string; } interface Checkpoint { tasks: Task[]; shadows: { id: string; taskId: string; state: any }[]; savedAt: number; } // ============== TASK MANAGER ============== class TaskManager { private tasks: Map = new Map(); private maxRetries = 3; private retryDelay = 5000; // 5 seconds base private checkpointDir: string; constructor(checkpointDir: string) { this.checkpointDir = checkpointDir; if (!fs.existsSync(checkpointDir)) { fs.mkdirSync(checkpointDir, { recursive: true }); } } // Create a new task createTask(id: string, message: string): Task { const task: Task = { id, message, status: "pending", createdAt: Date.now(), attempts: 0, maxRetries: this.maxRetries, retryDelay: this.retryDelay, }; this.tasks.set(id, task); this.saveCheckpoint(); return task; } // Get next pending task getNextPending(): Task | undefined { for (const task of this.tasks.values()) { if (task.status === "pending" || task.status === "retrying") { return task; } } return undefined; } // Start a task startTask(id: string): Task | undefined { const task = this.tasks.get(id); if (!task) return undefined; task.status = "running"; task.startedAt = Date.now(); task.attempts++; this.saveCheckpoint(); return task; } // Complete a task completeTask(id: string, result: string): Task | undefined { const task = this.tasks.get(id); if (!task) return undefined; task.status = "completed"; task.completedAt = Date.now(); task.result = result; this.saveCheckpoint(); return task; } // Fail a task failTask(id: string, error: string, tool?: string): Task | undefined { const task = this.tasks.get(id); if (!task) return undefined; task.error = { message: error, tool, timestamp: Date.now(), attempt: task.attempts, }; // Check if we can retry if (task.attempts < task.maxRetries) { task.status = "retrying"; // Exponential backoff: 5s, 10s, 20s... task.retryDelay = task.retryDelay * 2; } else { task.status = "failed"; } this.saveCheckpoint(); return task; } // Get task by ID getTask(id: string): Task | undefined { return this.tasks.get(id); } // List all tasks listTasks(): Task[] { return Array.from(this.tasks.values()); } // Save checkpoint to disk saveCheckpoint() { const checkpoint: Checkpoint = { tasks: this.listTasks(), shadows: [], savedAt: Date.now(), }; fs.writeFileSync( path.join(this.checkpointDir, "checkpoint.json"), JSON.stringify(checkpoint, null, 2) ); } // Load checkpoint from disk loadCheckpoint(): boolean { const checkpointPath = path.join(this.checkpointDir, "checkpoint.json"); if (!fs.existsSync(checkpointPath)) return false; try { const data = fs.readFileSync(checkpointPath, "utf-8"); const checkpoint: Checkpoint = JSON.parse(data); // Restore tasks for (const task of checkpoint.tasks) { this.tasks.set(task.id, task); } return true; } catch (e) { console.error("Failed to load checkpoint:", e); return false; } } // Get stats getStats() { const tasks = this.listTasks(); return { total: tasks.length, pending: tasks.filter(t => t.status === "pending").length, running: tasks.filter(t => t.status === "running").length, completed: tasks.filter(t => t.status === "completed").length, failed: tasks.filter(t => t.status === "failed").length, retrying: tasks.filter(t => t.status === "retrying").length, }; } } // ============== SHADOW ============== class Shadow { public id: string; public status: "idle" | "running" = "idle"; private agent: Agent; constructor(id: string, worktreePath: string, systemPrompt: string, tools: AgentTool[]) { this.id = id; this.agent = new Agent({ initialState: { systemPrompt, model, tools: tools as any, messages: [], }, convertToLlm: (messages: AgentMessage[]) => { return messages .filter((m) => m.role === "user" || m.role === "assistant" || m.role === "toolResult") .map((m) => ({ role: m.role, content: m.content })); }, }); this.agent.subscribe((event) => { if (event.type === "agent_start") this.status = "running"; if (event.type === "agent_end") this.status = "idle"; }); } async run(message: string): Promise { const events: string[] = []; this.agent.subscribe((event) => { events.push(event.type); // Log tool errors if (event.type === "tool_execution_end" && (event as any).isError) { console.log(` โš ๏ธ Tool error in ${event.toolName}`); } }); await this.agent.prompt(message); // Get last assistant message const lastMsg = this.agent.state.messages.filter(m => m.role === "assistant").pop(); return lastMsg ? JSON.stringify(lastMsg.content) : "No response"; } abort() { this.agent.abort(); } } // ============== EXECUTOR ============== class Executor { private shadow: Shadow; private taskManager: TaskManager; private isRunning = false; constructor(taskManager: TaskManager, worktreePath: string) { this.taskManager = taskManager; this.shadow = new Shadow( "executor-1", worktreePath, "You are a helpful coding assistant. Use the bash tool to run commands.", [] ); } async run(): Promise { this.isRunning = true; while (this.isRunning) { // Get next pending task const task = this.taskManager.getNextPending(); if (!task) { console.log("๐Ÿ˜ด No pending tasks, waiting..."); await this.sleep(3000); continue; } // Start the task this.taskManager.startTask(task.id); console.log(`\nโ–ถ๏ธ Running task ${task.id}: "${task.message.substring(0, 50)}..."`); console.log(` Attempt ${task.attempts}/${task.maxRetries}`); try { // Run the task const result = await this.shadow.run(task.message); // Success this.taskManager.completeTask(task.id, result); console.log(`โœ… Task ${task.id} completed!`); } catch (error: any) { // Failed this.taskManager.failTask(task.id, error.message); console.log(`โŒ Task ${task.id} failed: ${error.message}`); // Check if will retry const updatedTask = this.taskManager.getTask(task.id); if (updatedTask?.status === "retrying") { console.log(` ๐Ÿ”„ Will retry in ${updatedTask.retryDelay}ms...`); await this.sleep(updatedTask.retryDelay); } } // Show stats console.log(`\n๐Ÿ“Š Stats:`, this.taskManager.getStats()); } } stop() { this.isRunning = false; this.shadow.abort(); } private sleep(ms: number) { return new Promise(resolve => setTimeout(resolve, ms)); } } // ============== MAIN ============== async function main() { console.log("๐Ÿงช Level 3: Checkpoint/Recovery + Task Tracking\n"); registerBuiltInApiProviders(); // Create task manager with checkpoint directory const taskManager = new TaskManager(WORKSPACE); // Check for existing checkpoint const loaded = taskManager.loadCheckpoint(); if (loaded) { console.log("๐Ÿ“‚ Loaded checkpoint, existing tasks:", taskManager.getStats()); } // Create some test tasks console.log("๐Ÿ“ Creating test tasks..."); taskManager.createTask("task-1", "Say hello and run 'echo Hello from Task 1'"); taskManager.createTask("task-2", "Say hi and run 'echo Hello from Task 2'"); taskManager.createTask("task-3", "Run 'date' to get current time"); console.log("๐Ÿ“Š Initial stats:", taskManager.getStats()); // Create executor and run const executor = new Executor(taskManager, "/tmp"); // Run for a bit then stop (for demo) const runPromise = executor.run(); // Let it run for 60 seconds then stop await new Promise(resolve => setTimeout(resolve, 60000)); executor.stop(); await runPromise; console.log("\nโœ… Demo complete!"); console.log("๐Ÿ“Š Final stats:", taskManager.getStats()); // Show failed tasks with error details const tasks = taskManager.listTasks(); const failed = tasks.filter(t => t.status === "failed"); if (failed.length > 0) { console.log("\nโŒ Failed tasks:"); failed.forEach(t => { console.log(` - ${t.id}: ${t.error?.message} (attempt ${t.error?.attempt})`); }); } } main().catch(console.error);