Market Data Feeds for AI Trading Agents: WebSockets, REST, and On-Chain Sources

Every AI trading agent lives or dies by the quality of its data. An agent making decisions on stale prices, malformed ticks, or disconnected WebSocket streams will haemorrhage capital regardless of how sophisticated its strategy is. This guide covers the full data stack: source taxonomy, WebSocket lifecycle management, cross-source normalization, on-chain price reading, rate limiting, data quality guards, and wiring everything into Purple Flea for order execution.

1. Data Source Taxonomy

Before wiring up any feeds, you need to understand the landscape. Market data for crypto trading agents comes from five distinct source categories, each with different latency profiles, reliability characteristics, and cost structures.

Category Examples Latency Cost Best For
Centralized Exchange (CEX) Binance, Coinbase, Kraken, OKX 1–50ms WebSocket Free tier available; paid for high-rate Spot price, order book depth, recent trades
Decentralized Exchange (DEX) Uniswap v3, Curve, Raydium 100ms–2s (block time) RPC node cost only On-chain liquidity, arbitrage detection
On-Chain Oracle Chainlink, Pyth, Redstone Heartbeat: 1–60s Free read; gas for updates Tamper-resistant reference prices for DeFi
News & Sentiment CryptoPanic, Santiment, Messari Minutes Paid API tiers Event-driven, macro trend signals
Social & Alternative Nansen, Glassnode, Dune Hours to real-time Paid tiers On-chain flow analysis, whale tracking

Combining Sources Strategically

Production trading agents rarely rely on a single source. The common pattern is to use a CEX WebSocket for low-latency tick data, an on-chain oracle as a tamper-resistant reference price, and a news feed for event-driven filters that gate whether the agent should act at all. Redundancy across at least two CEX sources is standard practice for detecting exchange-specific anomalies.

Purple Flea Context

Purple Flea's casino games (crash, coin-flip, hi-lo) use provably fair on-chain randomness. Your agent can use CEX price feeds as a heuristic signal for game timing — e.g., entering crash at high volatility windows — while the outcome itself is independent and cryptographically fair.

2. WebSocket Feed Management

WebSocket connections to exchange feeds are not fire-and-forget. They disconnect, timeout, send malformed frames, and occasionally deliver duplicate messages. A production-grade agent needs a connection manager that handles the full lifecycle automatically.

Key WebSocket Lifecycle Events

Python websocket_manager.py — Reconnecting WebSocket with heartbeat
import asyncio
import json
import time
import logging
from typing import Callable, Awaitable
import websockets
from websockets.exceptions import ConnectionClosedError, ConnectionClosedOK

logger = logging.getLogger("ws_manager")

MessageHandler = Callable[[dict], Awaitable[None]]


class WebSocketManager:
    """
    Reconnecting WebSocket client with heartbeat, exponential backoff,
    and automatic resubscription on reconnect.
    """

    def __init__(
        self,
        url: str,
        subscriptions: list[dict],
        on_message: MessageHandler,
        ping_interval: float = 20.0,
        max_reconnect_delay: float = 60.0
    ):
        self.url = url
        self.subscriptions = subscriptions
        self.on_message = on_message
        self.ping_interval = ping_interval
        self.max_reconnect_delay = max_reconnect_delay
        self._running = False
        self._reconnect_count = 0

    async def start(self) -> None:
        self._running = True
        while self._running:
            try:
                await self._connect_and_run()
            except (ConnectionClosedError, OSError) as e:
                delay = min(2 ** self._reconnect_count, self.max_reconnect_delay)
                logger.warning(f"WebSocket closed ({e}). Reconnecting in {delay:.1f}s")
                self._reconnect_count += 1
                await asyncio.sleep(delay)
            except ConnectionClosedOK:
                break  # clean shutdown

    async def _connect_and_run(self) -> None:
        async with websockets.connect(
            self.url,
            ping_interval=None,   # we handle pings manually
            max_size=2**23
        ) as ws:
            logger.info(f"Connected to {self.url}")
            self._reconnect_count = 0

            # Resubscribe on every connect
            for sub in self.subscriptions:
                await ws.send(json.dumps(sub))

            # Run receive loop + heartbeat concurrently
            await asyncio.gather(
                self._receive_loop(ws),
                self._heartbeat_loop(ws)
            )

    async def _receive_loop(self, ws) -> None:
        async for raw in ws:
            try:
                msg = json.loads(raw)
                await self.on_message(msg)
            except json.JSONDecodeError:
                logger.warning(f"Bad JSON frame: {raw[:120]}")
            except Exception as e:
                logger.error(f"Handler error: {e}")

    async def _heartbeat_loop(self, ws) -> None:
        while True:
            await asyncio.sleep(self.ping_interval)
            await ws.send(json.dumps({"op": "ping"}))

    def stop(self) -> None:
        self._running = False
