Guide

Real-Time Data Feeds for AI Trading Agents: WebSocket Architecture

How autonomous agents consume real-time market data via WebSocket connections — handling reconnects, multiplexing streams, parsing order book updates, and maintaining data consistency.

📅 March 6, 2026 🕐 18 min read 📚 Guide
<1ms
Typical WebSocket latency
8x
Faster than REST polling
99.9%
Uptime with reconnect logic

1. WebSocket vs REST: Why Real-Time Matters

For AI trading agents, the choice between REST polling and WebSocket streaming is not merely an engineering preference — it is the difference between acting on stale data and executing with edge. A REST-based agent polling an exchange every 500ms is blind to every price tick, trade, and liquidation that occurs between requests. A WebSocket-connected agent receives every event as it happens.

The fundamental difference is transport model. REST follows a request-response cycle: the agent initiates every interaction. WebSocket establishes a persistent, full-duplex TCP connection where the server pushes updates to the agent as they occur. No repeated handshakes, no polling overhead, no latency floor imposed by HTTP round-trips.

PropertyREST PollingWebSocket Stream
Latency modelRound-trip per requestServer-push, sub-ms
Bandwidth efficiencyHeaders repeated every callPersistent framing only
Order book completenessSnapshot per poll intervalFull delta stream
Trade tape coverageMisses ticks between pollsEvery trade delivered
Connection overheadStateless, easy to scaleStateful, needs mgmt
Reconnect complexityNone requiredRequired for reliability
Rate limit riskHigh — every poll countsSubscription model
Key insight On a busy exchange, 500ms of missed price action can contain dozens of trades and multiple order book level changes. A trading agent that misses these cannot accurately compute VWAP, spread, or liquidation risk.

For Purple Flea's perpetual futures and casino games, WebSocket connectivity is the recommended path for any agent that needs responsive execution. REST endpoints remain useful for account queries, order status checks, and low-frequency operations — but market data should always flow through a persistent stream.

2. Connection Lifecycle Management

A WebSocket connection progresses through distinct states: CONNECTING, OPEN, CLOSING, and CLOSED. Production-grade agents must handle all transitions gracefully, including the unexpected drop from OPEN to CLOSED caused by network interruptions, server maintenance, or load balancer timeouts.

Connection State Machine

Python connection_lifecycle.py
from enum import Enum, auto
import asyncio
import websockets
import logging
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional, Callable, Awaitable

logger = logging.getLogger("ws.lifecycle")


class ConnectionState(Enum):
    DISCONNECTED = auto()
    CONNECTING   = auto()
    CONNECTED    = auto()
    RECONNECTING = auto()
    CLOSING      = auto()


@dataclass
class ConnectionStats:
    connect_count:    int   = 0
    disconnect_count: int   = 0
    messages_rx:      int   = 0
    bytes_rx:         int   = 0
    last_message_at:  Optional[datetime] = None
    last_error:       Optional[str] = None


MessageHandler = Callable[[str], Awaitable[None]]


class ManagedWebSocket:
    """Lifecycle-managed WebSocket with automatic state tracking."""

    def __init__(
        self,
        url: str,
        on_message: MessageHandler,
        on_connect: Optional[Callable] = None,
        on_disconnect: Optional[Callable] = None,
    ):
        self.url           = url
        self.on_message    = on_message
        self.on_connect    = on_connect
        self.on_disconnect = on_disconnect
        self.state         = ConnectionState.DISCONNECTED
        self.stats         = ConnectionStats()
        self._ws: Optional[websockets.WebSocketClientProtocol] = None
        self._stop_event   = asyncio.Event()

    @property
    def is_connected(self) -> bool:
        return self.state == ConnectionState.CONNECTED

    async def connect(self) -> None:
        self.state = ConnectionState.CONNECTING
        self._ws = await websockets.connect(
            self.url,
            ping_interval=20,
            ping_timeout=10,
            close_timeout=5,
            max_size=2**23,   # 8 MB for large order book snapshots
        )
        self.state = ConnectionState.CONNECTED
        self.stats.connect_count += 1
        logger.info(f"Connected to {self.url}")
        if self.on_connect:
            await self.on_connect()

    async def disconnect(self) -> None:
        self._stop_event.set()
        self.state = ConnectionState.CLOSING
        if self._ws:
            await self._ws.close()
        self.state = ConnectionState.DISCONNECTED
        self.stats.disconnect_count += 1
        if self.on_disconnect:
            await self.on_disconnect()

    async def listen(self) -> None:
        async for raw in self._ws:
            if self._stop_event.is_set():
                break
            self.stats.messages_rx += 1
            self.stats.bytes_rx += len(raw)
            self.stats.last_message_at = datetime.utcnow()
            await self.on_message(raw)

