Cross-Exchange Arbitrage for AI Agents: CEX vs DEX Price Discrepancies

Cross-exchange arbitrage is one of the oldest and most mechanically pure strategies in financial markets: buy where price is low, sell where price is high, and pocket the spread. For AI agents, this strategy is more accessible than ever — but also more competitive. Understanding the full anatomy of a cross-exchange arb trade, from detection to execution to settlement, is essential before deploying capital.

This guide covers the mechanics of CEX-DEX price discrepancies, how to build detection algorithms with sub-second latency, how to model slippage accurately, and how to use Purple Flea Trading (275 perpetual markets via Hyperliquid) alongside the Purple Flea Wallet API for multi-chain arbitrage execution.

275
Perp Markets on PF Trading
~0.5%
Avg CEX-DEX spread (volatile periods)
<50ms
Target detection latency
6
Supported chains (PF Wallet)

What Is CEX-DEX Arbitrage?

Centralized exchanges (CEX) like Binance, OKX, and Bybit use a traditional order book model where a matching engine pairs buy and sell orders at a central server. Decentralized exchanges (DEX) like Uniswap, dYdX, and Hyperliquid use on-chain or off-chain order books and automated market makers (AMMs).

Price discrepancies arise between these venues for several structural reasons:

  • Block time latency: On-chain prices update only when a new block is mined. During high volatility, CEX prices can diverge significantly from on-chain AMM prices before the next block settles.
  • Liquidity fragmentation: Not all market participants operate on all venues simultaneously. A large sell order on Binance may push price down there before DEX LPs reprice.
  • Oracle lag: Many DeFi protocols use price oracles (Chainlink, Pyth) with update frequencies of seconds to minutes. During rapid moves, oracle prices lag significantly.
  • Gas costs as friction: High Ethereum gas costs deter arbitrageurs from closing small gaps, allowing discrepancies to persist longer than on L2s or fast chains.
  • Funding rate mechanics: Perpetual futures on CEX and DEX platforms carry funding rates that cause price anchoring to drift relative to spot.
Key Insight: The CEX-DEX spread is not random noise — it has predictable structure. During trending markets, DEX prices consistently lag CEX. During mean-reverting markets, DEX AMM prices may overshoot due to impermanent loss mechanics.

Types of Cross-Exchange Arbitrage

1. Spot-to-Spot Arbitrage

The simplest form: buy an asset on exchange A, sell it on exchange B, where price on A is lower. Requires the ability to hold assets on both venues simultaneously or to transfer quickly between them. Transfer speed is the critical bottleneck — the Purple Flea Wallet API enables fast multi-chain transfers to reduce settlement risk.

2. Perp-to-Spot Arbitrage (Basis Trading)

When a perpetual futures contract trades at a premium or discount to spot, an agent can long the cheaper instrument and short the more expensive one, capturing the basis as it converges. Purple Flea Trading supports 275 perp markets through Hyperliquid, providing the perp leg of such trades.

3. DEX AMM vs CEX Arbitrage

AMMs like Uniswap V3 or Curve price assets using a constant product or stableswap formula. When CEX prices move faster than on-chain liquidity providers can react, a profitable swap exists on the DEX relative to CEX prices. Agents can execute the DEX swap and hedge on CEX simultaneously.

4. Triangular Arbitrage (On-Chain)

Within a single DEX ecosystem, three token pairs can create a circular profit opportunity: A→B→C→A, where the product of exchange rates exceeds 1.0 minus fees. These opportunities are typically captured by MEV bots within the same block, making them more relevant for agents with mempool access.

5. Funding Rate Arbitrage

When funding rates on CEX and DEX perpetuals diverge, agents can long the instrument with negative funding and short the instrument with positive funding, collecting the spread. This is market-neutral if correlation remains high.

Detection Algorithms

Arbitrage detection is fundamentally a comparison problem: continuously compare prices across venues and flag when the spread exceeds the cost of execution. However, the implementation details matter enormously.

WebSocket Price Aggregator

The foundation is maintaining real-time price feeds from all target venues. REST polling is too slow — at best 200-500ms per poll cycle, which misses most opportunities. WebSocket subscriptions are necessary.

# Price aggregator using async WebSockets
import asyncio
import json
import time
import websockets
from dataclasses import dataclass, field
from typing import Dict, Optional
import httpx

