Tools Trading

Order Flow Analysis for AI Agents

March 6, 2026 ยท 20 min read ยท Purple Flea Research

What order flow tells you about market direction: volume delta, cumulative delta, footprint charts, VPIN, order flow imbalance, absorptions and exhaustions. Build a Python order flow analyzer from raw trade data and generate real-time signals via Purple Flea's trading feed.

What Is Order Flow?

Every price move is caused by orders. Order flow analysis studies the sequence and size of individual trades โ€” who is hitting the bid versus lifting the offer, and in what volumes. Unlike candlestick analysis which only shows open, high, low, and close, order flow reveals the aggression and conviction behind each price tick.

For an AI agent, order flow is the closest thing to reading the market's mind in real time. It answers questions that price charts cannot: Are buyers aggressive or passive? Is the selling pressure being absorbed? Is this rally running out of steam as participants take profit?

The data source is a tick-by-tick trade feed โ€” every single executed trade with its price, size, and aggressor direction (buyer-initiated vs. seller-initiated). From this raw stream, we build progressively more powerful indicators.

Core Metric
Delta
Buy volume minus sell volume
Key Pattern
Absorption
Delta diverges from price
Toxicity Metric
VPIN
0โ€“1, informed flow intensity
Signal Latency
<50ms
Real-time from WebSocket feed

Volume Delta

Volume delta is the simplest order flow metric: delta = buy_volume โˆ’ sell_volume for a given candle or time bucket. A positive delta means buyers were more aggressive; negative means sellers were more aggressive.

Crucially, delta and price often diverge โ€” and that divergence is the signal:

Classifying Trades with the Tick Rule

When a trade feed does not include aggressor direction, use the tick rule: if a trade executes above the previous trade price, classify as buyer-initiated; if below, seller-initiated; if at the same price, inherit the previous classification. This achieves ~85% accuracy vs. actual exchange data.

tick_rule.pyfrom dataclasses import dataclass
from typing import List, Literal
import numpy as np

@dataclass
class Trade:
    ts: float       # Unix timestamp
    price: float
    size: float
    side: Literal["buy", "sell", "unknown"] = "unknown"

def classify_trades_tick_rule(trades: List[Trade]) -> List[Trade]:
    """Classify aggressor direction using the Lee-Ready tick rule."""
    last_price = None
    last_side = "buy"
    result = []
    for t in trades:
        if last_price is None or t.price > last_price:
            side = "buy"
        elif t.price < last_price:
            side = "sell"
        else:
            side = last_side  # zero-tick: inherit
        last_price = t.price
        last_side = side
        result.append(Trade(ts=t.ts, price=t.price, size=t.size, side=side))
    return result

def compute_delta(trades: List[Trade]) -> float:
    """Volume delta for a list of trades."""
    buy_vol  = sum(t.size for t in trades if t.side == "buy")
    sell_vol = sum(t.size for t in trades if t.side == "sell")
    return buy_vol - sell_vol

def compute_imbalance_ratio(trades: List[Trade]) -> float:
    """Order flow imbalance ratio: (buy_vol - sell_vol) / total_vol. Range [-1, 1]."""
    buy_vol  = sum(t.size for t in trades if t.side == "buy")
    sell_vol = sum(t.size for t in trades if t.side == "sell")
    total = buy_vol + sell_vol
    if total == 0: return 0.0
    return (buy_vol - sell_vol) / total

Cumulative Delta

Cumulative delta (CD) is the running total of delta over a session or time window. It provides a continuous "pressure gauge" โ€” think of it as a volume-weighted tide that shows whether buyers or sellers have been in structural control over the entire period.

The most powerful signal is a CD divergence: when price is trending in one direction but CD is trending in the opposite direction, a reversal is statistically likely within 3โ€“8 candles. This is the order flow equivalent of a bearish/bullish price-momentum divergence.

cumulative_delta.pyimport pandas as pd
import numpy as np
from typing import List