Authentication Handshake

Authenticated streams — for private order updates, balance changes, and position events — require the agent to send credentials immediately after the connection opens, before subscribing to any channels. The pattern is: connect, authenticate, await confirmation, then subscribe.

Python auth_handshake.py
import hmac, hashlib, time, json

async def authenticate(ws, api_key: str, api_secret: str) -> bool:
    timestamp = str(int(time.time() * 1000))
    sign_payload = timestamp + "GET" + "/realtime"
    signature = hmac.new(
        api_secret.encode(), sign_payload.encode(), hashlib.sha256
    ).hexdigest()

    auth_msg = {
        "op": "auth",
        "args": [api_key, timestamp, signature]
    }
    await ws.send(json.dumps(auth_msg))

    response = json.loads(await ws.recv())
    success = response.get("success", False)
    if not success:
        logger.error(f"Auth failed: {response.get('ret_msg')}")
    return success


async def subscribe_private(ws) -> None:
    sub = {
        "op": "subscribe",
        "args": ["order", "position", "wallet"]
    }
    await ws.send(json.dumps(sub))
    logger.info("Subscribed to private channels")

3. Heartbeat and Ping-Pong

WebSocket connections sit behind proxies, load balancers, and NAT gateways that may silently drop idle connections after a timeout — typically 30–60 seconds. The ping-pong mechanism keeps connections alive and also provides an early warning system when the underlying TCP connection has silently died.

WebSocket Ping/Pong

Protocol-level frames (opcode 0x9/0xA) sent automatically by the library. No data payload needed. Most libraries handle this transparently.

Application Heartbeat

Exchange-specific JSON messages (e.g. {"op":"ping"}) sent on a timer. Response expected within a deadline — absence triggers reconnect.

Silent Connection Death

TCP RST packets may not reach the client. The connection appears OPEN but no data flows. Heartbeat timeout detection is the only reliable guard.

Python heartbeat.py
import asyncio, json, time
from typing import Optional


class HeartbeatMonitor:
    """Application-level heartbeat with dead-connection detection."""

    def __init__(
        self,
        ws,
        interval: float = 20.0,
        timeout: float = 10.0,
        on_dead: Optional[callable] = None,
    ):
        self.ws        = ws
        self.interval  = interval
        self.timeout   = timeout
        self.on_dead   = on_dead
        self._last_pong = time.monotonic()
        self._task: Optional[asyncio.Task] = None

    def record_pong(self) -> None:
        """Call this when any message is received (acts as implicit pong)."""
        self._last_pong = time.monotonic()

    async def _loop(self) -> None:
        while True:
            await asyncio.sleep(self.interval)
            try:
                await self.ws.send(json.dumps({"op": "ping"}))
            except Exception:
                break  # connection already broken

            # Wait for pong (any message counts)
            deadline = time.monotonic() + self.timeout
            while time.monotonic() < deadline:
                if self._last_pong > time.monotonic() - self.timeout:
                    break
                await asyncio.sleep(0.5)
            else:
                logger.warning("Heartbeat timeout — connection dead")
                if self.on_dead:
                    await self.on_dead()
                break

    def start(self) -> None:
        self._task = asyncio.create_task(self._loop())

    def stop(self) -> None:
        if self._task:
            self._task.cancel()
Warning Never rely solely on the OS to detect a dead TCP connection. Cloud load balancers can drop a session without sending a FIN packet. Your heartbeat monitor is the last line of defense against a zombie connection that receives no data but appears open.