@dataclass
class PriceQuote:
    venue: str
    symbol: str
    bid: float
    ask: float
    mid: float
    timestamp_ms: int
    bid_size: float = 0.0
    ask_size: float = 0.0

@dataclass
class ArbSignal:
    buy_venue: str
    sell_venue: str
    symbol: str
    buy_price: float      # best ask on buy side
    sell_price: float     # best bid on sell side
    raw_spread_pct: float
    estimated_cost_pct: float
    net_spread_pct: float
    detected_at_ms: int
    max_size: float

class CrossExchangeDetector:
    def __init__(self, symbols: list[str], min_net_spread_pct: float = 0.05):
        self.symbols = symbols
        self.min_net_spread_pct = min_net_spread_pct
        self.prices: Dict[str, Dict[str, PriceQuote]] = {}
        # venue -> symbol -> quote
        self.callbacks = []

    def update_price(self, quote: PriceQuote):
        venue = quote.venue
        sym = quote.symbol
        if venue not in self.prices:
            self.prices[venue] = {}
        self.prices[venue][sym] = quote
        self._check_arb(sym)

    def _estimate_cost(self, buy_venue: str, sell_venue: str,
                          symbol: str, size: float) -> float:
        """Estimate total round-trip cost as % of trade size."""
        FEE_TABLE = {
            'binance': 0.10,    # 0.10% taker
            'okx': 0.08,
            'bybit': 0.10,
            'purpleflea': 0.035, # Hyperliquid taker
            'hyperliquid': 0.035,
            'uniswap_v3': 0.30,  # 0.30% default pool
            'curve': 0.04,
        }
        buy_fee = FEE_TABLE.get(buy_venue, 0.10)
        sell_fee = FEE_TABLE.get(sell_venue, 0.10)
        slippage_est = self._estimate_slippage(buy_venue, symbol, size)
        gas_pct = self._estimate_gas_pct(buy_venue, sell_venue, size)
        transfer_cost = 0.01  # bridge/transfer cost if cross-chain
        return buy_fee + sell_fee + slippage_est + gas_pct + transfer_cost

    def _estimate_slippage(self, venue: str, symbol: str, size: float) -> float:
        """Simple slippage model: 0.01% per $100K of size on liquid venues."""
        liquidity_factor = {
            'binance': 100_000,
            'hyperliquid': 50_000,
            'purpleflea': 50_000,
            'uniswap_v3': 20_000,
        }.get(venue, 30_000)
        return (size / liquidity_factor) * 0.01

    def _estimate_gas_pct(self, buy_venue: str, sell_venue: str, size: float) -> float:
        if 'uniswap' in buy_venue or 'uniswap' in sell_venue:
            gas_usd = 8.0  # L2 gas cost in USD
            return (gas_usd / size) * 100
        return 0.0

    def _check_arb(self, symbol: str):
        venues_with_quote = [
            v for v in self.prices if symbol in self.prices[v]
        ]
        if len(venues_with_quote) < 2:
            return

        now_ms = int(time.time() * 1000)
        best_signal: Optional[ArbSignal] = None

        for buy_v in venues_with_quote:
            for sell_v in venues_with_quote:
                if buy_v == sell_v:
                    continue
                bq = self.prices[buy_v][symbol]
                sq = self.prices[sell_v][symbol]

                # Staleness check: reject quotes older than 2s
                if now_ms - bq.timestamp_ms > 2000 or now_ms - sq.timestamp_ms > 2000:
                    continue

                raw_spread_pct = (sq.bid - bq.ask) / bq.ask * 100
                if raw_spread_pct <= 0:
                    continue

                max_size = min(bq.ask_size * bq.ask, sq.bid_size * sq.bid)
                max_size = min(max_size, 50_000)  # cap per trade

                cost_pct = self._estimate_cost(buy_v, sell_v, symbol, max_size)
                net = raw_spread_pct - cost_pct

                if net >= self.min_net_spread_pct:
                    sig = ArbSignal(
                        buy_venue=buy_v, sell_venue=sell_v, symbol=symbol,
                        buy_price=bq.ask, sell_price=sq.bid,
                        raw_spread_pct=raw_spread_pct,
                        estimated_cost_pct=cost_pct,
                        net_spread_pct=net,
                        detected_at_ms=now_ms,
                        max_size=max_size
                    )
                    if best_signal is None or sig.net_spread_pct > best_signal.net_spread_pct:
                        best_signal = sig

        if best_signal:
            for cb in self.callbacks:
                cb(best_signal)

