6.3 KiB
6.3 KiB
Queue System Research
Overview
Research on different queue system designs for managing concurrent agent execution.
Queue Types
1. Simple FIFO Queue
Description: First-in, first-out. Tasks are processed in the order they arrive.
class FifoQueue<T> {
private queue: T[] = [];
enqueue(item: T) {
this.queue.push(item);
}
dequeue(): T | undefined {
return this.queue.shift();
}
}
| Pros | Cons |
|---|---|
| Simple to implement | Doesn't prioritize urgent tasks |
| Fair (order preserved) | Long-running tasks block others |
| Predictable | No concurrency control |
2. Priority Queue
Description: Tasks have priority levels. Higher priority tasks are processed first.
interface PrioritizedTask {
id: string;
priority: number; // Higher = more urgent
payload: any;
}
class PriorityQueue {
private queue: PrioritizedTask[] = [];
enqueue(task: PrioritizedTask) {
this.queue.push(task);
this.queue.sort((a, b) => b.priority - a.priority);
}
dequeue(): PrioritizedTask | undefined {
return this.queue.shift();
}
}
| Pros | Cons |
|---|---|
| Urgent tasks first | More complex |
| Flexible priorities | Starvation possible (low priority never runs) |
| Fairer for different task types | Requires priority assignment logic |
3. Rate-Limited Queue
Description: Limits how many tasks can run per time window.
class RateLimitedQueue {
private queue: Task[] = [];
private running = 0;
constructor(
private maxConcurrent: number,
private ratePerSecond: number
) {}
async enqueue(task: Task) {
if (this.running >= this.maxConcurrent) {
await this.waitForSlot();
}
this.running++;
// process task...
this.running--;
}
}
| Pros | Cons |
|---|---|
| Prevents API rate limits | Complex timing logic |
| Controls resource usage | Hard to tune rate limits |
| Predictable throughput | May waste idle time |
4. Backpressure Queue
Description: Rejects new tasks when system is overloaded instead of queuing forever.
class BackpressureQueue {
constructor(
private maxQueueSize: number,
private maxConcurrent: number
) {}
async enqueue(task: Task) {
if (this.queue.length >= this.maxQueueSize) {
throw new Error("Queue full - backpressure");
}
if (this.running >= this.maxConcurrent) {
throw new Error("System overloaded");
}
// Accept task
}
}
| Pros | Cons |
|---|---|
| Never OOM | Tasks rejected under load |
| Clear failure mode | Requires client retry logic |
| Simple bounds | Less efficient utilization |
5. Token Bucket Queue
Description: Uses "tokens" that accumulate over time. Each task consumes tokens.
class TokenBucket {
private tokens = 0;
private lastRefill = Date.now();
constructor(
private capacity: number, // Max tokens
private refillRate: number // Tokens per second
) {}
tryConsume(tokens: number = 1): boolean {
this.refill();
if (this.tokens >= tokens) {
this.tokens -= tokens;
return true;
}
return false;
}
private refill() {
const now = Date.now();
const elapsed = (now - this.lastRefill) / 1000;
this.tokens = Math.min(this.capacity, this.tokens + elapsed * this.refillRate);
this.lastRefill = now;
}
}
| Pros | Cons |
|---|---|
| Handles burst traffic | Complex tuning |
| Smooth rate limiting | Token calculation overhead |
| Flexible | May be overkill for simple cases |
6. Job Queue with Workers (Worker Pool)
Description: Fixed number of workers pull tasks from a queue.
class WorkerPool {
private queue: Task[] = [];
private workers: Worker[] = [];
constructor(workerCount: number) {
for (let i = 0; i < workerCount; i++) {
this.workers.push(new Worker(this));
}
}
async enqueue(task: Task) {
this.queue.push(task);
this.notifyWorkers();
}
}
| Pros | Cons |
|---|---|
| True parallelism | More complex |
| Efficient resource use | Worker lifecycle management |
| Handles many tasks | Debugging harder |
Queue Libraries Comparison
| Library | Type | Language | Pros | Cons |
|---|---|---|---|---|
| Bull | Redis-based | Node.js | Mature, persistence, retries | Redis dependency |
| Bee Queue | Redis-based | Node.js | Simpler than Bull | Less features |
| P Queue | In-memory | Node.js | No deps, priority support | Not distributed |
| Async.Queue | In-memory | Node.js | Built-in, simple | No persistence |
| Celery | Broker-based | Python | Very mature | Python only |
| RQ | Redis-based | Python | Simple | Less features |
Recommendations for Kugetsu
Current State
- Kugetsu has a basic concurrency check (max concurrent)
- Queue system is "broken" (basic)
Recommended Approach
Phase 1: Enhanced Simple Queue
- Add priority support to current queue
- Add rate limiting (per-agent, per-API)
- Backpressure when too many tasks
Phase 2: If Needed
- Add persistence (Redis) for crash recovery
- Add distributed support (multiple machines)
Why Not Full Queue System?
- Current workload is relatively simple
- Pi uses less memory, so concurrency limits work
- Over-engineering a simple problem
Implementation Ideas
Simple Priority Queue for Kugetsu
interface QueuedTask {
id: string;
priority: "high" | "normal" | "low";
payload: any;
createdAt: Date;
}
class SimplePriorityQueue {
private queues = {
high: [] as QueuedTask[],
normal: [] as QueuedTask[],
low: [] as QueuedTask[],
};
enqueue(task: QueuedTask) {
this.queues[task.priority].push(task);
}
dequeue(): QueuedTask | undefined {
// Try high, then normal, then low
for (const priority of ["high", "normal", "low"] as const) {
const task = this.queues[priority].shift();
if (task) return task;
}
return undefined;
}
}
Summary
| Use Case | Recommended Queue |
|---|---|
| Simple, few tasks | Simple FIFO |
| Different priorities | Priority Queue |
| API rate limits | Rate-Limited |
| Prevent OOM | Backpressure |
| High volume | Worker Pool |
| Distributed | Redis-based (Bull) |
For Kugetsu: Priority Queue + Rate Limiting is likely sufficient.