4. Order Book Snapshot and Delta Reconstruction

Exchanges transmit order books using a two-phase protocol: an initial snapshot containing the full state, followed by a continuous stream of delta updates (inserts, updates, deletes at specific price levels). An agent that misses even one delta will have a corrupted view of the book until the next snapshot is requested.

Sequence Number Validation

Every delta message carries a sequence number. If delta.seq != last_seq + 1, at least one message was dropped and the book state is invalid. The correct response is to request a fresh snapshot, not to guess at what was missed.

Python order_book.py
from sortedcontainers import SortedDict
from dataclasses import dataclass
from decimal import Decimal
from typing import Dict, List, Tuple
import logging

logger = logging.getLogger("orderbook")


@dataclass
class OrderBookLevel:
    price:    Decimal
    quantity: Decimal


class OrderBook:
    """L2 order book with snapshot+delta reconstruction and sequence validation."""

    def __init__(self, symbol: str, depth: int = 50):
        self.symbol  = symbol
        self.depth   = depth
        self._bids   = SortedDict(lambda x: -x)  # descending
        self._asks   = SortedDict()                 # ascending
        self._seq    = -1
        self._synced = False

    def apply_snapshot(self, data: Dict) -> None:
        self._bids.clear()
        self._asks.clear()
        for level in data["bids"]:
            p, q = Decimal(level[0]), Decimal(level[1])
            if q > 0:
                self._bids[p] = q
        for level in data["asks"]:
            p, q = Decimal(level[0]), Decimal(level[1])
            if q > 0:
                self._asks[p] = q
        self._seq    = data["seq"]
        self._synced = True
        logger.info(f"{self.symbol}: snapshot at seq={self._seq}, "
                    f"bids={len(self._bids)}, asks={len(self._asks)}")

    def apply_delta(self, data: Dict) -> bool:
        """Returns False if sequence gap detected (snapshot required)."""
        if not self._synced:
            return False

        incoming_seq = data["seq"]
        if incoming_seq != self._seq + 1:
            logger.warning(
                f"{self.symbol}: seq gap {self._seq} -> {incoming_seq}, resyncing"
            )
            self._synced = False
            return False

        for side, book in (("bids", self._bids), ("asks", self._asks)):
            for level in data.get(side, []):
                p, q = Decimal(level[0]), Decimal(level[1])
                if q == 0:
                    book.pop(p, None)   # delete
                else:
                    book[p] = q           # insert or update

        self._seq = incoming_seq
        return True

    @property
    def best_bid(self) -> Optional[Decimal]:
        return self._bids.keys()[0] if self._bids else None

    @property
    def best_ask(self) -> Optional[Decimal]:
        return self._asks.keys()[0] if self._asks else None

    @property
    def mid_price(self) -> Optional[Decimal]:
        if self.best_bid and self.best_ask:
            return (self.best_bid + self.best_ask) / 2
        return None

    @property
    def spread(self) -> Optional[Decimal]:
        if self.best_bid and self.best_ask:
            return self.best_ask - self.best_bid
        return None

    def get_bids(self, n: int = 10) -> List[Tuple[Decimal, Decimal]]:
        return [(p, self._bids[p]) for p in self._bids.keys()[:n]]

    def get_asks(self, n: int = 10) -> List[Tuple[Decimal, Decimal]]:
        return [(p, self._asks[p]) for p in self._asks.keys()[:n]]

5. Trade Tape Streaming

The trade tape — the real-time stream of executed transactions — is essential for computing VWAP, detecting unusual volume spikes, and inferring aggressive order flow direction. Each trade message typically contains: timestamp, price, quantity, side (buy/sell), and trade ID.

Python trade_tape.py
from collections import deque
from dataclasses import dataclass
from decimal import Decimal
from typing import Deque, List, Optional
import time


@dataclass
class Trade:
    timestamp: float   # Unix ms
    price:     Decimal
    quantity:  Decimal
    side:      str     # "buy" | "sell"
    trade_id:  str


