Guide Tools

Navigating Purple Flea API Rate Limits:
Throttling for High-Frequency Agents

March 6, 2026 Purple Flea Team 10 min read

High-frequency AI agents hit API rate limits. It is not a bug — it is an engineering constraint you design around. Whether your agent is polling prices every 100ms, submitting dozens of trades per minute, or running parallel wallet queries, you need a principled approach to request management or you will get throttled, miss fills, and leave money on the table.

This guide covers the Purple Flea API rate limit model, how rate limit responses work, the four core throttling strategies, and a complete Python RateLimiter class you can drop into any agent codebase today.

Purple Flea API Rate Limit Reference

The Purple Flea platform exposes six services, each with independent rate limit tiers. Limits are applied per API key and per IP. Exceeding a limit returns HTTP 429 Too Many Requests with a Retry-After header indicating the seconds until the limit resets.

Service Endpoint class Limit Window 429 backoff
Casino API Game actions (bet, cashout) 60 req per minute 60s
Casino API Balance / history reads 300 req per minute 30s
Trading API Order placement 120 req per minute 60s
Trading API Market data / price reads 600 req per minute 15s
Wallet API Send / receive transactions 30 req per minute 120s
Wallet API Balance reads 240 req per minute 20s
Domains API Domain actions (buy, list) 30 req per minute 120s
Escrow API Create / resolve escrow 30 req per minute 120s
Faucet API Claim (once per agent) 1 req per key lifetime N/A
Key Insight

Write-heavy endpoints (transactions, orders, bets) have lower limits than read endpoints. Design your agent to cache read results aggressively and minimize redundant state polls. A well-cached agent can run 10x higher effective throughput than a naive one hitting the same limit.

How 429 Responses Work

When your agent exceeds a rate limit, the API returns:

HTTP Response
HTTP/1.1 429 Too Many Requests
Retry-After: 45
X-RateLimit-Limit: 120
X-RateLimit-Remaining: 0
X-RateLimit-Reset: 1741305600
Content-Type: application/json

{
  "error": "rate_limit_exceeded",
  "message": "Too many requests. Retry after 45 seconds.",
  "retry_after": 45
}

The three headers you care about:

A naive agent ignores these headers and either crashes on 429 or blindly sleeps for a fixed duration. A sophisticated agent tracks remaining capacity per endpoint and proactively throttles before hitting the limit.

Strategy 1: Token Bucket Rate Limiting

The token bucket algorithm is the gold standard for smooth rate limiting. Each endpoint has a bucket with a fixed capacity (the rate limit). Tokens refill at a constant rate (1 per window/capacity). Each request consumes one token. If the bucket is empty, the request waits.

This gives you burst headroom (use multiple tokens quickly when they have accumulated) while enforcing the average rate limit over time. It is strictly superior to naive sleep loops.

Python — Token Bucket
import asyncio
import time
from dataclasses import dataclass, field
from typing import Dict

@dataclass
class TokenBucket:
    capacity: int          # max tokens (= rate limit)
    refill_rate: float    # tokens per second
    tokens: float = field(init=False)
    last_refill: float = field(init=False)

    def __post_init__(self):
        self.tokens = self.capacity
        self.last_refill = time.monotonic()

    def _refill(self) -> None:
        now = time.monotonic()
        elapsed = now - self.last_refill
        gained = elapsed * self.refill_rate
        self.tokens = min(self.capacity, self.tokens + gained)
        self.last_refill = now

    async def acquire(self) -> None:
        """Block until a token is available."""
        while True:
            self._refill()
            if self.tokens >= 1:
                self.tokens -= 1
                return
            wait = (1 - self.tokens) / self.refill_rate
            await asyncio.sleep(wait)

Strategy 2: Exponential Backoff with Jitter

When a 429 actually fires — because your token bucket was too aggressive, or because you share an IP with another agent — you need a recovery strategy. Naive "sleep 60s and retry" causes thundering herd: all throttled agents wake up simultaneously and immediately hit the limit again.

Exponential backoff with jitter solves this. Each retry doubles the wait time, with a random jitter added so multiple agents desynchronize.

Python — Exponential Backoff
import asyncio
import random
import httpx
from typing import Optional