Execution Latency: The Real Bottleneck

Detection is only half the problem. An agent that detects an opportunity in 10ms but takes 800ms to execute will find the spread gone by the time orders fill. Latency budgets must be tracked precisely at every stage.

Warning: Most developers underestimate execution latency. Network round-trips, order validation, rate limits, and signing overhead compound. Always measure p99 latency, not p50.

Latency Budget Breakdown

Stage Typical Latency Optimization
WebSocket price receipt 1-5ms Colocate with exchange data center
Opportunity detection 0.1-1ms Keep in-memory, avoid I/O
Order signing (CEX) 0.5-2ms Pre-compute HMAC template
HTTP order submission 20-80ms HTTP/2, persistent connections
Order acknowledgment 5-50ms Exchange-dependent
On-chain tx submission 50-200ms Pre-sign, fast RPC node
Block confirmation 1000-12000ms Use L2s (Arbitrum: ~250ms)

The key insight from this table: for CEX-to-CEX arb, the window is typically 50-200ms total. For CEX-to-DEX arb involving on-chain settlement, the effective window must remain open for the full block confirmation time — meaning the opportunity must persist for 250ms (Arbitrum) to 12s (Ethereum mainnet).

Latency Measurement in Python

import time
import asyncio
import httpx
from contextlib import asynccontextmanager

class LatencyTracker:
    def __init__(self):
        self.samples: dict[str, list[float]] = {}

    @asynccontextmanager
    async def measure(self, label: str):
        start = time.perf_counter_ns()
        try:
            yield
        finally:
            elapsed_ms = (time.perf_counter_ns() - start) / 1_000_000
            self.samples.setdefault(label, []).append(elapsed_ms)

    def report(self) -> dict:
        import statistics
        result = {}
        for label, samples in self.samples.items():
            if not samples:
                continue
            result[label] = {
                'p50': statistics.median(samples),
                'p99': sorted(samples)[int(len(samples) * 0.99)],
                'mean': statistics.mean(samples),
                'count': len(samples),
            }
        return result

tracker = LatencyTracker()

async def submit_order_with_tracking(client: httpx.AsyncClient,
                                    venue: str, order: dict) -> dict:
    async with tracker.measure(f'{venue}_order_submit'):
        resp = await client.post(
            f'https://api.{venue}.com/v1/order',
            json=order,
            timeout=5.0
        )
    return resp.json()

Slippage Modeling

Slippage is the difference between the expected price and the actual fill price. For arbitrage agents, slippage on one leg can eliminate profits across both legs. Accurate slippage modeling is non-negotiable.

Order Book Depth Model

The most accurate slippage model uses real order book depth. For a buy order of size Q in a market with order book [(p1, s1), (p2, s2), ...] ordered by ascending ask price:

VWAP_fill = Σ(pᵢ × min(sᵢ, remaining)) / Q
Slippage = VWAP_fill - best_ask
from typing import List, Tuple

def compute_slippage(
    order_book: List[Tuple[float, float]],  # (price, size) sorted by price
    trade_size_usd: float,
    side: str  # 'buy' or 'sell'
) -> Tuple[float, float]:
    """
    Returns (vwap_fill_price, slippage_pct).
    order_book is asks for 'buy', bids for 'sell'.
    """
    if not order_book:
        raise ValueError("Empty order book")

    best_price = order_book[0][0]
    remaining_usd = trade_size_usd
    total_cost = 0.0
    total_units = 0.0

    for price, size_units in order_book:
        level_usd = price * size_units
        fill_usd = min(level_usd, remaining_usd)
        fill_units = fill_usd / price

        total_cost += fill_usd
        total_units += fill_units
        remaining_usd -= fill_usd

        if remaining_usd <= 0:
            break

    if remaining_usd > 0:
        # Insufficient liquidity
        return (float('inf'), float('inf'))

    vwap = total_cost / total_units

    if side == 'buy':
        slippage_pct = (vwap - best_price) / best_price * 100
    else:
        slippage_pct = (best_price - vwap) / best_price * 100

    return vwap, slippage_pct


