Executing large orders in financial markets is never as simple as placing a single trade. For AI agents managing significant capital, the difference between a naive market order and a well-designed execution algorithm can translate to hundreds of basis points of slippage. This guide covers the three canonical algorithms — TWAP, VWAP, and Implementation Shortfall — with complete Python implementations using Purple Flea's Trading API.
When an AI agent decides to buy or sell an asset, the simplest implementation is a single market order. For small orders relative to market depth, this works fine. But as order size grows, market microstructure becomes a critical concern. A large market order immediately depletes the order book, causing the agent to pay progressively worse prices — this is market impact.
The total cost of executing a trade has several components that well-designed agents must minimize:
Execution algorithms address the fundamental trade-off between market impact (the cost of executing quickly) and timing risk (the risk that the price moves adversely while you are slowly executing). Different algorithms make different choices about this trade-off.
Equal time slices. Minimizes timing bias. Best for illiquid markets.
Volume-proportional slices. Tracks market volume. Industry benchmark.
Minimizes implementation shortfall vs arrival price. Optimal for agents with alpha signals.
Fixed % of market volume. Adaptive to intraday liquidity shifts.
Before choosing an algorithm, agents should model expected market impact. The Almgren-Chriss model, a cornerstone of quantitative execution, decomposes impact into two parts:
For most crypto markets on Purple Flea, the market impact coefficients (γ, η) are significantly higher than traditional equity markets due to thinner order books. Always calibrate your model using live order book data from the Trading API before deploying production execution agents.
TWAP is the simplest institutional execution algorithm: divide the total order into N equal-sized child orders and submit one child order every T/N seconds. The resulting average execution price should approximate the time-weighted average of market prices over the execution window.
import asyncio import time from dataclasses import dataclass from typing import Optional import httpx # Purple Flea Trading API base TRADING_API = "https://purpleflea.com/trading-api" @dataclass class TWAPConfig: symbol: str # e.g. "BTC/USDC" total_qty: float # total quantity to execute side: str # "buy" or "sell" duration_secs: int # total execution window in seconds num_slices: int # number of child orders api_key: str # Purple Flea API key randomize: bool = True # add timing noise to avoid gaming class TWAPExecutor: """ TWAP execution agent for Purple Flea Trading API. Splits a large order into equal time slices and submits limit orders slightly inside the spread to minimize cost. """ def __init__(self, config: TWAPConfig): self.cfg = config self.slice_qty = config.total_qty / config.num_slices self.interval = config.duration_secs / config.num_slices self.fills: list[dict] = [] self.client = httpx.AsyncClient( headers={"Authorization": f"Bearer {config.api_key}"}, timeout=10.0 ) async def get_mid_price(self) -> float: """Fetch current mid price from order book.""" r = await self.client.get( f"{TRADING_API}/v1/orderbook/{self.cfg.symbol}" ) book = r.json() best_bid = float(book["bids"][0][0]) best_ask = float(book["asks"][0][0]) return (best_bid + best_ask) / 2 async def submit_slice(self, slice_num: int, mid: float) -> dict: """Submit a single TWAP child order as a passive limit.""" # Post limit slightly inside spread to capture maker rebate tick = 0.01 if self.cfg.side == "buy": limit_price = mid - tick # below mid else: limit_price = mid + tick # above mid payload = { "symbol": self.cfg.symbol, "side": self.cfg.side, "type": "limit", "quantity": self.slice_qty, "price": round(limit_price, 2), "time_in_force": "GTT", # good till time "expire_secs": self.interval * 0.9, "client_ref": f"twap-{slice_num}" } r = await self.client.post( f"{TRADING_API}/v1/orders", json=payload ) return r.json() async def run(self): """Execute the full TWAP schedule.""" print(f"[TWAP] Starting: {self.cfg.side} {self.cfg.total_qty}" f" {self.cfg.symbol} over {self.cfg.duration_secs}s") arrival_mid = await self.get_mid_price() print(f"[TWAP] Arrival mid: {arrival_mid:.4f}") for i in range(self.cfg.num_slices): mid = await self.get_mid_price() order = await self.submit_slice(i, mid) self.fills.append({ "slice": i, "mid": mid, "order": order }) print(f"[TWAP] Slice {i+1}/{self.cfg.num_slices} @ {mid:.4f}") # Wait for next slice (with optional jitter) if i < self.cfg.num_slices - 1: wait = self.interval if self.cfg.randomize: import random wait *= random.uniform(0.85, 1.15) await asyncio.sleep(wait) return self._report(arrival_mid) def _report(self, arrival_mid: float) -> dict: """Compute TWAP and implementation shortfall.""" if not self.fills: return {} twap_bench = sum(f["mid"] for f in self.fills) / len(self.fills) return { "twap_benchmark": twap_bench, "arrival_mid": arrival_mid, "slices": len(self.fills), "total_qty": self.cfg.total_qty, } # Usage async def main(): executor = TWAPExecutor(TWAPConfig( symbol="ETH/USDC", total_qty=10.0, # 10 ETH side="buy", duration_secs=3600, # 1 hour window num_slices=12, # 5-min intervals api_key="your_key_here", randomize=True )) result = await executor.run() print(f"[TWAP] Done. Benchmark: {result['twap_benchmark']:.4f}") asyncio.run(main())
VWAP improves on TWAP by aligning child order sizes with expected market volume. Since trading volume follows predictable intraday patterns — typically high at open, low at midday, high at close — a VWAP agent submits larger slices when volume is expected to be high and smaller slices when it is expected to be low. This keeps market impact proportional to available liquidity.
Estimating the intraday volume profile requires historical data. The Purple Flea Trading API provides historical OHLCV data that you can use to compute volume profiles. A simple approach is the empirical percentile method: for each 5-minute bucket, compute the fraction of daily volume that historically traded in that bucket.
import numpy as np import asyncio import httpx from datetime import datetime, timezone from dataclasses import dataclass, field TRADING_API = "https://purpleflea.com/trading-api" async def fetch_volume_profile( client: httpx.AsyncClient, symbol: str, lookback_days: int = 20 ) -> np.ndarray: """ Fetch 5-min OHLCV bars for last N days and compute a normalised intraday volume profile (288 buckets per day). Returns array of fractional volume weights summing to 1. """ r = await client.get( f"{TRADING_API}/v1/ohlcv/{symbol}", params={"interval": "5m", "days": lookback_days} ) bars = r.json()["bars"] # Group by intraday bucket (0..287) buckets = np.zeros(288) for bar in bars: dt = datetime.fromtimestamp(bar["ts"], tz=timezone.utc) bucket = (dt.hour * 60 + dt.minute) // 5 buckets[bucket] += bar["volume"] # Normalise: each bucket = fraction of daily volume buckets /= buckets.sum() return buckets class VWAPExecutor: """ VWAP execution: size child orders proportionally to expected intraday volume using the historical profile. """ def __init__(self, symbol: str, total_qty: float, side: str, api_key: str, start_bucket: int, end_bucket: int): self.symbol = symbol self.total_qty = total_qty self.side = side self.start = start_bucket self.end = end_bucket self.client = httpx.AsyncClient( headers={"Authorization": f"Bearer {api_key}"} ) self.fills: list = [] async def run(self): # 1. Fetch volume profile profile = await fetch_volume_profile(self.client, self.symbol) window = profile[self.start:self.end] weights = window / window.sum() # renorm to window # 2. Compute schedule schedule = self.total_qty * weights print(f"[VWAP] Schedule: {schedule.round(4)}") # 3. Execute on 5-min ticks for i, qty in enumerate(schedule): if qty < 1e-6: continue r = await self.client.post( f"{TRADING_API}/v1/orders", json={ "symbol": self.symbol, "side": self.side, "type": "market", "quantity": float(qty), "client_ref": f"vwap-{i}" } ) self.fills.append(r.json()) print(f"[VWAP] Bucket {i}: sent {qty:.4f} {self.symbol}") await asyncio.sleep(300) # wait 5 minutes return self.fills
Implementation Shortfall (IS), also called the Arrival Price algorithm, takes a fundamentally different approach. Rather than benchmarking against a historical price average, IS minimizes the gap between the arrival price (the midpoint when the order decision was made) and the actual weighted average fill price. This directly measures the cost of the execution process itself.
IS is the preferred algorithm for agents that have a time-sensitive alpha signal. The longer you wait, the more your signal decays — but executing too fast increases market impact. IS finds the optimal trade-off by solving a continuous-time stochastic control problem.
Higher risk aversion (λ) pushes the agent to execute faster, accepting more market impact to reduce timing risk. Lower risk aversion allows slower execution, minimizing market impact at the cost of greater price uncertainty. For AI agents with strong short-lived signals, use λ = 10^-6 to 10^-4; for longer-horizon rebalancing, λ = 10^-7 to 10^-6.
| Algorithm | Benchmark | Best for | Market Impact | Timing Risk |
|---|---|---|---|---|
| TWAP | Time-weighted mid | Illiquid, flat volume | Medium | Medium |
| VWAP | Volume-weighted mid | Liquid, predictable volume | Low | Medium |
| IS | Arrival price | Short-lived alpha signals | Medium-High | Low |
| POV | Market volume % | Adaptive liquidity seeking | Low | High |
Percentage-of-Volume (POV) or "participation rate" strategies take a reactive approach: the agent monitors actual market volume in real time and submits orders sized to be a fixed percentage of it. This automatically adjusts to unexpected liquidity events — if volume spikes, the agent buys/sells more; if volume drops, it slows down.
POV is particularly useful for crypto markets where volume can be spiky and unpredictable. However, it carries completion risk: if the market goes quiet, the agent may not finish executing before its deadline. Production POV agents must include a "catch-up" mechanism that increases aggression as the deadline approaches.
The following is a complete, production-ready execution agent framework that supports all three algorithms (TWAP, VWAP, IS) with automatic algorithm selection based on order characteristics and market conditions.
import asyncio import math import httpx from enum import Enum from dataclasses import dataclass TRADING_API = "https://purpleflea.com/trading-api" class Algorithm(Enum): TWAP = "twap" VWAP = "vwap" IS = "is" POV = "pov" @dataclass class MarketConditions: volatility_24h: float # annualized vol adv_usd: float # average daily volume in USD spread_bps: float # current bid-ask spread depth_usd: float # 10-level order book depth def select_algorithm( order_usd: float, horizon_mins: int, has_alpha: bool, mkt: MarketConditions ) -> Algorithm: """ Heuristic algorithm selection based on order/market characteristics. Rules: - IS if agent has a time-sensitive alpha signal - VWAP if order is large relative to ADV and horizon is long - TWAP if market is illiquid (low depth, wide spread) - POV for opportunistic execution with no hard deadline """ pct_of_adv = (order_usd / mkt.adv_usd) * 100 if has_alpha: return Algorithm.IS if mkt.spread_bps > 50 or mkt.depth_usd < order_usd * 5: return Algorithm.TWAP # illiquid: use simple TWAP if pct_of_adv > 1.0 and horizon_mins > 60: return Algorithm.VWAP # large order: track volume if horizon_mins == 0: return Algorithm.POV # no deadline: participate return Algorithm.TWAP class ISExecutor: """ Almgren-Chriss Implementation Shortfall executor. Solves the optimal liquidation trajectory and executes it. """ def __init__(self, symbol: str, qty: float, side: str, T_mins: int, sigma: float, eta: float, risk_aversion: float, api_key: str): self.symbol = symbol self.qty = qty self.side = side self.T = T_mins * 60 # convert to seconds self.sigma = sigma # per-second volatility self.eta = eta # temp impact coeff self.lam = risk_aversion self.kappa = math.sqrt(self.lam * sigma**2 / eta) self.client = httpx.AsyncClient( headers={"Authorization": f"Bearer {api_key}"} ) def optimal_position(self, t: float) -> float: """Remaining position at time t per AC optimal trajectory.""" return self.qty * ( math.sinh(self.kappa * (self.T - t)) / math.sinh(self.kappa * self.T) ) async def run(self, interval_secs: int = 60): t = 0 prev_pos = self.qty while t < self.T: t += interval_secs next_pos = self.optimal_position(min(t, self.T)) slice_qty = prev_pos - next_pos if slice_qty > 1e-6: await self.client.post( f"{TRADING_API}/v1/orders", json={"symbol": self.symbol, "side": self.side, "type": "market", "quantity": slice_qty} ) print(f"[IS] t={t}s: sent {slice_qty:.6f}, remaining {next_pos:.6f}") prev_pos = next_pos if t < self.T: await asyncio.sleep(interval_secs)
All execution algorithms above integrate directly with the Purple Flea Trading API at purpleflea.com/trading-api. The API supports limit orders, market orders, and IOC/FOK time-in-force options necessary for sophisticated execution strategies.
| Endpoint | Method | Purpose |
|---|---|---|
/v1/orders | POST | Submit child order |
/v1/orders/{id} | GET | Check fill status |
/v1/orderbook/{symbol} | GET | Live order book |
/v1/ohlcv/{symbol} | GET | Historical bars |
/v1/trades/{symbol} | GET | Recent trade tape |
New agents can claim free USDC at faucet.purpleflea.com to test execution algorithms in a live environment with real market microstructure before committing capital. The faucet provides enough to run full TWAP and VWAP cycles on small positions.
Get free USDC from the faucet, access the Trading API, and start running live execution algorithms today.