# Queue System Research ## Overview Research on different queue system designs for managing concurrent agent execution. --- ## Queue Types ### 1. Simple FIFO Queue **Description**: First-in, first-out. Tasks are processed in the order they arrive. ```typescript class FifoQueue { private queue: T[] = []; enqueue(item: T) { this.queue.push(item); } dequeue(): T | undefined { return this.queue.shift(); } } ``` | Pros | Cons | |------|------| | Simple to implement | Doesn't prioritize urgent tasks | | Fair (order preserved) | Long-running tasks block others | | Predictable | No concurrency control | --- ### 2. Priority Queue **Description**: Tasks have priority levels. Higher priority tasks are processed first. ```typescript interface PrioritizedTask { id: string; priority: number; // Higher = more urgent payload: any; } class PriorityQueue { private queue: PrioritizedTask[] = []; enqueue(task: PrioritizedTask) { this.queue.push(task); this.queue.sort((a, b) => b.priority - a.priority); } dequeue(): PrioritizedTask | undefined { return this.queue.shift(); } } ``` | Pros | Cons | |------|------| | Urgent tasks first | More complex | | Flexible priorities | Starvation possible (low priority never runs) | | Fairer for different task types | Requires priority assignment logic | --- ### 3. Rate-Limited Queue **Description**: Limits how many tasks can run per time window. ```typescript class RateLimitedQueue { private queue: Task[] = []; private running = 0; constructor( private maxConcurrent: number, private ratePerSecond: number ) {} async enqueue(task: Task) { if (this.running >= this.maxConcurrent) { await this.waitForSlot(); } this.running++; // process task... this.running--; } } ``` | Pros | Cons | |------|------| | Prevents API rate limits | Complex timing logic | | Controls resource usage | Hard to tune rate limits | | Predictable throughput | May waste idle time | --- ### 4. Backpressure Queue **Description**: Rejects new tasks when system is overloaded instead of queuing forever. ```typescript class BackpressureQueue { constructor( private maxQueueSize: number, private maxConcurrent: number ) {} async enqueue(task: Task) { if (this.queue.length >= this.maxQueueSize) { throw new Error("Queue full - backpressure"); } if (this.running >= this.maxConcurrent) { throw new Error("System overloaded"); } // Accept task } } ``` | Pros | Cons | |------|------| | Never OOM | Tasks rejected under load | | Clear failure mode | Requires client retry logic | | Simple bounds | Less efficient utilization | --- ### 5. Token Bucket Queue **Description**: Uses "tokens" that accumulate over time. Each task consumes tokens. ```typescript class TokenBucket { private tokens = 0; private lastRefill = Date.now(); constructor( private capacity: number, // Max tokens private refillRate: number // Tokens per second ) {} tryConsume(tokens: number = 1): boolean { this.refill(); if (this.tokens >= tokens) { this.tokens -= tokens; return true; } return false; } private refill() { const now = Date.now(); const elapsed = (now - this.lastRefill) / 1000; this.tokens = Math.min(this.capacity, this.tokens + elapsed * this.refillRate); this.lastRefill = now; } } ``` | Pros | Cons | |------|------| | Handles burst traffic | Complex tuning | | Smooth rate limiting | Token calculation overhead | | Flexible | May be overkill for simple cases | --- ### 6. Job Queue with Workers (Worker Pool) **Description**: Fixed number of workers pull tasks from a queue. ```typescript class WorkerPool { private queue: Task[] = []; private workers: Worker[] = []; constructor(workerCount: number) { for (let i = 0; i < workerCount; i++) { this.workers.push(new Worker(this)); } } async enqueue(task: Task) { this.queue.push(task); this.notifyWorkers(); } } ``` | Pros | Cons | |------|------| | True parallelism | More complex | | Efficient resource use | Worker lifecycle management | | Handles many tasks | Debugging harder | --- ## Queue Libraries Comparison | Library | Type | Language | Pros | Cons | |---------|------|----------|------|------| | **Bull** | Redis-based | Node.js | Mature, persistence, retries | Redis dependency | | **Bee Queue** | Redis-based | Node.js | Simpler than Bull | Less features | | **P Queue** | In-memory | Node.js | No deps, priority support | Not distributed | | **Async.Queue** | In-memory | Node.js | Built-in, simple | No persistence | | **Celery** | Broker-based | Python | Very mature | Python only | | **RQ** | Redis-based | Python | Simple | Less features | --- ## Recommendations for Kugetsu ### Current State - Kugetsu has a basic concurrency check (max concurrent) - Queue system is "broken" (basic) ### Recommended Approach **Phase 1: Enhanced Simple Queue** - Add priority support to current queue - Add rate limiting (per-agent, per-API) - Backpressure when too many tasks **Phase 2: If Needed** - Add persistence (Redis) for crash recovery - Add distributed support (multiple machines) ### Why Not Full Queue System? - Current workload is relatively simple - Pi uses less memory, so concurrency limits work - Over-engineering a simple problem --- ## Implementation Ideas ### Simple Priority Queue for Kugetsu ```typescript interface QueuedTask { id: string; priority: "high" | "normal" | "low"; payload: any; createdAt: Date; } class SimplePriorityQueue { private queues = { high: [] as QueuedTask[], normal: [] as QueuedTask[], low: [] as QueuedTask[], }; enqueue(task: QueuedTask) { this.queues[task.priority].push(task); } dequeue(): QueuedTask | undefined { // Try high, then normal, then low for (const priority of ["high", "normal", "low"] as const) { const task = this.queues[priority].shift(); if (task) return task; } return undefined; } } ``` --- ## Summary | Use Case | Recommended Queue | |----------|------------------| | Simple, few tasks | Simple FIFO | | Different priorities | Priority Queue | | API rate limits | Rate-Limited | | Prevent OOM | Backpressure | | High volume | Worker Pool | | Distributed | Redis-based (Bull) | For Kugetsu: **Priority Queue + Rate Limiting** is likely sufficient.