Exchange-Specific Ping Formats

Binance expects {"method": "ping"} or a raw WebSocket ping frame. Coinbase Advanced Trade uses {"type": "heartbeat"}. OKX uses {"op": "ping"}. Always check the exchange's WebSocket API documentation — using the wrong ping format causes silent disconnections.

3. Data Normalization Across Sources

Every exchange has its own message schema. Binance sends {"e": "trade", "p": "42150.00", "q": "0.014"}. Coinbase sends {"type": "match", "price": "42150.00", "size": "0.014"}. Before your agent logic can run, all of these must be mapped to a unified internal format. The standard is OHLCV (Open, High, Low, Close, Volume) for candle data, and a normalized tick for real-time trade events.

Unified Tick and OHLCV Formats

Python data_models.py — Unified data structures
from dataclasses import dataclass, field
from typing import Literal


@dataclass
class Tick:
    """Normalized real-time trade tick from any source."""
    source: str              # "binance" | "coinbase" | "uniswap_v3" | ...
    symbol: str              # always "BTC/USDC" format
    price: float
    volume: float
    side: Literal["buy", "sell", "unknown"]
    ts: int                  # Unix milliseconds
    raw: dict = field(default_factory=dict)  # original message


@dataclass
class OHLCV:
    """Normalized candle from any source."""
    source: str
    symbol: str
    interval: str            # "1m" | "5m" | "1h" | ...
    open: float
    high: float
    low: float
    close: float
    volume: float
    ts: int                  # candle open time, Unix ms
    is_closed: bool = False  # True when candle is final

4. The DataFeedManager Class

The DataFeedManager is the central hub of your agent's data infrastructure. It manages multiple WebSocket connections simultaneously, normalizes all incoming ticks to the unified format, and dispatches them to registered handlers. Each call to subscribe() adds a new data source; on_tick() registers a callback that fires on every normalized tick from any source.

Python data_feed_manager.py — Full DataFeedManager implementation
import asyncio
import time
import logging
from collections import deque
from typing import Callable, Awaitable, Optional
from data_models import Tick, OHLCV
from websocket_manager import WebSocketManager

logger = logging.getLogger("data_feed")
TickHandler = Callable[[Tick], Awaitable[None]]


class DataFeedManager:
    """
    Manages multiple market data WebSocket connections, normalizes
    all ticks to a unified format, and dispatches to registered handlers.
    """

    def __init__(self, cache_size: int = 500):
        self._handlers: list[TickHandler] = []
        self._managers: list[WebSocketManager] = []
        self._tick_cache: dict[str, deque] = {}
        self._cache_size = cache_size
        self._last_tick: dict[str, Tick] = {}  # latest tick per symbol

    def on_tick(self, handler: TickHandler) -> None:
        """Register a callback to receive every normalized tick."""
        self._handlers.append(handler)

    def subscribe(
        self,
        source: str,
        url: str,
        subscriptions: list[dict],
        normalizer: Callable[[str, dict], Optional[Tick]]
    ) -> None:
        """
        Add a new data source.
        normalizer: fn(source, raw_msg) -> Tick | None
        """
        async def handle_raw(msg: dict) -> None:
            tick = normalizer(source, msg)
            if tick is None:
                return  # skip non-tick messages (pong, subscribe ack, etc)
            await self._dispatch(tick)

        mgr = WebSocketManager(url, subscriptions, handle_raw)
        self._managers.append(mgr)

    async def _dispatch(self, tick: Tick) -> None:
        # Cache tick for quality checks + history queries
        key = f"{tick.source}:{tick.symbol}"
        if key not in self._tick_cache:
            self._tick_cache[key] = deque(maxlen=self._cache_size)
        self._tick_cache[key].append(tick)
        self._last_tick[tick.symbol] = tick

        # Dispatch to all handlers concurrently
        await asyncio.gather(
            *[h(tick) for h in self._handlers],
            return_exceptions=True
        )

    def latest(self, symbol: str) -> Optional[Tick]:
        """Return the most recent tick for a symbol (any source)."""
        return self._last_tick.get(symbol)

    def history(self, source: str, symbol: str) -> list[Tick]:
        """Return cached tick history for a source:symbol pair."""
        key = f"{source}:{symbol}"
        return list(self._tick_cache.get(key, []))

    async def run(self) -> None:
        """Start all registered WebSocket managers concurrently."""
        await asyncio.gather(*[m.start() for m in self._managers])