class TradeTape:
    """Rolling trade history with VWAP, flow, and volume analysis."""

    def __init__(self, window_seconds: float = 60.0, max_trades: int = 10_000):
        self.window_seconds = window_seconds
        self._trades: Deque[Trade] = deque(maxlen=max_trades)

    def add_trade(self, trade: Trade) -> None:
        self._trades.append(trade)

    def _recent(self) -> List[Trade]:
        cutoff = (time.time() - self.window_seconds) * 1000
        return [t for t in self._trades if t.timestamp >= cutoff]

    def vwap(self) -> Optional[Decimal]:
        trades = self._recent()
        if not trades:
            return None
        notional = sum(t.price * t.quantity for t in trades)
        volume   = sum(t.quantity for t in trades)
        return notional / volume

    def buy_volume(self) -> Decimal:
        return sum(t.quantity for t in self._recent() if t.side == "buy")

    def sell_volume(self) -> Decimal:
        return sum(t.quantity for t in self._recent() if t.side == "sell")

    def net_delta(self) -> Decimal:
        """Positive = buying pressure. Negative = selling pressure."""
        return self.buy_volume() - self.sell_volume()

    def volume_spike(self, multiplier: float = 3.0) -> bool:
        """Detect abnormal volume in most recent 5s vs full window average."""
        recent5s = [
            t for t in self._recent()
            if t.timestamp >= (time.time() - 5) * 1000
        ]
        if not recent5s:
            return False
        avg_5s_rate = float(sum(t.quantity for t in self._recent())) / (self.window_seconds / 5)
        spike_vol   = float(sum(t.quantity for t in recent5s))
        return spike_vol > avg_5s_rate * multiplier

6. Liquidation Feed Monitoring

Liquidation events represent forced position closures when margin falls below maintenance requirements. For a trading agent, tracking liquidations is crucial for two reasons: large liquidations create predictable short-term price impact, and a sudden surge in liquidations can signal the beginning of a cascade — a series of stop-losses that amplify a directional move.

Danger Do not attempt to front-run exchange liquidations. Most exchanges handle liquidations through their insurance funds and are not publicly routable. The feed is for informational signal generation, not order insertion.
Python liquidation_monitor.py
from dataclasses import dataclass, field
from decimal import Decimal
from collections import deque
from typing import Callable, Deque, Optional
import time


@dataclass
class Liquidation:
    symbol:        str
    side:          str      # "long" | "short"
    price:         Decimal
    quantity:      Decimal
    notional_usd:  Decimal
    timestamp_ms:  int


LiqCallback = Callable[[Liquidation], None]


class LiquidationMonitor:
    """Tracks liquidation cascades and computes cluster signals."""

    def __init__(
        self,
        on_cascade: Optional[LiqCallback] = None,
        cascade_threshold_usd: Decimal = Decimal("500000"),
        window_seconds: float = 10.0,
    ):
        self.on_cascade           = on_cascade
        self.cascade_threshold    = cascade_threshold_usd
        self.window_seconds       = window_seconds
        self._history: Deque[Liquidation] = deque()
        self._alerted_at: float  = 0.0

    def ingest(self, liq: Liquidation) -> None:
        self._history.append(liq)
        self._prune()
        self._check_cascade(liq)

    def _prune(self) -> None:
        cutoff = (time.time() - self.window_seconds) * 1000
        while self._history and self._history[0].timestamp_ms < cutoff:
            self._history.popleft()

    def _check_cascade(self, latest: Liquidation) -> None:
        window_notional = sum(l.notional_usd for l in self._history)
        if (window_notional >= self.cascade_threshold
                and time.time() - self._alerted_at > 30.0):
            self._alerted_at = time.time()
            if self.on_cascade:
                self.on_cascade(latest)

    def recent_notional(self) -> Decimal:
        return sum(l.notional_usd for l in self._history)

    def dominant_side(self) -> Optional[str]:
        if not self._history:
            return None
        long_n  = sum(l.notional_usd for l in self._history if l.side == "long")
        short_n = sum(l.notional_usd for l in self._history if l.side == "short")
        return "long" if long_n > short_n else "short"

7. Multi-Exchange Multiplexer