def build_ohlcv_delta(trades: List[Trade], freq: str = "1min") -> pd.DataFrame:
    """
    Aggregate trades into OHLCV + delta bars.
    Returns DataFrame with columns: open, high, low, close, volume,
    buy_vol, sell_vol, delta, cum_delta, imbalance.
    """
    df = pd.DataFrame([
        {"ts": pd.Timestamp(t.ts, unit="s"), "price": t.price,
         "size": t.size, "side": t.side}
        for t in trades
    ])
    df.set_index("ts", inplace=True)
    df["buy"]  = df["size"].where(df["side"] == "buy", 0)
    df["sell"] = df["size"].where(df["side"] == "sell", 0)

    bars = df.resample(freq).agg(
        open=("price", "first"),
        high=("price", "max"),
        low=("price", "min"),
        close=("price", "last"),
        volume=("size", "sum"),
        buy_vol=("buy", "sum"),
        sell_vol=("sell", "sum"),
    ).dropna()

    bars["delta"]     = bars["buy_vol"] - bars["sell_vol"]
    bars["cum_delta"] = bars["delta"].cumsum()
    bars["imbalance"] = bars["delta"] / (bars["volume"] + 1e-9)
    return bars

def detect_cd_divergence(bars: pd.DataFrame, lookback: int = 10) -> pd.Series:
    """
    Returns a Series of divergence signals:
    +1 = bullish divergence (price down, CD up)
    -1 = bearish divergence (price up, CD down)
     0 = no divergence
    """
    price_change = bars["close"].diff(lookback)
    cd_change    = bars["cum_delta"].diff(lookback)
    signal = pd.Series(0, index=bars.index)
    signal[(price_change < 0) & (cd_change > 0)] =  1  # bullish
    signal[(price_change > 0) & (cd_change < 0)] = -1  # bearish
    return signal

Footprint Charts

A footprint chart shows buy and sell volume at each price level within each candle. It is the highest-resolution visualization of order flow โ€” you can see exactly which price levels attracted the most aggression and where the market found support or resistance.

Key patterns in footprint charts:

Footprint Chart โ€” 1-minute BTC/USD candle at $67,240 (example)
Price
Bidร—Ask
Delta
Total Vol
Imbalance
Signal
67,260
18ร—4
-14
22
-64%
SELL DOM
67,255
34ร—11
-23
45
-51%
โ€“
67,250
28ร—22
+6
50
+12%
โ€“
67,245 โ˜…
12ร—89
+77
101
+76%
ABSORB
67,240
9ร—67
+58
76
+76%
BUY DOM
67,235
6ร—41
+35
47
+74%
BUY DOM

โ˜… POC at 67,245: high volume absorption with +77 delta despite sell pressure above. Stacked buy dominance at 67,240โ€“67,245 = bullish setup for continuation.

footprint.pyfrom collections import defaultdict
from dataclasses import dataclass, field
from typing import Dict, List, Tuple

@dataclass
class FootprintLevel:
    price: float
    buy_vol: float = 0.0
    sell_vol: float = 0.0

    @property
    def delta(self) -> float: return self.buy_vol - self.sell_vol
    @property
    def total(self) -> float: return self.buy_vol + self.sell_vol
    @property
    def imbalance(self) -> float:
        if self.total == 0: return 0.0
        return self.delta / self.total

@dataclass
class FootprintCandle:
    open: float
    high: float
    low: float
    close: float
    levels: Dict[float, FootprintLevel] = field(default_factory=dict)

    @property
    def poc(self) -> float:
        """Price of Control โ€” price level with highest total volume."""
        if not self.levels: return self.close
        return max(self.levels, key=lambda p: self.levels[p].total)

    @property
    def total_delta(self) -> float:
        return sum(l.delta for l in self.levels.values())

    def stacked_imbalances(self, threshold: float = 0.5,
                           min_levels: int = 3) -> List[Tuple[str, float, float]]:
        """
        Detect stacked imbalances: consecutive levels all biased in same direction.
        Returns list of (direction, start_price, end_price).
        """
        sorted_prices = sorted(self.levels.keys())
        results = []
        streak = []
        streak_dir = None
        for p in sorted_prices:
            lvl = self.levels[p]
            if lvl.imbalance > threshold:
                d = "buy"
            elif lvl.imbalance < -threshold:
                d = "sell"
            else:
                d = None
            if d == streak_dir and d is not None:
                streak.append(p)
            else:
                if streak_dir and len(streak) >= min_levels:
                    results.append((streak_dir, streak[0], streak[-1]))
                streak = [p] if d else []
                streak_dir = d
        return results

