Engineering

API Design Patterns for AI Agents:
RESTful, WebSocket, and MCP

March 4, 2026
22 min read
Purple Flea Engineering

AI agents interact with external services differently than human users do. They run continuously, operate autonomously, handle errors programmatically, and may fire thousands of requests per minute. Choosing the right API protocol — REST, WebSocket, or MCP — is foundational to building reliable agent infrastructure. This guide walks through each with production-ready code.

Table of Contents
  1. 01REST Fundamentals for Agents
  2. 02WebSocket Streams and Real-Time Data
  3. 03MCP Protocol: Agents as First-Class Clients
  4. 04Connection Management and Pooling
  5. 05Production Code Patterns
  6. 06Choosing the Right Protocol
01

REST Fundamentals for Agents

REST is the baseline protocol for AI agents integrating with financial APIs. Its stateless request-response model maps naturally to discrete agent actions: place a bet, check a balance, initiate a trade. Every Purple Flea API surface (Casino, Trading, Wallet, Domains) is REST-first.

Where REST differs for agents vs humans is in the throughput requirements, authentication patterns, and error handling expectations. A human clicks a button once every few seconds. An agent might fire 50 requests per second across multiple services simultaneously.

Authentication: API Keys and Bearer Tokens

Purple Flea APIs use API key authentication via the X-API-Key header. Agents should store keys in environment variables and rotate them on a schedule. Never hardcode credentials in agent source code.

Python purpleflea_client.py
import os
import httpx
from typing import Optional, Dict, Any
from dataclasses import dataclass, field
import asyncio

@dataclass
class PurpleFleatConfig:
    api_key: str = field(default_factory=lambda: os.environ["PURPLEFLEA_API_KEY"])
    base_url: str = "https://purpleflea.com/api/v1"
    timeout: float = 30.0
    max_retries: int = 3

class PurpleFleatClient:
    """Async REST client for Purple Flea APIs with automatic retry."""

    def __init__(self, config: Optional[PurpleFleatConfig] = None):
        self.config = config or PurpleFleatConfig()
        self._client: Optional[httpx.AsyncClient] = None

    async def __aenter__(self) -> "PurpleFleatClient":
        self._client = httpx.AsyncClient(
            base_url=self.config.base_url,
            headers={
                "X-API-Key": self.config.api_key,
                "Content-Type": "application/json",
                "Accept": "application/json",
                "User-Agent": "PurpleFleatAgent/1.0",
            },
            timeout=httpx.Timeout(self.config.timeout),
        )
        return self

    async def __aexit__(self, *args) -> None:
        if self._client:
            await self._client.aclose()

    async def get(self, path: str, **kwargs) -> Dict[str, Any]:
        return await self._request("GET", path, **kwargs)

    async def post(self, path: str, json: Dict = None, **kwargs) -> Dict[str, Any]:
        return await self._request("POST", path, json=json, **kwargs)

    async def _request(self, method: str, path: str, **kwargs) -> Dict[str, Any]:
        for attempt in range(self.config.max_retries):
            try:
                resp = await self._client.request(method, path, **kwargs)
                resp.raise_for_status()
                return resp.json()
            except httpx.HTTPStatusError as e:
                if e.response.status_code in (429, 503) and attempt < self.config.max_retries - 1:
                    # Rate-limited or server overloaded — back off exponentially
                    wait = (2 ** attempt) * 0.5
                    await asyncio.sleep(wait)
                    continue
                raise
            except httpx.TransportError as e:
                if attempt < self.config.max_retries - 1:
                    await asyncio.sleep(2 ** attempt)
                    continue
                raise
        raise RuntimeError("Max retries exceeded")

# Usage
async def main():
    async with PurpleFleatClient() as client:
        balance = await client.get("/wallet/balance")
        print(f"Balance: {balance['usdc']} USDC")

        result = await client.post("/casino/bet", json={
            "game": "dice",
            "amount": "10.00",
            "prediction": "over",
            "target": 50,
        })
        print(f"Bet result: {result['outcome']} — {result['payout']} USDC")
Rate Limiting Strategy

Purple Flea APIs return X-RateLimit-Remaining and X-RateLimit-Reset headers. Agents should read these proactively rather than waiting for a 429 response. Inspect headers after every request and throttle before the limit is hit.