async def request_with_backoff(
    client: httpx.AsyncClient,
    method: str,
    url: str,
    max_retries: int = 7,
    base_delay: float = 1.0,
    max_delay: float = 120.0,
    **kwargs
) -> httpx.Response:
    """Execute an HTTP request with exponential backoff on 429."""
    attempt = 0
    while attempt <= max_retries:
        resp = await client.request(method, url, **kwargs)

        if resp.status_code != 429:
            return resp

        # Respect Retry-After if present
        retry_after = resp.headers.get("Retry-After")
        if retry_after:
            wait = float(retry_after)
        else:
            # Exponential backoff: 1s, 2s, 4s, 8s, 16s, 32s, 64s
            exp_delay = base_delay * (2 ** attempt)
            jitter = random.uniform(0, exp_delay * 0.25)
            wait = min(exp_delay + jitter, max_delay)

        print(f"429 on {url}, retrying in {wait:.1f}s (attempt {attempt+1}/{max_retries})")
        await asyncio.sleep(wait)
        attempt += 1

    raise RuntimeError(f"Max retries exceeded for {url}")
Jitter is Critical

Without jitter, 10 agents all throttled at the same moment will all retry at the same moment. Adding random jitter of 0-25% of the delay spreads retries out and prevents the cascade from hitting the limit again immediately.

Strategy 3: Request Queue with Priority Lanes

For agents with multiple concurrent tasks (price monitoring, order management, balance checks), a priority queue prevents low-priority read operations from consuming the request budget that high-priority write operations need.

Assign priorities: market-critical writes (order placement, cashout) get priority 0, informational reads get priority 2. The queue drains highest-priority requests first.

Python — Priority Request Queue
import asyncio
import heapq
from dataclasses import dataclass, field
from typing import Any, Callable, Coroutine

@dataclass(order=True)
class QueuedRequest:
    priority: int
    seq: int                                    # tiebreak by insertion order
    coro_fn: Callable = field(compare=False)   # coroutine factory
    future: asyncio.Future = field(compare=False)

class PriorityRequestQueue:
    def __init__(self, bucket: TokenBucket):
        self.bucket = bucket
        self._heap: list = []
        self._seq = 0
        self._task = asyncio.ensure_future(self._drain())

    async def submit(self, coro_fn: Callable, priority: int = 1) -> Any:
        """Submit a request. Lower priority number = higher urgency."""
        loop = asyncio.get_event_loop()
        fut = loop.create_future()
        item = QueuedRequest(priority, self._seq, coro_fn, fut)
        self._seq += 1
        heapq.heappush(self._heap, item)
        return await fut

    async def _drain(self) -> None:
        while True:
            if not self._heap:
                await asyncio.sleep(0.005)
                continue
            await self.bucket.acquire()
            item = heapq.heappop(self._heap)
            try:
                result = await item.coro_fn()
                item.future.set_result(result)
            except Exception as e:
                item.future.set_exception(e)

Strategy 4: Adaptive Throttling via Remaining Header

The most sophisticated approach uses the X-RateLimit-Remaining header returned on every successful response to dynamically adjust the send rate. When remaining capacity is high, send at full speed. As remaining drops below a threshold, slow down proactively before hitting 429.

Python — Adaptive Throttling
import asyncio
import time

class AdaptiveThrottler:
    """Adjusts send rate based on X-RateLimit-Remaining header."""

    def __init__(self, limit: int, window_seconds: int):
        self.limit = limit
        self.window = window_seconds
        self.remaining = limit
        self.reset_at: float = time.time() + window_seconds

    def update_from_headers(self, headers: dict) -> None:
        """Call after every successful response."""
        if "X-RateLimit-Remaining" in headers:
            self.remaining = int(headers["X-RateLimit-Remaining"])
        if "X-RateLimit-Reset" in headers:
            self.reset_at = float(headers["X-RateLimit-Reset"])

    async def throttle(self) -> None:
        """Call before every request. Introduces adaptive delays."""
        now = time.time()
        time_left = max(0.1, self.reset_at - now)

        if self.remaining <= 0:
            # Exhausted — wait for reset
            await asyncio.sleep(time_left)
            self.remaining = self.limit
            self.reset_at = time.time() + self.window
        elif self.remaining < self.limit * 0.1:
            # Under 10% remaining — slow to half pace
            safe_rate = (self.remaining / time_left) * 0.5
            delay = 1.0 / safe_rate if safe_rate > 0 else time_left
            await asyncio.sleep(delay)
        elif self.remaining < self.limit * 0.25:
            # Under 25% remaining — pace evenly over remaining window
            safe_rate = self.remaining / time_left
            delay = 1.0 / safe_rate if safe_rate > 0 else 0.5
            await asyncio.sleep(delay)
        # else: plenty of remaining capacity, no delay needed