# AMM Slippage Model (Uniswap v2 / constant product)
def amm_slippage(
    reserve_in: float,
    reserve_out: float,
    amount_in: float,
    fee_pct: float = 0.30
) -> Tuple[float, float]:
    """
    Returns (amount_out, price_impact_pct) for a constant-product AMM.
    All values denominated in same unit.
    """
    fee_mult = 1 - (fee_pct / 100)
    amount_in_with_fee = amount_in * fee_mult
    amount_out = (reserve_out * amount_in_with_fee) / (reserve_in + amount_in_with_fee)
    mid_price = reserve_out / reserve_in
    expected_out = amount_in * mid_price * fee_mult
    price_impact_pct = (expected_out - amount_out) / expected_out * 100
    return amount_out, price_impact_pct
Rule of Thumb: On a liquid CEX (Binance BTC/USDT), $50K buy orders typically incur less than 0.02% slippage. On Uniswap V3 concentrated liquidity, the same order might incur 0.05-0.15% depending on liquidity distribution around the current tick.

Profitability Calculations

Net profitability of a cross-exchange arb trade requires accounting for all costs in the correct order. Missing any component leads to overestimating expected returns.

Net P&L = (Sell Price × Size) - (Buy Price × Size) - Fees - Slippage - Gas - Transfer Costs - Financing

Full P&L Model

from dataclasses import dataclass

@dataclass
class ArbPnL:
    gross_revenue: float
    buy_fees: float
    sell_fees: float
    buy_slippage: float
    sell_slippage: float
    gas_cost: float
    transfer_cost: float
    financing_cost: float  # cost of capital during execution
    net_pnl: float
    net_pnl_pct: float
    breakeven_spread_pct: float

def calculate_arb_pnl(
    buy_price: float,
    sell_price: float,
    size_usd: float,
    buy_fee_pct: float,
    sell_fee_pct: float,
    buy_slippage_pct: float,
    sell_slippage_pct: float,
    gas_usd: float = 0.0,
    transfer_cost_usd: float = 0.0,
    execution_time_hours: float = 0.001,  # ~3.6 seconds
    annual_cost_of_capital_pct: float = 5.0
) -> ArbPnL:
    units = size_usd / buy_price

    # Adjust prices for slippage
    eff_buy_price = buy_price * (1 + buy_slippage_pct / 100)
    eff_sell_price = sell_price * (1 - sell_slippage_pct / 100)

    gross = (eff_sell_price - eff_buy_price) * units
    buy_fees = size_usd * (buy_fee_pct / 100)
    sell_fees = size_usd * (sell_fee_pct / 100)
    buy_slip = size_usd * (buy_slippage_pct / 100)
    sell_slip = size_usd * (sell_slippage_pct / 100)

    # Financing: cost of holding capital during execution
    financing = size_usd * (annual_cost_of_capital_pct / 100) * (execution_time_hours / 8760)

    net = gross - buy_fees - sell_fees - gas_usd - transfer_cost_usd - financing

    breakeven_spread = (buy_fees + sell_fees + gas_usd + transfer_cost_usd) / size_usd * 100

    return ArbPnL(
        gross_revenue=gross,
        buy_fees=buy_fees,
        sell_fees=sell_fees,
        buy_slippage=buy_slip,
        sell_slippage=sell_slip,
        gas_cost=gas_usd,
        transfer_cost=transfer_cost_usd,
        financing_cost=financing,
        net_pnl=net,
        net_pnl_pct=net / size_usd * 100,
        breakeven_spread_pct=breakeven_spread
    )


# Example: $20K BTC arb between Binance and Purple Flea Trading
result = calculate_arb_pnl(
    buy_price=95_000.0,
    sell_price=95_450.0,  # 0.47% spread
    size_usd=20_000.0,
    buy_fee_pct=0.10,
    sell_fee_pct=0.035,   # PF Hyperliquid taker rate
    buy_slippage_pct=0.02,
    sell_slippage_pct=0.015,
    gas_usd=0.0,          # PF perp: off-chain settlement
    transfer_cost_usd=2.0  # minimal cross-account transfer
)
print(f"Net P&L: ${result.net_pnl:.2f} ({result.net_pnl_pct:.3f}%)")
# Output: Net P&L: $56.30 (0.282%)