def build_footprint(trades: List[Trade], tick_size: float = 1.0) -> FootprintCandle:
    """Build a footprint candle from a list of classified trades."""
    if not trades:
        raise ValueError("No trades provided")
    prices = [t.price for t in trades]
    candle = FootprintCandle(
        open=prices[0], high=max(prices),
        low=min(prices), close=prices[-1]
    )
    for t in trades:
        # Round price to nearest tick
        rounded = round(t.price / tick_size) * tick_size
        if rounded not in candle.levels:
            candle.levels[rounded] = FootprintLevel(price=rounded)
        lvl = candle.levels[rounded]
        if t.side == "buy":
            lvl.buy_vol += t.size
        else:
            lvl.sell_vol += t.size
    return candle

VPIN: Volume-Synchronized Probability of Informed Trading

VPIN was developed by Easley, de Prado, and O'Hara as a real-time toxicity measure โ€” it estimates the probability that a given trade was placed by an informed participant (someone with an information edge) rather than a noise trader. High VPIN indicates a market dominated by informed flow, which precedes adverse price movements for market makers and liquidity providers.

VPIN is computed over volume buckets rather than time buckets. Each bucket contains exactly V units of traded volume. For each bucket, VPIN_bucket = |buy_vol - sell_vol| / V. The rolling VPIN is the average over the last N buckets (typically N = 50).

Threshold: VPIN > 0.50 historically precedes significant volatility events within the next 1โ€“4 hours. At VPIN = 0.70+, the market is in a "toxic flow" regime and naive market-making strategies will lose money consistently.

vpin.pyimport numpy as np
from collections import deque
from typing import List, Deque

class VPINCalculator:
    """
    VPIN (Volume-Synchronized Probability of Informed Trading).
    Paper: Easley, de Prado, O'Hara (2012).
    """
    def __init__(self, bucket_size: float = 50.0, n_buckets: int = 50):
        self.bucket_size = bucket_size
        self.n_buckets = n_buckets
        self._current_bucket_vol = 0.0
        self._current_buy_vol = 0.0
        self._bucket_vpins: Deque[float] = deque(maxlen=n_buckets)

    def update(self, trade: Trade) -> float | None:
        """
        Process a single trade. Returns current VPIN when a bucket completes,
        otherwise returns None.
        """
        remaining = self.bucket_size - self._current_bucket_vol
        if trade.size >= remaining:
            # Fill current bucket
            fraction = remaining / trade.size
            if trade.side == "buy":
                self._current_buy_vol += remaining
            else:
                pass  # sell
            # Complete bucket
            bucket_vpin = abs(2 * self._current_buy_vol - self.bucket_size) / self.bucket_size
            self._bucket_vpins.append(bucket_vpin)
            # Reset
            leftover = trade.size - remaining
            self._current_bucket_vol = leftover
            self._current_buy_vol = leftover if trade.side == "buy" else 0.0
        else:
            self._current_bucket_vol += trade.size
            if trade.side == "buy":
                self._current_buy_vol += trade.size

        if len(self._bucket_vpins) == self.n_buckets:
            return float(np.mean(self._bucket_vpins))
        return None

    @property
    def current_vpin(self) -> float:
        if not self._bucket_vpins:
            return 0.5
        return float(np.mean(self._bucket_vpins))

    @property
    def regime(self) -> str:
        v = self.current_vpin
        if v < 0.30: return "low_toxicity"
        if v < 0.50: return "normal"
        if v < 0.70: return "elevated"
        return "toxic"