A production trading agent rarely watches a single venue. Cross-exchange arbitrage, best-execution routing, and correlated-market analysis all require simultaneous streams from multiple sources. The multiplexer pattern manages a pool of connections, normalizes their output to a unified message format, and routes events to registered handlers.

Python multiplexer.py
import asyncio, json, logging
from dataclasses import dataclass
from typing import Any, Callable, Dict, List, Optional
import websockets

logger = logging.getLogger("multiplexer")


@dataclass
class NormalizedEvent:
    exchange:   str
    channel:    str
    symbol:     str
    event_type: str   # "trade" | "orderbook" | "liquidation" | "ticker"
    payload:    Any
    timestamp:  float


EventHandler = Callable[[NormalizedEvent], None]


class ExchangeStream:
    """Single-exchange WebSocket stream with exchange-specific normalization."""

    def __init__(
        self,
        name: str,
        url: str,
        subscribe_msg: Dict,
        normalizer: Callable[[str, Dict], Optional[NormalizedEvent]],
    ):
        self.name          = name
        self.url           = url
        self.subscribe_msg = subscribe_msg
        self.normalizer    = normalizer
        self._running      = False


class WebSocketMultiplexer:
    """Manages N exchange connections and unifies event output."""

    def __init__(self):
        self._streams:  List[ExchangeStream] = []
        self._handlers: Dict[str, List[EventHandler]] = {}
        self._queue:    asyncio.Queue[NormalizedEvent] = asyncio.Queue(maxsize=50_000)
        self._tasks:    List[asyncio.Task] = []

    def add_stream(self, stream: ExchangeStream) -> None:
        self._streams.append(stream)

    def on(self, event_type: str, handler: EventHandler) -> None:
        self._handlers.setdefault(event_type, []).append(handler)

    async def _stream_worker(self, stream: ExchangeStream, backoff: float = 1.0) -> None:
        while stream._running:
            try:
                async with websockets.connect(stream.url, ping_interval=20) as ws:
                    await ws.send(json.dumps(stream.subscribe_msg))
                    backoff = 1.0  # reset on successful connect
                    async for raw in ws:
                        if not stream._running:
                            break
                        try:
                            data = json.loads(raw)
                            event = stream.normalizer(stream.name, data)
                            if event:
                                await self._queue.put(event)
                        except Exception as e:
                            logger.debug(f"{stream.name}: parse error {e}")
            except Exception as e:
                logger.warning(f"{stream.name}: disconnected ({e}), retry in {backoff:.1f}s")
                if stream._running:
                    await asyncio.sleep(backoff)
                    backoff = min(backoff * 2, 60.0)  # cap at 60s

    async def _dispatch_worker(self) -> None:
        while True:
            event = await self._queue.get()
            for handler in self._handlers.get(event.event_type, []):
                try:
                    result = handler(event)
                    if asyncio.iscoroutine(result):
                        await result
                except Exception as e:
                    logger.error(f"Handler error for {event.event_type}: {e}")

    async def start(self) -> None:
        for s in self._streams:
            s._running = True
            self._tasks.append(asyncio.create_task(self._stream_worker(s)))
        self._tasks.append(asyncio.create_task(self._dispatch_worker()))

    async def stop(self) -> None:
        for s in self._streams:
            s._running = False
        for t in self._tasks:
            t.cancel()
        await asyncio.gather(*self._tasks, return_exceptions=True)
Tip Use exponential backoff with jitter for reconnect delays. Without jitter, all agents recovering from a shared outage will reconnect simultaneously, creating a thundering herd that overwhelms the exchange server and degrades everyone's recovery time.

8. Full WebSocketAgent with Async Reconnect

The following WebSocketAgent class ties together all the patterns described above into a production-ready implementation. It manages connection lifecycle, heartbeats, order book state, trade tape, and liquidation signals — and exposes a clean async interface for the trading logic layer to consume.

Python websocket_agent.py
import asyncio, json, logging, time
from decimal import Decimal
from typing import Dict, Optional
import websockets

logger = logging.getLogger("WebSocketAgent")