Purple Flea API Integration

Purple Flea provides two APIs that are essential for multi-chain arbitrage: the Trading API (275 perp markets, Hyperliquid-backed) and the Wallet API (multi-chain custody and transfer). Together, they enable a full arb workflow.

Step 1: Register and Get API Keys

import httpx
import asyncio

PF_BASE = "https://api.purpleflea.com"

async def register_agent(agent_name: str) -> dict:
    async with httpx.AsyncClient() as client:
        resp = await client.post(
            f"{PF_BASE}/v1/register",
            json={"agent_name": agent_name, "type": "trading"}
        )
        data = resp.json()
        return {
            "agent_id": data["agent_id"],
            "api_key": data["api_key"],
            "wallet_address": data["wallet_address"]
        }

async def get_pf_price(client: httpx.AsyncClient,
                       api_key: str, symbol: str) -> dict:
    """Get best bid/ask from Purple Flea Trading."""
    resp = await client.get(
        f"{PF_BASE}/v1/trading/orderbook",
        params={"symbol": symbol, "depth": 10},
        headers={"X-API-Key": api_key}
    )
    ob = resp.json()
    return {
        "bid": ob["bids"][0][0],
        "ask": ob["asks"][0][0],
        "bid_size": ob["bids"][0][1],
        "ask_size": ob["asks"][0][1],
    }

async def place_pf_order(client: httpx.AsyncClient,
                         api_key: str,
                         symbol: str,
                         side: str,         # 'buy' or 'sell'
                         size_usd: float,
                         order_type: str = "market",
                         limit_price: float = None) -> dict:
    payload = {
        "symbol": symbol,
        "side": side,
        "size_usd": size_usd,
        "type": order_type,
    }
    if limit_price:
        payload["limit_price"] = limit_price

    resp = await client.post(
        f"{PF_BASE}/v1/trading/order",
        json=payload,
        headers={"X-API-Key": api_key}
    )
    return resp.json()

Step 2: Multi-Chain Wallet for Asset Movement

async def get_wallet_balances(client: httpx.AsyncClient, api_key: str) -> dict:
    """Get balances across all chains in the Purple Flea wallet."""
    resp = await client.get(
        f"{PF_BASE}/v1/wallet/balances",
        headers={"X-API-Key": api_key}
    )
    return resp.json()  # {chain: {token: amount}}

async def initiate_transfer(
    client: httpx.AsyncClient,
    api_key: str,
    from_chain: str,
    to_chain: str,
    token: str,
    amount: float,
    destination_address: str
) -> dict:
    """Bridge assets between chains for multi-chain arb legs."""
    resp = await client.post(
        f"{PF_BASE}/v1/wallet/transfer",
        json={
            "from_chain": from_chain,
            "to_chain": to_chain,
            "token": token,
            "amount": amount,
            "destination": destination_address
        },
        headers={"X-API-Key": api_key}
    )
    return resp.json()

Step 3: Full Arb Execution Loop

import asyncio
import logging

log = logging.getLogger("arb_agent")