# ── Normalizers for common exchanges ──────────────────────────────

def normalize_binance(source: str, msg: dict) -> Optional[Tick]:
    if msg.get("e") != "trade":
        return None
    symbol = msg["s"][:-4] + "/" + msg["s"][-4:]  # BTCUSDT → BTC/USDT
    return Tick(
        source=source, symbol=symbol,
        price=float(msg["p"]), volume=float(msg["q"]),
        side="sell" if msg["m"] else "buy",
        ts=msg["T"], raw=msg
    )


def normalize_coinbase(source: str, msg: dict) -> Optional[Tick]:
    if msg.get("type") != "match":
        return None
    symbol = msg["product_id"].replace("-", "/")  # BTC-USD → BTC/USD
    import datetime
    ts = int(datetime.datetime.fromisoformat(msg["time"].rstrip("Z")).timestamp() * 1000)
    return Tick(
        source=source, symbol=symbol,
        price=float(msg["price"]), volume=float(msg["size"]),
        side=msg.get("side", "unknown"),
        ts=ts, raw=msg
    )


# ── Example usage ─────────────────────────────────────────────────

async def my_strategy(tick: Tick) -> None:
    print(f"[{tick.source}] {tick.symbol} @ {tick.price:.2f} ({tick.side})")

async def main():
    feed = DataFeedManager()
    feed.on_tick(my_strategy)

    feed.subscribe(
        source="binance",
        url="wss://stream.binance.com:9443/ws/btcusdt@trade",
        subscriptions=[],  # stream URL already encodes subscription
        normalizer=normalize_binance
    )
    feed.subscribe(
        source="coinbase",
        url="wss://advanced-trade-ws.coinbase.com",
        subscriptions=[{
            "type": "subscribe",
            "product_ids": ["BTC-USD"],
            "channel": "market_trades"
        }],
        normalizer=normalize_coinbase
    )
    await feed.run()

5. On-Chain Data: Reading AMM Prices from Smart Contracts

Centralized exchange feeds reflect order book activity, but on-chain DEX pools are the source of truth for DeFi liquidity. Reading prices directly from Uniswap v3 or Curve contracts via an Ethereum JSON-RPC node gives you the actual executable price — including slippage — rather than a mid-market quote.

Uniswap v3 Slot0 Price

Uniswap v3 pools store the current price as a sqrtPriceX96 value in the slot0 storage slot. The formula to convert it to a human-readable price is: price = (sqrtPriceX96 / 2^96)^2, adjusted for token decimals.

Python onchain_price.py — Read Uniswap v3 pool price via web3.py
from web3 import Web3
import time

# Connect to Ethereum node (use Alchemy/Infura/QuickNode RPC)
w3 = Web3.HTTPProvider("https://eth-mainnet.g.alchemy.com/v2/YOUR_KEY")
web3 = Web3(w3)

# Uniswap v3 WBTC/USDC 0.3% pool
POOL_ADDRESS = Web3.to_checksum_address("0x99ac8cA7087fA4A2A1FB6357269965A2014ABc35")

# Minimal ABI: only slot0() needed for price
POOL_ABI = [
    {
        "inputs": [],
        "name": "slot0",
        "outputs": [
            {"name": "sqrtPriceX96", "type": "uint160"},
            {"name": "tick", "type": "int24"}
        ],
        "stateMutability": "view",
        "type": "function"
    }
]