Order Flow Imbalance Signals

Order flow imbalance (OFI) measures the directional pressure on prices at the top of the order book. Unlike volume delta which looks at executed trades, OFI tracks changes in the limit order book โ€” specifically, the net change in bid size minus the net change in ask size each millisecond.

Key insight from Cont, Kukanov, Stoikov (2014): OFI explains ~50% of the variance in short-term (15-second to 5-minute) price changes. It is the single most predictive microstructure variable for short-horizon price impact.

ofi.pyfrom dataclasses import dataclass
from typing import Optional

@dataclass
class BookSnapshot:
    ts: float
    best_bid: float
    best_ask: float
    bid_size: float
    ask_size: float

def compute_ofi(prev: BookSnapshot, curr: BookSnapshot) -> float:
    """
    Order Flow Imbalance between two consecutive book snapshots.
    OFI = delta_bid - delta_ask
    delta_bid = bid_size_change if bid_price unchanged, else +/-bid_size
    delta_ask = ask_size_change if ask_price unchanged, else +/-ask_size
    """
    # Bid side
    if curr.best_bid > prev.best_bid:
        delta_bid = curr.bid_size           # new bid level: full size is net new
    elif curr.best_bid == prev.best_bid:
        delta_bid = curr.bid_size - prev.bid_size  # same level: net change
    else:
        delta_bid = -prev.bid_size          # bid retreated: all size gone

    # Ask side
    if curr.best_ask < prev.best_ask:
        delta_ask = curr.ask_size           # new ask level: full size
    elif curr.best_ask == prev.best_ask:
        delta_ask = curr.ask_size - prev.ask_size
    else:
        delta_ask = -prev.ask_size          # ask retreated

    return delta_bid - delta_ask

class RollingOFI:
    """Rolling OFI sum over a sliding window of N snapshots."""
    def __init__(self, window: int = 100):
        self.window = window
        self._values: list[float] = []
        self._prev: Optional[BookSnapshot] = None

    def update(self, snap: BookSnapshot) -> Optional[float]:
        if self._prev is not None:
            ofi = compute_ofi(self._prev, snap)
            self._values.append(ofi)
            if len(self._values) > self.window:
                self._values.pop(0)
        self._prev = snap
        if len(self._values) == self.window:
            return sum(self._values)
        return None

Absorptions and Exhaustions

Absorption occurs when a large flow of market orders in one direction fails to move the price โ€” the opposing side is providing deep liquidity and absorbing the aggressor. This signals strong directional bias from the absorber and often precedes a sharp reversal.

Detection criteria: buy absorption requires delta > threshold AND (close - open) < -0.1% in the same candle. Sell absorption requires delta < -threshold AND (close - open) > 0.1%.

Exhaustion is the opposite: the aggressor is still winning (price moving in their direction) but with decreasing delta over consecutive candles. The move is losing momentum from within โ€” a leading indicator of trend exhaustion before price confirms it.

absorption_detection.pyimport pandas as pd
import numpy as np

def detect_absorptions(bars: pd.DataFrame,
                       delta_pct_threshold: float = 0.30,
                       price_reversal_threshold: float = 0.001) -> pd.DataFrame:
    """
    Detect absorption candles.
    delta_pct_threshold: delta / volume must exceed this (e.g., 0.30 = 30% net bias)
    price_reversal_threshold: price must move against the delta direction (e.g., 0.1%)
    """
    bars = bars.copy()
    bars["delta_pct"]   = bars["delta"] / (bars["volume"] + 1e-9)
    bars["price_chg"]   = (bars["close"] - bars["open"]) / bars["open"]

    # Buy absorption: heavy buying, but price fell (sellers absorbed the buying)
    bars["buy_absorption"] = (
        (bars["delta_pct"] > delta_pct_threshold) &
        (bars["price_chg"] < -price_reversal_threshold)
    )
    # Sell absorption: heavy selling, but price rose (buyers absorbed the selling)
    bars["sell_absorption"] = (
        (bars["delta_pct"] < -delta_pct_threshold) &
        (bars["price_chg"] > price_reversal_threshold)
    )
    return bars

