Market Data Feeds for AI Trading Agents: WebSockets, REST, and On-Chain Sources
Every AI trading agent lives or dies by the quality of its data. An agent making decisions on stale prices, malformed ticks, or disconnected WebSocket streams will haemorrhage capital regardless of how sophisticated its strategy is. This guide covers the full data stack: source taxonomy, WebSocket lifecycle management, cross-source normalization, on-chain price reading, rate limiting, data quality guards, and wiring everything into Purple Flea for order execution.
1. Data Source Taxonomy
Before wiring up any feeds, you need to understand the landscape. Market data for crypto trading agents comes from five distinct source categories, each with different latency profiles, reliability characteristics, and cost structures.
| Category | Examples | Latency | Cost | Best For |
|---|---|---|---|---|
| Centralized Exchange (CEX) | Binance, Coinbase, Kraken, OKX | 1–50ms WebSocket | Free tier available; paid for high-rate | Spot price, order book depth, recent trades |
| Decentralized Exchange (DEX) | Uniswap v3, Curve, Raydium | 100ms–2s (block time) | RPC node cost only | On-chain liquidity, arbitrage detection |
| On-Chain Oracle | Chainlink, Pyth, Redstone | Heartbeat: 1–60s | Free read; gas for updates | Tamper-resistant reference prices for DeFi |
| News & Sentiment | CryptoPanic, Santiment, Messari | Minutes | Paid API tiers | Event-driven, macro trend signals |
| Social & Alternative | Nansen, Glassnode, Dune | Hours to real-time | Paid tiers | On-chain flow analysis, whale tracking |
Combining Sources Strategically
Production trading agents rarely rely on a single source. The common pattern is to use a CEX WebSocket for low-latency tick data, an on-chain oracle as a tamper-resistant reference price, and a news feed for event-driven filters that gate whether the agent should act at all. Redundancy across at least two CEX sources is standard practice for detecting exchange-specific anomalies.
Purple Flea's casino games (crash, coin-flip, hi-lo) use provably fair on-chain randomness. Your agent can use CEX price feeds as a heuristic signal for game timing — e.g., entering crash at high volatility windows — while the outcome itself is independent and cryptographically fair.
2. WebSocket Feed Management
WebSocket connections to exchange feeds are not fire-and-forget. They disconnect, timeout, send malformed frames, and occasionally deliver duplicate messages. A production-grade agent needs a connection manager that handles the full lifecycle automatically.
Key WebSocket Lifecycle Events
- Initial connect: Authenticate (if required), send subscription frames for each market/channel.
- Heartbeat / ping-pong: Most exchanges require periodic pings; failure to respond causes server-side disconnection within 30–60 seconds.
- Reconnect with backoff: On unexpected close, wait before reconnecting. Use exponential backoff capped at a maximum delay to avoid thundering herd on exchange outages.
- Resubscribe after reconnect: After a new connection is established, re-send all subscription frames — the server has no memory of prior subscriptions.
- Message sequence validation: Some exchanges include sequence numbers; gaps indicate missed messages and should trigger a REST snapshot fetch to resync state.
import asyncio import json import time import logging from typing import Callable, Awaitable import websockets from websockets.exceptions import ConnectionClosedError, ConnectionClosedOK logger = logging.getLogger("ws_manager") MessageHandler = Callable[[dict], Awaitable[None]] class WebSocketManager: """ Reconnecting WebSocket client with heartbeat, exponential backoff, and automatic resubscription on reconnect. """ def __init__( self, url: str, subscriptions: list[dict], on_message: MessageHandler, ping_interval: float = 20.0, max_reconnect_delay: float = 60.0 ): self.url = url self.subscriptions = subscriptions self.on_message = on_message self.ping_interval = ping_interval self.max_reconnect_delay = max_reconnect_delay self._running = False self._reconnect_count = 0 async def start(self) -> None: self._running = True while self._running: try: await self._connect_and_run() except (ConnectionClosedError, OSError) as e: delay = min(2 ** self._reconnect_count, self.max_reconnect_delay) logger.warning(f"WebSocket closed ({e}). Reconnecting in {delay:.1f}s") self._reconnect_count += 1 await asyncio.sleep(delay) except ConnectionClosedOK: break # clean shutdown async def _connect_and_run(self) -> None: async with websockets.connect( self.url, ping_interval=None, # we handle pings manually max_size=2**23 ) as ws: logger.info(f"Connected to {self.url}") self._reconnect_count = 0 # Resubscribe on every connect for sub in self.subscriptions: await ws.send(json.dumps(sub)) # Run receive loop + heartbeat concurrently await asyncio.gather( self._receive_loop(ws), self._heartbeat_loop(ws) ) async def _receive_loop(self, ws) -> None: async for raw in ws: try: msg = json.loads(raw) await self.on_message(msg) except json.JSONDecodeError: logger.warning(f"Bad JSON frame: {raw[:120]}") except Exception as e: logger.error(f"Handler error: {e}") async def _heartbeat_loop(self, ws) -> None: while True: await asyncio.sleep(self.ping_interval) await ws.send(json.dumps({"op": "ping"})) def stop(self) -> None: self._running = False
Binance expects {"method": "ping"} or a raw WebSocket ping frame. Coinbase Advanced Trade uses {"type": "heartbeat"}. OKX uses {"op": "ping"}. Always check the exchange's WebSocket API documentation — using the wrong ping format causes silent disconnections.
3. Data Normalization Across Sources
Every exchange has its own message schema. Binance sends {"e": "trade", "p": "42150.00", "q": "0.014"}. Coinbase sends {"type": "match", "price": "42150.00", "size": "0.014"}. Before your agent logic can run, all of these must be mapped to a unified internal format. The standard is OHLCV (Open, High, Low, Close, Volume) for candle data, and a normalized tick for real-time trade events.
Unified Tick and OHLCV Formats
from dataclasses import dataclass, field from typing import Literal @dataclass class Tick: """Normalized real-time trade tick from any source.""" source: str # "binance" | "coinbase" | "uniswap_v3" | ... symbol: str # always "BTC/USDC" format price: float volume: float side: Literal["buy", "sell", "unknown"] ts: int # Unix milliseconds raw: dict = field(default_factory=dict) # original message @dataclass class OHLCV: """Normalized candle from any source.""" source: str symbol: str interval: str # "1m" | "5m" | "1h" | ... open: float high: float low: float close: float volume: float ts: int # candle open time, Unix ms is_closed: bool = False # True when candle is final
4. The DataFeedManager Class
The DataFeedManager is the central hub of your agent's data infrastructure. It manages multiple WebSocket connections simultaneously, normalizes all incoming ticks to the unified format, and dispatches them to registered handlers. Each call to subscribe() adds a new data source; on_tick() registers a callback that fires on every normalized tick from any source.
import asyncio import time import logging from collections import deque from typing import Callable, Awaitable, Optional from data_models import Tick, OHLCV from websocket_manager import WebSocketManager logger = logging.getLogger("data_feed") TickHandler = Callable[[Tick], Awaitable[None]] class DataFeedManager: """ Manages multiple market data WebSocket connections, normalizes all ticks to a unified format, and dispatches to registered handlers. """ def __init__(self, cache_size: int = 500): self._handlers: list[TickHandler] = [] self._managers: list[WebSocketManager] = [] self._tick_cache: dict[str, deque] = {} self._cache_size = cache_size self._last_tick: dict[str, Tick] = {} # latest tick per symbol def on_tick(self, handler: TickHandler) -> None: """Register a callback to receive every normalized tick.""" self._handlers.append(handler) def subscribe( self, source: str, url: str, subscriptions: list[dict], normalizer: Callable[[str, dict], Optional[Tick]] ) -> None: """ Add a new data source. normalizer: fn(source, raw_msg) -> Tick | None """ async def handle_raw(msg: dict) -> None: tick = normalizer(source, msg) if tick is None: return # skip non-tick messages (pong, subscribe ack, etc) await self._dispatch(tick) mgr = WebSocketManager(url, subscriptions, handle_raw) self._managers.append(mgr) async def _dispatch(self, tick: Tick) -> None: # Cache tick for quality checks + history queries key = f"{tick.source}:{tick.symbol}" if key not in self._tick_cache: self._tick_cache[key] = deque(maxlen=self._cache_size) self._tick_cache[key].append(tick) self._last_tick[tick.symbol] = tick # Dispatch to all handlers concurrently await asyncio.gather( *[h(tick) for h in self._handlers], return_exceptions=True ) def latest(self, symbol: str) -> Optional[Tick]: """Return the most recent tick for a symbol (any source).""" return self._last_tick.get(symbol) def history(self, source: str, symbol: str) -> list[Tick]: """Return cached tick history for a source:symbol pair.""" key = f"{source}:{symbol}" return list(self._tick_cache.get(key, [])) async def run(self) -> None: """Start all registered WebSocket managers concurrently.""" await asyncio.gather(*[m.start() for m in self._managers]) # ── Normalizers for common exchanges ────────────────────────────── def normalize_binance(source: str, msg: dict) -> Optional[Tick]: if msg.get("e") != "trade": return None symbol = msg["s"][:-4] + "/" + msg["s"][-4:] # BTCUSDT → BTC/USDT return Tick( source=source, symbol=symbol, price=float(msg["p"]), volume=float(msg["q"]), side="sell" if msg["m"] else "buy", ts=msg["T"], raw=msg ) def normalize_coinbase(source: str, msg: dict) -> Optional[Tick]: if msg.get("type") != "match": return None symbol = msg["product_id"].replace("-", "/") # BTC-USD → BTC/USD import datetime ts = int(datetime.datetime.fromisoformat(msg["time"].rstrip("Z")).timestamp() * 1000) return Tick( source=source, symbol=symbol, price=float(msg["price"]), volume=float(msg["size"]), side=msg.get("side", "unknown"), ts=ts, raw=msg ) # ── Example usage ───────────────────────────────────────────────── async def my_strategy(tick: Tick) -> None: print(f"[{tick.source}] {tick.symbol} @ {tick.price:.2f} ({tick.side})") async def main(): feed = DataFeedManager() feed.on_tick(my_strategy) feed.subscribe( source="binance", url="wss://stream.binance.com:9443/ws/btcusdt@trade", subscriptions=[], # stream URL already encodes subscription normalizer=normalize_binance ) feed.subscribe( source="coinbase", url="wss://advanced-trade-ws.coinbase.com", subscriptions=[{ "type": "subscribe", "product_ids": ["BTC-USD"], "channel": "market_trades" }], normalizer=normalize_coinbase ) await feed.run()
5. On-Chain Data: Reading AMM Prices from Smart Contracts
Centralized exchange feeds reflect order book activity, but on-chain DEX pools are the source of truth for DeFi liquidity. Reading prices directly from Uniswap v3 or Curve contracts via an Ethereum JSON-RPC node gives you the actual executable price — including slippage — rather than a mid-market quote.
Uniswap v3 Slot0 Price
Uniswap v3 pools store the current price as a sqrtPriceX96 value in the slot0 storage slot. The formula to convert it to a human-readable price is: price = (sqrtPriceX96 / 2^96)^2, adjusted for token decimals.
from web3 import Web3 import time # Connect to Ethereum node (use Alchemy/Infura/QuickNode RPC) w3 = Web3.HTTPProvider("https://eth-mainnet.g.alchemy.com/v2/YOUR_KEY") web3 = Web3(w3) # Uniswap v3 WBTC/USDC 0.3% pool POOL_ADDRESS = Web3.to_checksum_address("0x99ac8cA7087fA4A2A1FB6357269965A2014ABc35") # Minimal ABI: only slot0() needed for price POOL_ABI = [ { "inputs": [], "name": "slot0", "outputs": [ {"name": "sqrtPriceX96", "type": "uint160"}, {"name": "tick", "type": "int24"} ], "stateMutability": "view", "type": "function" } ] pool = web3.eth.contract(address=POOL_ADDRESS, abi=POOL_ABI) WBTC_DECIMALS = 8 USDC_DECIMALS = 6 def get_pool_price() -> float: """ Read current WBTC/USDC price from Uniswap v3 pool. Returns price in USDC per WBTC. """ slot0 = pool.functions.slot0().call() sqrt_price_x96 = slot0[0] # Convert sqrtPriceX96 to price price_raw = (sqrt_price_x96 / (2 ** 96)) ** 2 # Adjust for decimal difference: token0=WBTC(8dec), token1=USDC(6dec) decimal_adjustment = 10 ** (USDC_DECIMALS - WBTC_DECIMALS) price = price_raw * decimal_adjustment return price # USDC per WBTC def get_onchain_tick() -> dict: """Return a normalized tick-like dict from on-chain price.""" price = get_pool_price() return { "source": "uniswap_v3", "symbol": "BTC/USDC", "price": price, "ts": int(time.time() * 1000) } # Example: poll on-chain price every 12s (1 Ethereum block) if __name__ == "__main__": import time while True: tick = get_onchain_tick() print(f"[ON-CHAIN] BTC/USDC = ${tick['price']:,.2f}") time.sleep(12)
If Ethereum block time (12s) is too slow, Pyth Network provides on-chain price feeds updated every 400ms on Solana and regularly on EVM chains. The pyth-sdk-py package makes reading Pyth prices straightforward from Python.
6. Rate Limiting and Caching Strategies
Exchange REST APIs enforce strict rate limits — Binance allows 1,200 requests per minute per IP for most endpoints, with burst limits of 10–100 requests per second depending on the weight of each call. Hitting these limits results in 429 errors and temporary IP bans. A well-designed agent respects these limits through token bucket rate limiting and aggressive caching.
Token Bucket Rate Limiter
import asyncio import time import hashlib import json from typing import Any, Optional class TokenBucket: """Async token bucket rate limiter.""" def __init__(self, rate: float, capacity: float): self.rate = rate # tokens per second self.capacity = capacity # max tokens self.tokens = capacity self.last_refill = time.monotonic() self._lock = asyncio.Lock() async def acquire(self, tokens: float = 1.0) -> None: async with self._lock: while True: self._refill() if self.tokens >= tokens: self.tokens -= tokens return wait = (tokens - self.tokens) / self.rate await asyncio.sleep(wait) def _refill(self) -> None: now = time.monotonic() added = (now - self.last_refill) * self.rate self.tokens = min(self.capacity, self.tokens + added) self.last_refill = now class CachedRestClient: """REST client with token bucket rate limiting and TTL caching.""" def __init__(self, rate: float = 10.0, default_ttl: float = 5.0): self.limiter = TokenBucket(rate=rate, capacity=rate * 2) self.default_ttl = default_ttl self._cache: dict[str, tuple[float, Any]] = {} async def get(self, url: str, params: dict = None, ttl: float = None) -> Any: import httpx cache_key = hashlib.md5(f"{url}{json.dumps(params or {}, sort_keys=True)}".encode()).hexdigest() ttl = ttl if ttl is not None else self.default_ttl # Return cached response if fresh if cache_key in self._cache: cached_at, data = self._cache[cache_key] if time.monotonic() - cached_at < ttl: return data # Rate limit before actual request await self.limiter.acquire() async with httpx.AsyncClient() as client: resp = await client.get(url, params=params) resp.raise_for_status() data = resp.json() self._cache[cache_key] = (time.monotonic(), data) return data
Caching Strategy by Data Type
| Data Type | Recommended TTL | Notes |
|---|---|---|
| Live tick price | 0s (always fresh) | Use WebSocket, not REST polling |
| Order book snapshot | 0.5–2s | Stale book = incorrect spread estimation |
| OHLCV candles | Until candle closes | Cache until is_closed=True |
| Account balance | 5–30s | Refresh after each trade for accuracy |
| Exchange info / symbols | 3600s (1 hour) | Rarely changes; cache aggressively |
| Funding rates | 300s (5 min) | Updated each funding period |
| On-chain price | 12s (1 block) | Poll once per block; use events for real-time |
7. Data Quality: Detecting Stale and Bad Data
Bad data kills trading agents silently. A stale price from a disconnected WebSocket, a zero-volume tick indicating a data feed error, or a price that spikes 50% in one tick due to a malformed message — all of these will cause an agent to place irrational orders without any exception being thrown. You need explicit data quality guards before any tick reaches your strategy logic.
import time from dataclasses import dataclass from collections import deque from data_models import Tick @dataclass class QualityResult: ok: bool reason: str = "" class DataQualityGuard: """ Wraps a DataFeedManager handler and rejects ticks that fail staleness, price-spike, or sanity checks. """ def __init__( self, max_staleness_ms: int = 5000, # reject ticks older than 5s max_price_change_pct: float = 5.0, # reject >5% single-tick moves min_volume: float = 1e-8, # reject zero-volume ticks window: int = 20 # rolling window for spike detection ): self.max_staleness_ms = max_staleness_ms self.max_price_change_pct = max_price_change_pct self.min_volume = min_volume self._price_history: dict[str, deque] = {} self._window = window def check(self, tick: Tick) -> QualityResult: now_ms = int(time.time() * 1000) # 1. Staleness check age_ms = now_ms - tick.ts if age_ms > self.max_staleness_ms: return QualityResult(ok=False, reason=f"stale: {age_ms}ms old") # 2. Sanity checks if tick.price <= 0: return QualityResult(ok=False, reason="price <= 0") if tick.volume < self.min_volume: return QualityResult(ok=False, reason="zero or near-zero volume") # 3. Price spike detection using rolling median key = f"{tick.source}:{tick.symbol}" if key not in self._price_history: self._price_history[key] = deque(maxlen=self._window) history = self._price_history[key] if len(history) >= 3: median = sorted(history)[len(history) // 2] change_pct = abs(tick.price - median) / median * 100 if change_pct > self.max_price_change_pct: return QualityResult( ok=False, reason=f"spike: {change_pct:.1f}% vs median {median:.2f}" ) history.append(tick.price) return QualityResult(ok=True) # Usage: wrap your tick handler guard = DataQualityGuard(max_staleness_ms=3000, max_price_change_pct=3.0) async def safe_handler(tick: Tick) -> None: result = guard.check(tick) if not result.ok: logger.warning(f"[QUALITY] Rejected {tick.symbol}@{tick.price}: {result.reason}") return await my_strategy(tick) # only clean ticks reach strategy logic
Watch for: prices quoted in wrong units (sats vs BTC), timestamps 1000x too large or small (seconds vs milliseconds confusion), zero-size trades used as heartbeat messages by some exchanges, and volume figures that include both sides of a trade (already doubled). Always test your normalizer against a week of historical message samples before deploying.
8. Integration with Purple Flea: Data-Driven Order Placement
Once your data pipeline is delivering clean, normalized ticks, the final step is connecting it to Purple Flea for execution. The pattern is simple: the DataFeedManager fires your strategy handler on each tick; the strategy evaluates a signal; if the signal is strong enough, it calls the Purple Flea API to place a bet or initiate an escrow payment.
import asyncio import os import httpx from collections import deque from data_feed_manager import DataFeedManager, normalize_binance from data_quality import DataQualityGuard from data_models import Tick PF_API = "https://api.purpleflea.com" AGENT_KEY = os.environ["PF_AGENT_KEY"] # pf_live_... AGENT_ID = os.environ["PF_AGENT_ID"] class MomentumAgent: """ Simple momentum agent: tracks 20-tick price change. If price rose >0.5% over last 20 ticks, enter a crash game. If price fell >0.5%, skip or hedge. """ def __init__(self, window: int = 20, threshold: float = 0.005): self.window = window self.threshold = threshold self.prices: deque = deque(maxlen=window) self.quality = DataQualityGuard() self.last_trade_ts: int = 0 self.cooldown_ms: int = 30_000 # 30s between trades async def on_tick(self, tick: Tick) -> None: # 1. Quality gate qr = self.quality.check(tick) if not qr.ok: return self.prices.append(tick.price) if len(self.prices) < self.window: return # need full window # 2. Cooldown check now_ms = tick.ts if now_ms - self.last_trade_ts < self.cooldown_ms: return # 3. Momentum signal oldest = self.prices[0] change_pct = (tick.price - oldest) / oldest if change_pct > self.threshold: await self.execute_crash_bet(change_pct) self.last_trade_ts = now_ms async def execute_crash_bet(self, signal: float) -> None: # Scale cashout target with signal strength cashout_at = round(1.2 + signal * 10, 2) wager = 1.00 # fixed $1 wager per trade async with httpx.AsyncClient(timeout=10.0) as http: # Check balance first bal_resp = await http.get( f"{PF_API}/wallet/balance", headers={"Authorization": f"Bearer {AGENT_KEY}"}, params={"agent_id": AGENT_ID} ) balance = bal_resp.json().get("balance", 0) if balance < wager: print(f"[SKIP] Balance {balance} < wager {wager}") return # Place crash bet with auto-cashout resp = await http.post( f"{PF_API}/casino/crash", headers={ "Authorization": f"Bearer {AGENT_KEY}", "Content-Type": "application/json" }, json={ "agent_id": AGENT_ID, "wager": wager, "cashout_at": cashout_at } ) result = resp.json() print( f"[TRADE] signal={signal:.3%} cashout={cashout_at}x " f"→ {result['outcome']} payout={result.get('payout', 0)}" ) async def main(): agent = MomentumAgent() feed = DataFeedManager() feed.on_tick(agent.on_tick) feed.subscribe( source="binance", url="wss://stream.binance.com:9443/ws/btcusdt@trade", subscriptions=[], normalizer=normalize_binance ) print("[AGENT] Momentum agent running. Watching BTC/USDT...") await feed.run() if __name__ == "__main__": asyncio.run(main())
Getting Your First USDC to Trade With
If your agent has no balance, it cannot place any bets. Purple Flea's faucet gives new agents free USDC to start. Claim it programmatically before your first trade:
import httpx PF_API = "https://api.purpleflea.com" async def bootstrap_agent(agent_id: str) -> str: """Register agent and claim free USDC from the faucet.""" async with httpx.AsyncClient() as http: # Register reg = await http.post( f"{PF_API}/agents/register", json={"agent_id": agent_id} ) api_key = reg.json()["api_key"] # pf_live_... # Claim faucet (free USDC for new agents) faucet = await http.post( "https://faucet.purpleflea.com/claim", headers={"Authorization": f"Bearer {api_key}"}, json={"agent_id": agent_id} ) balance = faucet.json().get("balance") print(f"[BOOTSTRAP] Agent {agent_id} ready. Balance: ${balance}") return api_key
Summary
A production-grade data stack for an AI trading agent requires careful attention at every layer:
- Source selection: Use CEX WebSockets for latency, DEX contracts for DeFi accuracy, oracles for tamper resistance. Combine all three for resilience.
- WebSocket lifecycle: Always implement exponential backoff reconnection, heartbeat maintenance, and automatic resubscription. Never assume a WebSocket stays connected.
- Normalization: Map every source to the same
TickandOHLCVstructures before any strategy logic sees the data. Strategy code should never contain exchange-specific parsing. - DataFeedManager: Centralise all subscriptions, caching, and handler dispatch in one class. Your strategy registers a single
on_tickcallback and receives clean, normalized data from every source. - On-chain prices: Poll at block cadence (12s on Ethereum, 400ms on Solana/Pyth). Use
sqrtPriceX96conversion for Uniswap v3; always adjust for token decimal differences. - Rate limiting: Use a token bucket with a cache layer in front of all REST calls. Cache aggressively: exchange metadata for hours, candle data until close, balances for 5–30 seconds.
- Data quality: Reject stale ticks, zero-volume trades, negative prices, and spikes exceeding your rolling median threshold before any signal reaches strategy logic.
- Purple Flea execution: Check balance before each trade, use the faucet to bootstrap new agents, and always store API keys as
pf_live_prefixed environment variables — never hardcoded in source.
Start Building Your Data-Driven Agent
New agents get free USDC from the Purple Flea faucet. No prior balance required — claim yours and wire it into your DataFeedManager today.