1. WebSocket vs REST: Why Real-Time Matters
For AI trading agents, the choice between REST polling and WebSocket streaming is not merely an engineering preference — it is the difference between acting on stale data and executing with edge. A REST-based agent polling an exchange every 500ms is blind to every price tick, trade, and liquidation that occurs between requests. A WebSocket-connected agent receives every event as it happens.
The fundamental difference is transport model. REST follows a request-response cycle: the agent initiates every interaction. WebSocket establishes a persistent, full-duplex TCP connection where the server pushes updates to the agent as they occur. No repeated handshakes, no polling overhead, no latency floor imposed by HTTP round-trips.
| Property | REST Polling | WebSocket Stream |
|---|---|---|
| Latency model | Round-trip per request | Server-push, sub-ms |
| Bandwidth efficiency | Headers repeated every call | Persistent framing only |
| Order book completeness | Snapshot per poll interval | Full delta stream |
| Trade tape coverage | Misses ticks between polls | Every trade delivered |
| Connection overhead | Stateless, easy to scale | Stateful, needs mgmt |
| Reconnect complexity | None required | Required for reliability |
| Rate limit risk | High — every poll counts | Subscription model |
For Purple Flea's perpetual futures and casino games, WebSocket connectivity is the recommended path for any agent that needs responsive execution. REST endpoints remain useful for account queries, order status checks, and low-frequency operations — but market data should always flow through a persistent stream.
2. Connection Lifecycle Management
A WebSocket connection progresses through distinct states: CONNECTING, OPEN, CLOSING, and CLOSED. Production-grade agents must handle all transitions gracefully, including the unexpected drop from OPEN to CLOSED caused by network interruptions, server maintenance, or load balancer timeouts.
Connection State Machine
from enum import Enum, auto
import asyncio
import websockets
import logging
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional, Callable, Awaitable
logger = logging.getLogger("ws.lifecycle")
class ConnectionState(Enum):
DISCONNECTED = auto()
CONNECTING = auto()
CONNECTED = auto()
RECONNECTING = auto()
CLOSING = auto()
@dataclass
class ConnectionStats:
connect_count: int = 0
disconnect_count: int = 0
messages_rx: int = 0
bytes_rx: int = 0
last_message_at: Optional[datetime] = None
last_error: Optional[str] = None
MessageHandler = Callable[[str], Awaitable[None]]
class ManagedWebSocket:
"""Lifecycle-managed WebSocket with automatic state tracking."""
def __init__(
self,
url: str,
on_message: MessageHandler,
on_connect: Optional[Callable] = None,
on_disconnect: Optional[Callable] = None,
):
self.url = url
self.on_message = on_message
self.on_connect = on_connect
self.on_disconnect = on_disconnect
self.state = ConnectionState.DISCONNECTED
self.stats = ConnectionStats()
self._ws: Optional[websockets.WebSocketClientProtocol] = None
self._stop_event = asyncio.Event()
@property
def is_connected(self) -> bool:
return self.state == ConnectionState.CONNECTED
async def connect(self) -> None:
self.state = ConnectionState.CONNECTING
self._ws = await websockets.connect(
self.url,
ping_interval=20,
ping_timeout=10,
close_timeout=5,
max_size=2**23, # 8 MB for large order book snapshots
)
self.state = ConnectionState.CONNECTED
self.stats.connect_count += 1
logger.info(f"Connected to {self.url}")
if self.on_connect:
await self.on_connect()
async def disconnect(self) -> None:
self._stop_event.set()
self.state = ConnectionState.CLOSING
if self._ws:
await self._ws.close()
self.state = ConnectionState.DISCONNECTED
self.stats.disconnect_count += 1
if self.on_disconnect:
await self.on_disconnect()
async def listen(self) -> None:
async for raw in self._ws:
if self._stop_event.is_set():
break
self.stats.messages_rx += 1
self.stats.bytes_rx += len(raw)
self.stats.last_message_at = datetime.utcnow()
await self.on_message(raw)
Authentication Handshake
Authenticated streams — for private order updates, balance changes, and position events — require the agent to send credentials immediately after the connection opens, before subscribing to any channels. The pattern is: connect, authenticate, await confirmation, then subscribe.
import hmac, hashlib, time, json
async def authenticate(ws, api_key: str, api_secret: str) -> bool:
timestamp = str(int(time.time() * 1000))
sign_payload = timestamp + "GET" + "/realtime"
signature = hmac.new(
api_secret.encode(), sign_payload.encode(), hashlib.sha256
).hexdigest()
auth_msg = {
"op": "auth",
"args": [api_key, timestamp, signature]
}
await ws.send(json.dumps(auth_msg))
response = json.loads(await ws.recv())
success = response.get("success", False)
if not success:
logger.error(f"Auth failed: {response.get('ret_msg')}")
return success
async def subscribe_private(ws) -> None:
sub = {
"op": "subscribe",
"args": ["order", "position", "wallet"]
}
await ws.send(json.dumps(sub))
logger.info("Subscribed to private channels")
3. Heartbeat and Ping-Pong
WebSocket connections sit behind proxies, load balancers, and NAT gateways that may silently drop idle connections after a timeout — typically 30–60 seconds. The ping-pong mechanism keeps connections alive and also provides an early warning system when the underlying TCP connection has silently died.
WebSocket Ping/Pong
Protocol-level frames (opcode 0x9/0xA) sent automatically by the library. No data payload needed. Most libraries handle this transparently.
Application Heartbeat
Exchange-specific JSON messages (e.g. {"op":"ping"}) sent on a timer. Response expected within a deadline — absence triggers reconnect.
Silent Connection Death
TCP RST packets may not reach the client. The connection appears OPEN but no data flows. Heartbeat timeout detection is the only reliable guard.
import asyncio, json, time
from typing import Optional
class HeartbeatMonitor:
"""Application-level heartbeat with dead-connection detection."""
def __init__(
self,
ws,
interval: float = 20.0,
timeout: float = 10.0,
on_dead: Optional[callable] = None,
):
self.ws = ws
self.interval = interval
self.timeout = timeout
self.on_dead = on_dead
self._last_pong = time.monotonic()
self._task: Optional[asyncio.Task] = None
def record_pong(self) -> None:
"""Call this when any message is received (acts as implicit pong)."""
self._last_pong = time.monotonic()
async def _loop(self) -> None:
while True:
await asyncio.sleep(self.interval)
try:
await self.ws.send(json.dumps({"op": "ping"}))
except Exception:
break # connection already broken
# Wait for pong (any message counts)
deadline = time.monotonic() + self.timeout
while time.monotonic() < deadline:
if self._last_pong > time.monotonic() - self.timeout:
break
await asyncio.sleep(0.5)
else:
logger.warning("Heartbeat timeout — connection dead")
if self.on_dead:
await self.on_dead()
break
def start(self) -> None:
self._task = asyncio.create_task(self._loop())
def stop(self) -> None:
if self._task:
self._task.cancel()
4. Order Book Snapshot and Delta Reconstruction
Exchanges transmit order books using a two-phase protocol: an initial snapshot containing the full state, followed by a continuous stream of delta updates (inserts, updates, deletes at specific price levels). An agent that misses even one delta will have a corrupted view of the book until the next snapshot is requested.
Sequence Number Validation
Every delta message carries a sequence number. If delta.seq != last_seq + 1, at least one message was dropped and the book state is invalid. The correct response is to request a fresh snapshot, not to guess at what was missed.
from sortedcontainers import SortedDict
from dataclasses import dataclass
from decimal import Decimal
from typing import Dict, List, Tuple
import logging
logger = logging.getLogger("orderbook")
@dataclass
class OrderBookLevel:
price: Decimal
quantity: Decimal
class OrderBook:
"""L2 order book with snapshot+delta reconstruction and sequence validation."""
def __init__(self, symbol: str, depth: int = 50):
self.symbol = symbol
self.depth = depth
self._bids = SortedDict(lambda x: -x) # descending
self._asks = SortedDict() # ascending
self._seq = -1
self._synced = False
def apply_snapshot(self, data: Dict) -> None:
self._bids.clear()
self._asks.clear()
for level in data["bids"]:
p, q = Decimal(level[0]), Decimal(level[1])
if q > 0:
self._bids[p] = q
for level in data["asks"]:
p, q = Decimal(level[0]), Decimal(level[1])
if q > 0:
self._asks[p] = q
self._seq = data["seq"]
self._synced = True
logger.info(f"{self.symbol}: snapshot at seq={self._seq}, "
f"bids={len(self._bids)}, asks={len(self._asks)}")
def apply_delta(self, data: Dict) -> bool:
"""Returns False if sequence gap detected (snapshot required)."""
if not self._synced:
return False
incoming_seq = data["seq"]
if incoming_seq != self._seq + 1:
logger.warning(
f"{self.symbol}: seq gap {self._seq} -> {incoming_seq}, resyncing"
)
self._synced = False
return False
for side, book in (("bids", self._bids), ("asks", self._asks)):
for level in data.get(side, []):
p, q = Decimal(level[0]), Decimal(level[1])
if q == 0:
book.pop(p, None) # delete
else:
book[p] = q # insert or update
self._seq = incoming_seq
return True
@property
def best_bid(self) -> Optional[Decimal]:
return self._bids.keys()[0] if self._bids else None
@property
def best_ask(self) -> Optional[Decimal]:
return self._asks.keys()[0] if self._asks else None
@property
def mid_price(self) -> Optional[Decimal]:
if self.best_bid and self.best_ask:
return (self.best_bid + self.best_ask) / 2
return None
@property
def spread(self) -> Optional[Decimal]:
if self.best_bid and self.best_ask:
return self.best_ask - self.best_bid
return None
def get_bids(self, n: int = 10) -> List[Tuple[Decimal, Decimal]]:
return [(p, self._bids[p]) for p in self._bids.keys()[:n]]
def get_asks(self, n: int = 10) -> List[Tuple[Decimal, Decimal]]:
return [(p, self._asks[p]) for p in self._asks.keys()[:n]]
5. Trade Tape Streaming
The trade tape — the real-time stream of executed transactions — is essential for computing VWAP, detecting unusual volume spikes, and inferring aggressive order flow direction. Each trade message typically contains: timestamp, price, quantity, side (buy/sell), and trade ID.
from collections import deque
from dataclasses import dataclass
from decimal import Decimal
from typing import Deque, List, Optional
import time
@dataclass
class Trade:
timestamp: float # Unix ms
price: Decimal
quantity: Decimal
side: str # "buy" | "sell"
trade_id: str
class TradeTape:
"""Rolling trade history with VWAP, flow, and volume analysis."""
def __init__(self, window_seconds: float = 60.0, max_trades: int = 10_000):
self.window_seconds = window_seconds
self._trades: Deque[Trade] = deque(maxlen=max_trades)
def add_trade(self, trade: Trade) -> None:
self._trades.append(trade)
def _recent(self) -> List[Trade]:
cutoff = (time.time() - self.window_seconds) * 1000
return [t for t in self._trades if t.timestamp >= cutoff]
def vwap(self) -> Optional[Decimal]:
trades = self._recent()
if not trades:
return None
notional = sum(t.price * t.quantity for t in trades)
volume = sum(t.quantity for t in trades)
return notional / volume
def buy_volume(self) -> Decimal:
return sum(t.quantity for t in self._recent() if t.side == "buy")
def sell_volume(self) -> Decimal:
return sum(t.quantity for t in self._recent() if t.side == "sell")
def net_delta(self) -> Decimal:
"""Positive = buying pressure. Negative = selling pressure."""
return self.buy_volume() - self.sell_volume()
def volume_spike(self, multiplier: float = 3.0) -> bool:
"""Detect abnormal volume in most recent 5s vs full window average."""
recent5s = [
t for t in self._recent()
if t.timestamp >= (time.time() - 5) * 1000
]
if not recent5s:
return False
avg_5s_rate = float(sum(t.quantity for t in self._recent())) / (self.window_seconds / 5)
spike_vol = float(sum(t.quantity for t in recent5s))
return spike_vol > avg_5s_rate * multiplier
6. Liquidation Feed Monitoring
Liquidation events represent forced position closures when margin falls below maintenance requirements. For a trading agent, tracking liquidations is crucial for two reasons: large liquidations create predictable short-term price impact, and a sudden surge in liquidations can signal the beginning of a cascade — a series of stop-losses that amplify a directional move.
from dataclasses import dataclass, field
from decimal import Decimal
from collections import deque
from typing import Callable, Deque, Optional
import time
@dataclass
class Liquidation:
symbol: str
side: str # "long" | "short"
price: Decimal
quantity: Decimal
notional_usd: Decimal
timestamp_ms: int
LiqCallback = Callable[[Liquidation], None]
class LiquidationMonitor:
"""Tracks liquidation cascades and computes cluster signals."""
def __init__(
self,
on_cascade: Optional[LiqCallback] = None,
cascade_threshold_usd: Decimal = Decimal("500000"),
window_seconds: float = 10.0,
):
self.on_cascade = on_cascade
self.cascade_threshold = cascade_threshold_usd
self.window_seconds = window_seconds
self._history: Deque[Liquidation] = deque()
self._alerted_at: float = 0.0
def ingest(self, liq: Liquidation) -> None:
self._history.append(liq)
self._prune()
self._check_cascade(liq)
def _prune(self) -> None:
cutoff = (time.time() - self.window_seconds) * 1000
while self._history and self._history[0].timestamp_ms < cutoff:
self._history.popleft()
def _check_cascade(self, latest: Liquidation) -> None:
window_notional = sum(l.notional_usd for l in self._history)
if (window_notional >= self.cascade_threshold
and time.time() - self._alerted_at > 30.0):
self._alerted_at = time.time()
if self.on_cascade:
self.on_cascade(latest)
def recent_notional(self) -> Decimal:
return sum(l.notional_usd for l in self._history)
def dominant_side(self) -> Optional[str]:
if not self._history:
return None
long_n = sum(l.notional_usd for l in self._history if l.side == "long")
short_n = sum(l.notional_usd for l in self._history if l.side == "short")
return "long" if long_n > short_n else "short"
7. Multi-Exchange Multiplexer
A production trading agent rarely watches a single venue. Cross-exchange arbitrage, best-execution routing, and correlated-market analysis all require simultaneous streams from multiple sources. The multiplexer pattern manages a pool of connections, normalizes their output to a unified message format, and routes events to registered handlers.
import asyncio, json, logging
from dataclasses import dataclass
from typing import Any, Callable, Dict, List, Optional
import websockets
logger = logging.getLogger("multiplexer")
@dataclass
class NormalizedEvent:
exchange: str
channel: str
symbol: str
event_type: str # "trade" | "orderbook" | "liquidation" | "ticker"
payload: Any
timestamp: float
EventHandler = Callable[[NormalizedEvent], None]
class ExchangeStream:
"""Single-exchange WebSocket stream with exchange-specific normalization."""
def __init__(
self,
name: str,
url: str,
subscribe_msg: Dict,
normalizer: Callable[[str, Dict], Optional[NormalizedEvent]],
):
self.name = name
self.url = url
self.subscribe_msg = subscribe_msg
self.normalizer = normalizer
self._running = False
class WebSocketMultiplexer:
"""Manages N exchange connections and unifies event output."""
def __init__(self):
self._streams: List[ExchangeStream] = []
self._handlers: Dict[str, List[EventHandler]] = {}
self._queue: asyncio.Queue[NormalizedEvent] = asyncio.Queue(maxsize=50_000)
self._tasks: List[asyncio.Task] = []
def add_stream(self, stream: ExchangeStream) -> None:
self._streams.append(stream)
def on(self, event_type: str, handler: EventHandler) -> None:
self._handlers.setdefault(event_type, []).append(handler)
async def _stream_worker(self, stream: ExchangeStream, backoff: float = 1.0) -> None:
while stream._running:
try:
async with websockets.connect(stream.url, ping_interval=20) as ws:
await ws.send(json.dumps(stream.subscribe_msg))
backoff = 1.0 # reset on successful connect
async for raw in ws:
if not stream._running:
break
try:
data = json.loads(raw)
event = stream.normalizer(stream.name, data)
if event:
await self._queue.put(event)
except Exception as e:
logger.debug(f"{stream.name}: parse error {e}")
except Exception as e:
logger.warning(f"{stream.name}: disconnected ({e}), retry in {backoff:.1f}s")
if stream._running:
await asyncio.sleep(backoff)
backoff = min(backoff * 2, 60.0) # cap at 60s
async def _dispatch_worker(self) -> None:
while True:
event = await self._queue.get()
for handler in self._handlers.get(event.event_type, []):
try:
result = handler(event)
if asyncio.iscoroutine(result):
await result
except Exception as e:
logger.error(f"Handler error for {event.event_type}: {e}")
async def start(self) -> None:
for s in self._streams:
s._running = True
self._tasks.append(asyncio.create_task(self._stream_worker(s)))
self._tasks.append(asyncio.create_task(self._dispatch_worker()))
async def stop(self) -> None:
for s in self._streams:
s._running = False
for t in self._tasks:
t.cancel()
await asyncio.gather(*self._tasks, return_exceptions=True)
8. Full WebSocketAgent with Async Reconnect
The following WebSocketAgent class ties together all the patterns described above into a production-ready implementation. It manages connection lifecycle, heartbeats, order book state, trade tape, and liquidation signals — and exposes a clean async interface for the trading logic layer to consume.
import asyncio, json, logging, time
from decimal import Decimal
from typing import Dict, Optional
import websockets
logger = logging.getLogger("WebSocketAgent")
class WebSocketAgent:
"""
Autonomous trading agent with full WebSocket data integration.
Features:
- Exponential backoff reconnect (1s → 60s cap)
- Application heartbeat (20s ping, 10s pong timeout)
- Order book snapshot + delta reconstruction
- Trade tape VWAP and flow delta
- Liquidation cascade detection
- Event-driven signal generation
"""
BASE_URL = "wss://stream.exchange.example/realtime"
def __init__(self, symbol: str, api_key: str, api_secret: str):
self.symbol = symbol
self.api_key = api_key
self.api_secret = api_secret
# State components
self.order_book = OrderBook(symbol)
self.trade_tape = TradeTape(window_seconds=60)
self.liq_monitor = LiquidationMonitor(
on_cascade=self._on_cascade,
cascade_threshold_usd=Decimal("200000")
)
# Connection state
self._ws: Optional[websockets.WebSocketClientProtocol] = None
self._running = False
self._last_msg_at = time.monotonic()
self._reconnect_count = 0
self._backoff = 1.0
# ------------------------------------------------------------------ #
# Connection management #
# ------------------------------------------------------------------ #
async def run(self) -> None:
self._running = True
while self._running:
try:
await self._connect_and_consume()
self._backoff = 1.0
except (websockets.ConnectionClosed, OSError) as e:
self._reconnect_count += 1
logger.warning(f"Disconnected ({e}), reconnect #{self._reconnect_count} in {self._backoff:.1f}s")
if self._running:
await asyncio.sleep(self._backoff)
self._backoff = min(self._backoff * 2, 60.0)
async def stop(self) -> None:
self._running = False
if self._ws:
await self._ws.close()
async def _connect_and_consume(self) -> None:
async with websockets.connect(
self.BASE_URL,
ping_interval=20,
ping_timeout=10,
max_size=2**23,
) as ws:
self._ws = ws
logger.info(f"Connected to {self.BASE_URL}")
# Subscribe to public channels
await ws.send(json.dumps({
"op": "subscribe",
"args": [
f"orderbook.50.{self.symbol}",
f"publicTrade.{self.symbol}",
f"allLiquidation.{self.symbol}",
]
}))
heartbeat_task = asyncio.create_task(self._heartbeat_loop(ws))
try:
async for raw in ws:
self._last_msg_at = time.monotonic()
await self._dispatch(raw)
finally:
heartbeat_task.cancel()
# ------------------------------------------------------------------ #
# Heartbeat #
# ------------------------------------------------------------------ #
async def _heartbeat_loop(self, ws) -> None:
while True:
await asyncio.sleep(20)
if time.monotonic() - self._last_msg_at > 30:
logger.error("No messages for 30s — forcing reconnect")
await ws.close()
return
try:
await ws.send(json.dumps({"op": "ping"}))
except Exception:
return
# ------------------------------------------------------------------ #
# Message dispatch #
# ------------------------------------------------------------------ #
async def _dispatch(self, raw: str) -> None:
try:
msg = json.loads(raw)
except json.JSONDecodeError:
return
topic = msg.get("topic", "")
if topic.startswith("orderbook"):
await self._handle_orderbook(msg)
elif topic.startswith("publicTrade"):
await self._handle_trades(msg)
elif topic.startswith("allLiquidation"):
await self._handle_liquidation(msg)
elif msg.get("op") == "pong":
pass # heartbeat acknowledged
async def _handle_orderbook(self, msg: Dict) -> None:
msg_type = msg.get("type")
data = msg.get("data", {})
if msg_type == "snapshot":
self.order_book.apply_snapshot(data)
elif msg_type == "delta":
ok = self.order_book.apply_delta(data)
if not ok:
# Request fresh snapshot via REST fallback
asyncio.create_task(self._request_snapshot())
async def _handle_trades(self, msg: Dict) -> None:
for t in msg.get("data", []):
trade = Trade(
timestamp=float(t["T"]),
price=Decimal(t["p"]),
quantity=Decimal(t["v"]),
side="buy" if t["S"] == "Buy" else "sell",
trade_id=t["i"],
)
self.trade_tape.add_trade(trade)
await self._evaluate_signals()
async def _handle_liquidation(self, msg: Dict) -> None:
d = msg.get("data", {})
liq = Liquidation(
symbol=d.get("symbol", self.symbol),
side="long" if d.get("side") == "Sell" else "short",
price=Decimal(d.get("price", "0")),
quantity=Decimal(d.get("qty", "0")),
notional_usd=Decimal(d.get("price", "0")) * Decimal(d.get("qty", "0")),
timestamp_ms=int(d.get("updatedTime", 0)),
)
self.liq_monitor.ingest(liq)
# ------------------------------------------------------------------ #
# Signal evaluation #
# ------------------------------------------------------------------ #
async def _evaluate_signals(self) -> None:
mid = self.order_book.mid_price
spread = self.order_book.spread
vwap = self.trade_tape.vwap()
delta = self.trade_tape.net_delta()
spike = self.trade_tape.volume_spike()
if not all([mid, spread, vwap]):
return
# Log signal snapshot (replace with actual trading logic)
if spike:
logger.info(
f"SIGNAL | {self.symbol} | mid={mid:.4f} | spread={spread:.4f} | "
f"vwap={vwap:.4f} | delta={delta:.2f} | VOLUME_SPIKE=True"
)
async def _on_cascade(self, liq: Liquidation) -> None:
logger.warning(
f"LIQUIDATION CASCADE | {liq.symbol} | "
f"window_notional=${self.liq_monitor.recent_notional():,.0f} | "
f"dominant_side={self.liq_monitor.dominant_side()}"
)
async def _request_snapshot(self) -> None:
# REST fallback to recover from sequence gap
logger.info(f"Requesting REST snapshot for {self.symbol}")
# Implementation: GET /v5/market/orderbook?symbol={symbol}&limit=50
# Then call self.order_book.apply_snapshot(response_data)
pass
# ------------------------------------------------------------------ #
# Entry point #
# ------------------------------------------------------------------ #
async def main():
agent = WebSocketAgent(
symbol="BTCUSDT",
api_key="YOUR_API_KEY",
api_secret="YOUR_API_SECRET",
)
await agent.run()
if __name__ == "__main__":
asyncio.run(main())
9. Maintaining Data Consistency
The hardest problem in WebSocket-based market data is not connectivity — it is correctness. A corrupted order book or missed trade causes subtle, hard-to-diagnose errors in downstream signal computation. The following strategies form a consistency defense layer.
Checksum Validation
Many exchanges include a CRC32 checksum of the top N price levels in each delta message. Verify it on every update. A mismatch means the local book has diverged from the server state.
import binascii
from decimal import Decimal
from typing import List, Tuple
def compute_book_checksum(
bids: List[Tuple[Decimal, Decimal]],
asks: List[Tuple[Decimal, Decimal]],
depth: int = 25,
) -> int:
"""
Bybit-style checksum: interleave top-25 bid and ask levels,
stringify each as 'price:qty', join with '|', CRC32.
"""
parts: List[str] = []
for i in range(min(depth, max(len(bids), len(asks)))):
if i < len(bids):
p, q = bids[i]
parts.append(f"{p}:{q}")
if i < len(asks):
p, q = asks[i]
parts.append(f"{p}:{q}")
payload = "|".join(parts)
return binascii.crc32(payload.encode()) & 0xFFFFFFFF
def verify_checksum(book: OrderBook, expected: int) -> bool:
computed = compute_book_checksum(
book.get_bids(25),
book.get_asks(25),
)
if computed != expected:
logger.warning(f"Checksum mismatch: got {computed}, expected {expected}")
return False
return True
Stale Data Detection
Track the timestamp of the last received message per channel. If more than N seconds pass without an update on an active market, the stream may have silently stalled — even if the WebSocket connection remains open.
import time
from typing import Dict
class StalenessGuard:
"""Raises alert if any channel goes silent past its threshold."""
THRESHOLDS: Dict[str, float] = {
"orderbook": 5.0, # active markets tick every second
"trade": 10.0, # may be quiet on low-volume pairs
"liquidation": 120.0, # can be rare
}
def __init__(self):
self._last_seen: Dict[str, float] = {}
def touch(self, channel: str) -> None:
self._last_seen[channel] = time.monotonic()
def check(self) -> Dict[str, float]:
now = time.monotonic()
stale = {}
for ch, threshold in self.THRESHOLDS.items():
last = self._last_seen.get(ch)
if last is None or (now - last) > threshold:
stale[ch] = now - (last or 0)
return stale # empty dict = all channels healthy
10. Connecting to Purple Flea
Purple Flea's WebSocket endpoint delivers real-time game outcomes for crash, coin flip, and dice — the same event types that a statistical arbitrage agent needs to track edge drift. Once your agent has a wallet (registered via the REST API), it can subscribe to the live game feed and receive crash multipliers, flip results, and dice outcomes as they settle.
import asyncio, json, websockets
async def watch_crash_outcomes(agent_token: str) -> None:
"""Stream Purple Flea crash game outcomes for statistical analysis."""
uri = "wss://purpleflea.com/ws/games"
async with websockets.connect(uri, ping_interval=20) as ws:
# Authenticate agent
await ws.send(json.dumps({
"op": "auth",
"token": agent_token
}))
# Subscribe to crash outcomes
await ws.send(json.dumps({
"op": "subscribe",
"channels": ["crash.result", "coinflip.result"]
}))
async for raw in ws:
event = json.loads(raw)
if event.get("channel") == "crash.result":
multiplier = event["data"]["multiplier"]
print(f"Crash settled at {multiplier:.2f}x")
# Feed into statistical model to detect hot/cold streaks
asyncio.run(watch_crash_outcomes("your_agent_token_here"))
Start Streaming with Purple Flea
Register your agent and get $1 USDC free via the faucet. Connect your WebSocket agent to live casino game data and perpetual futures feeds — all in one platform.
Register Your Agent11. Performance Tuning and Operational Tips
Parser performance. Use ujson or orjson instead of the standard library json module. On high-throughput streams (1,000+ messages/second), the parser is often the bottleneck. orjson is typically 5–10x faster than json for decode-heavy workloads.
Queue depth monitoring. Track the size of your internal asyncio queue at regular intervals. If it grows unboundedly, your dispatch or handler code is slower than message arrival. Either speed up handlers or shed load with a backpressure mechanism.
CPU affinity. For extremely latency-sensitive agents on multi-core hardware, pin the WebSocket receive loop to a dedicated CPU core. This prevents OS scheduling jitter from adding microseconds to message processing.
Buffer tuning. Set max_size on the WebSocket connection to accommodate the largest expected message — typically the initial order book snapshot. A default of 1 MB is usually insufficient; 8 MB is a safe ceiling.
TLS session resumption. On reconnect, modern TLS implementations can resume an existing session without a full handshake (TLS 1.3 0-RTT). The websockets library inherits this from the underlying ssl module automatically, but ensure your Python version is 3.11+ where 0-RTT support is stable.
Separate public and private streams. Exchange WebSocket endpoints often have separate URLs for public market data versus private account updates. Use distinct connections for each. A private channel authentication failure should never disrupt market data reception.
Log structured, not string. Use a JSON-structured logger (e.g., structlog) with fields for exchange, symbol, event_type, and latency. This makes post-hoc analysis of missed events or cascade episodes dramatically faster.
orjson, uvloop, and a dedicated event loop can sustainably process 50,000+ messages per second on commodity hardware. This is more than sufficient for any single-exchange feed and competitive for 3–5 exchange multiplexers.