Pagination for Agents

When listing transactions, game history, or trade records, agents must handle cursor-based pagination correctly. Page-based pagination is unreliable in high-frequency environments where new records arrive while you're paginating. Purple Flea uses cursor pagination everywhere.

Python pagination.py
from typing import AsyncIterator

async def paginate_transactions(
    client: PurpleFleatClient,
    limit: int = 100,
) -> AsyncIterator[dict]:
    """Yield all transactions using cursor pagination."""
    cursor: Optional[str] = None

    while True:
        params = {"limit": limit}
        if cursor:
            params["cursor"] = cursor

        page = await client.get("/wallet/transactions", params=params)

        for tx in page["data"]:
            yield tx

        cursor = page.get("next_cursor")
        if not cursor:
            break  # No more pages

# Collect all transactions for reconciliation
async def reconcile_balance(client: PurpleFleatClient) -> float:
    total = 0.0
    async for tx in paginate_transactions(client):
        if tx["type"] == "credit":
            total += float(tx["amount"])
        else:
            total -= float(tx["amount"])
    return total
02

WebSocket Streams and Real-Time Data

REST is request-response. For market data, live game outcomes, and order book updates, agents need persistent streams. WebSocket connections eliminate the polling overhead that would otherwise dominate an agent's resource budget at high frequency.

Purple Flea's Trading API exposes WebSocket endpoints for real-time price feeds, position updates, and trade fills. Agents subscribing to these streams can react to market events within milliseconds rather than the 100–500ms typical of REST polling.

Low Latency
Sub-millisecond server-to-client delivery. No polling overhead. Events arrive as they happen.
🔁
Persistent Connection
One TCP connection for continuous data. Eliminates TLS handshake overhead on every request.
📈
Bidirectional
Send and receive on the same connection. Place orders and receive confirmations on one socket.

WebSocket Client with Reconnection Logic

WebSocket connections drop. Networks hiccup. Servers restart for deployments. A production agent must automatically reconnect with exponential backoff and replay any subscriptions lost during the disconnect.

Python ws_stream.py
import asyncio
import json
import logging
from typing import Callable, Set
import websockets
from websockets.exceptions import ConnectionClosed, WebSocketException

logger = logging.getLogger(__name__)

class TradingStream:
    """Robust WebSocket stream for Purple Flea Trading API."""

    WS_URL = "wss://purpleflea.com/trading-api/ws"
    MAX_BACKOFF = 60  # seconds

    def __init__(self, api_key: str, on_message: Callable[[dict], None]):
        self.api_key = api_key
        self.on_message = on_message
        self._subscriptions: Set[str] = set()
        self._running = False
        self._ws = None

    def subscribe(self, channel: str):
        """Register a channel; will be auto-resubscribed after reconnection."""
        self._subscriptions.add(channel)

    async def _resubscribe(self, ws):
        # Replay all subscriptions on fresh connection
        for channel in self._subscriptions:
            await ws.send(json.dumps({
                "action": "subscribe",
                "channel": channel,
            }))
            logger.debug(f"Resubscribed to {channel}")

    async def run(self):
        """Main loop with exponential backoff reconnection."""
        self._running = True
        backoff = 1

        while self._running:
            try:
                headers = {"X-API-Key": self.api_key}
                async with websockets.connect(
                    self.WS_URL,
                    extra_headers=headers,
                    ping_interval=20,
                    ping_timeout=10,
                ) as ws:
                    self._ws = ws
                    backoff = 1  # Reset on successful connect
                    logger.info("WebSocket connected")
                    await self._resubscribe(ws)

                    async for raw in ws:
                        try:
                            msg = json.loads(raw)
                            await asyncio.to_thread(self.on_message, msg)
                        except Exception as e:
                            logger.error(f"Handler error: {e}", exc_info=True)

            except ConnectionClosed as e:
                logger.warning(f"Connection closed ({e.code}): {e.reason}")
            except WebSocketException as e:
                logger.error(f"WebSocket error: {e}")
            except Exception as e:
                logger.error(f"Unexpected error: {e}", exc_info=True)

            if self._running:
                logger.info(f"Reconnecting in {backoff}s...")
                await asyncio.sleep(backoff)
                backoff = min(backoff * 2, self.MAX_BACKOFF)

    async def send(self, msg: dict):
        if self._ws and not self._ws.closed:
            await self._ws.send(json.dumps(msg))
        else:
            raise RuntimeError("WebSocket not connected")

    def stop(self):
        self._running = False

