01Rate Limit Taxonomy: Know What You're Fighting
Not all rate limits work the same way, and the correct defensive strategy depends on understanding which type you are hitting. Applying the wrong pattern — for instance, using simple backoff against a sliding window limiter — results in wasted retries and prolonged outages.
The Five Rate Limit Types
| Type | How It Works | Reset Behavior | Best Defense |
|---|---|---|---|
| Fixed Window | N requests per time window (e.g., 100/min) | Hard reset at window boundary | Sleep until window resets |
| Sliding Window | N requests in any rolling time window | Continuous, per-request expiry | Token bucket |
| Token Bucket | Tokens refill at constant rate; burst allowed | Continuous refill | Matching token bucket client-side |
| Leaky Bucket | Requests processed at constant rate; queue fills | Queue drain rate | Rate-limited queue + backpressure |
| Concurrent | Max N simultaneous in-flight requests | On request completion | Semaphore / connection pool |
Reading Rate Limit Headers
All modern APIs return rate limit state in response headers. Your agent must read these on every response — not just on 429 errors. Acting on real-time rate limit data is far more efficient than guessing:
| Header | Meaning | Action |
|---|---|---|
X-RateLimit-Limit |
Max requests in window | Configure local rate limiter |
X-RateLimit-Remaining |
Requests left in current window | Throttle if < 10% remaining |
X-RateLimit-Reset |
Unix timestamp when window resets | Sleep until reset on 429 |
Retry-After |
Seconds to wait before retry (on 429) | Honor exactly + add jitter |
X-RateLimit-Burst |
Max burst size allowed | Set token bucket burst parameter |
Thundering Herd: When a rate limit window resets, all waiting agents fire simultaneously — immediately hitting the limit again. This is the thundering herd problem. The solution is jitter: randomize retry timing so agents don't synchronize. A shared rate limit across 100 agents with no jitter produces 100 simultaneous retries at T+0. With jitter, they spread out over T+0 to T+5s.
Purple Flea API Rate Limits
Standard tier
Pro tier
Max in-flight
Per request
Purple Flea uses a token bucket algorithm for all API endpoints. The burst parameter is 2x the per-minute limit, allowing short bursts without penalty. Headers `X-RateLimit-Remaining` and `X-RateLimit-Reset` are present on every response.
02Exponential Backoff with Jitter
When a request fails with 429 (Too Many Requests) or 503 (Service Unavailable), the worst thing your agent can do is immediately retry. The second-worst is waiting a fixed time. The correct approach is exponential backoff with jitter: each retry waits exponentially longer, with randomization to prevent synchronization.
The Algorithm
base = 0.5s, cap = 60s, attempt = retry number (0-indexed)
With these parameters:
- Attempt 0: wait = random(0, 0.5s) — near-instant retry for transient errors
- Attempt 1: wait = random(0, 1s)
- Attempt 2: wait = random(0, 2s)
- Attempt 5: wait = random(0, 16s)
- Attempt 7+: wait = random(0, 60s) — capped
Jitter Variants
| Jitter Type | Formula | Properties | Use Case |
|---|---|---|---|
| No Jitter | base * 2^attempt | Thundering herd | Single agent only |
| Full Jitter | random(0, cap) | Best spread | Multi-agent systems |
| Equal Jitter | cap/2 + random(0, cap/2) | Min wait guaranteed | When min wait matters |
| Decorrelated Jitter | random(base, prev*3) | Lowest contention | Competitive workloads |
For multi-agent scenarios (many agents hitting the same API), Full Jitter and Decorrelated Jitter produce the lowest contention. AWS recommends Decorrelated Jitter for DynamoDB and other highly contended services — the same principle applies to any shared rate-limited API.
03Token Bucket Rate Limiter
Proactive rate limiting — slowing yourself down before hitting the limit — is far superior to reactive backoff. The token bucket algorithm models an API's token bucket server-side and never allows your agent to exceed the rate in the first place.
A token bucket has a capacity (maximum tokens) and a refill rate (tokens per second). Each request consumes one token. If no tokens are available, the request must wait. The bucket refills continuously at the configured rate.
Client-Side Token Bucket Tip: Set your client-side bucket capacity to 80-90% of the server's limit. This leaves headroom for other agents sharing the same API key, network latency spikes, and clock drift between your agent and the server's rate limit clock. Hitting 90% locally means you never hit 100% on the server.
04Circuit Breaker Pattern
A circuit breaker is an automatic switch that stops request flow to a failing service, allowing it time to recover. Named after electrical circuit breakers, it prevents cascading failures from propagating through your agent's dependency graph.
The Three States
CLOSED
Normal operation.
Requests pass through.
threshold hit
OPEN
Fail fast.
No requests sent.
elapsed
HALF-OPEN
Probe with one
request.
State transitions:
- Closed → Open: When failure rate exceeds threshold (e.g., 50% of last 10 requests fail). All subsequent requests are immediately rejected with a local error — no network call made.
- Open → Half-Open: After a configured recovery timeout (e.g., 30 seconds), the breaker allows one probe request through.
- Half-Open → Closed: If the probe succeeds, the breaker closes and normal operation resumes.
- Half-Open → Open: If the probe fails, the breaker reopens and the recovery timer resets.
Circuit Breaker Compound Benefit: Beyond protecting your agent from cascading failures, open circuit breakers also protect the downstream service from being hammered by retry storms while it is recovering. Your agent's circuit breaker is a courtesy to every other agent sharing the API.
When to Use Circuit Breakers vs Backoff
These patterns are complementary, not alternatives:
- Backoff: For transient rate limit errors (429). The service is healthy but busy. Retry with patience.
- Circuit breaker: For sustained failures (5xx, connection errors). The service may be down or severely degraded. Stop hammering it.
- Both together: Circuit breaker wraps the retry-with-backoff logic. If retries keep failing, the circuit opens. When the circuit is closed, retries use backoff.
05Priority Queue Management
When rate limits constrain throughput, not all requests are equally important. A market order to close a losing position should not wait behind a low-priority status check. Priority queues route your rate limit budget toward the requests that matter most.
Priority Tiers
| Priority | Request Type | Max Wait | On Timeout |
|---|---|---|---|
| P0 — Critical | Emergency stop-loss, liquidation prevention | 0ms (skip queue) | Error + alert |
| P1 — High | Order placement, position close | 500ms | Retry with backoff |
| P2 — Normal | Price queries, balance checks | 5s | Return cached value |
| P3 — Low | Historical data, reporting | 60s | Drop + log |
| P4 — Background | Analytics, non-urgent updates | 300s | Drop silently |
Budget Allocation
Assign rate limit token budgets across priority levels. A conservative allocation for trading agents: P0 always passes (no budget constraint). P1: 40% of tokens. P2: 35% of tokens. P3/P4: 25% combined. This ensures critical operations always have headroom even when lower-priority requests are saturating the queue.
Priority Inversion: Never let low-priority tasks starve indefinitely — they will eventually time out and produce confusing errors. Implement aging: a P3 request that has waited 30 seconds should be promoted to P2. After 120 seconds, promote again to P1. Starvation of any tier is a bug.
06Complete Python Implementation
The following is a production-grade implementation covering token bucket, exponential backoff decorator, circuit breaker, and priority queue — all composable and compatible with Purple Flea's APIs and any other rate-limited service.
import asyncio import time import random import logging from dataclasses import dataclass, field from enum import Enum from typing import Optional, Callable, Any from functools import wraps logger = logging.getLogger("rate_limiter") # ============================================================ # TOKEN BUCKET RATE LIMITER # ============================================================ class TokenBucket: """ Async token bucket rate limiter. Refills at `rate` tokens/second, up to `capacity` tokens. Supports burst: initial tokens = capacity (allows initial burst). """ def __init__( self, rate: float, # tokens per second capacity: float, # max tokens (burst size) initial_tokens: Optional[float] = None ): self.rate = rate self.capacity = capacity self.tokens = initial_tokens if initial_tokens is not None else capacity self.last_refill = time.monotonic() self._lock = asyncio.Lock() def _refill(self): now = time.monotonic() elapsed = now - self.last_refill new_tokens = elapsed * self.rate self.tokens = min(self.capacity, self.tokens + new_tokens) self.last_refill = now async def acquire(self, tokens: float = 1.0) -> float: """ Acquire tokens. Returns wait time (0 if immediate). If wait > 0, caller should sleep before proceeding. """ async with self._lock: self._refill() if self.tokens >= tokens: self.tokens -= tokens return 0.0 # Calculate wait time until enough tokens available deficit = tokens - self.tokens wait = deficit / self.rate return wait async def wait_and_acquire(self, tokens: float = 1.0): """Acquire tokens, sleeping if necessary.""" wait = await self.acquire(tokens) if wait > 0: logger.debug(f"Rate limiting: sleeping {wait:.3f}s") await asyncio.sleep(wait) # Re-acquire after sleeping async with self._lock: self._refill() self.tokens -= tokens def available_tokens(self) -> float: async with self._lock: self._refill() return self.tokens def utilization_pct(self) -> float: return (1.0 - self.tokens / self.capacity) * 100
import asyncio import random import logging from functools import wraps from typing import Tuple, Type # ============================================================ # EXPONENTIAL BACKOFF DECORATOR # ============================================================ def with_retry( max_attempts: int = 5, base_delay: float = 0.5, max_delay: float = 60.0, jitter: str = 'full', # 'full', 'equal', 'decorrelated' retryable_status: Tuple = (429, 500, 502, 503, 504), honor_retry_after: bool = True ): """ Async decorator for exponential backoff with jitter. Reads Retry-After header when available. Raises on non-retryable errors immediately. """ def decorator(func): @wraps(func) async def wrapper(*args, **kwargs): prev_delay = base_delay for attempt in range(max_attempts): try: resp = await func(*args, **kwargs) # Check for rate limit headers on success too if hasattr(resp, 'headers'): remaining = int(resp.headers.get('X-RateLimit-Remaining', '999')) if remaining < 10: reset_ts = int(resp.headers.get('X-RateLimit-Reset', '0')) wait = max(0, reset_ts - time.time()) if wait > 0: logger.info(f"Approaching rate limit ({remaining} remaining). Sleeping {wait:.1f}s") await asyncio.sleep(wait + random.uniform(0, 1)) return resp except Exception as e: status = getattr(getattr(e, 'response', None), 'status_code', None) # Non-retryable: raise immediately if status is not None and status not in retryable_status: raise if attempt == max_attempts - 1: logger.error(f"All {max_attempts} attempts failed: {e}") raise # Check Retry-After header retry_after = None if honor_retry_after and hasattr(getattr(e, 'response', None), 'headers'): ra = e.response.headers.get('Retry-After') if ra: retry_after = float(ra) # Calculate delay with jitter cap = min(max_delay, base_delay * (2 ** attempt)) if retry_after: delay = retry_after + random.uniform(0, 1) elif jitter == 'full': delay = random.uniform(0, cap) elif jitter == 'equal': delay = cap / 2 + random.uniform(0, cap / 2) elif jitter == 'decorrelated': delay = min(max_delay, random.uniform(base_delay, prev_delay * 3)) prev_delay = delay else: delay = cap logger.warning( f"Attempt {attempt + 1}/{max_attempts} failed " f"(status={status}). Retrying in {delay:.2f}s: {e}" ) await asyncio.sleep(delay) return wrapper return decorator # ============================================================ # CIRCUIT BREAKER # ============================================================ class CircuitState(Enum): CLOSED = "closed" OPEN = "open" HALF_OPEN = "half_open" class CircuitBreaker: """ Async circuit breaker. Tracks failure rates over a rolling window. Opens circuit when failure rate exceeds threshold. """ def __init__( self, failure_threshold: float = 0.5, # 50% failure rate opens circuit window_size: int = 10, # rolling window of N requests recovery_timeout: float = 30.0, # seconds to wait before half-open half_open_timeout: float = 10.0, # seconds for probe request name: str = "circuit" ): self.failure_threshold = failure_threshold self.window_size = window_size self.recovery_timeout = recovery_timeout self.half_open_timeout = half_open_timeout self.name = name self.state = CircuitState.CLOSED self.window: list = [] # True = success, False = failure self.opened_at: Optional[float] = None self._lock = asyncio.Lock() def _failure_rate(self) -> float: if not self.window: return 0.0 return 1.0 - sum(self.window) / len(self.window) def _record(self, success: bool): self.window.append(success) if len(self.window) > self.window_size: self.window.pop(0) async def call(self, fn: Callable, *args, **kwargs) -> Any: """Execute fn through the circuit breaker.""" async with self._lock: if self.state == CircuitState.OPEN: elapsed = time.monotonic() - (self.opened_at or 0) if elapsed < self.recovery_timeout: raise RuntimeError( f"Circuit '{self.name}' is OPEN. " f"Retry in {self.recovery_timeout - elapsed:.1f}s" ) # Transition to half-open self.state = CircuitState.HALF_OPEN logger.info(f"Circuit '{self.name}' → HALF_OPEN (probing)") try: result = await asyncio.wait_for( fn(*args, **kwargs), timeout=self.half_open_timeout if self.state == CircuitState.HALF_OPEN else None ) async with self._lock: self._record(True) if self.state == CircuitState.HALF_OPEN: self.state = CircuitState.CLOSED self.window.clear() logger.info(f"Circuit '{self.name}' → CLOSED (probe succeeded)") return result except Exception as e: async with self._lock: self._record(False) if self.state == CircuitState.HALF_OPEN: self.state = CircuitState.OPEN self.opened_at = time.monotonic() logger.warning(f"Circuit '{self.name}' → OPEN (probe failed)") elif (self.state == CircuitState.CLOSED and len(self.window) >= self.window_size and self._failure_rate() >= self.failure_threshold): self.state = CircuitState.OPEN self.opened_at = time.monotonic() logger.error( f"Circuit '{self.name}' → OPEN " f"(failure rate={self._failure_rate():.0%})" ) raise
import asyncio import heapq import httpx from dataclasses import dataclass, field # ============================================================ # PRIORITY QUEUE + COMPLETE RESILIENT AGENT # ============================================================ @dataclass(order=True) class QueuedRequest: priority: int # lower = higher priority enqueued_at: float url: str = field(compare=False) kwargs: dict = field(compare=False) future: asyncio.Future = field(compare=False) max_wait: float = field(compare=False, default=30.0) class ResilientPurpleFlеaClient: """ Complete resilient HTTP client for Purple Flea APIs. Combines: token bucket + priority queue + circuit breaker + backoff. Usage: client = ResilientPurpleFlеaClient(api_key="...", rate=5.0, burst=20) price = await client.request("GET", "/trading-api/ticker/BTC-USDC", priority=2) await client.request("POST", "/trading-api/order", json={...}, priority=1) """ PRIORITY_NAMES = {0: "CRITICAL", 1: "HIGH", 2: "NORMAL", 3: "LOW", 4: "BACKGROUND"} MAX_WAIT_BY_PRIORITY = {0: 0.0, 1: 0.5, 2: 5.0, 3: 60.0, 4: 300.0} def __init__( self, api_key: str, base_url: str = "https://purpleflea.com", rate: float = 4.5, # 4.5/s = 270/min (90% of 300/min standard) burst: float = 20, # burst size max_concurrent: int = 20 # concurrent in-flight ): self.base_url = base_url self.http = httpx.AsyncClient( base_url=base_url, headers={"Authorization": f"Bearer {api_key}"}, timeout=15.0 ) self.bucket = TokenBucket(rate=rate, capacity=burst) self.semaphore = asyncio.Semaphore(max_concurrent) self.circuit = CircuitBreaker(name="purpleflea") self._queue: list = [] self._worker_task: Optional[asyncio.Task] = None async def start(self): self._worker_task = asyncio.create_task(self._process_queue()) async def stop(self): if self._worker_task: self._worker_task.cancel() await self.http.aclose() async def request( self, method: str, path: str, priority: int = 2, **kwargs ) -> dict: loop = asyncio.get_event_loop() future = loop.create_future() max_wait = self.MAX_WAIT_BY_PRIORITY.get(priority, 30.0) req = QueuedRequest( priority=priority, enqueued_at=asyncio.get_event_loop().time(), url=f"{method} {path}", kwargs={"method": method, "url": path, **kwargs}, future=future, max_wait=max_wait ) # P0 (critical): bypass queue entirely if priority == 0: return await self._execute(req) heapq.heappush(self._queue, req) return await future @with_retry(max_attempts=4, base_delay=0.5, jitter='decorrelated') async def _execute(self, req: QueuedRequest) -> dict: # Apply rate limiting await self.bucket.wait_and_acquire() # Concurrency limit async with self.semaphore: async def _do_request(): resp = await self.http.request(**req.kwargs) resp.raise_for_status() return resp.json() return await self.circuit.call(_do_request) async def _process_queue(self): while True: if not self._queue: await asyncio.sleep(0.01) continue now = asyncio.get_event_loop().time() req = self._queue[0] # Check for timeout (priority aging) age = now - req.enqueued_at if age > req.max_wait and req.priority > 0: # Promote priority req.priority -= 1 heapq.heapify(self._queue) logger.info(f"Promoted {req.url} to priority {req.priority}") continue req = heapq.heappop(self._queue) try: result = await self._execute(req) if not req.future.done(): req.future.set_result(result) except Exception as e: if not req.future.done(): req.future.set_exception(e) # ==================================================== # Example usage with Purple Flea APIs # ==================================================== async def main(): client = ResilientPurpleFlеaClient(api_key="your-api-key") await client.start() try: # Critical: emergency stop (bypasses queue) await client.request("POST", "/trading-api/order", json={"symbol": "BTC-USDC", "side": "sell", "type": "market", "reduce_only": True}, priority=0 ) # Normal: price check (queued normally) price_data = await client.request( "GET", "/trading-api/ticker/BTC-USDC", priority=2 ) print(f"BTC mark price: ${price_data['mark_price']}") # Background: analytics (low priority) history = await client.request( "GET", "/trading-api/history?limit=1000", priority=4 ) finally: await client.stop() if __name__ == "__main__": asyncio.run(main())
Production Checklist: Before deploying a rate-limit-aware agent, verify: (1) client-side bucket rate is set to 80-90% of API limit, (2) circuit breaker thresholds are calibrated to observed failure rates, (3) P0 requests bypass all queueing, (4) Retry-After headers are honored, (5) jitter is applied on all retries, and (6) metrics are exported for bucket utilization and circuit state transitions.
Build Resilient Agents on Purple Flea
All Purple Flea APIs include rate limit headers, backoff-friendly 429 responses, and generous burst budgets. Start with free USDC from the faucet.