pool = web3.eth.contract(address=POOL_ADDRESS, abi=POOL_ABI)

WBTC_DECIMALS = 8
USDC_DECIMALS = 6


def get_pool_price() -> float:
    """
    Read current WBTC/USDC price from Uniswap v3 pool.
    Returns price in USDC per WBTC.
    """
    slot0 = pool.functions.slot0().call()
    sqrt_price_x96 = slot0[0]

    # Convert sqrtPriceX96 to price
    price_raw = (sqrt_price_x96 / (2 ** 96)) ** 2

    # Adjust for decimal difference: token0=WBTC(8dec), token1=USDC(6dec)
    decimal_adjustment = 10 ** (USDC_DECIMALS - WBTC_DECIMALS)
    price = price_raw * decimal_adjustment

    return price  # USDC per WBTC


def get_onchain_tick() -> dict:
    """Return a normalized tick-like dict from on-chain price."""
    price = get_pool_price()
    return {
        "source": "uniswap_v3",
        "symbol": "BTC/USDC",
        "price": price,
        "ts": int(time.time() * 1000)
    }

# Example: poll on-chain price every 12s (1 Ethereum block)
if __name__ == "__main__":
    import time
    while True:
        tick = get_onchain_tick()
        print(f"[ON-CHAIN] BTC/USDC = ${tick['price']:,.2f}")
        time.sleep(12)
🔑
Pyth Network for Sub-Second On-Chain Prices

If Ethereum block time (12s) is too slow, Pyth Network provides on-chain price feeds updated every 400ms on Solana and regularly on EVM chains. The pyth-sdk-py package makes reading Pyth prices straightforward from Python.

6. Rate Limiting and Caching Strategies

Exchange REST APIs enforce strict rate limits — Binance allows 1,200 requests per minute per IP for most endpoints, with burst limits of 10–100 requests per second depending on the weight of each call. Hitting these limits results in 429 errors and temporary IP bans. A well-designed agent respects these limits through token bucket rate limiting and aggressive caching.

Token Bucket Rate Limiter

Python rate_limiter.py — Async token bucket + request cache
import asyncio
import time
import hashlib
import json
from typing import Any, Optional


class TokenBucket:
    """Async token bucket rate limiter."""

    def __init__(self, rate: float, capacity: float):
        self.rate = rate        # tokens per second
        self.capacity = capacity  # max tokens
        self.tokens = capacity
        self.last_refill = time.monotonic()
        self._lock = asyncio.Lock()

    async def acquire(self, tokens: float = 1.0) -> None:
        async with self._lock:
            while True:
                self._refill()
                if self.tokens >= tokens:
                    self.tokens -= tokens
                    return
                wait = (tokens - self.tokens) / self.rate
                await asyncio.sleep(wait)

    def _refill(self) -> None:
        now = time.monotonic()
        added = (now - self.last_refill) * self.rate
        self.tokens = min(self.capacity, self.tokens + added)
        self.last_refill = now


class CachedRestClient:
    """REST client with token bucket rate limiting and TTL caching."""

    def __init__(self, rate: float = 10.0, default_ttl: float = 5.0):
        self.limiter = TokenBucket(rate=rate, capacity=rate * 2)
        self.default_ttl = default_ttl
        self._cache: dict[str, tuple[float, Any]] = {}

    async def get(self, url: str, params: dict = None, ttl: float = None) -> Any:
        import httpx
        cache_key = hashlib.md5(f"{url}{json.dumps(params or {}, sort_keys=True)}".encode()).hexdigest()
        ttl = ttl if ttl is not None else self.default_ttl

        # Return cached response if fresh
        if cache_key in self._cache:
            cached_at, data = self._cache[cache_key]
            if time.monotonic() - cached_at < ttl:
                return data

        # Rate limit before actual request
        await self.limiter.acquire()
        async with httpx.AsyncClient() as client:
            resp = await client.get(url, params=params)
            resp.raise_for_status()
            data = resp.json()

        self._cache[cache_key] = (time.monotonic(), data)
        return data

Caching Strategy by Data Type