# Usage
def handle_price_update(msg: dict):
    if msg["type"] == "price":
        print(f"BTC/USDC: {msg['bid']} / {msg['ask']}")
    elif msg["type"] == "fill":
        print(f"Order filled: {msg['order_id']} @ {msg['price']}")

async def run_agent():
    stream = TradingStream(
        api_key=os.environ["PURPLEFLEA_API_KEY"],
        on_message=handle_price_update,
    )
    stream.subscribe("prices.BTC-USDC")
    stream.subscribe("orders.fills")
    await stream.run()
Message Ordering Guarantee

WebSocket messages are ordered per connection but not globally. If your agent opens multiple connections for different channels, messages from different channels may arrive out of order relative to server-side time. Use the server_ts field in every message to reconcile ordering across channels.

03

MCP Protocol: Agents as First-Class Clients

The Model Context Protocol (MCP) is the emerging standard for LLM-agent-to-service communication. Unlike REST (designed for humans via browsers) or WebSocket (designed for streaming), MCP is designed specifically for AI agents. It defines a typed tool system where agents discover, invoke, and receive results from services in a structured way that LLMs can reason about natively.

Both the Purple Flea Faucet (faucet.purpleflea.com/mcp) and Escrow (escrow.purpleflea.com/mcp) expose MCP endpoints using StreamableHTTP transport. This makes them directly compatible with any MCP-aware agent framework.

Why MCP matters: An LLM agent using REST needs hardcoded knowledge of every API endpoint. An LLM agent using MCP can discover available tools at runtime via the tools/list method and call them with structured arguments. The service describes itself to the agent dynamically.

Connecting to Purple Flea MCP Endpoints

Python mcp_client.py
import httpx
import json
from typing import Any, Dict, List

class MCPClient:
    """StreamableHTTP MCP client for Purple Flea endpoints."""

    def __init__(self, endpoint: str, api_key: str):
        self.endpoint = endpoint.rstrip("/")
        self.headers = {
            "X-API-Key": api_key,
            "Content-Type": "application/json",
            "Accept": "application/json, text/event-stream",
        }
        self._session_id: str = None
        self._request_id = 0

    def _next_id(self) -> int:
        self._request_id += 1
        return self._request_id

    async def initialize(self) -> Dict[str, Any]:
        """Perform MCP initialization handshake."""
        payload = {
            "jsonrpc": "2.0",
            "id": self._next_id(),
            "method": "initialize",
            "params": {
                "protocolVersion": "2024-11-05",
                "capabilities": {"roots": {"listChanged": True}},
                "clientInfo": {"name": "PurpleFleatAgent", "version": "1.0.0"},
            },
        }
        async with httpx.AsyncClient() as c:
            resp = await c.post(self.endpoint, json=payload, headers=self.headers)
            resp.raise_for_status()
            result = resp.json()

        # Capture session ID from response headers if present
        self._session_id = resp.headers.get("Mcp-Session-Id")
        return result["result"]

    async def list_tools(self) -> List[Dict[str, Any]]:
        """Discover all tools exposed by this MCP server."""
        payload = {
            "jsonrpc": "2.0",
            "id": self._next_id(),
            "method": "tools/list",
        }
        async with httpx.AsyncClient() as c:
            headers = {**self.headers}
            if self._session_id:
                headers["Mcp-Session-Id"] = self._session_id
            resp = await c.post(self.endpoint, json=payload, headers=headers)
            resp.raise_for_status()
        return resp.json()["result"]["tools"]

    async def call_tool(self, name: str, arguments: Dict[str, Any]) -> Any:
        """Invoke a tool by name with typed arguments."""
        payload = {
            "jsonrpc": "2.0",
            "id": self._next_id(),
            "method": "tools/call",
            "params": {"name": name, "arguments": arguments},
        }
        async with httpx.AsyncClient(timeout=60) as c:
            headers = {**self.headers}
            if self._session_id:
                headers["Mcp-Session-Id"] = self._session_id
            resp = await c.post(self.endpoint, json=payload, headers=headers)
            resp.raise_for_status()
        data = resp.json()
        if "error" in data:
            raise RuntimeError(f"MCP error {data['error']['code']}: {data['error']['message']}")
        return data["result"]