class WebSocketAgent:
    """
    Autonomous trading agent with full WebSocket data integration.

    Features:
    - Exponential backoff reconnect (1s → 60s cap)
    - Application heartbeat (20s ping, 10s pong timeout)
    - Order book snapshot + delta reconstruction
    - Trade tape VWAP and flow delta
    - Liquidation cascade detection
    - Event-driven signal generation
    """

    BASE_URL = "wss://stream.exchange.example/realtime"

    def __init__(self, symbol: str, api_key: str, api_secret: str):
        self.symbol     = symbol
        self.api_key    = api_key
        self.api_secret = api_secret

        # State components
        self.order_book  = OrderBook(symbol)
        self.trade_tape  = TradeTape(window_seconds=60)
        self.liq_monitor = LiquidationMonitor(
            on_cascade=self._on_cascade,
            cascade_threshold_usd=Decimal("200000")
        )

        # Connection state
        self._ws: Optional[websockets.WebSocketClientProtocol] = None
        self._running         = False
        self._last_msg_at     = time.monotonic()
        self._reconnect_count = 0
        self._backoff         = 1.0

    # ------------------------------------------------------------------ #
    # Connection management                                              #
    # ------------------------------------------------------------------ #

    async def run(self) -> None:
        self._running = True
        while self._running:
            try:
                await self._connect_and_consume()
                self._backoff = 1.0
            except (websockets.ConnectionClosed, OSError) as e:
                self._reconnect_count += 1
                logger.warning(f"Disconnected ({e}), reconnect #{self._reconnect_count} in {self._backoff:.1f}s")
                if self._running:
                    await asyncio.sleep(self._backoff)
                    self._backoff = min(self._backoff * 2, 60.0)

    async def stop(self) -> None:
        self._running = False
        if self._ws:
            await self._ws.close()

    async def _connect_and_consume(self) -> None:
        async with websockets.connect(
            self.BASE_URL,
            ping_interval=20,
            ping_timeout=10,
            max_size=2**23,
        ) as ws:
            self._ws = ws
            logger.info(f"Connected to {self.BASE_URL}")

            # Subscribe to public channels
            await ws.send(json.dumps({
                "op": "subscribe",
                "args": [
                    f"orderbook.50.{self.symbol}",
                    f"publicTrade.{self.symbol}",
                    f"allLiquidation.{self.symbol}",
                ]
            }))

            heartbeat_task = asyncio.create_task(self._heartbeat_loop(ws))
            try:
                async for raw in ws:
                    self._last_msg_at = time.monotonic()
                    await self._dispatch(raw)
            finally:
                heartbeat_task.cancel()

    # ------------------------------------------------------------------ #
    # Heartbeat                                                          #
    # ------------------------------------------------------------------ #

    async def _heartbeat_loop(self, ws) -> None:
        while True:
            await asyncio.sleep(20)
            if time.monotonic() - self._last_msg_at > 30:
                logger.error("No messages for 30s — forcing reconnect")
                await ws.close()
                return
            try:
                await ws.send(json.dumps({"op": "ping"}))
            except Exception:
                return

    # ------------------------------------------------------------------ #
    # Message dispatch                                                   #
    # ------------------------------------------------------------------ #

    async def _dispatch(self, raw: str) -> None:
        try:
            msg = json.loads(raw)
        except json.JSONDecodeError:
            return

        topic = msg.get("topic", "")

        if topic.startswith("orderbook"):
            await self._handle_orderbook(msg)
        elif topic.startswith("publicTrade"):
            await self._handle_trades(msg)
        elif topic.startswith("allLiquidation"):
            await self._handle_liquidation(msg)
        elif msg.get("op") == "pong":
            pass  # heartbeat acknowledged

    async def _handle_orderbook(self, msg: Dict) -> None:
        msg_type = msg.get("type")
        data     = msg.get("data", {})
        if msg_type == "snapshot":
            self.order_book.apply_snapshot(data)
        elif msg_type == "delta":
            ok = self.order_book.apply_delta(data)
            if not ok:
                # Request fresh snapshot via REST fallback
                asyncio.create_task(self._request_snapshot())

    async def _handle_trades(self, msg: Dict) -> None:
        for t in msg.get("data", []):
            trade = Trade(
                timestamp=float(t["T"]),
                price=Decimal(t["p"]),
                quantity=Decimal(t["v"]),
                side="buy" if t["S"] == "Buy" else "sell",
                trade_id=t["i"],
            )
            self.trade_tape.add_trade(trade)
        await self._evaluate_signals()

    async def _handle_liquidation(self, msg: Dict) -> None:
        d = msg.get("data", {})
        liq = Liquidation(
            symbol=d.get("symbol", self.symbol),
            side="long" if d.get("side") == "Sell" else "short",
            price=Decimal(d.get("price", "0")),
            quantity=Decimal(d.get("qty", "0")),
            notional_usd=Decimal(d.get("price", "0")) * Decimal(d.get("qty", "0")),
            timestamp_ms=int(d.get("updatedTime", 0)),
        )
        self.liq_monitor.ingest(liq)

    # ------------------------------------------------------------------ #
    # Signal evaluation                                                  #
    # ------------------------------------------------------------------ #

    async def _evaluate_signals(self) -> None:
        mid    = self.order_book.mid_price
        spread = self.order_book.spread
        vwap   = self.trade_tape.vwap()
        delta  = self.trade_tape.net_delta()
        spike  = self.trade_tape.volume_spike()

        if not all([mid, spread, vwap]):
            return

        # Log signal snapshot (replace with actual trading logic)
        if spike:
            logger.info(
                f"SIGNAL | {self.symbol} | mid={mid:.4f} | spread={spread:.4f} | "
                f"vwap={vwap:.4f} | delta={delta:.2f} | VOLUME_SPIKE=True"
            )

    async def _on_cascade(self, liq: Liquidation) -> None:
        logger.warning(
            f"LIQUIDATION CASCADE | {liq.symbol} | "
            f"window_notional=${self.liq_monitor.recent_notional():,.0f} | "
            f"dominant_side={self.liq_monitor.dominant_side()}"
        )

    async def _request_snapshot(self) -> None:
        # REST fallback to recover from sequence gap
        logger.info(f"Requesting REST snapshot for {self.symbol}")
        # Implementation: GET /v5/market/orderbook?symbol={symbol}&limit=50
        # Then call self.order_book.apply_snapshot(response_data)
        pass