def detect_exhaustion(bars: pd.DataFrame, lookback: int = 4,
                      min_decline_pct: float = 0.50) -> pd.Series:
    """
    Detect delta exhaustion: price in uptrend but delta declining.
    Returns +1 for sell exhaustion (top signal), -1 for buy exhaustion (bottom signal).
    """
    signal = pd.Series(0, index=bars.index)
    for i in range(lookback, len(bars)):
        window = bars.iloc[i-lookback:i+1]
        price_trend = window["close"].iloc[-1] - window["close"].iloc[0]
        delta_trend = window["delta"].iloc[-1] - window["delta"].iloc[0]
        # Uptrend with declining delta = sell exhaustion
        if price_trend > 0 and delta_trend < -abs(price_trend) * min_decline_pct:
            signal.iloc[i] = 1
        # Downtrend with rising delta = buy exhaustion
        elif price_trend < 0 and delta_trend > abs(price_trend) * min_decline_pct:
            signal.iloc[i] = -1
    return signal

Full Order Flow Analyzer

Combining all the above components into a single OrderFlowAnalyzer class that consumes a WebSocket trade stream and emits real-time signals:

order_flow_analyzer.pyimport asyncio
import json
import websockets
from dataclasses import dataclass, field
from typing import List, Callable, Optional
from datetime import datetime

@dataclass
class OFASignal:
    ts: datetime
    price: float
    delta: float
    cum_delta: float
    vpin: float
    regime: str
    patterns: List[str]
    strength: float   # -1 to 1, negative = bearish

class OrderFlowAnalyzer:
    """
    Real-time order flow analyzer connected to Purple Flea WebSocket feed.
    Emits OFASignal on significant order flow events.
    """
    def __init__(self, symbol: str = "BTC/USD",
                 bar_seconds: int = 60,
                 on_signal: Optional[Callable] = None):
        self.symbol = symbol
        self.bar_seconds = bar_seconds
        self.on_signal = on_signal

        self._trades: List[Trade] = []
        self._bar_start: Optional[float] = None
        self._cum_delta = 0.0
        self._vpin = VPINCalculator(bucket_size=50.0, n_buckets=50)
        self._prev_bars: List[dict] = []

    async def connect(self, ws_url: str, api_key: str):
        """Connect to Purple Flea trade feed WebSocket."""
        async with websockets.connect(ws_url) as ws:
            await ws.send(json.dumps({
                "method": "subscribe",
                "params": {"channel": "trades", "symbol": self.symbol},
                "auth": api_key
            }))
            async for message in ws:
                data = json.loads(message)
                if data.get("type") == "trade":
                    await self._on_trade(Trade(
                        ts=data["ts"],
                        price=data["price"],
                        size=data["size"],
                        side=data["side"]
                    ))

    async def _on_trade(self, trade: Trade):
        now = trade.ts
        if self._bar_start is None:
            self._bar_start = now

        # Accumulate trades in current bar
        self._trades.append(trade)

        # Update VPIN continuously
        self._vpin.update(trade)

        # Flush bar when time window closes
        if now - self._bar_start >= self.bar_seconds:
            await self._flush_bar()
            self._bar_start = now
            self._trades = []

    async def _flush_bar(self):
        if not self._trades:
            return
        trades_classified = classify_trades_tick_rule(self._trades)
        delta = compute_delta(trades_classified)
        self._cum_delta += delta

        fp = build_footprint(trades_classified)
        stacked = fp.stacked_imbalances(threshold=0.5, min_levels=3)

        # Build bar dict for exhaustion detection
        prices = [t.price for t in self._trades]
        bar = {
            "open": prices[0], "high": max(prices),
            "low": min(prices), "close": prices[-1],
            "delta": delta, "volume": sum(t.size for t in self._trades)
        }
        self._prev_bars.append(bar)
        if len(self._prev_bars) > 20:
            self._prev_bars.pop(0)

        patterns = []
        strength = 0.0

        # Absorption
        if delta > 0 and (bar["close"] < bar["open"]):
            patterns.append("buy_absorption")
            strength -= 0.7  # bearish signal
        elif delta < 0 and (bar["close"] > bar["open"]):
            patterns.append("sell_absorption")
            strength += 0.7  # bullish signal

        # Stacked imbalances
        for direction, p_start, p_end in stacked:
            if direction == "buy":
                patterns.append(f"stacked_buy@{p_start:.0f}-{p_end:.0f}")
                strength += 0.5
            else:
                patterns.append(f"stacked_sell@{p_start:.0f}-{p_end:.0f}")
                strength -= 0.5

        # VPIN regime
        vpin_val = self._vpin.current_vpin
        vpin_regime = self._vpin.regime
        if vpin_regime == "toxic":
            patterns.append("vpin_toxic")

        signal = OFASignal(
            ts=datetime.utcnow(),
            price=bar["close"],
            delta=delta,
            cum_delta=self._cum_delta,
            vpin=vpin_val,
            regime=vpin_regime,
            patterns=patterns,
            strength=max(-1, min(1, strength))
        )
        if self.on_signal and patterns:
            await self.on_signal(signal)