# Connect to faucet and claim free USDC
async def claim_faucet(agent_wallet: str) -> dict:
    client = MCPClient(
        endpoint="https://faucet.purpleflea.com/mcp",
        api_key=os.environ["PURPLEFLEA_API_KEY"],
    )
    await client.initialize()
    tools = await client.list_tools()
    print(f"Available faucet tools: {[t['name'] for t in tools]}")
    result = await client.call_tool("claim_faucet", {
        "wallet_address": agent_wallet,
    })
    return result
MCP on Smithery

Purple Flea's faucet and escrow are registered on Smithery under purpleflea/faucet and purpleflea/escrow. Agents using any Smithery-compatible framework get these tools automatically via the registry — no manual endpoint configuration required.

04

Connection Management and Pooling

Autonomous agents run for hours, days, or indefinitely. Connection management strategies that work fine in a web app — opening and closing connections per request — degrade badly at agent scale. You need connection pooling, keep-alive tuning, and circuit breakers.

HTTP Connection Pooling

HTTPX's AsyncClient is a connection pool by default. The key is to instantiate it once at agent startup and reuse it across all requests, rather than creating a new client per request. A new client per request means a new TLS handshake on every call.

Approach Overhead per Request Suitable For Verdict
New client per request ~150ms (TLS handshake) One-off scripts Avoid
Shared AsyncClient ~1ms (reused connection) Long-running agents Recommended
Multiple workers + shared pool ~2ms (pool contention) High-throughput agents Recommended
WebSocket for all calls <1ms Real-time trading only Contextual

Circuit Breaker Pattern

A circuit breaker prevents an agent from hammering a degraded downstream service. After a threshold of failures, the circuit "opens" and requests fail immediately with a known error instead of timing out. This protects both the agent and the API.

Python circuit_breaker.py
import asyncio
import time
from enum import Enum
from typing import Callable, TypeVar

T = TypeVar("T")

class State(Enum):
    CLOSED = "closed"     # Normal: requests flow through
    OPEN = "open"         # Failing: requests fail fast
    HALF_OPEN = "half_open"  # Testing: one probe request allowed

class CircuitBreaker:
    def __init__(
        self,
        name: str,
        failure_threshold: int = 5,
        recovery_timeout: float = 60.0,
        success_threshold: int = 2,
    ):
        self.name = name
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.success_threshold = success_threshold
        self._state = State.CLOSED
        self._failure_count = 0
        self._success_count = 0
        self._last_failure_time = 0.0

    @property
    def state(self) -> State:
        if self._state == State.OPEN:
            if time.monotonic() - self._last_failure_time > self.recovery_timeout:
                self._state = State.HALF_OPEN
                self._success_count = 0
        return self._state

    async def call(self, fn: Callable, *args, **kwargs):
        if self.state == State.OPEN:
            raise RuntimeError(f"Circuit '{self.name}' is OPEN — failing fast")
        try:
            result = await fn(*args, **kwargs)
            self._on_success()
            return result
        except Exception as e:
            self._on_failure()
            raise

    def _on_success(self):
        self._failure_count = 0
        if self._state == State.HALF_OPEN:
            self._success_count += 1
            if self._success_count >= self.success_threshold:
                self._state = State.CLOSED

    def _on_failure(self):
        self._failure_count += 1
        self._last_failure_time = time.monotonic()
        if self._failure_count >= self.failure_threshold:
            self._state = State.OPEN

# Wrap Purple Flea API calls with circuit breakers
casino_cb = CircuitBreaker("casino-api", failure_threshold=5)
trading_cb = CircuitBreaker("trading-api", failure_threshold=3, recovery_timeout=30)

async def safe_place_bet(client, bet_params: dict) -> dict:
    return await casino_cb.call(client.post, "/casino/bet", json=bet_params)
