Files
kage-research/queue-research.md
2026-04-09 00:39:52 +00:00

6.3 KiB

Queue System Research

Overview

Research on different queue system designs for managing concurrent agent execution.


Queue Types

1. Simple FIFO Queue

Description: First-in, first-out. Tasks are processed in the order they arrive.

class FifoQueue<T> {
  private queue: T[] = [];
  
  enqueue(item: T) {
    this.queue.push(item);
  }
  
  dequeue(): T | undefined {
    return this.queue.shift();
  }
}
Pros Cons
Simple to implement Doesn't prioritize urgent tasks
Fair (order preserved) Long-running tasks block others
Predictable No concurrency control

2. Priority Queue

Description: Tasks have priority levels. Higher priority tasks are processed first.

interface PrioritizedTask {
  id: string;
  priority: number; // Higher = more urgent
  payload: any;
}

class PriorityQueue {
  private queue: PrioritizedTask[] = [];
  
  enqueue(task: PrioritizedTask) {
    this.queue.push(task);
    this.queue.sort((a, b) => b.priority - a.priority);
  }
  
  dequeue(): PrioritizedTask | undefined {
    return this.queue.shift();
  }
}
Pros Cons
Urgent tasks first More complex
Flexible priorities Starvation possible (low priority never runs)
Fairer for different task types Requires priority assignment logic

3. Rate-Limited Queue

Description: Limits how many tasks can run per time window.

class RateLimitedQueue {
  private queue: Task[] = [];
  private running = 0;
  
  constructor(
    private maxConcurrent: number,
    private ratePerSecond: number
  ) {}
  
  async enqueue(task: Task) {
    if (this.running >= this.maxConcurrent) {
      await this.waitForSlot();
    }
    this.running++;
    // process task...
    this.running--;
  }
}
Pros Cons
Prevents API rate limits Complex timing logic
Controls resource usage Hard to tune rate limits
Predictable throughput May waste idle time

4. Backpressure Queue

Description: Rejects new tasks when system is overloaded instead of queuing forever.

class BackpressureQueue {
  constructor(
    private maxQueueSize: number,
    private maxConcurrent: number
  ) {}
  
  async enqueue(task: Task) {
    if (this.queue.length >= this.maxQueueSize) {
      throw new Error("Queue full - backpressure");
    }
    if (this.running >= this.maxConcurrent) {
      throw new Error("System overloaded");
    }
    // Accept task
  }
}
Pros Cons
Never OOM Tasks rejected under load
Clear failure mode Requires client retry logic
Simple bounds Less efficient utilization

5. Token Bucket Queue

Description: Uses "tokens" that accumulate over time. Each task consumes tokens.

class TokenBucket {
  private tokens = 0;
  private lastRefill = Date.now();
  
  constructor(
    private capacity: number,    // Max tokens
    private refillRate: number   // Tokens per second
  ) {}
  
  tryConsume(tokens: number = 1): boolean {
    this.refill();
    if (this.tokens >= tokens) {
      this.tokens -= tokens;
      return true;
    }
    return false;
  }
  
  private refill() {
    const now = Date.now();
    const elapsed = (now - this.lastRefill) / 1000;
    this.tokens = Math.min(this.capacity, this.tokens + elapsed * this.refillRate);
    this.lastRefill = now;
  }
}
Pros Cons
Handles burst traffic Complex tuning
Smooth rate limiting Token calculation overhead
Flexible May be overkill for simple cases

6. Job Queue with Workers (Worker Pool)

Description: Fixed number of workers pull tasks from a queue.

class WorkerPool {
  private queue: Task[] = [];
  private workers: Worker[] = [];
  
  constructor(workerCount: number) {
    for (let i = 0; i < workerCount; i++) {
      this.workers.push(new Worker(this));
    }
  }
  
  async enqueue(task: Task) {
    this.queue.push(task);
    this.notifyWorkers();
  }
}
Pros Cons
True parallelism More complex
Efficient resource use Worker lifecycle management
Handles many tasks Debugging harder

Queue Libraries Comparison

Library Type Language Pros Cons
Bull Redis-based Node.js Mature, persistence, retries Redis dependency
Bee Queue Redis-based Node.js Simpler than Bull Less features
P Queue In-memory Node.js No deps, priority support Not distributed
Async.Queue In-memory Node.js Built-in, simple No persistence
Celery Broker-based Python Very mature Python only
RQ Redis-based Python Simple Less features

Recommendations for Kugetsu

Current State

  • Kugetsu has a basic concurrency check (max concurrent)
  • Queue system is "broken" (basic)

Phase 1: Enhanced Simple Queue

  • Add priority support to current queue
  • Add rate limiting (per-agent, per-API)
  • Backpressure when too many tasks

Phase 2: If Needed

  • Add persistence (Redis) for crash recovery
  • Add distributed support (multiple machines)

Why Not Full Queue System?

  • Current workload is relatively simple
  • Pi uses less memory, so concurrency limits work
  • Over-engineering a simple problem

Implementation Ideas

Simple Priority Queue for Kugetsu

interface QueuedTask {
  id: string;
  priority: "high" | "normal" | "low";
  payload: any;
  createdAt: Date;
}

class SimplePriorityQueue {
  private queues = {
    high: [] as QueuedTask[],
    normal: [] as QueuedTask[],
    low: [] as QueuedTask[],
  };
  
  enqueue(task: QueuedTask) {
    this.queues[task.priority].push(task);
  }
  
  dequeue(): QueuedTask | undefined {
    // Try high, then normal, then low
    for (const priority of ["high", "normal", "low"] as const) {
      const task = this.queues[priority].shift();
      if (task) return task;
    }
    return undefined;
  }
}

Summary

Use Case Recommended Queue
Simple, few tasks Simple FIFO
Different priorities Priority Queue
API rate limits Rate-Limited
Prevent OOM Backpressure
High volume Worker Pool
Distributed Redis-based (Bull)

For Kugetsu: Priority Queue + Rate Limiting is likely sufficient.