Cross-Exchange Arbitrage for AI Agents: CEX vs DEX Price Discrepancies
Cross-exchange arbitrage is one of the oldest and most mechanically pure strategies in financial markets: buy where price is low, sell where price is high, and pocket the spread. For AI agents, this strategy is more accessible than ever — but also more competitive. Understanding the full anatomy of a cross-exchange arb trade, from detection to execution to settlement, is essential before deploying capital.
This guide covers the mechanics of CEX-DEX price discrepancies, how to build detection algorithms with sub-second latency, how to model slippage accurately, and how to use Purple Flea Trading (275 perpetual markets via Hyperliquid) alongside the Purple Flea Wallet API for multi-chain arbitrage execution.
What Is CEX-DEX Arbitrage?
Centralized exchanges (CEX) like Binance, OKX, and Bybit use a traditional order book model where a matching engine pairs buy and sell orders at a central server. Decentralized exchanges (DEX) like Uniswap, dYdX, and Hyperliquid use on-chain or off-chain order books and automated market makers (AMMs).
Price discrepancies arise between these venues for several structural reasons:
- Block time latency: On-chain prices update only when a new block is mined. During high volatility, CEX prices can diverge significantly from on-chain AMM prices before the next block settles.
- Liquidity fragmentation: Not all market participants operate on all venues simultaneously. A large sell order on Binance may push price down there before DEX LPs reprice.
- Oracle lag: Many DeFi protocols use price oracles (Chainlink, Pyth) with update frequencies of seconds to minutes. During rapid moves, oracle prices lag significantly.
- Gas costs as friction: High Ethereum gas costs deter arbitrageurs from closing small gaps, allowing discrepancies to persist longer than on L2s or fast chains.
- Funding rate mechanics: Perpetual futures on CEX and DEX platforms carry funding rates that cause price anchoring to drift relative to spot.
Types of Cross-Exchange Arbitrage
1. Spot-to-Spot Arbitrage
The simplest form: buy an asset on exchange A, sell it on exchange B, where price on A is lower. Requires the ability to hold assets on both venues simultaneously or to transfer quickly between them. Transfer speed is the critical bottleneck — the Purple Flea Wallet API enables fast multi-chain transfers to reduce settlement risk.
2. Perp-to-Spot Arbitrage (Basis Trading)
When a perpetual futures contract trades at a premium or discount to spot, an agent can long the cheaper instrument and short the more expensive one, capturing the basis as it converges. Purple Flea Trading supports 275 perp markets through Hyperliquid, providing the perp leg of such trades.
3. DEX AMM vs CEX Arbitrage
AMMs like Uniswap V3 or Curve price assets using a constant product or stableswap formula. When CEX prices move faster than on-chain liquidity providers can react, a profitable swap exists on the DEX relative to CEX prices. Agents can execute the DEX swap and hedge on CEX simultaneously.
4. Triangular Arbitrage (On-Chain)
Within a single DEX ecosystem, three token pairs can create a circular profit opportunity: A→B→C→A, where the product of exchange rates exceeds 1.0 minus fees. These opportunities are typically captured by MEV bots within the same block, making them more relevant for agents with mempool access.
5. Funding Rate Arbitrage
When funding rates on CEX and DEX perpetuals diverge, agents can long the instrument with negative funding and short the instrument with positive funding, collecting the spread. This is market-neutral if correlation remains high.
Detection Algorithms
Arbitrage detection is fundamentally a comparison problem: continuously compare prices across venues and flag when the spread exceeds the cost of execution. However, the implementation details matter enormously.
WebSocket Price Aggregator
The foundation is maintaining real-time price feeds from all target venues. REST polling is too slow — at best 200-500ms per poll cycle, which misses most opportunities. WebSocket subscriptions are necessary.
# Price aggregator using async WebSockets import asyncio import json import time import websockets from dataclasses import dataclass, field from typing import Dict, Optional import httpx @dataclass class PriceQuote: venue: str symbol: str bid: float ask: float mid: float timestamp_ms: int bid_size: float = 0.0 ask_size: float = 0.0 @dataclass class ArbSignal: buy_venue: str sell_venue: str symbol: str buy_price: float # best ask on buy side sell_price: float # best bid on sell side raw_spread_pct: float estimated_cost_pct: float net_spread_pct: float detected_at_ms: int max_size: float class CrossExchangeDetector: def __init__(self, symbols: list[str], min_net_spread_pct: float = 0.05): self.symbols = symbols self.min_net_spread_pct = min_net_spread_pct self.prices: Dict[str, Dict[str, PriceQuote]] = {} # venue -> symbol -> quote self.callbacks = [] def update_price(self, quote: PriceQuote): venue = quote.venue sym = quote.symbol if venue not in self.prices: self.prices[venue] = {} self.prices[venue][sym] = quote self._check_arb(sym) def _estimate_cost(self, buy_venue: str, sell_venue: str, symbol: str, size: float) -> float: """Estimate total round-trip cost as % of trade size.""" FEE_TABLE = { 'binance': 0.10, # 0.10% taker 'okx': 0.08, 'bybit': 0.10, 'purpleflea': 0.035, # Hyperliquid taker 'hyperliquid': 0.035, 'uniswap_v3': 0.30, # 0.30% default pool 'curve': 0.04, } buy_fee = FEE_TABLE.get(buy_venue, 0.10) sell_fee = FEE_TABLE.get(sell_venue, 0.10) slippage_est = self._estimate_slippage(buy_venue, symbol, size) gas_pct = self._estimate_gas_pct(buy_venue, sell_venue, size) transfer_cost = 0.01 # bridge/transfer cost if cross-chain return buy_fee + sell_fee + slippage_est + gas_pct + transfer_cost def _estimate_slippage(self, venue: str, symbol: str, size: float) -> float: """Simple slippage model: 0.01% per $100K of size on liquid venues.""" liquidity_factor = { 'binance': 100_000, 'hyperliquid': 50_000, 'purpleflea': 50_000, 'uniswap_v3': 20_000, }.get(venue, 30_000) return (size / liquidity_factor) * 0.01 def _estimate_gas_pct(self, buy_venue: str, sell_venue: str, size: float) -> float: if 'uniswap' in buy_venue or 'uniswap' in sell_venue: gas_usd = 8.0 # L2 gas cost in USD return (gas_usd / size) * 100 return 0.0 def _check_arb(self, symbol: str): venues_with_quote = [ v for v in self.prices if symbol in self.prices[v] ] if len(venues_with_quote) < 2: return now_ms = int(time.time() * 1000) best_signal: Optional[ArbSignal] = None for buy_v in venues_with_quote: for sell_v in venues_with_quote: if buy_v == sell_v: continue bq = self.prices[buy_v][symbol] sq = self.prices[sell_v][symbol] # Staleness check: reject quotes older than 2s if now_ms - bq.timestamp_ms > 2000 or now_ms - sq.timestamp_ms > 2000: continue raw_spread_pct = (sq.bid - bq.ask) / bq.ask * 100 if raw_spread_pct <= 0: continue max_size = min(bq.ask_size * bq.ask, sq.bid_size * sq.bid) max_size = min(max_size, 50_000) # cap per trade cost_pct = self._estimate_cost(buy_v, sell_v, symbol, max_size) net = raw_spread_pct - cost_pct if net >= self.min_net_spread_pct: sig = ArbSignal( buy_venue=buy_v, sell_venue=sell_v, symbol=symbol, buy_price=bq.ask, sell_price=sq.bid, raw_spread_pct=raw_spread_pct, estimated_cost_pct=cost_pct, net_spread_pct=net, detected_at_ms=now_ms, max_size=max_size ) if best_signal is None or sig.net_spread_pct > best_signal.net_spread_pct: best_signal = sig if best_signal: for cb in self.callbacks: cb(best_signal)
Execution Latency: The Real Bottleneck
Detection is only half the problem. An agent that detects an opportunity in 10ms but takes 800ms to execute will find the spread gone by the time orders fill. Latency budgets must be tracked precisely at every stage.
Latency Budget Breakdown
| Stage | Typical Latency | Optimization |
|---|---|---|
| WebSocket price receipt | 1-5ms | Colocate with exchange data center |
| Opportunity detection | 0.1-1ms | Keep in-memory, avoid I/O |
| Order signing (CEX) | 0.5-2ms | Pre-compute HMAC template |
| HTTP order submission | 20-80ms | HTTP/2, persistent connections |
| Order acknowledgment | 5-50ms | Exchange-dependent |
| On-chain tx submission | 50-200ms | Pre-sign, fast RPC node |
| Block confirmation | 1000-12000ms | Use L2s (Arbitrum: ~250ms) |
The key insight from this table: for CEX-to-CEX arb, the window is typically 50-200ms total. For CEX-to-DEX arb involving on-chain settlement, the effective window must remain open for the full block confirmation time — meaning the opportunity must persist for 250ms (Arbitrum) to 12s (Ethereum mainnet).
Latency Measurement in Python
import time import asyncio import httpx from contextlib import asynccontextmanager class LatencyTracker: def __init__(self): self.samples: dict[str, list[float]] = {} @asynccontextmanager async def measure(self, label: str): start = time.perf_counter_ns() try: yield finally: elapsed_ms = (time.perf_counter_ns() - start) / 1_000_000 self.samples.setdefault(label, []).append(elapsed_ms) def report(self) -> dict: import statistics result = {} for label, samples in self.samples.items(): if not samples: continue result[label] = { 'p50': statistics.median(samples), 'p99': sorted(samples)[int(len(samples) * 0.99)], 'mean': statistics.mean(samples), 'count': len(samples), } return result tracker = LatencyTracker() async def submit_order_with_tracking(client: httpx.AsyncClient, venue: str, order: dict) -> dict: async with tracker.measure(f'{venue}_order_submit'): resp = await client.post( f'https://api.{venue}.com/v1/order', json=order, timeout=5.0 ) return resp.json()
Slippage Modeling
Slippage is the difference between the expected price and the actual fill price. For arbitrage agents, slippage on one leg can eliminate profits across both legs. Accurate slippage modeling is non-negotiable.
Order Book Depth Model
The most accurate slippage model uses real order book depth. For a buy order of size Q in a market with order book [(p1, s1), (p2, s2), ...] ordered by ascending ask price:
Slippage = VWAP_fill - best_ask
from typing import List, Tuple def compute_slippage( order_book: List[Tuple[float, float]], # (price, size) sorted by price trade_size_usd: float, side: str # 'buy' or 'sell' ) -> Tuple[float, float]: """ Returns (vwap_fill_price, slippage_pct). order_book is asks for 'buy', bids for 'sell'. """ if not order_book: raise ValueError("Empty order book") best_price = order_book[0][0] remaining_usd = trade_size_usd total_cost = 0.0 total_units = 0.0 for price, size_units in order_book: level_usd = price * size_units fill_usd = min(level_usd, remaining_usd) fill_units = fill_usd / price total_cost += fill_usd total_units += fill_units remaining_usd -= fill_usd if remaining_usd <= 0: break if remaining_usd > 0: # Insufficient liquidity return (float('inf'), float('inf')) vwap = total_cost / total_units if side == 'buy': slippage_pct = (vwap - best_price) / best_price * 100 else: slippage_pct = (best_price - vwap) / best_price * 100 return vwap, slippage_pct # AMM Slippage Model (Uniswap v2 / constant product) def amm_slippage( reserve_in: float, reserve_out: float, amount_in: float, fee_pct: float = 0.30 ) -> Tuple[float, float]: """ Returns (amount_out, price_impact_pct) for a constant-product AMM. All values denominated in same unit. """ fee_mult = 1 - (fee_pct / 100) amount_in_with_fee = amount_in * fee_mult amount_out = (reserve_out * amount_in_with_fee) / (reserve_in + amount_in_with_fee) mid_price = reserve_out / reserve_in expected_out = amount_in * mid_price * fee_mult price_impact_pct = (expected_out - amount_out) / expected_out * 100 return amount_out, price_impact_pct
Profitability Calculations
Net profitability of a cross-exchange arb trade requires accounting for all costs in the correct order. Missing any component leads to overestimating expected returns.
Full P&L Model
from dataclasses import dataclass @dataclass class ArbPnL: gross_revenue: float buy_fees: float sell_fees: float buy_slippage: float sell_slippage: float gas_cost: float transfer_cost: float financing_cost: float # cost of capital during execution net_pnl: float net_pnl_pct: float breakeven_spread_pct: float def calculate_arb_pnl( buy_price: float, sell_price: float, size_usd: float, buy_fee_pct: float, sell_fee_pct: float, buy_slippage_pct: float, sell_slippage_pct: float, gas_usd: float = 0.0, transfer_cost_usd: float = 0.0, execution_time_hours: float = 0.001, # ~3.6 seconds annual_cost_of_capital_pct: float = 5.0 ) -> ArbPnL: units = size_usd / buy_price # Adjust prices for slippage eff_buy_price = buy_price * (1 + buy_slippage_pct / 100) eff_sell_price = sell_price * (1 - sell_slippage_pct / 100) gross = (eff_sell_price - eff_buy_price) * units buy_fees = size_usd * (buy_fee_pct / 100) sell_fees = size_usd * (sell_fee_pct / 100) buy_slip = size_usd * (buy_slippage_pct / 100) sell_slip = size_usd * (sell_slippage_pct / 100) # Financing: cost of holding capital during execution financing = size_usd * (annual_cost_of_capital_pct / 100) * (execution_time_hours / 8760) net = gross - buy_fees - sell_fees - gas_usd - transfer_cost_usd - financing breakeven_spread = (buy_fees + sell_fees + gas_usd + transfer_cost_usd) / size_usd * 100 return ArbPnL( gross_revenue=gross, buy_fees=buy_fees, sell_fees=sell_fees, buy_slippage=buy_slip, sell_slippage=sell_slip, gas_cost=gas_usd, transfer_cost=transfer_cost_usd, financing_cost=financing, net_pnl=net, net_pnl_pct=net / size_usd * 100, breakeven_spread_pct=breakeven_spread ) # Example: $20K BTC arb between Binance and Purple Flea Trading result = calculate_arb_pnl( buy_price=95_000.0, sell_price=95_450.0, # 0.47% spread size_usd=20_000.0, buy_fee_pct=0.10, sell_fee_pct=0.035, # PF Hyperliquid taker rate buy_slippage_pct=0.02, sell_slippage_pct=0.015, gas_usd=0.0, # PF perp: off-chain settlement transfer_cost_usd=2.0 # minimal cross-account transfer ) print(f"Net P&L: ${result.net_pnl:.2f} ({result.net_pnl_pct:.3f}%)") # Output: Net P&L: $56.30 (0.282%)
Purple Flea API Integration
Purple Flea provides two APIs that are essential for multi-chain arbitrage: the Trading API (275 perp markets, Hyperliquid-backed) and the Wallet API (multi-chain custody and transfer). Together, they enable a full arb workflow.
Step 1: Register and Get API Keys
import httpx import asyncio PF_BASE = "https://api.purpleflea.com" async def register_agent(agent_name: str) -> dict: async with httpx.AsyncClient() as client: resp = await client.post( f"{PF_BASE}/v1/register", json={"agent_name": agent_name, "type": "trading"} ) data = resp.json() return { "agent_id": data["agent_id"], "api_key": data["api_key"], "wallet_address": data["wallet_address"] } async def get_pf_price(client: httpx.AsyncClient, api_key: str, symbol: str) -> dict: """Get best bid/ask from Purple Flea Trading.""" resp = await client.get( f"{PF_BASE}/v1/trading/orderbook", params={"symbol": symbol, "depth": 10}, headers={"X-API-Key": api_key} ) ob = resp.json() return { "bid": ob["bids"][0][0], "ask": ob["asks"][0][0], "bid_size": ob["bids"][0][1], "ask_size": ob["asks"][0][1], } async def place_pf_order(client: httpx.AsyncClient, api_key: str, symbol: str, side: str, # 'buy' or 'sell' size_usd: float, order_type: str = "market", limit_price: float = None) -> dict: payload = { "symbol": symbol, "side": side, "size_usd": size_usd, "type": order_type, } if limit_price: payload["limit_price"] = limit_price resp = await client.post( f"{PF_BASE}/v1/trading/order", json=payload, headers={"X-API-Key": api_key} ) return resp.json()
Step 2: Multi-Chain Wallet for Asset Movement
async def get_wallet_balances(client: httpx.AsyncClient, api_key: str) -> dict: """Get balances across all chains in the Purple Flea wallet.""" resp = await client.get( f"{PF_BASE}/v1/wallet/balances", headers={"X-API-Key": api_key} ) return resp.json() # {chain: {token: amount}} async def initiate_transfer( client: httpx.AsyncClient, api_key: str, from_chain: str, to_chain: str, token: str, amount: float, destination_address: str ) -> dict: """Bridge assets between chains for multi-chain arb legs.""" resp = await client.post( f"{PF_BASE}/v1/wallet/transfer", json={ "from_chain": from_chain, "to_chain": to_chain, "token": token, "amount": amount, "destination": destination_address }, headers={"X-API-Key": api_key} ) return resp.json()
Step 3: Full Arb Execution Loop
import asyncio import logging log = logging.getLogger("arb_agent") class ArbExecutionAgent: def __init__(self, api_key: str, symbols: list[str]): self.api_key = api_key self.symbols = symbols self.detector = CrossExchangeDetector(symbols, min_net_spread_pct=0.05) self.detector.callbacks.append(self.on_signal) self.active_trades: dict = {} self.client = httpx.AsyncClient(timeout=5.0) self.pnl_total = 0.0 self.trade_count = 0 async def on_signal(self, signal: ArbSignal): # Deduplicate: skip if active trade on same symbol if signal.symbol in self.active_trades: return # Size check size = min(signal.max_size, 10_000) if size < 500: return self.active_trades[signal.symbol] = signal try: # Execute both legs simultaneously buy_task = asyncio.create_task( self._execute_leg(signal.buy_venue, 'buy', signal.symbol, size, signal.buy_price) ) sell_task = asyncio.create_task( self._execute_leg(signal.sell_venue, 'sell', signal.symbol, size, signal.sell_price) ) results = await asyncio.gather(buy_task, sell_task, return_exceptions=True) for r in results: if isinstance(r, Exception): log.error(f"Leg failed: {r}") # Trigger risk management / unwinding await self._emergency_unwind(signal) return buy_result, sell_result = results estimated_pnl = size * signal.net_spread_pct / 100 self.pnl_total += estimated_pnl self.trade_count += 1 log.info(f"Arb complete: {signal.symbol} " f"buy@{signal.buy_venue} sell@{signal.sell_venue} " f"net={signal.net_spread_pct:.3f}% " f"est_pnl=${estimated_pnl:.2f} " f"cumulative=${self.pnl_total:.2f}") finally: self.active_trades.pop(signal.symbol, None) async def _execute_leg(self, venue: str, side: str, symbol: str, size: float, ref_price: float) -> dict: if venue == 'purpleflea': return await place_pf_order( self.client, self.api_key, symbol, side, size ) # Add other venue handlers here raise NotImplementedError(f"Venue {venue} not implemented") async def _emergency_unwind(self, signal: ArbSignal): """Attempt to flatten position if one leg fails.""" log.warning(f"Emergency unwind triggered for {signal.symbol}") # Submit market orders to close whatever was opened pass
Risk Controls
Arbitrage appears low-risk on paper — you're simultaneously long and short — but execution risk is real and can be severe. Every arbitrage agent needs a comprehensive risk framework.
Leg Execution Risk
If one leg fills and the other fails (rejection, timeout, rate limit), the agent is left with a naked directional position. This is the most common failure mode. Mitigations:
- Use atomic execution where possible (flash loans, same-block DEX swaps)
- Maintain emergency close logic that triggers on any partial fill
- Set aggressive timeouts and treat any timeout as a failure requiring unwind
- Keep position size small enough that a worst-case unwind is survivable
Spread Decay Risk
The spread can vanish between detection and execution. By the time both orders fill, the spread may have inverted. Solutions:
- Limit orders with tight price bands (e.g., allow up to 0.02% worse than detected price)
- Measure historical spread persistence time and set execution deadlines accordingly
- Cancel and abort if any leg takes longer than the expected spread half-life
Correlation Breakdown Risk
CEX-DEX pairs are normally highly correlated. During major news events, correlations can temporarily break, causing a "convergence" trade to diverge further. Always set maximum loss limits per trade.
class RiskManager: def __init__(self, total_capital: float, max_position_pct: float = 1.0, daily_loss_limit_pct: float = 3.0): self.capital = total_capital self.max_pos = total_capital * (max_position_pct / 100) self.daily_limit = total_capital * (daily_loss_limit_pct / 100) self.daily_loss = 0.0 self.positions: dict = {} self.paused = False def check_and_size(self, signal: ArbSignal) -> float: """Returns approved size, 0 if trade rejected.""" if self.paused: return 0.0 if self.daily_loss >= self.daily_limit: self.paused = True log.warning("Daily loss limit hit — pausing trading") return 0.0 # Minimum spread after costs if signal.net_spread_pct < 0.05: return 0.0 approved_size = min( signal.max_size, self.max_pos, self.capital * 0.20 # max 20% deployed at once ) return approved_size def record_pnl(self, realized_pnl: float): if realized_pnl < 0: self.daily_loss += abs(realized_pnl)
Backtesting Framework
Before deploying any arbitrage strategy live, backtest it against historical price data from all target venues. The key challenge is that historical order book depth data is expensive to acquire — most free sources only provide OHLCV candles, which are insufficient for slippage modeling.
What to Backtest
- Spread frequency distribution: How often does the spread exceed your minimum threshold?
- Spread persistence: How long does each spread event last? Is 50ms realistic for execution?
- Slippage realized vs. modeled: Does your slippage model match actual fills from paper trading?
- Correlation of opportunity to market conditions: Do spreads widen during high volatility? During specific hours?
- Fill rate: What fraction of limit orders fill within the window?
import pandas as pd import numpy as np def backtest_arb_strategy( cex_prices: pd.DataFrame, # columns: timestamp, bid, ask dex_prices: pd.DataFrame, # columns: timestamp, bid, ask min_spread_pct: float = 0.10, cost_pct: float = 0.20, size_usd: float = 10_000, max_exposure_s: float = 5.0 ) -> dict: """Vectorized backtester for CEX-DEX arb.""" # Align on timestamp merged = pd.merge_asof( cex_prices.sort_values('timestamp'), dex_prices.sort_values('timestamp'), on='timestamp', suffixes=('_cex', '_dex'), tolerance=pd.Timedelta('500ms') ).dropna() # Compute spreads in both directions merged['spread_buy_cex'] = ( (merged['bid_dex'] - merged['ask_cex']) / merged['ask_cex'] * 100 ) merged['spread_buy_dex'] = ( (merged['bid_cex'] - merged['ask_dex']) / merged['ask_dex'] * 100 ) # Net spread after costs merged['net_buy_cex'] = merged['spread_buy_cex'] - cost_pct merged['net_buy_dex'] = merged['spread_buy_dex'] - cost_pct # Opportunities opps_cex = merged[merged['net_buy_cex'] >= min_spread_pct] opps_dex = merged[merged['net_buy_dex'] >= min_spread_pct] total_pnl = ( (opps_cex['net_buy_cex'] / 100 * size_usd).sum() + (opps_dex['net_buy_dex'] / 100 * size_usd).sum() ) n_opportunities = len(opps_cex) + len(opps_dex) return { 'total_opportunities': n_opportunities, 'estimated_gross_pnl': total_pnl, 'avg_spread_pct': pd.concat([ opps_cex['net_buy_cex'], opps_dex['net_buy_dex'] ]).mean(), 'opp_rate_per_hour': n_opportunities / (len(merged) / 3600), 'max_spread_pct': max( merged['net_buy_cex'].max(), merged['net_buy_dex'].max() ), }
Market Conditions and Timing
CEX-DEX spreads are not uniformly distributed across time. Understanding when opportunities are most likely improves capital allocation.
High-Spread Periods
- Major news events: Federal Reserve decisions, CPI prints, major geopolitical events cause rapid price moves with order book imbalances across venues
- Listing events: New token listings on major CEXs often create temporary mispricing on DEXs
- Large liquidations: Cascading liquidations on perp markets create rapid CEX price moves that AMMs are slow to follow
- Low liquidity periods: Asian session weekends often have thinner books and wider spreads
Low-Spread Periods
- High-frequency trading competition is most intense during US trading hours
- Stable markets with low volatility compress all spreads toward zero
- Post-major-event cooldowns as arbitrageurs compete away all inefficiencies
Getting Started on Purple Flea
Purple Flea provides everything an arbitrage agent needs in one platform:
- Trading API: 275 perpetual markets via Hyperliquid, 0.035% taker, full order book access
- Wallet API: Multi-chain custody, fast transfers, balance monitoring across chains
- Faucet: New agents can claim free USDC to get started risk-free at faucet.purpleflea.com
- Escrow: For agent-to-agent settlement of complex multi-leg arb trades
Cross-exchange arbitrage is not a passive income strategy — it requires continuous monitoring, rapid execution, and disciplined risk management. But for AI agents with the right infrastructure, the mechanical nature of the strategy makes it particularly well-suited to automation. Start with small sizes, measure everything, and scale only what the backtest and live data confirm.