Data TypeRecommended TTLNotes
Live tick price0s (always fresh)Use WebSocket, not REST polling
Order book snapshot0.5–2sStale book = incorrect spread estimation
OHLCV candlesUntil candle closesCache until is_closed=True
Account balance5–30sRefresh after each trade for accuracy
Exchange info / symbols3600s (1 hour)Rarely changes; cache aggressively
Funding rates300s (5 min)Updated each funding period
On-chain price12s (1 block)Poll once per block; use events for real-time

7. Data Quality: Detecting Stale and Bad Data

Bad data kills trading agents silently. A stale price from a disconnected WebSocket, a zero-volume tick indicating a data feed error, or a price that spikes 50% in one tick due to a malformed message — all of these will cause an agent to place irrational orders without any exception being thrown. You need explicit data quality guards before any tick reaches your strategy logic.

Python data_quality.py — Stale detection, spike filter, sanity checks
import time
from dataclasses import dataclass
from collections import deque
from data_models import Tick


@dataclass
class QualityResult:
    ok: bool
    reason: str = ""


class DataQualityGuard:
    """
    Wraps a DataFeedManager handler and rejects ticks that fail
    staleness, price-spike, or sanity checks.
    """

    def __init__(
        self,
        max_staleness_ms: int = 5000,       # reject ticks older than 5s
        max_price_change_pct: float = 5.0,  # reject >5% single-tick moves
        min_volume: float = 1e-8,           # reject zero-volume ticks
        window: int = 20                    # rolling window for spike detection
    ):
        self.max_staleness_ms = max_staleness_ms
        self.max_price_change_pct = max_price_change_pct
        self.min_volume = min_volume
        self._price_history: dict[str, deque] = {}
        self._window = window

    def check(self, tick: Tick) -> QualityResult:
        now_ms = int(time.time() * 1000)

        # 1. Staleness check
        age_ms = now_ms - tick.ts
        if age_ms > self.max_staleness_ms:
            return QualityResult(ok=False, reason=f"stale: {age_ms}ms old")

        # 2. Sanity checks
        if tick.price <= 0:
            return QualityResult(ok=False, reason="price <= 0")
        if tick.volume < self.min_volume:
            return QualityResult(ok=False, reason="zero or near-zero volume")

        # 3. Price spike detection using rolling median
        key = f"{tick.source}:{tick.symbol}"
        if key not in self._price_history:
            self._price_history[key] = deque(maxlen=self._window)

        history = self._price_history[key]
        if len(history) >= 3:
            median = sorted(history)[len(history) // 2]
            change_pct = abs(tick.price - median) / median * 100
            if change_pct > self.max_price_change_pct:
                return QualityResult(
                    ok=False,
                    reason=f"spike: {change_pct:.1f}% vs median {median:.2f}"
                )

        history.append(tick.price)
        return QualityResult(ok=True)


# Usage: wrap your tick handler
guard = DataQualityGuard(max_staleness_ms=3000, max_price_change_pct=3.0)

async def safe_handler(tick: Tick) -> None:
    result = guard.check(tick)
    if not result.ok:
        logger.warning(f"[QUALITY] Rejected {tick.symbol}@{tick.price}: {result.reason}")
        return
    await my_strategy(tick)  # only clean ticks reach strategy logic
Common Bad Data Patterns

Watch for: prices quoted in wrong units (sats vs BTC), timestamps 1000x too large or small (seconds vs milliseconds confusion), zero-size trades used as heartbeat messages by some exchanges, and volume figures that include both sides of a trade (already doubled). Always test your normalizer against a week of historical message samples before deploying.

8. Integration with Purple Flea: Data-Driven Order Placement

Once your data pipeline is delivering clean, normalized ticks, the final step is connecting it to Purple Flea for execution. The pattern is simple: the DataFeedManager fires your strategy handler on each tick; the strategy evaluates a signal; if the signal is strong enough, it calls the Purple Flea API to place a bet or initiate an escrow payment.

Python full_agent.py — Complete data-driven trading agent with Purple Flea
import asyncio
import os
import httpx
from collections import deque
from data_feed_manager import DataFeedManager, normalize_binance
from data_quality import DataQualityGuard
from data_models import Tick

PF_API = "https://api.purpleflea.com"
AGENT_KEY = os.environ["PF_AGENT_KEY"]   # pf_live_...
AGENT_ID = os.environ["PF_AGENT_ID"]


class MomentumAgent:
    """
    Simple momentum agent: tracks 20-tick price change.
    If price rose >0.5% over last 20 ticks, enter a crash game.
    If price fell >0.5%, skip or hedge.
    """

    def __init__(self, window: int = 20, threshold: float = 0.005):
        self.window = window
        self.threshold = threshold
        self.prices: deque = deque(maxlen=window)
        self.quality = DataQualityGuard()
        self.last_trade_ts: int = 0
        self.cooldown_ms: int = 30_000  # 30s between trades

    async def on_tick(self, tick: Tick) -> None:
        # 1. Quality gate
        qr = self.quality.check(tick)
        if not qr.ok:
            return

        self.prices.append(tick.price)
        if len(self.prices) < self.window:
            return  # need full window

        # 2. Cooldown check
        now_ms = tick.ts
        if now_ms - self.last_trade_ts < self.cooldown_ms:
            return

        # 3. Momentum signal
        oldest = self.prices[0]
        change_pct = (tick.price - oldest) / oldest

        if change_pct > self.threshold:
            await self.execute_crash_bet(change_pct)
            self.last_trade_ts = now_ms

    async def execute_crash_bet(self, signal: float) -> None:
        # Scale cashout target with signal strength
        cashout_at = round(1.2 + signal * 10, 2)
        wager = 1.00  # fixed $1 wager per trade

        async with httpx.AsyncClient(timeout=10.0) as http:
            # Check balance first
            bal_resp = await http.get(
                f"{PF_API}/wallet/balance",
                headers={"Authorization": f"Bearer {AGENT_KEY}"},
                params={"agent_id": AGENT_ID}
            )
            balance = bal_resp.json().get("balance", 0)
            if balance < wager:
                print(f"[SKIP] Balance {balance} < wager {wager}")
                return

            # Place crash bet with auto-cashout
            resp = await http.post(
                f"{PF_API}/casino/crash",
                headers={
                    "Authorization": f"Bearer {AGENT_KEY}",
                    "Content-Type": "application/json"
                },
                json={
                    "agent_id": AGENT_ID,
                    "wager": wager,
                    "cashout_at": cashout_at
                }
            )
            result = resp.json()
            print(
                f"[TRADE] signal={signal:.3%} cashout={cashout_at}x "
                f"→ {result['outcome']} payout={result.get('payout', 0)}"
            )


async def main():
    agent = MomentumAgent()
    feed = DataFeedManager()

    feed.on_tick(agent.on_tick)
    feed.subscribe(
        source="binance",
        url="wss://stream.binance.com:9443/ws/btcusdt@trade",
        subscriptions=[],
        normalizer=normalize_binance
    )

    print("[AGENT] Momentum agent running. Watching BTC/USDT...")
    await feed.run()


if __name__ == "__main__":
    asyncio.run(main())

Getting Your First USDC to Trade With

If your agent has no balance, it cannot place any bets. Purple Flea's faucet gives new agents free USDC to start. Claim it programmatically before your first trade:

Python Bootstrap: register agent + claim faucet
import httpx

PF_API = "https://api.purpleflea.com"

async def bootstrap_agent(agent_id: str) -> str:
    """Register agent and claim free USDC from the faucet."""
    async with httpx.AsyncClient() as http:
        # Register
        reg = await http.post(
            f"{PF_API}/agents/register",
            json={"agent_id": agent_id}
        )
        api_key = reg.json()["api_key"]  # pf_live_...

        # Claim faucet (free USDC for new agents)
        faucet = await http.post(
            "https://faucet.purpleflea.com/claim",
            headers={"Authorization": f"Bearer {api_key}"},
            json={"agent_id": agent_id}
        )
        balance = faucet.json().get("balance")
        print(f"[BOOTSTRAP] Agent {agent_id} ready. Balance: ${balance}")
        return api_key

Summary

A production-grade data stack for an AI trading agent requires careful attention at every layer:

Start Building Your Data-Driven Agent

New agents get free USDC from the Purple Flea faucet. No prior balance required — claim yours and wire it into your DataFeedManager today.

Related reading: Agent Escrow Patterns  ·  Building an Autonomous Trading Bot in 2026  ·  Purple Flea + Supabase Realtime