What Is Order Flow?
Every price move is caused by orders. Order flow analysis studies the sequence and size of individual trades โ who is hitting the bid versus lifting the offer, and in what volumes. Unlike candlestick analysis which only shows open, high, low, and close, order flow reveals the aggression and conviction behind each price tick.
For an AI agent, order flow is the closest thing to reading the market's mind in real time. It answers questions that price charts cannot: Are buyers aggressive or passive? Is the selling pressure being absorbed? Is this rally running out of steam as participants take profit?
The data source is a tick-by-tick trade feed โ every single executed trade with its price, size, and aggressor direction (buyer-initiated vs. seller-initiated). From this raw stream, we build progressively more powerful indicators.
Volume Delta
Volume delta is the simplest order flow metric: delta = buy_volume โ sell_volume for a given candle or time bucket. A positive delta means buyers were more aggressive; negative means sellers were more aggressive.
Crucially, delta and price often diverge โ and that divergence is the signal:
- Bearish divergence: Price makes a new high, but delta makes a lower high. Buyers are pushing price up with decreasing conviction โ reversal risk is elevated.
- Bullish divergence: Price makes a new low, but delta makes a higher low. Sellers are pushing price down but losing steam โ bounce likely.
- Delta confirmation: Price and delta both at new highs โ trend continuation is likely.
Classifying Trades with the Tick Rule
When a trade feed does not include aggressor direction, use the tick rule: if a trade executes above the previous trade price, classify as buyer-initiated; if below, seller-initiated; if at the same price, inherit the previous classification. This achieves ~85% accuracy vs. actual exchange data.
tick_rule.pyfrom dataclasses import dataclass
from typing import List, Literal
import numpy as np
@dataclass
class Trade:
ts: float # Unix timestamp
price: float
size: float
side: Literal["buy", "sell", "unknown"] = "unknown"
def classify_trades_tick_rule(trades: List[Trade]) -> List[Trade]:
"""Classify aggressor direction using the Lee-Ready tick rule."""
last_price = None
last_side = "buy"
result = []
for t in trades:
if last_price is None or t.price > last_price:
side = "buy"
elif t.price < last_price:
side = "sell"
else:
side = last_side # zero-tick: inherit
last_price = t.price
last_side = side
result.append(Trade(ts=t.ts, price=t.price, size=t.size, side=side))
return result
def compute_delta(trades: List[Trade]) -> float:
"""Volume delta for a list of trades."""
buy_vol = sum(t.size for t in trades if t.side == "buy")
sell_vol = sum(t.size for t in trades if t.side == "sell")
return buy_vol - sell_vol
def compute_imbalance_ratio(trades: List[Trade]) -> float:
"""Order flow imbalance ratio: (buy_vol - sell_vol) / total_vol. Range [-1, 1]."""
buy_vol = sum(t.size for t in trades if t.side == "buy")
sell_vol = sum(t.size for t in trades if t.side == "sell")
total = buy_vol + sell_vol
if total == 0: return 0.0
return (buy_vol - sell_vol) / total
Cumulative Delta
Cumulative delta (CD) is the running total of delta over a session or time window. It provides a continuous "pressure gauge" โ think of it as a volume-weighted tide that shows whether buyers or sellers have been in structural control over the entire period.
The most powerful signal is a CD divergence: when price is trending in one direction but CD is trending in the opposite direction, a reversal is statistically likely within 3โ8 candles. This is the order flow equivalent of a bearish/bullish price-momentum divergence.
cumulative_delta.pyimport pandas as pd
import numpy as np
from typing import List
def build_ohlcv_delta(trades: List[Trade], freq: str = "1min") -> pd.DataFrame:
"""
Aggregate trades into OHLCV + delta bars.
Returns DataFrame with columns: open, high, low, close, volume,
buy_vol, sell_vol, delta, cum_delta, imbalance.
"""
df = pd.DataFrame([
{"ts": pd.Timestamp(t.ts, unit="s"), "price": t.price,
"size": t.size, "side": t.side}
for t in trades
])
df.set_index("ts", inplace=True)
df["buy"] = df["size"].where(df["side"] == "buy", 0)
df["sell"] = df["size"].where(df["side"] == "sell", 0)
bars = df.resample(freq).agg(
open=("price", "first"),
high=("price", "max"),
low=("price", "min"),
close=("price", "last"),
volume=("size", "sum"),
buy_vol=("buy", "sum"),
sell_vol=("sell", "sum"),
).dropna()
bars["delta"] = bars["buy_vol"] - bars["sell_vol"]
bars["cum_delta"] = bars["delta"].cumsum()
bars["imbalance"] = bars["delta"] / (bars["volume"] + 1e-9)
return bars
def detect_cd_divergence(bars: pd.DataFrame, lookback: int = 10) -> pd.Series:
"""
Returns a Series of divergence signals:
+1 = bullish divergence (price down, CD up)
-1 = bearish divergence (price up, CD down)
0 = no divergence
"""
price_change = bars["close"].diff(lookback)
cd_change = bars["cum_delta"].diff(lookback)
signal = pd.Series(0, index=bars.index)
signal[(price_change < 0) & (cd_change > 0)] = 1 # bullish
signal[(price_change > 0) & (cd_change < 0)] = -1 # bearish
return signal
Footprint Charts
A footprint chart shows buy and sell volume at each price level within each candle. It is the highest-resolution visualization of order flow โ you can see exactly which price levels attracted the most aggression and where the market found support or resistance.
Key patterns in footprint charts:
- Stacked imbalances: Multiple consecutive price levels where buy volume significantly exceeds sell volume (3:1 or better) โ indicates strong directional conviction.
- Absorption: Large sell volume at a price level but price does not drop โ the sell pressure is being absorbed by aggressive buyers. Bullish setup.
- Exhaustion: The last few price levels in a move show decreasing delta despite continued price extension โ buyers are running out of steam.
- POC (Point of Control): The price level with the highest total volume in a candle โ acts as a magnet and often becomes future support/resistance.
โ POC at 67,245: high volume absorption with +77 delta despite sell pressure above. Stacked buy dominance at 67,240โ67,245 = bullish setup for continuation.
footprint.pyfrom collections import defaultdict
from dataclasses import dataclass, field
from typing import Dict, List, Tuple
@dataclass
class FootprintLevel:
price: float
buy_vol: float = 0.0
sell_vol: float = 0.0
@property
def delta(self) -> float: return self.buy_vol - self.sell_vol
@property
def total(self) -> float: return self.buy_vol + self.sell_vol
@property
def imbalance(self) -> float:
if self.total == 0: return 0.0
return self.delta / self.total
@dataclass
class FootprintCandle:
open: float
high: float
low: float
close: float
levels: Dict[float, FootprintLevel] = field(default_factory=dict)
@property
def poc(self) -> float:
"""Price of Control โ price level with highest total volume."""
if not self.levels: return self.close
return max(self.levels, key=lambda p: self.levels[p].total)
@property
def total_delta(self) -> float:
return sum(l.delta for l in self.levels.values())
def stacked_imbalances(self, threshold: float = 0.5,
min_levels: int = 3) -> List[Tuple[str, float, float]]:
"""
Detect stacked imbalances: consecutive levels all biased in same direction.
Returns list of (direction, start_price, end_price).
"""
sorted_prices = sorted(self.levels.keys())
results = []
streak = []
streak_dir = None
for p in sorted_prices:
lvl = self.levels[p]
if lvl.imbalance > threshold:
d = "buy"
elif lvl.imbalance < -threshold:
d = "sell"
else:
d = None
if d == streak_dir and d is not None:
streak.append(p)
else:
if streak_dir and len(streak) >= min_levels:
results.append((streak_dir, streak[0], streak[-1]))
streak = [p] if d else []
streak_dir = d
return results
def build_footprint(trades: List[Trade], tick_size: float = 1.0) -> FootprintCandle:
"""Build a footprint candle from a list of classified trades."""
if not trades:
raise ValueError("No trades provided")
prices = [t.price for t in trades]
candle = FootprintCandle(
open=prices[0], high=max(prices),
low=min(prices), close=prices[-1]
)
for t in trades:
# Round price to nearest tick
rounded = round(t.price / tick_size) * tick_size
if rounded not in candle.levels:
candle.levels[rounded] = FootprintLevel(price=rounded)
lvl = candle.levels[rounded]
if t.side == "buy":
lvl.buy_vol += t.size
else:
lvl.sell_vol += t.size
return candle
VPIN: Volume-Synchronized Probability of Informed Trading
VPIN was developed by Easley, de Prado, and O'Hara as a real-time toxicity measure โ it estimates the probability that a given trade was placed by an informed participant (someone with an information edge) rather than a noise trader. High VPIN indicates a market dominated by informed flow, which precedes adverse price movements for market makers and liquidity providers.
VPIN is computed over volume buckets rather than time buckets. Each bucket contains exactly V units of traded volume. For each bucket, VPIN_bucket = |buy_vol - sell_vol| / V. The rolling VPIN is the average over the last N buckets (typically N = 50).
Threshold: VPIN > 0.50 historically precedes significant volatility events within the next 1โ4 hours. At VPIN = 0.70+, the market is in a "toxic flow" regime and naive market-making strategies will lose money consistently.
vpin.pyimport numpy as np
from collections import deque
from typing import List, Deque
class VPINCalculator:
"""
VPIN (Volume-Synchronized Probability of Informed Trading).
Paper: Easley, de Prado, O'Hara (2012).
"""
def __init__(self, bucket_size: float = 50.0, n_buckets: int = 50):
self.bucket_size = bucket_size
self.n_buckets = n_buckets
self._current_bucket_vol = 0.0
self._current_buy_vol = 0.0
self._bucket_vpins: Deque[float] = deque(maxlen=n_buckets)
def update(self, trade: Trade) -> float | None:
"""
Process a single trade. Returns current VPIN when a bucket completes,
otherwise returns None.
"""
remaining = self.bucket_size - self._current_bucket_vol
if trade.size >= remaining:
# Fill current bucket
fraction = remaining / trade.size
if trade.side == "buy":
self._current_buy_vol += remaining
else:
pass # sell
# Complete bucket
bucket_vpin = abs(2 * self._current_buy_vol - self.bucket_size) / self.bucket_size
self._bucket_vpins.append(bucket_vpin)
# Reset
leftover = trade.size - remaining
self._current_bucket_vol = leftover
self._current_buy_vol = leftover if trade.side == "buy" else 0.0
else:
self._current_bucket_vol += trade.size
if trade.side == "buy":
self._current_buy_vol += trade.size
if len(self._bucket_vpins) == self.n_buckets:
return float(np.mean(self._bucket_vpins))
return None
@property
def current_vpin(self) -> float:
if not self._bucket_vpins:
return 0.5
return float(np.mean(self._bucket_vpins))
@property
def regime(self) -> str:
v = self.current_vpin
if v < 0.30: return "low_toxicity"
if v < 0.50: return "normal"
if v < 0.70: return "elevated"
return "toxic"
Order Flow Imbalance Signals
Order flow imbalance (OFI) measures the directional pressure on prices at the top of the order book. Unlike volume delta which looks at executed trades, OFI tracks changes in the limit order book โ specifically, the net change in bid size minus the net change in ask size each millisecond.
Key insight from Cont, Kukanov, Stoikov (2014): OFI explains ~50% of the variance in short-term (15-second to 5-minute) price changes. It is the single most predictive microstructure variable for short-horizon price impact.
ofi.pyfrom dataclasses import dataclass
from typing import Optional
@dataclass
class BookSnapshot:
ts: float
best_bid: float
best_ask: float
bid_size: float
ask_size: float
def compute_ofi(prev: BookSnapshot, curr: BookSnapshot) -> float:
"""
Order Flow Imbalance between two consecutive book snapshots.
OFI = delta_bid - delta_ask
delta_bid = bid_size_change if bid_price unchanged, else +/-bid_size
delta_ask = ask_size_change if ask_price unchanged, else +/-ask_size
"""
# Bid side
if curr.best_bid > prev.best_bid:
delta_bid = curr.bid_size # new bid level: full size is net new
elif curr.best_bid == prev.best_bid:
delta_bid = curr.bid_size - prev.bid_size # same level: net change
else:
delta_bid = -prev.bid_size # bid retreated: all size gone
# Ask side
if curr.best_ask < prev.best_ask:
delta_ask = curr.ask_size # new ask level: full size
elif curr.best_ask == prev.best_ask:
delta_ask = curr.ask_size - prev.ask_size
else:
delta_ask = -prev.ask_size # ask retreated
return delta_bid - delta_ask
class RollingOFI:
"""Rolling OFI sum over a sliding window of N snapshots."""
def __init__(self, window: int = 100):
self.window = window
self._values: list[float] = []
self._prev: Optional[BookSnapshot] = None
def update(self, snap: BookSnapshot) -> Optional[float]:
if self._prev is not None:
ofi = compute_ofi(self._prev, snap)
self._values.append(ofi)
if len(self._values) > self.window:
self._values.pop(0)
self._prev = snap
if len(self._values) == self.window:
return sum(self._values)
return None
Absorptions and Exhaustions
Absorption occurs when a large flow of market orders in one direction fails to move the price โ the opposing side is providing deep liquidity and absorbing the aggressor. This signals strong directional bias from the absorber and often precedes a sharp reversal.
Detection criteria: buy absorption requires delta > threshold AND (close - open) < -0.1% in the same candle. Sell absorption requires delta < -threshold AND (close - open) > 0.1%.
Exhaustion is the opposite: the aggressor is still winning (price moving in their direction) but with decreasing delta over consecutive candles. The move is losing momentum from within โ a leading indicator of trend exhaustion before price confirms it.
absorption_detection.pyimport pandas as pd
import numpy as np
def detect_absorptions(bars: pd.DataFrame,
delta_pct_threshold: float = 0.30,
price_reversal_threshold: float = 0.001) -> pd.DataFrame:
"""
Detect absorption candles.
delta_pct_threshold: delta / volume must exceed this (e.g., 0.30 = 30% net bias)
price_reversal_threshold: price must move against the delta direction (e.g., 0.1%)
"""
bars = bars.copy()
bars["delta_pct"] = bars["delta"] / (bars["volume"] + 1e-9)
bars["price_chg"] = (bars["close"] - bars["open"]) / bars["open"]
# Buy absorption: heavy buying, but price fell (sellers absorbed the buying)
bars["buy_absorption"] = (
(bars["delta_pct"] > delta_pct_threshold) &
(bars["price_chg"] < -price_reversal_threshold)
)
# Sell absorption: heavy selling, but price rose (buyers absorbed the selling)
bars["sell_absorption"] = (
(bars["delta_pct"] < -delta_pct_threshold) &
(bars["price_chg"] > price_reversal_threshold)
)
return bars
def detect_exhaustion(bars: pd.DataFrame, lookback: int = 4,
min_decline_pct: float = 0.50) -> pd.Series:
"""
Detect delta exhaustion: price in uptrend but delta declining.
Returns +1 for sell exhaustion (top signal), -1 for buy exhaustion (bottom signal).
"""
signal = pd.Series(0, index=bars.index)
for i in range(lookback, len(bars)):
window = bars.iloc[i-lookback:i+1]
price_trend = window["close"].iloc[-1] - window["close"].iloc[0]
delta_trend = window["delta"].iloc[-1] - window["delta"].iloc[0]
# Uptrend with declining delta = sell exhaustion
if price_trend > 0 and delta_trend < -abs(price_trend) * min_decline_pct:
signal.iloc[i] = 1
# Downtrend with rising delta = buy exhaustion
elif price_trend < 0 and delta_trend > abs(price_trend) * min_decline_pct:
signal.iloc[i] = -1
return signal
Full Order Flow Analyzer
Combining all the above components into a single OrderFlowAnalyzer class that consumes a WebSocket trade stream and emits real-time signals:
order_flow_analyzer.pyimport asyncio
import json
import websockets
from dataclasses import dataclass, field
from typing import List, Callable, Optional
from datetime import datetime
@dataclass
class OFASignal:
ts: datetime
price: float
delta: float
cum_delta: float
vpin: float
regime: str
patterns: List[str]
strength: float # -1 to 1, negative = bearish
class OrderFlowAnalyzer:
"""
Real-time order flow analyzer connected to Purple Flea WebSocket feed.
Emits OFASignal on significant order flow events.
"""
def __init__(self, symbol: str = "BTC/USD",
bar_seconds: int = 60,
on_signal: Optional[Callable] = None):
self.symbol = symbol
self.bar_seconds = bar_seconds
self.on_signal = on_signal
self._trades: List[Trade] = []
self._bar_start: Optional[float] = None
self._cum_delta = 0.0
self._vpin = VPINCalculator(bucket_size=50.0, n_buckets=50)
self._prev_bars: List[dict] = []
async def connect(self, ws_url: str, api_key: str):
"""Connect to Purple Flea trade feed WebSocket."""
async with websockets.connect(ws_url) as ws:
await ws.send(json.dumps({
"method": "subscribe",
"params": {"channel": "trades", "symbol": self.symbol},
"auth": api_key
}))
async for message in ws:
data = json.loads(message)
if data.get("type") == "trade":
await self._on_trade(Trade(
ts=data["ts"],
price=data["price"],
size=data["size"],
side=data["side"]
))
async def _on_trade(self, trade: Trade):
now = trade.ts
if self._bar_start is None:
self._bar_start = now
# Accumulate trades in current bar
self._trades.append(trade)
# Update VPIN continuously
self._vpin.update(trade)
# Flush bar when time window closes
if now - self._bar_start >= self.bar_seconds:
await self._flush_bar()
self._bar_start = now
self._trades = []
async def _flush_bar(self):
if not self._trades:
return
trades_classified = classify_trades_tick_rule(self._trades)
delta = compute_delta(trades_classified)
self._cum_delta += delta
fp = build_footprint(trades_classified)
stacked = fp.stacked_imbalances(threshold=0.5, min_levels=3)
# Build bar dict for exhaustion detection
prices = [t.price for t in self._trades]
bar = {
"open": prices[0], "high": max(prices),
"low": min(prices), "close": prices[-1],
"delta": delta, "volume": sum(t.size for t in self._trades)
}
self._prev_bars.append(bar)
if len(self._prev_bars) > 20:
self._prev_bars.pop(0)
patterns = []
strength = 0.0
# Absorption
if delta > 0 and (bar["close"] < bar["open"]):
patterns.append("buy_absorption")
strength -= 0.7 # bearish signal
elif delta < 0 and (bar["close"] > bar["open"]):
patterns.append("sell_absorption")
strength += 0.7 # bullish signal
# Stacked imbalances
for direction, p_start, p_end in stacked:
if direction == "buy":
patterns.append(f"stacked_buy@{p_start:.0f}-{p_end:.0f}")
strength += 0.5
else:
patterns.append(f"stacked_sell@{p_start:.0f}-{p_end:.0f}")
strength -= 0.5
# VPIN regime
vpin_val = self._vpin.current_vpin
vpin_regime = self._vpin.regime
if vpin_regime == "toxic":
patterns.append("vpin_toxic")
signal = OFASignal(
ts=datetime.utcnow(),
price=bar["close"],
delta=delta,
cum_delta=self._cum_delta,
vpin=vpin_val,
regime=vpin_regime,
patterns=patterns,
strength=max(-1, min(1, strength))
)
if self.on_signal and patterns:
await self.on_signal(signal)
# Entry point
async def main():
def on_signal(sig: OFASignal):
print(f"[{sig.ts:%H:%M:%S}] price={sig.price:.0f} "
f"delta={sig.delta:+.1f} vpin={sig.vpin:.3f} "
f"patterns={sig.patterns} strength={sig.strength:+.2f}")
analyzer = OrderFlowAnalyzer(symbol="BTC/USD", on_signal=on_signal)
await analyzer.connect(
ws_url="wss://purpleflea.com/ws/trades",
api_key="pf_live_"
)
if __name__ == "__main__":
asyncio.run(main())
Real-Time Signal Generation and Trading
The order flow signals from the analyzer feed directly into trade execution logic. The key principle: never trade against absorption signals. If you see buy absorption (heavy buying that the market absorbed without moving price up), the smart money is clearly selling into strength โ align with them.
| Pattern | Interpretation | Trade Action | Confidence |
|---|---|---|---|
| Buy absorption | Sellers absorbing buyers | Sell / short bias | High |
| Sell absorption | Buyers absorbing sellers | Buy / long bias | High |
| Stacked buy imbalance | Aggressive sequential buying | Buy momentum | Medium |
| Stacked sell imbalance | Aggressive sequential selling | Sell momentum | Medium |
| CD bearish divergence | Price up, cumulative delta down | Reduce longs | Medium-High |
| VPIN toxic (>0.7) | Informed flow dominant | Widen stops, reduce size | High |
| Delta exhaustion (up) | Rally losing internal strength | Tighten profit targets | Medium |
Important: Order flow signals are most reliable at key structural price levels (POC from previous session, daily open, high-volume nodes). A buy absorption signal at a weekly POC is far more significant than one in open air. Always combine order flow with structural context.
Trade With Purple Flea's Live Feed
Connect your OrderFlowAnalyzer to Purple Flea's real-time WebSocket trade stream. Tick-level data, sub-50ms latency, 6 trading pairs.
Get API Key โSummary
Order flow analysis gives AI agents a structural edge by revealing the intentions behind price moves rather than just the price itself. The four-layer framework covered here โ volume delta for directional pressure, cumulative delta for structural bias, footprint charts for level-by-level precision, and VPIN for toxicity regime detection โ provides a complete picture of market microstructure.
The OrderFlowAnalyzer class integrates all layers into a real-time signal generator that connects directly to Purple Flea's WebSocket trade feed. Used in combination with the sentiment index and structural price levels, order flow signals form the core of a robust systematic trading framework for autonomous agents.