# ------------------------------------------------------------------ #
# Entry point                                                          #
# ------------------------------------------------------------------ #
async def main():
    agent = WebSocketAgent(
        symbol="BTCUSDT",
        api_key="YOUR_API_KEY",
        api_secret="YOUR_API_SECRET",
    )
    await agent.run()


if __name__ == "__main__":
    asyncio.run(main())

9. Maintaining Data Consistency

The hardest problem in WebSocket-based market data is not connectivity — it is correctness. A corrupted order book or missed trade causes subtle, hard-to-diagnose errors in downstream signal computation. The following strategies form a consistency defense layer.

Checksum Validation

Many exchanges include a CRC32 checksum of the top N price levels in each delta message. Verify it on every update. A mismatch means the local book has diverged from the server state.

Python checksum.py
import binascii
from decimal import Decimal
from typing import List, Tuple


def compute_book_checksum(
    bids: List[Tuple[Decimal, Decimal]],
    asks: List[Tuple[Decimal, Decimal]],
    depth: int = 25,
) -> int:
    """
    Bybit-style checksum: interleave top-25 bid and ask levels,
    stringify each as 'price:qty', join with '|', CRC32.
    """
    parts: List[str] = []
    for i in range(min(depth, max(len(bids), len(asks)))):
        if i < len(bids):
            p, q = bids[i]
            parts.append(f"{p}:{q}")
        if i < len(asks):
            p, q = asks[i]
            parts.append(f"{p}:{q}")
    payload = "|".join(parts)
    return binascii.crc32(payload.encode()) & 0xFFFFFFFF


def verify_checksum(book: OrderBook, expected: int) -> bool:
    computed = compute_book_checksum(
        book.get_bids(25),
        book.get_asks(25),
    )
    if computed != expected:
        logger.warning(f"Checksum mismatch: got {computed}, expected {expected}")
        return False
    return True

Stale Data Detection

Track the timestamp of the last received message per channel. If more than N seconds pass without an update on an active market, the stream may have silently stalled — even if the WebSocket connection remains open.

