/** * Level 3c: Queue System with Worker Pool * * Features: * 1. Task queue - register many tasks * 2. Worker pool - max concurrent agents * 3. Auto-pull - workers take next task when free * 4. Priority support - high/normal/low * 5. Backpressure - reject when queue full */ import { Agent, type AgentTool, type AgentMessage, type AgentEvent } from "@mariozechner/pi-agent-core"; import { registerBuiltInApiProviders } from "@mariozechner/pi-ai"; import type { Model } from "@mariozechner/pi-ai"; import { exec } from "child_process"; // ============== CONFIG ============== const OPENROUTER_API_KEY = process.env.OPENROUTER_API_KEY || "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"; process.env.OPENROUTER_API_KEY = OPENROUTER_API_KEY; const model: Model<"openai-responses"> = { id: "stepfun/step-3.5-flash:free", name: "Step-3.5 Flash (Free)", api: "openai-responses", provider: "openrouter", baseUrl: "https://openrouter.ai/api/v1", reasoning: false, input: ["text"], output: ["text"], cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 }, contextWindow: 128000, maxTokens: 8192, }; // ============== TASK ============== type TaskPriority = "high" | "normal" | "low"; interface QueuedTask { id: string; message: string; priority: TaskPriority; createdAt: number; status: "queued" | "running" | "completed" | "failed"; } // ============== QUEUE ============== class TaskQueue { private queue: QueuedTask[] = []; private maxSize: number; constructor(maxSize: number = 100) { this.maxSize = maxSize; } // Add task to queue enqueue(task: QueuedTask): boolean { if (this.queue.length >= this.maxSize) { return false; // Queue full - backpressure } // Insert based on priority let insertIndex = this.queue.length; const priorityOrder = { high: 0, normal: 1, low: 2 }; for (let i = 0; i < this.queue.length; i++) { if (priorityOrder[task.priority] < priorityOrder[this.queue[i].priority]) { insertIndex = i; break; } } this.queue.splice(insertIndex, 0, task); return true; } // Get next task (highest priority first) dequeue(): QueuedTask | undefined { const task = this.queue.shift(); if (task) { task.status = "running"; } return task; } // Peek at next task without removing peek(): QueuedTask | undefined { return this.queue[0]; } // Get queue size size(): number { return this.queue.length; } // Get all queued tasks getAll(): QueuedTask[] { return [...this.queue]; } // Update task status updateStatus(id: string, status: "completed" | "failed") { const task = this.queue.find(t => t.id === id); if (task) { task.status = status; } } } // ============== WORKER (SHADOW) ============== class Worker { public id: string; public status: "idle" | "busy" = "idle"; private agent: Agent; constructor(id: string, worktreePath: string) { this.id = id; this.agent = new Agent({ initialState: { systemPrompt: "You are a helpful coding assistant.", model: model, tools: [] as any, messages: [], }, convertToLlm: (messages: AgentMessage[]) => { return messages .filter((m) => m.role === "user" || m.role === "assistant" || m.role === "toolResult") .map((m) => ({ role: m.role, content: m.content })); }, }); this.agent.subscribe((event) => { if (event.type === "agent_start") this.status = "busy"; if (event.type === "agent_end") this.status = "idle"; }); } async run(task: QueuedTask): Promise { this.status = "busy"; await this.agent.prompt(task.message); this.status = "idle"; return "completed"; } abort() { this.agent.abort(); } } // ============== QUEUE SYSTEM ============== class QueueSystem { private queue: TaskQueue; private workers: Worker[] = []; private maxWorkers: number; private maxQueueSize: number; private running = false; constructor(maxWorkers: number = 2, maxQueueSize: number = 100) { this.maxWorkers = maxWorkers; this.maxQueueSize = maxQueueSize; this.queue = new TaskQueue(maxQueueSize); } // Submit a task to queue submit(message: string, priority: TaskPriority = "normal"): boolean { const task: QueuedTask = { id: `task-${Date.now()}-${Math.random().toString(36).substr(2, 5)}`, message, priority, createdAt: Date.now(), status: "queued", }; const success = this.queue.enqueue(task); if (success) { console.log(`๐Ÿ“ฅ Queued: ${task.id} (priority: ${priority}, queue: ${this.queue.size()})`); this.dispatch(); // Try to dispatch immediately } else { console.log(`โŒ Queue full! Rejected: ${task.id}`); } return success; } // Dispatch task to available worker private dispatch() { // Find idle workers const idleWorkers = this.workers.filter(w => w.status === "idle"); // Get next task const task = this.queue.peek(); if (!task || idleWorkers.length === 0) { return; // No task or no workers } // Assign task to first idle worker const worker = idleWorkers[0]; this.queue.dequeue(); // Remove from queue console.log(`โ–ถ๏ธ Dispatching ${task.id} to worker ${worker.id}`); // Run task worker.run(task).then(() => { task.status = "completed"; console.log(`โœ… Completed: ${task.id}`); // Check for more tasks this.dispatch(); }).catch((error) => { task.status = "failed"; console.log(`โŒ Failed: ${task.id} - ${error.message}`); // Check for more tasks this.dispatch(); }); } // Add a worker addWorker(id: string, worktreePath: string): Worker { const worker = new Worker(id, worktreePath); this.workers.push(worker); console.log(`๐Ÿ‘ท Added worker: ${id} (total: ${this.workers.length})`); // Try to dispatch this.dispatch(); return worker; } // Remove a worker removeWorker(id: string) { const worker = this.workers.find(w => w.id === id); if (worker) { worker.abort(); this.workers = this.workers.filter(w => w.id !== id); console.log(`๐Ÿ‘ท Removed worker: ${id}`); } } // Get stats getStats() { return { queueSize: this.queue.size(), maxQueueSize: this.maxQueueSize, workers: this.workers.length, idleWorkers: this.workers.filter(w => w.status === "idle").length, busyWorkers: this.workers.filter(w => w.status === "busy").length, maxWorkers: this.maxWorkers, queuedTasks: this.queue.getAll().map(t => ({ id: t.id, priority: t.priority, status: t.status, })), }; } // Start the queue processor start() { this.running = true; console.log(`๐Ÿš€ Queue system started (max ${this.maxWorkers} workers)`); } // Stop the queue processor stop() { this.running = false; this.workers.forEach(w => w.abort()); console.log("๐Ÿ›‘ Queue system stopped"); } } // ============== TESTS ============== async function testSequential() { console.log("\n" + "=" .repeat(50)); console.log("TEST 1: Sequential (1 worker, multiple tasks)"); console.log("=" .repeat(50)); const queue = new QueueSystem(1, 50); queue.start(); queue.addWorker("worker-1", "/tmp"); // Submit 3 tasks queue.submit("Say 'Task 1'", "normal"); queue.submit("Say 'Task 2'", "normal"); queue.submit("Say 'Task 3'", "normal"); // Wait for tasks to complete await new Promise(resolve => setTimeout(resolve, 30000)); console.log("\n๐Ÿ“Š Stats:", queue.getStats()); queue.stop(); } async function testParallel() { console.log("\n" + "=" .repeat(50)); console.log("TEST 2: Parallel (2 workers, multiple tasks)"); console.log("=" .repeat(50)); const queue = new QueueSystem(2, 50); queue.start(); queue.addWorker("worker-1", "/tmp"); queue.addWorker("worker-2", "/tmp"); // Submit 4 tasks queue.submit("Say 'Task A'", "normal"); queue.submit("Say 'Task B'", "normal"); queue.submit("Say 'Task C'", "normal"); queue.submit("Say 'Task D'", "normal"); // Wait for tasks to complete await new Promise(resolve => setTimeout(resolve, 30000)); console.log("\n๐Ÿ“Š Stats:", queue.getStats()); queue.stop(); } async function testPriority() { console.log("\n" + "=" .repeat(50)); console.log("TEST 3: Priority (high priority first)"); console.log("=" .repeat(50)); const queue = new QueueSystem(1, 50); queue.start(); queue.addWorker("worker-1", "/tmp"); // Submit in random order with different priorities queue.submit("Say 'Normal 1'", "normal"); queue.submit("Say 'Low'", "low"); queue.submit("Say 'High 1'", "high"); queue.submit("Say 'Normal 2'", "normal"); queue.submit("Say 'High 2'", "high"); console.log("\n๐Ÿ“Š Queue order:", queue.getStats().queuedTasks.map(t => `${t.priority}:${t.id.slice(-3)}`)); // Wait for tasks to complete await new Promise(resolve => setTimeout(resolve, 40000)); console.log("\n๐Ÿ“Š Stats:", queue.getStats()); queue.stop(); } async function testBackpressure() { console.log("\n" + "=" .repeat(50)); console.log("TEST 4: Backpressure (queue full)"); console.log("=" .repeat(50)); // Very small queue (3 max) const queue = new QueueSystem(1, 3); queue.start(); queue.addWorker("worker-1", "/tmp"); // Submit 5 tasks (should reject 2) const results = []; results.push(queue.submit("Task 1", "normal")); results.push(queue.submit("Task 2", "normal")); results.push(queue.submit("Task 3", "normal")); results.push(queue.submit("Task 4", "normal")); // Should fail results.push(queue.submit("Task 5", "normal")); // Should fail console.log("\n๐Ÿ“Š Submit results:", results.map((r, i) => `Task ${i+1}: ${r ? 'โœ…' : 'โŒ'}`).join(", ")); console.log("\n๐Ÿ“Š Stats:", queue.getStats()); // Wait a bit then cleanup await new Promise(resolve => setTimeout(resolve, 5000)); queue.stop(); } // ============== MAIN ============== async function main() { console.log("๐Ÿงช Level 3c: Queue System with Worker Pool\n"); registerBuiltInApiProviders(); await testSequential(); await new Promise(resolve => setTimeout(resolve, 3000)); await testParallel(); await new Promise(resolve => setTimeout(resolve, 3000)); await testPriority(); await new Promise(resolve => setTimeout(resolve, 3000)); await testBackpressure(); console.log("\nโœ… All tests complete!"); } main().catch(console.error);