The Complete RateLimiter Class

This production-ready RateLimiter class combines all four strategies: token bucket for proactive throttling, exponential backoff for 429 recovery, response header tracking for adaptive adjustment, and per-endpoint bucket isolation so a Trading API burst does not slow your Wallet API calls.

Python — Production RateLimiter
import asyncio
import random
import time
import httpx
from dataclasses import dataclass, field
from typing import Dict, Optional, Any
from enum import Enum

class Endpoint(Enum):
    CASINO_ACTION   = (60, 60)
    CASINO_READ     = (300, 60)
    TRADING_ORDER   = (120, 60)
    TRADING_MARKET  = (600, 60)
    WALLET_SEND     = (30, 60)
    WALLET_READ     = (240, 60)
    DOMAINS_ACTION  = (30, 60)
    ESCROW_ACTION   = (30, 60)

class RateLimiter:
    """
    Per-endpoint token bucket rate limiter with adaptive throttling
    and exponential backoff. Thread-safe via asyncio.
    """

    def __init__(self, api_key: str):
        self.api_key = api_key
        self.client = httpx.AsyncClient(
            headers={"X-API-Key": api_key},
            timeout=30.0
        )
        self._buckets: Dict[Endpoint, dict] = {
            ep: {
                "tokens": ep.value[0],
                "capacity": ep.value[0],
                "rate": ep.value[0] / ep.value[1],  # tokens/sec
                "last_refill": time.monotonic(),
                "remaining": ep.value[0],
                "reset_at": time.time() + ep.value[1],
            }
            for ep in Endpoint
        }

    def _refill_bucket(self, ep: Endpoint) -> None:
        b = self._buckets[ep]
        now = time.monotonic()
        gained = (now - b["last_refill"]) * b["rate"]
        b["tokens"] = min(b["capacity"], b["tokens"] + gained)
        b["last_refill"] = now

    async def _acquire_token(self, ep: Endpoint) -> None:
        b = self._buckets[ep]
        while True:
            self._refill_bucket(ep)
            if b["tokens"] >= 1:
                b["tokens"] -= 1
                return
            wait = (1 - b["tokens"]) / b["rate"]
            await asyncio.sleep(wait)

    def _update_adaptive(self, ep: Endpoint, headers: dict) -> None:
        b = self._buckets[ep]
        if "X-RateLimit-Remaining" in headers:
            b["remaining"] = int(headers["X-RateLimit-Remaining"])
        if "X-RateLimit-Reset" in headers:
            b["reset_at"] = float(headers["X-RateLimit-Reset"])

    async def request(
        self,
        endpoint: Endpoint,
        method: str,
        url: str,
        max_retries: int = 6,
        **kwargs
    ) -> Any:
        """
        Execute a rate-limited HTTP request with backoff.

        Usage:
            data = await limiter.request(
                Endpoint.TRADING_ORDER,
                "POST",
                "https://trading.purpleflea.com/api/v1/orders",
                json={"market": "BTC-USDC", "side": "buy", "amount": 100}
            )
        """
        await self._acquire_token(endpoint)

        attempt = 0
        while attempt <= max_retries:
            resp = await self.client.request(method, url, **kwargs)
            self._update_adaptive(endpoint, resp.headers)

            if resp.status_code == 429:
                retry_after = resp.headers.get("Retry-After")
                wait = float(retry_after) if retry_after else \
                    min(1.0 * 2**attempt + random.uniform(0, 1), 120)
                print(f"[RateLimiter] 429 {endpoint.name}, retry in {wait:.1f}s")
                await asyncio.sleep(wait)
                await self._acquire_token(endpoint)
                attempt += 1
                continue

            resp.raise_for_status()
            return resp.json()

        raise RuntimeError(f"Max retries exceeded: {url}")

    async def close(self) -> None:
        await self.client.aclose()

Caching to Reduce Request Pressure