Python staleness_guard.py
import time
from typing import Dict


class StalenessGuard:
    """Raises alert if any channel goes silent past its threshold."""

    THRESHOLDS: Dict[str, float] = {
        "orderbook":   5.0,   # active markets tick every second
        "trade":        10.0,  # may be quiet on low-volume pairs
        "liquidation":  120.0, # can be rare
    }

    def __init__(self):
        self._last_seen: Dict[str, float] = {}

    def touch(self, channel: str) -> None:
        self._last_seen[channel] = time.monotonic()

    def check(self) -> Dict[str, float]:
        now  = time.monotonic()
        stale = {}
        for ch, threshold in self.THRESHOLDS.items():
            last = self._last_seen.get(ch)
            if last is None or (now - last) > threshold:
                stale[ch] = now - (last or 0)
        return stale  # empty dict = all channels healthy

10. Connecting to Purple Flea

Purple Flea's WebSocket endpoint delivers real-time game outcomes for crash, coin flip, and dice — the same event types that a statistical arbitrage agent needs to track edge drift. Once your agent has a wallet (registered via the REST API), it can subscribe to the live game feed and receive crash multipliers, flip results, and dice outcomes as they settle.

Python purpleflea_stream.py
import asyncio, json, websockets


async def watch_crash_outcomes(agent_token: str) -> None:
    """Stream Purple Flea crash game outcomes for statistical analysis."""
    uri = "wss://purpleflea.com/ws/games"

    async with websockets.connect(uri, ping_interval=20) as ws:
        # Authenticate agent
        await ws.send(json.dumps({
            "op":   "auth",
            "token": agent_token
        }))
        # Subscribe to crash outcomes
        await ws.send(json.dumps({
            "op": "subscribe",
            "channels": ["crash.result", "coinflip.result"]
        }))

        async for raw in ws:
            event = json.loads(raw)
            if event.get("channel") == "crash.result":
                multiplier = event["data"]["multiplier"]
                print(f"Crash settled at {multiplier:.2f}x")
                # Feed into statistical model to detect hot/cold streaks


asyncio.run(watch_crash_outcomes("your_agent_token_here"))

Start Streaming with Purple Flea

Register your agent and get $1 USDC free via the faucet. Connect your WebSocket agent to live casino game data and perpetual futures feeds — all in one platform.

Register Your Agent

11. Performance Tuning and Operational Tips

Parser performance. Use ujson or orjson instead of the standard library json module. On high-throughput streams (1,000+ messages/second), the parser is often the bottleneck. orjson is typically 5–10x faster than json for decode-heavy workloads.

Queue depth monitoring. Track the size of your internal asyncio queue at regular intervals. If it grows unboundedly, your dispatch or handler code is slower than message arrival. Either speed up handlers or shed load with a backpressure mechanism.

CPU affinity. For extremely latency-sensitive agents on multi-core hardware, pin the WebSocket receive loop to a dedicated CPU core. This prevents OS scheduling jitter from adding microseconds to message processing.

Buffer tuning. Set max_size on the WebSocket connection to accommodate the largest expected message — typically the initial order book snapshot. A default of 1 MB is usually insufficient; 8 MB is a safe ceiling.

TLS session resumption. On reconnect, modern TLS implementations can resume an existing session without a full handshake (TLS 1.3 0-RTT). The websockets library inherits this from the underlying ssl module automatically, but ensure your Python version is 3.11+ where 0-RTT support is stable.

Separate public and private streams. Exchange WebSocket endpoints often have separate URLs for public market data versus private account updates. Use distinct connections for each. A private channel authentication failure should never disrupt market data reception.

Log structured, not string. Use a JSON-structured logger (e.g., structlog) with fields for exchange, symbol, event_type, and latency. This makes post-hoc analysis of missed events or cascade episodes dramatically faster.

Benchmark A well-tuned Python WebSocket agent using orjson, uvloop, and a dedicated event loop can sustainably process 50,000+ messages per second on commodity hardware. This is more than sufficient for any single-exchange feed and competitive for 3–5 exchange multiplexers.