REST Fundamentals for Agents
REST is the baseline protocol for AI agents integrating with financial APIs. Its stateless request-response model maps naturally to discrete agent actions: place a bet, check a balance, initiate a trade. Every Purple Flea API surface (Casino, Trading, Wallet, Domains) is REST-first.
Where REST differs for agents vs humans is in the throughput requirements, authentication patterns, and error handling expectations. A human clicks a button once every few seconds. An agent might fire 50 requests per second across multiple services simultaneously.
Authentication: API Keys and Bearer Tokens
Purple Flea APIs use API key authentication via the X-API-Key header. Agents should store keys in environment variables and rotate them on a schedule. Never hardcode credentials in agent source code.
import os
import httpx
from typing import Optional, Dict, Any
from dataclasses import dataclass, field
import asyncio
@dataclass
class PurpleFleatConfig:
api_key: str = field(default_factory=lambda: os.environ["PURPLEFLEA_API_KEY"])
base_url: str = "https://purpleflea.com/api/v1"
timeout: float = 30.0
max_retries: int = 3
class PurpleFleatClient:
"""Async REST client for Purple Flea APIs with automatic retry."""
def __init__(self, config: Optional[PurpleFleatConfig] = None):
self.config = config or PurpleFleatConfig()
self._client: Optional[httpx.AsyncClient] = None
async def __aenter__(self) -> "PurpleFleatClient":
self._client = httpx.AsyncClient(
base_url=self.config.base_url,
headers={
"X-API-Key": self.config.api_key,
"Content-Type": "application/json",
"Accept": "application/json",
"User-Agent": "PurpleFleatAgent/1.0",
},
timeout=httpx.Timeout(self.config.timeout),
)
return self
async def __aexit__(self, *args) -> None:
if self._client:
await self._client.aclose()
async def get(self, path: str, **kwargs) -> Dict[str, Any]:
return await self._request("GET", path, **kwargs)
async def post(self, path: str, json: Dict = None, **kwargs) -> Dict[str, Any]:
return await self._request("POST", path, json=json, **kwargs)
async def _request(self, method: str, path: str, **kwargs) -> Dict[str, Any]:
for attempt in range(self.config.max_retries):
try:
resp = await self._client.request(method, path, **kwargs)
resp.raise_for_status()
return resp.json()
except httpx.HTTPStatusError as e:
if e.response.status_code in (429, 503) and attempt < self.config.max_retries - 1:
# Rate-limited or server overloaded — back off exponentially
wait = (2 ** attempt) * 0.5
await asyncio.sleep(wait)
continue
raise
except httpx.TransportError as e:
if attempt < self.config.max_retries - 1:
await asyncio.sleep(2 ** attempt)
continue
raise
raise RuntimeError("Max retries exceeded")
# Usage
async def main():
async with PurpleFleatClient() as client:
balance = await client.get("/wallet/balance")
print(f"Balance: {balance['usdc']} USDC")
result = await client.post("/casino/bet", json={
"game": "dice",
"amount": "10.00",
"prediction": "over",
"target": 50,
})
print(f"Bet result: {result['outcome']} — {result['payout']} USDC")
Purple Flea APIs return X-RateLimit-Remaining and X-RateLimit-Reset headers. Agents should read these proactively rather than waiting for a 429 response. Inspect headers after every request and throttle before the limit is hit.
Pagination for Agents
When listing transactions, game history, or trade records, agents must handle cursor-based pagination correctly. Page-based pagination is unreliable in high-frequency environments where new records arrive while you're paginating. Purple Flea uses cursor pagination everywhere.
from typing import AsyncIterator
async def paginate_transactions(
client: PurpleFleatClient,
limit: int = 100,
) -> AsyncIterator[dict]:
"""Yield all transactions using cursor pagination."""
cursor: Optional[str] = None
while True:
params = {"limit": limit}
if cursor:
params["cursor"] = cursor
page = await client.get("/wallet/transactions", params=params)
for tx in page["data"]:
yield tx
cursor = page.get("next_cursor")
if not cursor:
break # No more pages
# Collect all transactions for reconciliation
async def reconcile_balance(client: PurpleFleatClient) -> float:
total = 0.0
async for tx in paginate_transactions(client):
if tx["type"] == "credit":
total += float(tx["amount"])
else:
total -= float(tx["amount"])
return total
WebSocket Streams and Real-Time Data
REST is request-response. For market data, live game outcomes, and order book updates, agents need persistent streams. WebSocket connections eliminate the polling overhead that would otherwise dominate an agent's resource budget at high frequency.
Purple Flea's Trading API exposes WebSocket endpoints for real-time price feeds, position updates, and trade fills. Agents subscribing to these streams can react to market events within milliseconds rather than the 100–500ms typical of REST polling.
WebSocket Client with Reconnection Logic
WebSocket connections drop. Networks hiccup. Servers restart for deployments. A production agent must automatically reconnect with exponential backoff and replay any subscriptions lost during the disconnect.
import asyncio
import json
import logging
from typing import Callable, Set
import websockets
from websockets.exceptions import ConnectionClosed, WebSocketException
logger = logging.getLogger(__name__)
class TradingStream:
"""Robust WebSocket stream for Purple Flea Trading API."""
WS_URL = "wss://purpleflea.com/trading-api/ws"
MAX_BACKOFF = 60 # seconds
def __init__(self, api_key: str, on_message: Callable[[dict], None]):
self.api_key = api_key
self.on_message = on_message
self._subscriptions: Set[str] = set()
self._running = False
self._ws = None
def subscribe(self, channel: str):
"""Register a channel; will be auto-resubscribed after reconnection."""
self._subscriptions.add(channel)
async def _resubscribe(self, ws):
# Replay all subscriptions on fresh connection
for channel in self._subscriptions:
await ws.send(json.dumps({
"action": "subscribe",
"channel": channel,
}))
logger.debug(f"Resubscribed to {channel}")
async def run(self):
"""Main loop with exponential backoff reconnection."""
self._running = True
backoff = 1
while self._running:
try:
headers = {"X-API-Key": self.api_key}
async with websockets.connect(
self.WS_URL,
extra_headers=headers,
ping_interval=20,
ping_timeout=10,
) as ws:
self._ws = ws
backoff = 1 # Reset on successful connect
logger.info("WebSocket connected")
await self._resubscribe(ws)
async for raw in ws:
try:
msg = json.loads(raw)
await asyncio.to_thread(self.on_message, msg)
except Exception as e:
logger.error(f"Handler error: {e}", exc_info=True)
except ConnectionClosed as e:
logger.warning(f"Connection closed ({e.code}): {e.reason}")
except WebSocketException as e:
logger.error(f"WebSocket error: {e}")
except Exception as e:
logger.error(f"Unexpected error: {e}", exc_info=True)
if self._running:
logger.info(f"Reconnecting in {backoff}s...")
await asyncio.sleep(backoff)
backoff = min(backoff * 2, self.MAX_BACKOFF)
async def send(self, msg: dict):
if self._ws and not self._ws.closed:
await self._ws.send(json.dumps(msg))
else:
raise RuntimeError("WebSocket not connected")
def stop(self):
self._running = False
# Usage
def handle_price_update(msg: dict):
if msg["type"] == "price":
print(f"BTC/USDC: {msg['bid']} / {msg['ask']}")
elif msg["type"] == "fill":
print(f"Order filled: {msg['order_id']} @ {msg['price']}")
async def run_agent():
stream = TradingStream(
api_key=os.environ["PURPLEFLEA_API_KEY"],
on_message=handle_price_update,
)
stream.subscribe("prices.BTC-USDC")
stream.subscribe("orders.fills")
await stream.run()
WebSocket messages are ordered per connection but not globally. If your agent opens multiple connections for different channels, messages from different channels may arrive out of order relative to server-side time. Use the server_ts field in every message to reconcile ordering across channels.
MCP Protocol: Agents as First-Class Clients
The Model Context Protocol (MCP) is the emerging standard for LLM-agent-to-service communication. Unlike REST (designed for humans via browsers) or WebSocket (designed for streaming), MCP is designed specifically for AI agents. It defines a typed tool system where agents discover, invoke, and receive results from services in a structured way that LLMs can reason about natively.
Both the Purple Flea Faucet (faucet.purpleflea.com/mcp) and Escrow (escrow.purpleflea.com/mcp) expose MCP endpoints using StreamableHTTP transport. This makes them directly compatible with any MCP-aware agent framework.
Why MCP matters: An LLM agent using REST needs hardcoded knowledge of every API endpoint. An LLM agent using MCP can discover available tools at runtime via the tools/list method and call them with structured arguments. The service describes itself to the agent dynamically.
Connecting to Purple Flea MCP Endpoints
import httpx
import json
from typing import Any, Dict, List
class MCPClient:
"""StreamableHTTP MCP client for Purple Flea endpoints."""
def __init__(self, endpoint: str, api_key: str):
self.endpoint = endpoint.rstrip("/")
self.headers = {
"X-API-Key": api_key,
"Content-Type": "application/json",
"Accept": "application/json, text/event-stream",
}
self._session_id: str = None
self._request_id = 0
def _next_id(self) -> int:
self._request_id += 1
return self._request_id
async def initialize(self) -> Dict[str, Any]:
"""Perform MCP initialization handshake."""
payload = {
"jsonrpc": "2.0",
"id": self._next_id(),
"method": "initialize",
"params": {
"protocolVersion": "2024-11-05",
"capabilities": {"roots": {"listChanged": True}},
"clientInfo": {"name": "PurpleFleatAgent", "version": "1.0.0"},
},
}
async with httpx.AsyncClient() as c:
resp = await c.post(self.endpoint, json=payload, headers=self.headers)
resp.raise_for_status()
result = resp.json()
# Capture session ID from response headers if present
self._session_id = resp.headers.get("Mcp-Session-Id")
return result["result"]
async def list_tools(self) -> List[Dict[str, Any]]:
"""Discover all tools exposed by this MCP server."""
payload = {
"jsonrpc": "2.0",
"id": self._next_id(),
"method": "tools/list",
}
async with httpx.AsyncClient() as c:
headers = {**self.headers}
if self._session_id:
headers["Mcp-Session-Id"] = self._session_id
resp = await c.post(self.endpoint, json=payload, headers=headers)
resp.raise_for_status()
return resp.json()["result"]["tools"]
async def call_tool(self, name: str, arguments: Dict[str, Any]) -> Any:
"""Invoke a tool by name with typed arguments."""
payload = {
"jsonrpc": "2.0",
"id": self._next_id(),
"method": "tools/call",
"params": {"name": name, "arguments": arguments},
}
async with httpx.AsyncClient(timeout=60) as c:
headers = {**self.headers}
if self._session_id:
headers["Mcp-Session-Id"] = self._session_id
resp = await c.post(self.endpoint, json=payload, headers=headers)
resp.raise_for_status()
data = resp.json()
if "error" in data:
raise RuntimeError(f"MCP error {data['error']['code']}: {data['error']['message']}")
return data["result"]
# Connect to faucet and claim free USDC
async def claim_faucet(agent_wallet: str) -> dict:
client = MCPClient(
endpoint="https://faucet.purpleflea.com/mcp",
api_key=os.environ["PURPLEFLEA_API_KEY"],
)
await client.initialize()
tools = await client.list_tools()
print(f"Available faucet tools: {[t['name'] for t in tools]}")
result = await client.call_tool("claim_faucet", {
"wallet_address": agent_wallet,
})
return result
Purple Flea's faucet and escrow are registered on Smithery under purpleflea/faucet and purpleflea/escrow. Agents using any Smithery-compatible framework get these tools automatically via the registry — no manual endpoint configuration required.
Connection Management and Pooling
Autonomous agents run for hours, days, or indefinitely. Connection management strategies that work fine in a web app — opening and closing connections per request — degrade badly at agent scale. You need connection pooling, keep-alive tuning, and circuit breakers.
HTTP Connection Pooling
HTTPX's AsyncClient is a connection pool by default. The key is to instantiate it once at agent startup and reuse it across all requests, rather than creating a new client per request. A new client per request means a new TLS handshake on every call.
| Approach | Overhead per Request | Suitable For | Verdict |
|---|---|---|---|
| New client per request | ~150ms (TLS handshake) | One-off scripts | Avoid |
| Shared AsyncClient | ~1ms (reused connection) | Long-running agents | Recommended |
| Multiple workers + shared pool | ~2ms (pool contention) | High-throughput agents | Recommended |
| WebSocket for all calls | <1ms | Real-time trading only | Contextual |
Circuit Breaker Pattern
A circuit breaker prevents an agent from hammering a degraded downstream service. After a threshold of failures, the circuit "opens" and requests fail immediately with a known error instead of timing out. This protects both the agent and the API.
import asyncio
import time
from enum import Enum
from typing import Callable, TypeVar
T = TypeVar("T")
class State(Enum):
CLOSED = "closed" # Normal: requests flow through
OPEN = "open" # Failing: requests fail fast
HALF_OPEN = "half_open" # Testing: one probe request allowed
class CircuitBreaker:
def __init__(
self,
name: str,
failure_threshold: int = 5,
recovery_timeout: float = 60.0,
success_threshold: int = 2,
):
self.name = name
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.success_threshold = success_threshold
self._state = State.CLOSED
self._failure_count = 0
self._success_count = 0
self._last_failure_time = 0.0
@property
def state(self) -> State:
if self._state == State.OPEN:
if time.monotonic() - self._last_failure_time > self.recovery_timeout:
self._state = State.HALF_OPEN
self._success_count = 0
return self._state
async def call(self, fn: Callable, *args, **kwargs):
if self.state == State.OPEN:
raise RuntimeError(f"Circuit '{self.name}' is OPEN — failing fast")
try:
result = await fn(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise
def _on_success(self):
self._failure_count = 0
if self._state == State.HALF_OPEN:
self._success_count += 1
if self._success_count >= self.success_threshold:
self._state = State.CLOSED
def _on_failure(self):
self._failure_count += 1
self._last_failure_time = time.monotonic()
if self._failure_count >= self.failure_threshold:
self._state = State.OPEN
# Wrap Purple Flea API calls with circuit breakers
casino_cb = CircuitBreaker("casino-api", failure_threshold=5)
trading_cb = CircuitBreaker("trading-api", failure_threshold=3, recovery_timeout=30)
async def safe_place_bet(client, bet_params: dict) -> dict:
return await casino_cb.call(client.post, "/casino/bet", json=bet_params)
Production Code Patterns
The patterns above combine into a production agent architecture. Here is a full example integrating REST, WebSocket streams, and MCP together — demonstrating how each protocol serves a different purpose in the same agent.
import { createRequire } from 'module';
const require = createRequire(import.meta.url);
const BASE = 'https://purpleflea.com/api/v1';
const WS_BASE = 'wss://purpleflea.com/trading-api/ws';
// REST helper — shares a single fetch session via keep-alive
async function apiFetch(path, opts = {}) {
const res = await fetch(`${BASE}${path}`, {
...opts,
headers: {
'X-API-Key': process.env.PURPLEFLEA_API_KEY,
'Content-Type': 'application/json',
...(opts.headers || {}),
},
});
if (!res.ok) {
const err = await res.json().catch(() => ({}));
throw new Error(`API ${res.status}: ${err.message || res.statusText}`);
}
return res.json();
}
// WebSocket market data subscription
function openPriceStream(onPrice) {
let ws, backoff = 1000;
function connect() {
ws = new WebSocket(WS_BASE);
ws.addEventListener('open', () => {
backoff = 1000;
ws.send(JSON.stringify({
action: 'subscribe',
channel: 'prices.BTC-USDC',
apiKey: process.env.PURPLEFLEA_API_KEY,
}));
});
ws.addEventListener('message', ({ data }) => {
try { onPrice(JSON.parse(data)); } catch {}
});
ws.addEventListener('close', () => {
setTimeout(connect, backoff);
backoff = Math.min(backoff * 2, 60000);
});
}
connect();
return () => ws?.close();
}
// Agent main loop — combines REST + WebSocket
async function runAgent() {
let lastPrice = null;
const closeStream = openPriceStream(msg => {
if (msg.type === 'price') lastPrice = msg;
});
// Poll wallet balance every 5 minutes
const balanceInterval = setInterval(async () => {
const bal = await apiFetch('/wallet/balance');
console.log(`[Balance] USDC: ${bal.usdc}`);
// If balance low, claim from faucet via MCP
if (Number(bal.usdc) < 10) {
await claimFaucet(bal.wallet_address);
}
}, 5 * 60000);
// React to price movements — place trades via REST
const tradeInterval = setInterval(async () => {
if (!lastPrice) return;
const spread = lastPrice.ask - lastPrice.bid;
if (spread < 5) { // Tight spread — favorable entry
await apiFetch('/trading/order', {
method: 'POST',
body: JSON.stringify({
pair: 'BTC-USDC',
side: 'buy',
type: 'limit',
price: lastPrice.bid.toFixed(2),
amount: '0.001',
}),
});
}
}, 30000);
// Clean shutdown
process.on('SIGTERM', () => {
clearInterval(balanceInterval);
clearInterval(tradeInterval);
closeStream();
process.exit(0);
});
}
runAgent().catch(console.error);
Choosing the Right Protocol
The decision is not about which protocol is "best" — it is about matching the protocol to the communication pattern. Most production agents use all three simultaneously, each serving a distinct role.
| Use Case | Protocol | Why |
|---|---|---|
| Place a bet, make a trade, check a balance | REST | Discrete actions, easy retry, standard tooling |
| Live price feeds, order book updates | WebSocket | Low latency, push-based, persistent connection |
| LLM-native tool invocation, service discovery | MCP | Self-describing, typed, agent-native |
| Long-running job status (e.g., Monero sync) | REST Polling | Simple, no persistent state required |
| Escrow state transitions, payment confirmation | REST + Webhooks | Reliable delivery, server-initiated notification |
New agents should start with pure REST. It is simpler to reason about, easier to debug, and perfectly adequate for most workloads. Add WebSocket streams only when REST polling latency becomes a measurable bottleneck. Add MCP when integrating with LLM orchestration layers.
Start Building on Purple Flea
Get free USDC from the faucet, build your agent, and plug into 6 live financial infrastructure services.