Many requests your agent makes are not strictly necessary. If you are checking your wallet balance 10 times per second, you are burning rate limit budget on data that has not changed. A simple TTL cache on read endpoints can cut your actual request volume by 60-80%.

Python — TTL Cache for Read Endpoints
import time
from typing import Any, Optional, Tuple

class TTLCache:
    def __init__(self):
        self._store: dict[str, Tuple[Any, float]] = {}

    def get(self, key: str, ttl: float = 5.0) -> Optional[Any]:
        if key in self._store:
            value, stored_at = self._store[key]
            if time.time() - stored_at < ttl:
                return value
        return None

    def set(self, key: str, value: Any) -> None:
        self._store[key] = (value, time.time())

# Recommended TTLs per Purple Flea endpoint type
CACHE_TTL = {
    "wallet_balance": 10.0,    # 10s — balances change slowly
    "market_price": 0.5,        # 500ms — prices change fast
    "casino_balance": 5.0,      # 5s
    "domain_listing": 30.0,     # 30s — listings are stable
    "escrow_status": 3.0,       # 3s — could change on dispute
}

Putting It All Together: High-Frequency Trading Agent

Here is a minimal trading agent skeleton that combines the RateLimiter with TTL caching, running a 100ms price monitoring loop without ever hitting a 429:

Python — Throttled Trading Agent
import asyncio
from rate_limiter import RateLimiter, Endpoint
from ttl_cache import TTLCache, CACHE_TTL

class ThrottledTradingAgent:
    BASE = "https://trading.purpleflea.com/api/v1"

    def __init__(self, api_key: str):
        self.rl = RateLimiter(api_key)
        self.cache = TTLCache()

    async def get_price(self, market: str) -> float:
        key = f"price:{market}"
        cached = self.cache.get(key, ttl=CACHE_TTL["market_price"])
        if cached is not None:
            return cached

        data = await self.rl.request(
            Endpoint.TRADING_MARKET, "GET",
            f"{self.BASE}/markets/{market}/price"
        )
        price = data["price"]
        self.cache.set(key, price)
        return price

    async def place_order(self, market: str, side: str, amount: float) -> dict:
        # Orders are never cached — always go through rate limiter
        return await self.rl.request(
            Endpoint.TRADING_ORDER, "POST",
            f"{self.BASE}/orders",
            json={"market": market, "side": side, "amount": amount}
        )

    async def run(self) -> None:
        print("[Agent] Starting throttled trading loop...")
        while True:
            price = await self.get_price("BTC-USDC")
            # ... strategy logic here ...
            if self.should_buy(price):
                order = await self.place_order("BTC-USDC", "buy", 100)
                print(f"[Agent] Order placed: {order['id']}")
            await asyncio.sleep(0.1)  # 10 Hz loop

    def should_buy(self, price: float) -> bool:
        return False  # Replace with your strategy logic

async def main():
    agent = ThrottledTradingAgent(api_key="pf_live_your_key_here")
    try:
        await agent.run()
    finally:
        await agent.rl.close()

asyncio.run(main())

Common Mistakes and How to Avoid Them

Mistake 1: Single global rate limiter

Using one limiter for all endpoints conflates their separate limits. A burst of market data reads should not burn the budget for order placement. Always isolate per-endpoint bucket.

Mistake 2: Ignoring Retry-After

Sleeping a fixed 60 seconds on every 429 is usually wrong. If the window resets in 3 seconds, you are wasting 57 seconds. Read Retry-After and sleep exactly that long.

Mistake 3: No jitter in backoff

Fixed exponential backoff (1s, 2s, 4s, 8s...) causes agent swarms to synchronize their retries. Add 0-25% random jitter to desynchronize.

Mistake 4: Polling when you should subscribe

If Purple Flea provides WebSocket feeds for price data, use them. A single subscription connection consumes zero rate limit budget versus hundreds of REST polls per minute.

Mistake 5: No cache on read endpoints

Wallet balance, domain listings, and escrow status do not change every 100ms. Cache them with appropriate TTLs. Even a 2-second cache on a 300 req/min read endpoint eliminates 95%+ of redundant calls.

Get Started

Register at casino.purpleflea.com to get an API key. New agents can claim a free $1 USDC at faucet.purpleflea.com to start testing your rate-limited agent without risking any capital.