class ArbExecutionAgent:
    def __init__(self, api_key: str, symbols: list[str]):
        self.api_key = api_key
        self.symbols = symbols
        self.detector = CrossExchangeDetector(symbols, min_net_spread_pct=0.05)
        self.detector.callbacks.append(self.on_signal)
        self.active_trades: dict = {}
        self.client = httpx.AsyncClient(timeout=5.0)
        self.pnl_total = 0.0
        self.trade_count = 0

    async def on_signal(self, signal: ArbSignal):
        # Deduplicate: skip if active trade on same symbol
        if signal.symbol in self.active_trades:
            return

        # Size check
        size = min(signal.max_size, 10_000)
        if size < 500:
            return

        self.active_trades[signal.symbol] = signal

        try:
            # Execute both legs simultaneously
            buy_task = asyncio.create_task(
                self._execute_leg(signal.buy_venue, 'buy',
                                  signal.symbol, size, signal.buy_price)
            )
            sell_task = asyncio.create_task(
                self._execute_leg(signal.sell_venue, 'sell',
                                  signal.symbol, size, signal.sell_price)
            )
            results = await asyncio.gather(buy_task, sell_task,
                                              return_exceptions=True)

            for r in results:
                if isinstance(r, Exception):
                    log.error(f"Leg failed: {r}")
                    # Trigger risk management / unwinding
                    await self._emergency_unwind(signal)
                    return

            buy_result, sell_result = results
            estimated_pnl = size * signal.net_spread_pct / 100
            self.pnl_total += estimated_pnl
            self.trade_count += 1
            log.info(f"Arb complete: {signal.symbol} "
                     f"buy@{signal.buy_venue} sell@{signal.sell_venue} "
                     f"net={signal.net_spread_pct:.3f}% "
                     f"est_pnl=${estimated_pnl:.2f} "
                     f"cumulative=${self.pnl_total:.2f}")

        finally:
            self.active_trades.pop(signal.symbol, None)

    async def _execute_leg(self, venue: str, side: str,
                            symbol: str, size: float, ref_price: float) -> dict:
        if venue == 'purpleflea':
            return await place_pf_order(
                self.client, self.api_key, symbol, side, size
            )
        # Add other venue handlers here
        raise NotImplementedError(f"Venue {venue} not implemented")

    async def _emergency_unwind(self, signal: ArbSignal):
        """Attempt to flatten position if one leg fails."""
        log.warning(f"Emergency unwind triggered for {signal.symbol}")
        # Submit market orders to close whatever was opened
        pass

Risk Controls

Arbitrage appears low-risk on paper — you're simultaneously long and short — but execution risk is real and can be severe. Every arbitrage agent needs a comprehensive risk framework.

Leg Execution Risk

If one leg fills and the other fails (rejection, timeout, rate limit), the agent is left with a naked directional position. This is the most common failure mode. Mitigations:

  • Use atomic execution where possible (flash loans, same-block DEX swaps)
  • Maintain emergency close logic that triggers on any partial fill
  • Set aggressive timeouts and treat any timeout as a failure requiring unwind
  • Keep position size small enough that a worst-case unwind is survivable

Spread Decay Risk

The spread can vanish between detection and execution. By the time both orders fill, the spread may have inverted. Solutions:

  • Limit orders with tight price bands (e.g., allow up to 0.02% worse than detected price)
  • Measure historical spread persistence time and set execution deadlines accordingly
  • Cancel and abort if any leg takes longer than the expected spread half-life

Correlation Breakdown Risk

CEX-DEX pairs are normally highly correlated. During major news events, correlations can temporarily break, causing a "convergence" trade to diverge further. Always set maximum loss limits per trade.

Risk Framework: Never risk more than 1% of total capital per arb trade. Set a daily drawdown limit of 3%. If limit is hit, pause all trading for 4 hours and review execution logs.
class RiskManager:
    def __init__(self, total_capital: float,
                 max_position_pct: float = 1.0,
                 daily_loss_limit_pct: float = 3.0):
        self.capital = total_capital
        self.max_pos = total_capital * (max_position_pct / 100)
        self.daily_limit = total_capital * (daily_loss_limit_pct / 100)
        self.daily_loss = 0.0
        self.positions: dict = {}
        self.paused = False

    def check_and_size(self, signal: ArbSignal) -> float:
        """Returns approved size, 0 if trade rejected."""
        if self.paused:
            return 0.0

        if self.daily_loss >= self.daily_limit:
            self.paused = True
            log.warning("Daily loss limit hit — pausing trading")
            return 0.0

        # Minimum spread after costs
        if signal.net_spread_pct < 0.05:
            return 0.0

        approved_size = min(
            signal.max_size,
            self.max_pos,
            self.capital * 0.20  # max 20% deployed at once
        )
        return approved_size

    def record_pnl(self, realized_pnl: float):
        if realized_pnl < 0:
            self.daily_loss += abs(realized_pnl)

Backtesting Framework

Before deploying any arbitrage strategy live, backtest it against historical price data from all target venues. The key challenge is that historical order book depth data is expensive to acquire — most free sources only provide OHLCV candles, which are insufficient for slippage modeling.

What to Backtest

  • Spread frequency distribution: How often does the spread exceed your minimum threshold?
  • Spread persistence: How long does each spread event last? Is 50ms realistic for execution?
  • Slippage realized vs. modeled: Does your slippage model match actual fills from paper trading?
  • Correlation of opportunity to market conditions: Do spreads widen during high volatility? During specific hours?
  • Fill rate: What fraction of limit orders fill within the window?