05

Production Code Patterns

The patterns above combine into a production agent architecture. Here is a full example integrating REST, WebSocket streams, and MCP together — demonstrating how each protocol serves a different purpose in the same agent.

JavaScript agent.mjs
import { createRequire } from 'module';
const require = createRequire(import.meta.url);

const BASE = 'https://purpleflea.com/api/v1';
const WS_BASE = 'wss://purpleflea.com/trading-api/ws';

// REST helper — shares a single fetch session via keep-alive
async function apiFetch(path, opts = {}) {
  const res = await fetch(`${BASE}${path}`, {
    ...opts,
    headers: {
      'X-API-Key': process.env.PURPLEFLEA_API_KEY,
      'Content-Type': 'application/json',
      ...(opts.headers || {}),
    },
  });
  if (!res.ok) {
    const err = await res.json().catch(() => ({}));
    throw new Error(`API ${res.status}: ${err.message || res.statusText}`);
  }
  return res.json();
}

// WebSocket market data subscription
function openPriceStream(onPrice) {
  let ws, backoff = 1000;

  function connect() {
    ws = new WebSocket(WS_BASE);

    ws.addEventListener('open', () => {
      backoff = 1000;
      ws.send(JSON.stringify({
        action: 'subscribe',
        channel: 'prices.BTC-USDC',
        apiKey: process.env.PURPLEFLEA_API_KEY,
      }));
    });

    ws.addEventListener('message', ({ data }) => {
      try { onPrice(JSON.parse(data)); } catch {}
    });

    ws.addEventListener('close', () => {
      setTimeout(connect, backoff);
      backoff = Math.min(backoff * 2, 60000);
    });
  }

  connect();
  return () => ws?.close();
}

// Agent main loop — combines REST + WebSocket
async function runAgent() {
  let lastPrice = null;

  const closeStream = openPriceStream(msg => {
    if (msg.type === 'price') lastPrice = msg;
  });

  // Poll wallet balance every 5 minutes
  const balanceInterval = setInterval(async () => {
    const bal = await apiFetch('/wallet/balance');
    console.log(`[Balance] USDC: ${bal.usdc}`);

    // If balance low, claim from faucet via MCP
    if (Number(bal.usdc) < 10) {
      await claimFaucet(bal.wallet_address);
    }
  }, 5 * 60000);

  // React to price movements — place trades via REST
  const tradeInterval = setInterval(async () => {
    if (!lastPrice) return;
    const spread = lastPrice.ask - lastPrice.bid;
    if (spread < 5) {  // Tight spread — favorable entry
      await apiFetch('/trading/order', {
        method: 'POST',
        body: JSON.stringify({
          pair: 'BTC-USDC',
          side: 'buy',
          type: 'limit',
          price: lastPrice.bid.toFixed(2),
          amount: '0.001',
        }),
      });
    }
  }, 30000);

  // Clean shutdown
  process.on('SIGTERM', () => {
    clearInterval(balanceInterval);
    clearInterval(tradeInterval);
    closeStream();
    process.exit(0);
  });
}

runAgent().catch(console.error);
06

Choosing the Right Protocol

The decision is not about which protocol is "best" — it is about matching the protocol to the communication pattern. Most production agents use all three simultaneously, each serving a distinct role.

Use Case Protocol Why
Place a bet, make a trade, check a balance REST Discrete actions, easy retry, standard tooling
Live price feeds, order book updates WebSocket Low latency, push-based, persistent connection
LLM-native tool invocation, service discovery MCP Self-describing, typed, agent-native
Long-running job status (e.g., Monero sync) REST Polling Simple, no persistent state required
Escrow state transitions, payment confirmation REST + Webhooks Reliable delivery, server-initiated notification
💡
Start with REST, Add WebSocket When Needed

New agents should start with pure REST. It is simpler to reason about, easier to debug, and perfectly adequate for most workloads. Add WebSocket streams only when REST polling latency becomes a measurable bottleneck. Add MCP when integrating with LLM orchestration layers.

Start Building on Purple Flea

Get free USDC from the faucet, build your agent, and plug into 6 live financial infrastructure services.