387 lines
10 KiB
TypeScript
387 lines
10 KiB
TypeScript
/**
|
|
* Level 3c: Queue System with Worker Pool
|
|
*
|
|
* Features:
|
|
* 1. Task queue - register many tasks
|
|
* 2. Worker pool - max concurrent agents
|
|
* 3. Auto-pull - workers take next task when free
|
|
* 4. Priority support - high/normal/low
|
|
* 5. Backpressure - reject when queue full
|
|
*/
|
|
|
|
import { Agent, type AgentTool, type AgentMessage, type AgentEvent } from "@mariozechner/pi-agent-core";
|
|
import { registerBuiltInApiProviders } from "@mariozechner/pi-ai";
|
|
import type { Model } from "@mariozechner/pi-ai";
|
|
import { exec } from "child_process";
|
|
|
|
// ============== CONFIG ==============
|
|
const OPENROUTER_API_KEY = process.env.OPENROUTER_API_KEY || "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";
|
|
process.env.OPENROUTER_API_KEY = OPENROUTER_API_KEY;
|
|
|
|
const model: Model<"openai-responses"> = {
|
|
id: "stepfun/step-3.5-flash:free",
|
|
name: "Step-3.5 Flash (Free)",
|
|
api: "openai-responses",
|
|
provider: "openrouter",
|
|
baseUrl: "https://openrouter.ai/api/v1",
|
|
reasoning: false,
|
|
input: ["text"],
|
|
output: ["text"],
|
|
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 },
|
|
contextWindow: 128000,
|
|
maxTokens: 8192,
|
|
};
|
|
|
|
// ============== TASK ==============
|
|
type TaskPriority = "high" | "normal" | "low";
|
|
|
|
interface QueuedTask {
|
|
id: string;
|
|
message: string;
|
|
priority: TaskPriority;
|
|
createdAt: number;
|
|
status: "queued" | "running" | "completed" | "failed";
|
|
}
|
|
|
|
// ============== QUEUE ==============
|
|
class TaskQueue {
|
|
private queue: QueuedTask[] = [];
|
|
private maxSize: number;
|
|
|
|
constructor(maxSize: number = 100) {
|
|
this.maxSize = maxSize;
|
|
}
|
|
|
|
// Add task to queue
|
|
enqueue(task: QueuedTask): boolean {
|
|
if (this.queue.length >= this.maxSize) {
|
|
return false; // Queue full - backpressure
|
|
}
|
|
|
|
// Insert based on priority
|
|
let insertIndex = this.queue.length;
|
|
const priorityOrder = { high: 0, normal: 1, low: 2 };
|
|
|
|
for (let i = 0; i < this.queue.length; i++) {
|
|
if (priorityOrder[task.priority] < priorityOrder[this.queue[i].priority]) {
|
|
insertIndex = i;
|
|
break;
|
|
}
|
|
}
|
|
|
|
this.queue.splice(insertIndex, 0, task);
|
|
return true;
|
|
}
|
|
|
|
// Get next task (highest priority first)
|
|
dequeue(): QueuedTask | undefined {
|
|
const task = this.queue.shift();
|
|
if (task) {
|
|
task.status = "running";
|
|
}
|
|
return task;
|
|
}
|
|
|
|
// Peek at next task without removing
|
|
peek(): QueuedTask | undefined {
|
|
return this.queue[0];
|
|
}
|
|
|
|
// Get queue size
|
|
size(): number {
|
|
return this.queue.length;
|
|
}
|
|
|
|
// Get all queued tasks
|
|
getAll(): QueuedTask[] {
|
|
return [...this.queue];
|
|
}
|
|
|
|
// Update task status
|
|
updateStatus(id: string, status: "completed" | "failed") {
|
|
const task = this.queue.find(t => t.id === id);
|
|
if (task) {
|
|
task.status = status;
|
|
}
|
|
}
|
|
}
|
|
|
|
// ============== WORKER (SHADOW) ==============
|
|
class Worker {
|
|
public id: string;
|
|
public status: "idle" | "busy" = "idle";
|
|
private agent: Agent;
|
|
|
|
constructor(id: string, worktreePath: string) {
|
|
this.id = id;
|
|
this.agent = new Agent({
|
|
initialState: {
|
|
systemPrompt: "You are a helpful coding assistant.",
|
|
model: model,
|
|
tools: [] as any,
|
|
messages: [],
|
|
},
|
|
convertToLlm: (messages: AgentMessage[]) => {
|
|
return messages
|
|
.filter((m) => m.role === "user" || m.role === "assistant" || m.role === "toolResult")
|
|
.map((m) => ({ role: m.role, content: m.content }));
|
|
},
|
|
});
|
|
|
|
this.agent.subscribe((event) => {
|
|
if (event.type === "agent_start") this.status = "busy";
|
|
if (event.type === "agent_end") this.status = "idle";
|
|
});
|
|
}
|
|
|
|
async run(task: QueuedTask): Promise<string> {
|
|
this.status = "busy";
|
|
await this.agent.prompt(task.message);
|
|
this.status = "idle";
|
|
return "completed";
|
|
}
|
|
|
|
abort() {
|
|
this.agent.abort();
|
|
}
|
|
}
|
|
|
|
// ============== QUEUE SYSTEM ==============
|
|
class QueueSystem {
|
|
private queue: TaskQueue;
|
|
private workers: Worker[] = [];
|
|
private maxWorkers: number;
|
|
private maxQueueSize: number;
|
|
private running = false;
|
|
|
|
constructor(maxWorkers: number = 2, maxQueueSize: number = 100) {
|
|
this.maxWorkers = maxWorkers;
|
|
this.maxQueueSize = maxQueueSize;
|
|
this.queue = new TaskQueue(maxQueueSize);
|
|
}
|
|
|
|
// Submit a task to queue
|
|
submit(message: string, priority: TaskPriority = "normal"): boolean {
|
|
const task: QueuedTask = {
|
|
id: `task-${Date.now()}-${Math.random().toString(36).substr(2, 5)}`,
|
|
message,
|
|
priority,
|
|
createdAt: Date.now(),
|
|
status: "queued",
|
|
};
|
|
|
|
const success = this.queue.enqueue(task);
|
|
if (success) {
|
|
console.log(`📥 Queued: ${task.id} (priority: ${priority}, queue: ${this.queue.size()})`);
|
|
this.dispatch(); // Try to dispatch immediately
|
|
} else {
|
|
console.log(`❌ Queue full! Rejected: ${task.id}`);
|
|
}
|
|
return success;
|
|
}
|
|
|
|
// Dispatch task to available worker
|
|
private dispatch() {
|
|
// Find idle workers
|
|
const idleWorkers = this.workers.filter(w => w.status === "idle");
|
|
|
|
// Get next task
|
|
const task = this.queue.peek();
|
|
|
|
if (!task || idleWorkers.length === 0) {
|
|
return; // No task or no workers
|
|
}
|
|
|
|
// Assign task to first idle worker
|
|
const worker = idleWorkers[0];
|
|
this.queue.dequeue(); // Remove from queue
|
|
|
|
console.log(`▶️ Dispatching ${task.id} to worker ${worker.id}`);
|
|
|
|
// Run task
|
|
worker.run(task).then(() => {
|
|
task.status = "completed";
|
|
console.log(`✅ Completed: ${task.id}`);
|
|
|
|
// Check for more tasks
|
|
this.dispatch();
|
|
}).catch((error) => {
|
|
task.status = "failed";
|
|
console.log(`❌ Failed: ${task.id} - ${error.message}`);
|
|
|
|
// Check for more tasks
|
|
this.dispatch();
|
|
});
|
|
}
|
|
|
|
// Add a worker
|
|
addWorker(id: string, worktreePath: string): Worker {
|
|
const worker = new Worker(id, worktreePath);
|
|
this.workers.push(worker);
|
|
console.log(`👷 Added worker: ${id} (total: ${this.workers.length})`);
|
|
|
|
// Try to dispatch
|
|
this.dispatch();
|
|
|
|
return worker;
|
|
}
|
|
|
|
// Remove a worker
|
|
removeWorker(id: string) {
|
|
const worker = this.workers.find(w => w.id === id);
|
|
if (worker) {
|
|
worker.abort();
|
|
this.workers = this.workers.filter(w => w.id !== id);
|
|
console.log(`👷 Removed worker: ${id}`);
|
|
}
|
|
}
|
|
|
|
// Get stats
|
|
getStats() {
|
|
return {
|
|
queueSize: this.queue.size(),
|
|
maxQueueSize: this.maxQueueSize,
|
|
workers: this.workers.length,
|
|
idleWorkers: this.workers.filter(w => w.status === "idle").length,
|
|
busyWorkers: this.workers.filter(w => w.status === "busy").length,
|
|
maxWorkers: this.maxWorkers,
|
|
queuedTasks: this.queue.getAll().map(t => ({
|
|
id: t.id,
|
|
priority: t.priority,
|
|
status: t.status,
|
|
})),
|
|
};
|
|
}
|
|
|
|
// Start the queue processor
|
|
start() {
|
|
this.running = true;
|
|
console.log(`🚀 Queue system started (max ${this.maxWorkers} workers)`);
|
|
}
|
|
|
|
// Stop the queue processor
|
|
stop() {
|
|
this.running = false;
|
|
this.workers.forEach(w => w.abort());
|
|
console.log("🛑 Queue system stopped");
|
|
}
|
|
}
|
|
|
|
// ============== TESTS ==============
|
|
async function testSequential() {
|
|
console.log("\n" + "=" .repeat(50));
|
|
console.log("TEST 1: Sequential (1 worker, multiple tasks)");
|
|
console.log("=" .repeat(50));
|
|
|
|
const queue = new QueueSystem(1, 50);
|
|
queue.start();
|
|
queue.addWorker("worker-1", "/tmp");
|
|
|
|
// Submit 3 tasks
|
|
queue.submit("Say 'Task 1'", "normal");
|
|
queue.submit("Say 'Task 2'", "normal");
|
|
queue.submit("Say 'Task 3'", "normal");
|
|
|
|
// Wait for tasks to complete
|
|
await new Promise(resolve => setTimeout(resolve, 30000));
|
|
|
|
console.log("\n📊 Stats:", queue.getStats());
|
|
queue.stop();
|
|
}
|
|
|
|
async function testParallel() {
|
|
console.log("\n" + "=" .repeat(50));
|
|
console.log("TEST 2: Parallel (2 workers, multiple tasks)");
|
|
console.log("=" .repeat(50));
|
|
|
|
const queue = new QueueSystem(2, 50);
|
|
queue.start();
|
|
queue.addWorker("worker-1", "/tmp");
|
|
queue.addWorker("worker-2", "/tmp");
|
|
|
|
// Submit 4 tasks
|
|
queue.submit("Say 'Task A'", "normal");
|
|
queue.submit("Say 'Task B'", "normal");
|
|
queue.submit("Say 'Task C'", "normal");
|
|
queue.submit("Say 'Task D'", "normal");
|
|
|
|
// Wait for tasks to complete
|
|
await new Promise(resolve => setTimeout(resolve, 30000));
|
|
|
|
console.log("\n📊 Stats:", queue.getStats());
|
|
queue.stop();
|
|
}
|
|
|
|
async function testPriority() {
|
|
console.log("\n" + "=" .repeat(50));
|
|
console.log("TEST 3: Priority (high priority first)");
|
|
console.log("=" .repeat(50));
|
|
|
|
const queue = new QueueSystem(1, 50);
|
|
queue.start();
|
|
queue.addWorker("worker-1", "/tmp");
|
|
|
|
// Submit in random order with different priorities
|
|
queue.submit("Say 'Normal 1'", "normal");
|
|
queue.submit("Say 'Low'", "low");
|
|
queue.submit("Say 'High 1'", "high");
|
|
queue.submit("Say 'Normal 2'", "normal");
|
|
queue.submit("Say 'High 2'", "high");
|
|
|
|
console.log("\n📊 Queue order:", queue.getStats().queuedTasks.map(t => `${t.priority}:${t.id.slice(-3)}`));
|
|
|
|
// Wait for tasks to complete
|
|
await new Promise(resolve => setTimeout(resolve, 40000));
|
|
|
|
console.log("\n📊 Stats:", queue.getStats());
|
|
queue.stop();
|
|
}
|
|
|
|
async function testBackpressure() {
|
|
console.log("\n" + "=" .repeat(50));
|
|
console.log("TEST 4: Backpressure (queue full)");
|
|
console.log("=" .repeat(50));
|
|
|
|
// Very small queue (3 max)
|
|
const queue = new QueueSystem(1, 3);
|
|
queue.start();
|
|
queue.addWorker("worker-1", "/tmp");
|
|
|
|
// Submit 5 tasks (should reject 2)
|
|
const results = [];
|
|
results.push(queue.submit("Task 1", "normal"));
|
|
results.push(queue.submit("Task 2", "normal"));
|
|
results.push(queue.submit("Task 3", "normal"));
|
|
results.push(queue.submit("Task 4", "normal")); // Should fail
|
|
results.push(queue.submit("Task 5", "normal")); // Should fail
|
|
|
|
console.log("\n📊 Submit results:", results.map((r, i) => `Task ${i+1}: ${r ? '✅' : '❌'}`).join(", "));
|
|
console.log("\n📊 Stats:", queue.getStats());
|
|
|
|
// Wait a bit then cleanup
|
|
await new Promise(resolve => setTimeout(resolve, 5000));
|
|
queue.stop();
|
|
}
|
|
|
|
// ============== MAIN ==============
|
|
async function main() {
|
|
console.log("🧪 Level 3c: Queue System with Worker Pool\n");
|
|
|
|
registerBuiltInApiProviders();
|
|
|
|
await testSequential();
|
|
await new Promise(resolve => setTimeout(resolve, 3000));
|
|
|
|
await testParallel();
|
|
await new Promise(resolve => setTimeout(resolve, 3000));
|
|
|
|
await testPriority();
|
|
await new Promise(resolve => setTimeout(resolve, 3000));
|
|
|
|
await testBackpressure();
|
|
|
|
console.log("\n✅ All tests complete!");
|
|
}
|
|
|
|
main().catch(console.error);
|