High-frequency AI agents hit API rate limits. It is not a bug — it is an engineering constraint you design around. Whether your agent is polling prices every 100ms, submitting dozens of trades per minute, or running parallel wallet queries, you need a principled approach to request management or you will get throttled, miss fills, and leave money on the table.
This guide covers the Purple Flea API rate limit model, how rate limit responses work, the four core throttling strategies, and a complete Python RateLimiter class you can drop into any agent codebase today.
Purple Flea API Rate Limit Reference
The Purple Flea platform exposes six services, each with independent rate limit tiers. Limits are applied per API key and per IP. Exceeding a limit returns HTTP 429 Too Many Requests with a Retry-After header indicating the seconds until the limit resets.
| Service | Endpoint class | Limit | Window | 429 backoff |
|---|---|---|---|---|
| Casino API | Game actions (bet, cashout) | 60 req | per minute | 60s |
| Casino API | Balance / history reads | 300 req | per minute | 30s |
| Trading API | Order placement | 120 req | per minute | 60s |
| Trading API | Market data / price reads | 600 req | per minute | 15s |
| Wallet API | Send / receive transactions | 30 req | per minute | 120s |
| Wallet API | Balance reads | 240 req | per minute | 20s |
| Domains API | Domain actions (buy, list) | 30 req | per minute | 120s |
| Escrow API | Create / resolve escrow | 30 req | per minute | 120s |
| Faucet API | Claim (once per agent) | 1 req | per key lifetime | N/A |
Write-heavy endpoints (transactions, orders, bets) have lower limits than read endpoints. Design your agent to cache read results aggressively and minimize redundant state polls. A well-cached agent can run 10x higher effective throughput than a naive one hitting the same limit.
How 429 Responses Work
When your agent exceeds a rate limit, the API returns:
HTTP/1.1 429 Too Many Requests
Retry-After: 45
X-RateLimit-Limit: 120
X-RateLimit-Remaining: 0
X-RateLimit-Reset: 1741305600
Content-Type: application/json
{
"error": "rate_limit_exceeded",
"message": "Too many requests. Retry after 45 seconds.",
"retry_after": 45
}
The three headers you care about:
Retry-After: exact seconds until the limit window resets. Use this, not a fixed backoff.X-RateLimit-Remaining: requests left in the current window. Monitor this on every response to avoid hitting the wall.X-RateLimit-Reset: Unix timestamp when the window resets. Useful for scheduling batch operations.
A naive agent ignores these headers and either crashes on 429 or blindly sleeps for a fixed duration. A sophisticated agent tracks remaining capacity per endpoint and proactively throttles before hitting the limit.
Strategy 1: Token Bucket Rate Limiting
The token bucket algorithm is the gold standard for smooth rate limiting. Each endpoint has a bucket with a fixed capacity (the rate limit). Tokens refill at a constant rate (1 per window/capacity). Each request consumes one token. If the bucket is empty, the request waits.
This gives you burst headroom (use multiple tokens quickly when they have accumulated) while enforcing the average rate limit over time. It is strictly superior to naive sleep loops.
import asyncio import time from dataclasses import dataclass, field from typing import Dict @dataclass class TokenBucket: capacity: int # max tokens (= rate limit) refill_rate: float # tokens per second tokens: float = field(init=False) last_refill: float = field(init=False) def __post_init__(self): self.tokens = self.capacity self.last_refill = time.monotonic() def _refill(self) -> None: now = time.monotonic() elapsed = now - self.last_refill gained = elapsed * self.refill_rate self.tokens = min(self.capacity, self.tokens + gained) self.last_refill = now async def acquire(self) -> None: """Block until a token is available.""" while True: self._refill() if self.tokens >= 1: self.tokens -= 1 return wait = (1 - self.tokens) / self.refill_rate await asyncio.sleep(wait)
Strategy 2: Exponential Backoff with Jitter
When a 429 actually fires — because your token bucket was too aggressive, or because you share an IP with another agent — you need a recovery strategy. Naive "sleep 60s and retry" causes thundering herd: all throttled agents wake up simultaneously and immediately hit the limit again.
Exponential backoff with jitter solves this. Each retry doubles the wait time, with a random jitter added so multiple agents desynchronize.
import asyncio import random import httpx from typing import Optional async def request_with_backoff( client: httpx.AsyncClient, method: str, url: str, max_retries: int = 7, base_delay: float = 1.0, max_delay: float = 120.0, **kwargs ) -> httpx.Response: """Execute an HTTP request with exponential backoff on 429.""" attempt = 0 while attempt <= max_retries: resp = await client.request(method, url, **kwargs) if resp.status_code != 429: return resp # Respect Retry-After if present retry_after = resp.headers.get("Retry-After") if retry_after: wait = float(retry_after) else: # Exponential backoff: 1s, 2s, 4s, 8s, 16s, 32s, 64s exp_delay = base_delay * (2 ** attempt) jitter = random.uniform(0, exp_delay * 0.25) wait = min(exp_delay + jitter, max_delay) print(f"429 on {url}, retrying in {wait:.1f}s (attempt {attempt+1}/{max_retries})") await asyncio.sleep(wait) attempt += 1 raise RuntimeError(f"Max retries exceeded for {url}")
Without jitter, 10 agents all throttled at the same moment will all retry at the same moment. Adding random jitter of 0-25% of the delay spreads retries out and prevents the cascade from hitting the limit again immediately.
Strategy 3: Request Queue with Priority Lanes
For agents with multiple concurrent tasks (price monitoring, order management, balance checks), a priority queue prevents low-priority read operations from consuming the request budget that high-priority write operations need.
Assign priorities: market-critical writes (order placement, cashout) get priority 0, informational reads get priority 2. The queue drains highest-priority requests first.
import asyncio import heapq from dataclasses import dataclass, field from typing import Any, Callable, Coroutine @dataclass(order=True) class QueuedRequest: priority: int seq: int # tiebreak by insertion order coro_fn: Callable = field(compare=False) # coroutine factory future: asyncio.Future = field(compare=False) class PriorityRequestQueue: def __init__(self, bucket: TokenBucket): self.bucket = bucket self._heap: list = [] self._seq = 0 self._task = asyncio.ensure_future(self._drain()) async def submit(self, coro_fn: Callable, priority: int = 1) -> Any: """Submit a request. Lower priority number = higher urgency.""" loop = asyncio.get_event_loop() fut = loop.create_future() item = QueuedRequest(priority, self._seq, coro_fn, fut) self._seq += 1 heapq.heappush(self._heap, item) return await fut async def _drain(self) -> None: while True: if not self._heap: await asyncio.sleep(0.005) continue await self.bucket.acquire() item = heapq.heappop(self._heap) try: result = await item.coro_fn() item.future.set_result(result) except Exception as e: item.future.set_exception(e)
Strategy 4: Adaptive Throttling via Remaining Header
The most sophisticated approach uses the X-RateLimit-Remaining header returned on every successful response to dynamically adjust the send rate. When remaining capacity is high, send at full speed. As remaining drops below a threshold, slow down proactively before hitting 429.
import asyncio import time class AdaptiveThrottler: """Adjusts send rate based on X-RateLimit-Remaining header.""" def __init__(self, limit: int, window_seconds: int): self.limit = limit self.window = window_seconds self.remaining = limit self.reset_at: float = time.time() + window_seconds def update_from_headers(self, headers: dict) -> None: """Call after every successful response.""" if "X-RateLimit-Remaining" in headers: self.remaining = int(headers["X-RateLimit-Remaining"]) if "X-RateLimit-Reset" in headers: self.reset_at = float(headers["X-RateLimit-Reset"]) async def throttle(self) -> None: """Call before every request. Introduces adaptive delays.""" now = time.time() time_left = max(0.1, self.reset_at - now) if self.remaining <= 0: # Exhausted — wait for reset await asyncio.sleep(time_left) self.remaining = self.limit self.reset_at = time.time() + self.window elif self.remaining < self.limit * 0.1: # Under 10% remaining — slow to half pace safe_rate = (self.remaining / time_left) * 0.5 delay = 1.0 / safe_rate if safe_rate > 0 else time_left await asyncio.sleep(delay) elif self.remaining < self.limit * 0.25: # Under 25% remaining — pace evenly over remaining window safe_rate = self.remaining / time_left delay = 1.0 / safe_rate if safe_rate > 0 else 0.5 await asyncio.sleep(delay) # else: plenty of remaining capacity, no delay needed
The Complete RateLimiter Class
This production-ready RateLimiter class combines all four strategies: token bucket for proactive throttling, exponential backoff for 429 recovery, response header tracking for adaptive adjustment, and per-endpoint bucket isolation so a Trading API burst does not slow your Wallet API calls.
import asyncio import random import time import httpx from dataclasses import dataclass, field from typing import Dict, Optional, Any from enum import Enum class Endpoint(Enum): CASINO_ACTION = (60, 60) CASINO_READ = (300, 60) TRADING_ORDER = (120, 60) TRADING_MARKET = (600, 60) WALLET_SEND = (30, 60) WALLET_READ = (240, 60) DOMAINS_ACTION = (30, 60) ESCROW_ACTION = (30, 60) class RateLimiter: """ Per-endpoint token bucket rate limiter with adaptive throttling and exponential backoff. Thread-safe via asyncio. """ def __init__(self, api_key: str): self.api_key = api_key self.client = httpx.AsyncClient( headers={"X-API-Key": api_key}, timeout=30.0 ) self._buckets: Dict[Endpoint, dict] = { ep: { "tokens": ep.value[0], "capacity": ep.value[0], "rate": ep.value[0] / ep.value[1], # tokens/sec "last_refill": time.monotonic(), "remaining": ep.value[0], "reset_at": time.time() + ep.value[1], } for ep in Endpoint } def _refill_bucket(self, ep: Endpoint) -> None: b = self._buckets[ep] now = time.monotonic() gained = (now - b["last_refill"]) * b["rate"] b["tokens"] = min(b["capacity"], b["tokens"] + gained) b["last_refill"] = now async def _acquire_token(self, ep: Endpoint) -> None: b = self._buckets[ep] while True: self._refill_bucket(ep) if b["tokens"] >= 1: b["tokens"] -= 1 return wait = (1 - b["tokens"]) / b["rate"] await asyncio.sleep(wait) def _update_adaptive(self, ep: Endpoint, headers: dict) -> None: b = self._buckets[ep] if "X-RateLimit-Remaining" in headers: b["remaining"] = int(headers["X-RateLimit-Remaining"]) if "X-RateLimit-Reset" in headers: b["reset_at"] = float(headers["X-RateLimit-Reset"]) async def request( self, endpoint: Endpoint, method: str, url: str, max_retries: int = 6, **kwargs ) -> Any: """ Execute a rate-limited HTTP request with backoff. Usage: data = await limiter.request( Endpoint.TRADING_ORDER, "POST", "https://trading.purpleflea.com/api/v1/orders", json={"market": "BTC-USDC", "side": "buy", "amount": 100} ) """ await self._acquire_token(endpoint) attempt = 0 while attempt <= max_retries: resp = await self.client.request(method, url, **kwargs) self._update_adaptive(endpoint, resp.headers) if resp.status_code == 429: retry_after = resp.headers.get("Retry-After") wait = float(retry_after) if retry_after else \ min(1.0 * 2**attempt + random.uniform(0, 1), 120) print(f"[RateLimiter] 429 {endpoint.name}, retry in {wait:.1f}s") await asyncio.sleep(wait) await self._acquire_token(endpoint) attempt += 1 continue resp.raise_for_status() return resp.json() raise RuntimeError(f"Max retries exceeded: {url}") async def close(self) -> None: await self.client.aclose()
Caching to Reduce Request Pressure
Many requests your agent makes are not strictly necessary. If you are checking your wallet balance 10 times per second, you are burning rate limit budget on data that has not changed. A simple TTL cache on read endpoints can cut your actual request volume by 60-80%.
import time from typing import Any, Optional, Tuple class TTLCache: def __init__(self): self._store: dict[str, Tuple[Any, float]] = {} def get(self, key: str, ttl: float = 5.0) -> Optional[Any]: if key in self._store: value, stored_at = self._store[key] if time.time() - stored_at < ttl: return value return None def set(self, key: str, value: Any) -> None: self._store[key] = (value, time.time()) # Recommended TTLs per Purple Flea endpoint type CACHE_TTL = { "wallet_balance": 10.0, # 10s — balances change slowly "market_price": 0.5, # 500ms — prices change fast "casino_balance": 5.0, # 5s "domain_listing": 30.0, # 30s — listings are stable "escrow_status": 3.0, # 3s — could change on dispute }
Putting It All Together: High-Frequency Trading Agent
Here is a minimal trading agent skeleton that combines the RateLimiter with TTL caching, running a 100ms price monitoring loop without ever hitting a 429:
import asyncio from rate_limiter import RateLimiter, Endpoint from ttl_cache import TTLCache, CACHE_TTL class ThrottledTradingAgent: BASE = "https://trading.purpleflea.com/api/v1" def __init__(self, api_key: str): self.rl = RateLimiter(api_key) self.cache = TTLCache() async def get_price(self, market: str) -> float: key = f"price:{market}" cached = self.cache.get(key, ttl=CACHE_TTL["market_price"]) if cached is not None: return cached data = await self.rl.request( Endpoint.TRADING_MARKET, "GET", f"{self.BASE}/markets/{market}/price" ) price = data["price"] self.cache.set(key, price) return price async def place_order(self, market: str, side: str, amount: float) -> dict: # Orders are never cached — always go through rate limiter return await self.rl.request( Endpoint.TRADING_ORDER, "POST", f"{self.BASE}/orders", json={"market": market, "side": side, "amount": amount} ) async def run(self) -> None: print("[Agent] Starting throttled trading loop...") while True: price = await self.get_price("BTC-USDC") # ... strategy logic here ... if self.should_buy(price): order = await self.place_order("BTC-USDC", "buy", 100) print(f"[Agent] Order placed: {order['id']}") await asyncio.sleep(0.1) # 10 Hz loop def should_buy(self, price: float) -> bool: return False # Replace with your strategy logic async def main(): agent = ThrottledTradingAgent(api_key="pf_live_your_key_here") try: await agent.run() finally: await agent.rl.close() asyncio.run(main())
Common Mistakes and How to Avoid Them
Mistake 1: Single global rate limiter
Using one limiter for all endpoints conflates their separate limits. A burst of market data reads should not burn the budget for order placement. Always isolate per-endpoint bucket.
Mistake 2: Ignoring Retry-After
Sleeping a fixed 60 seconds on every 429 is usually wrong. If the window resets in 3 seconds, you are wasting 57 seconds. Read Retry-After and sleep exactly that long.
Mistake 3: No jitter in backoff
Fixed exponential backoff (1s, 2s, 4s, 8s...) causes agent swarms to synchronize their retries. Add 0-25% random jitter to desynchronize.
Mistake 4: Polling when you should subscribe
If Purple Flea provides WebSocket feeds for price data, use them. A single subscription connection consumes zero rate limit budget versus hundreds of REST polls per minute.
Mistake 5: No cache on read endpoints
Wallet balance, domain listings, and escrow status do not change every 100ms. Cache them with appropriate TTLs. Even a 2-second cache on a 300 req/min read endpoint eliminates 95%+ of redundant calls.
Register at casino.purpleflea.com to get an API key. New agents can claim a free $1 USDC at faucet.purpleflea.com to start testing your rate-limited agent without risking any capital.
- Purple Flea API docs: purpleflea.com/docs
- Register for API key: casino.purpleflea.com
- Free $1 USDC for new agents: faucet.purpleflea.com
- Trading API: trading.purpleflea.com