Files
kage-research/level3c.ts
2026-04-09 00:39:52 +00:00

387 lines
10 KiB
TypeScript

/**
* Level 3c: Queue System with Worker Pool
*
* Features:
* 1. Task queue - register many tasks
* 2. Worker pool - max concurrent agents
* 3. Auto-pull - workers take next task when free
* 4. Priority support - high/normal/low
* 5. Backpressure - reject when queue full
*/
import { Agent, type AgentTool, type AgentMessage, type AgentEvent } from "@mariozechner/pi-agent-core";
import { registerBuiltInApiProviders } from "@mariozechner/pi-ai";
import type { Model } from "@mariozechner/pi-ai";
import { exec } from "child_process";
// ============== CONFIG ==============
const OPENROUTER_API_KEY = process.env.OPENROUTER_API_KEY || "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";
process.env.OPENROUTER_API_KEY = OPENROUTER_API_KEY;
const model: Model<"openai-responses"> = {
id: "stepfun/step-3.5-flash:free",
name: "Step-3.5 Flash (Free)",
api: "openai-responses",
provider: "openrouter",
baseUrl: "https://openrouter.ai/api/v1",
reasoning: false,
input: ["text"],
output: ["text"],
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 },
contextWindow: 128000,
maxTokens: 8192,
};
// ============== TASK ==============
type TaskPriority = "high" | "normal" | "low";
interface QueuedTask {
id: string;
message: string;
priority: TaskPriority;
createdAt: number;
status: "queued" | "running" | "completed" | "failed";
}
// ============== QUEUE ==============
class TaskQueue {
private queue: QueuedTask[] = [];
private maxSize: number;
constructor(maxSize: number = 100) {
this.maxSize = maxSize;
}
// Add task to queue
enqueue(task: QueuedTask): boolean {
if (this.queue.length >= this.maxSize) {
return false; // Queue full - backpressure
}
// Insert based on priority
let insertIndex = this.queue.length;
const priorityOrder = { high: 0, normal: 1, low: 2 };
for (let i = 0; i < this.queue.length; i++) {
if (priorityOrder[task.priority] < priorityOrder[this.queue[i].priority]) {
insertIndex = i;
break;
}
}
this.queue.splice(insertIndex, 0, task);
return true;
}
// Get next task (highest priority first)
dequeue(): QueuedTask | undefined {
const task = this.queue.shift();
if (task) {
task.status = "running";
}
return task;
}
// Peek at next task without removing
peek(): QueuedTask | undefined {
return this.queue[0];
}
// Get queue size
size(): number {
return this.queue.length;
}
// Get all queued tasks
getAll(): QueuedTask[] {
return [...this.queue];
}
// Update task status
updateStatus(id: string, status: "completed" | "failed") {
const task = this.queue.find(t => t.id === id);
if (task) {
task.status = status;
}
}
}
// ============== WORKER (SHADOW) ==============
class Worker {
public id: string;
public status: "idle" | "busy" = "idle";
private agent: Agent;
constructor(id: string, worktreePath: string) {
this.id = id;
this.agent = new Agent({
initialState: {
systemPrompt: "You are a helpful coding assistant.",
model: model,
tools: [] as any,
messages: [],
},
convertToLlm: (messages: AgentMessage[]) => {
return messages
.filter((m) => m.role === "user" || m.role === "assistant" || m.role === "toolResult")
.map((m) => ({ role: m.role, content: m.content }));
},
});
this.agent.subscribe((event) => {
if (event.type === "agent_start") this.status = "busy";
if (event.type === "agent_end") this.status = "idle";
});
}
async run(task: QueuedTask): Promise<string> {
this.status = "busy";
await this.agent.prompt(task.message);
this.status = "idle";
return "completed";
}
abort() {
this.agent.abort();
}
}
// ============== QUEUE SYSTEM ==============
class QueueSystem {
private queue: TaskQueue;
private workers: Worker[] = [];
private maxWorkers: number;
private maxQueueSize: number;
private running = false;
constructor(maxWorkers: number = 2, maxQueueSize: number = 100) {
this.maxWorkers = maxWorkers;
this.maxQueueSize = maxQueueSize;
this.queue = new TaskQueue(maxQueueSize);
}
// Submit a task to queue
submit(message: string, priority: TaskPriority = "normal"): boolean {
const task: QueuedTask = {
id: `task-${Date.now()}-${Math.random().toString(36).substr(2, 5)}`,
message,
priority,
createdAt: Date.now(),
status: "queued",
};
const success = this.queue.enqueue(task);
if (success) {
console.log(`📥 Queued: ${task.id} (priority: ${priority}, queue: ${this.queue.size()})`);
this.dispatch(); // Try to dispatch immediately
} else {
console.log(`❌ Queue full! Rejected: ${task.id}`);
}
return success;
}
// Dispatch task to available worker
private dispatch() {
// Find idle workers
const idleWorkers = this.workers.filter(w => w.status === "idle");
// Get next task
const task = this.queue.peek();
if (!task || idleWorkers.length === 0) {
return; // No task or no workers
}
// Assign task to first idle worker
const worker = idleWorkers[0];
this.queue.dequeue(); // Remove from queue
console.log(`▶️ Dispatching ${task.id} to worker ${worker.id}`);
// Run task
worker.run(task).then(() => {
task.status = "completed";
console.log(`✅ Completed: ${task.id}`);
// Check for more tasks
this.dispatch();
}).catch((error) => {
task.status = "failed";
console.log(`❌ Failed: ${task.id} - ${error.message}`);
// Check for more tasks
this.dispatch();
});
}
// Add a worker
addWorker(id: string, worktreePath: string): Worker {
const worker = new Worker(id, worktreePath);
this.workers.push(worker);
console.log(`👷 Added worker: ${id} (total: ${this.workers.length})`);
// Try to dispatch
this.dispatch();
return worker;
}
// Remove a worker
removeWorker(id: string) {
const worker = this.workers.find(w => w.id === id);
if (worker) {
worker.abort();
this.workers = this.workers.filter(w => w.id !== id);
console.log(`👷 Removed worker: ${id}`);
}
}
// Get stats
getStats() {
return {
queueSize: this.queue.size(),
maxQueueSize: this.maxQueueSize,
workers: this.workers.length,
idleWorkers: this.workers.filter(w => w.status === "idle").length,
busyWorkers: this.workers.filter(w => w.status === "busy").length,
maxWorkers: this.maxWorkers,
queuedTasks: this.queue.getAll().map(t => ({
id: t.id,
priority: t.priority,
status: t.status,
})),
};
}
// Start the queue processor
start() {
this.running = true;
console.log(`🚀 Queue system started (max ${this.maxWorkers} workers)`);
}
// Stop the queue processor
stop() {
this.running = false;
this.workers.forEach(w => w.abort());
console.log("🛑 Queue system stopped");
}
}
// ============== TESTS ==============
async function testSequential() {
console.log("\n" + "=" .repeat(50));
console.log("TEST 1: Sequential (1 worker, multiple tasks)");
console.log("=" .repeat(50));
const queue = new QueueSystem(1, 50);
queue.start();
queue.addWorker("worker-1", "/tmp");
// Submit 3 tasks
queue.submit("Say 'Task 1'", "normal");
queue.submit("Say 'Task 2'", "normal");
queue.submit("Say 'Task 3'", "normal");
// Wait for tasks to complete
await new Promise(resolve => setTimeout(resolve, 30000));
console.log("\n📊 Stats:", queue.getStats());
queue.stop();
}
async function testParallel() {
console.log("\n" + "=" .repeat(50));
console.log("TEST 2: Parallel (2 workers, multiple tasks)");
console.log("=" .repeat(50));
const queue = new QueueSystem(2, 50);
queue.start();
queue.addWorker("worker-1", "/tmp");
queue.addWorker("worker-2", "/tmp");
// Submit 4 tasks
queue.submit("Say 'Task A'", "normal");
queue.submit("Say 'Task B'", "normal");
queue.submit("Say 'Task C'", "normal");
queue.submit("Say 'Task D'", "normal");
// Wait for tasks to complete
await new Promise(resolve => setTimeout(resolve, 30000));
console.log("\n📊 Stats:", queue.getStats());
queue.stop();
}
async function testPriority() {
console.log("\n" + "=" .repeat(50));
console.log("TEST 3: Priority (high priority first)");
console.log("=" .repeat(50));
const queue = new QueueSystem(1, 50);
queue.start();
queue.addWorker("worker-1", "/tmp");
// Submit in random order with different priorities
queue.submit("Say 'Normal 1'", "normal");
queue.submit("Say 'Low'", "low");
queue.submit("Say 'High 1'", "high");
queue.submit("Say 'Normal 2'", "normal");
queue.submit("Say 'High 2'", "high");
console.log("\n📊 Queue order:", queue.getStats().queuedTasks.map(t => `${t.priority}:${t.id.slice(-3)}`));
// Wait for tasks to complete
await new Promise(resolve => setTimeout(resolve, 40000));
console.log("\n📊 Stats:", queue.getStats());
queue.stop();
}
async function testBackpressure() {
console.log("\n" + "=" .repeat(50));
console.log("TEST 4: Backpressure (queue full)");
console.log("=" .repeat(50));
// Very small queue (3 max)
const queue = new QueueSystem(1, 3);
queue.start();
queue.addWorker("worker-1", "/tmp");
// Submit 5 tasks (should reject 2)
const results = [];
results.push(queue.submit("Task 1", "normal"));
results.push(queue.submit("Task 2", "normal"));
results.push(queue.submit("Task 3", "normal"));
results.push(queue.submit("Task 4", "normal")); // Should fail
results.push(queue.submit("Task 5", "normal")); // Should fail
console.log("\n📊 Submit results:", results.map((r, i) => `Task ${i+1}: ${r ? '✅' : '❌'}`).join(", "));
console.log("\n📊 Stats:", queue.getStats());
// Wait a bit then cleanup
await new Promise(resolve => setTimeout(resolve, 5000));
queue.stop();
}
// ============== MAIN ==============
async function main() {
console.log("🧪 Level 3c: Queue System with Worker Pool\n");
registerBuiltInApiProviders();
await testSequential();
await new Promise(resolve => setTimeout(resolve, 3000));
await testParallel();
await new Promise(resolve => setTimeout(resolve, 3000));
await testPriority();
await new Promise(resolve => setTimeout(resolve, 3000));
await testBackpressure();
console.log("\n✅ All tests complete!");
}
main().catch(console.error);