import pandas as pd
import numpy as np

def backtest_arb_strategy(
    cex_prices: pd.DataFrame,   # columns: timestamp, bid, ask
    dex_prices: pd.DataFrame,   # columns: timestamp, bid, ask
    min_spread_pct: float = 0.10,
    cost_pct: float = 0.20,
    size_usd: float = 10_000,
    max_exposure_s: float = 5.0
) -> dict:
    """Vectorized backtester for CEX-DEX arb."""

    # Align on timestamp
    merged = pd.merge_asof(
        cex_prices.sort_values('timestamp'),
        dex_prices.sort_values('timestamp'),
        on='timestamp',
        suffixes=('_cex', '_dex'),
        tolerance=pd.Timedelta('500ms')
    ).dropna()

    # Compute spreads in both directions
    merged['spread_buy_cex'] = (
        (merged['bid_dex'] - merged['ask_cex']) / merged['ask_cex'] * 100
    )
    merged['spread_buy_dex'] = (
        (merged['bid_cex'] - merged['ask_dex']) / merged['ask_dex'] * 100
    )

    # Net spread after costs
    merged['net_buy_cex'] = merged['spread_buy_cex'] - cost_pct
    merged['net_buy_dex'] = merged['spread_buy_dex'] - cost_pct

    # Opportunities
    opps_cex = merged[merged['net_buy_cex'] >= min_spread_pct]
    opps_dex = merged[merged['net_buy_dex'] >= min_spread_pct]

    total_pnl = (
        (opps_cex['net_buy_cex'] / 100 * size_usd).sum() +
        (opps_dex['net_buy_dex'] / 100 * size_usd).sum()
    )
    n_opportunities = len(opps_cex) + len(opps_dex)

    return {
        'total_opportunities': n_opportunities,
        'estimated_gross_pnl': total_pnl,
        'avg_spread_pct': pd.concat([
            opps_cex['net_buy_cex'], opps_dex['net_buy_dex']
        ]).mean(),
        'opp_rate_per_hour': n_opportunities / (len(merged) / 3600),
        'max_spread_pct': max(
            merged['net_buy_cex'].max(),
            merged['net_buy_dex'].max()
        ),
    }

Market Conditions and Timing

CEX-DEX spreads are not uniformly distributed across time. Understanding when opportunities are most likely improves capital allocation.

High-Spread Periods

  • Major news events: Federal Reserve decisions, CPI prints, major geopolitical events cause rapid price moves with order book imbalances across venues
  • Listing events: New token listings on major CEXs often create temporary mispricing on DEXs
  • Large liquidations: Cascading liquidations on perp markets create rapid CEX price moves that AMMs are slow to follow
  • Low liquidity periods: Asian session weekends often have thinner books and wider spreads

Low-Spread Periods

  • High-frequency trading competition is most intense during US trading hours
  • Stable markets with low volatility compress all spreads toward zero
  • Post-major-event cooldowns as arbitrageurs compete away all inefficiencies
Strategy Tip: Schedule your arbitrage agent to be most aggressive during news event windows and market open/close periods. Use Purple Flea Trading's 275 markets to find less-trafficked perp pairs where competition is lower and spreads persist longer.

Getting Started on Purple Flea

Purple Flea provides everything an arbitrage agent needs in one platform:

  • Trading API: 275 perpetual markets via Hyperliquid, 0.035% taker, full order book access
  • Wallet API: Multi-chain custody, fast transfers, balance monitoring across chains
  • Faucet: New agents can claim free USDC to get started risk-free at faucet.purpleflea.com
  • Escrow: For agent-to-agent settlement of complex multi-leg arb trades
Free Start: New agents can claim free USDC from the Purple Flea Faucet to paper-trade arbitrage strategies before committing real capital. Register at purpleflea.com/register.

Cross-exchange arbitrage is not a passive income strategy — it requires continuous monitoring, rapid execution, and disciplined risk management. But for AI agents with the right infrastructure, the mechanical nature of the strategy makes it particularly well-suited to automation. Start with small sizes, measure everything, and scale only what the backtest and live data confirm.