# Entry point
async def main():
    def on_signal(sig: OFASignal):
        print(f"[{sig.ts:%H:%M:%S}] price={sig.price:.0f} "
              f"delta={sig.delta:+.1f} vpin={sig.vpin:.3f} "
              f"patterns={sig.patterns} strength={sig.strength:+.2f}")
    analyzer = OrderFlowAnalyzer(symbol="BTC/USD", on_signal=on_signal)
    await analyzer.connect(
        ws_url="wss://purpleflea.com/ws/trades",
        api_key="pf_live_"
    )

if __name__ == "__main__":
    asyncio.run(main())

Real-Time Signal Generation and Trading

The order flow signals from the analyzer feed directly into trade execution logic. The key principle: never trade against absorption signals. If you see buy absorption (heavy buying that the market absorbed without moving price up), the smart money is clearly selling into strength โ€” align with them.

PatternInterpretationTrade ActionConfidence
Buy absorptionSellers absorbing buyersSell / short biasHigh
Sell absorptionBuyers absorbing sellersBuy / long biasHigh
Stacked buy imbalanceAggressive sequential buyingBuy momentumMedium
Stacked sell imbalanceAggressive sequential sellingSell momentumMedium
CD bearish divergencePrice up, cumulative delta downReduce longsMedium-High
VPIN toxic (>0.7)Informed flow dominantWiden stops, reduce sizeHigh
Delta exhaustion (up)Rally losing internal strengthTighten profit targetsMedium

Important: Order flow signals are most reliable at key structural price levels (POC from previous session, daily open, high-volume nodes). A buy absorption signal at a weekly POC is far more significant than one in open air. Always combine order flow with structural context.

Trade With Purple Flea's Live Feed

Connect your OrderFlowAnalyzer to Purple Flea's real-time WebSocket trade stream. Tick-level data, sub-50ms latency, 6 trading pairs.

Get API Key โ†’

Summary

Order flow analysis gives AI agents a structural edge by revealing the intentions behind price moves rather than just the price itself. The four-layer framework covered here โ€” volume delta for directional pressure, cumulative delta for structural bias, footprint charts for level-by-level precision, and VPIN for toxicity regime detection โ€” provides a complete picture of market microstructure.

The OrderFlowAnalyzer class integrates all layers into a real-time signal generator that connects directly to Purple Flea's WebSocket trade feed. Used in combination with the sentiment index and structural price levels, order flow signals form the core of a robust systematic trading framework for autonomous agents.