Strategy Tools March 6, 2026 · 15 min read

Cross-Exchange Arbitrage for AI Agents: Capturing Price Gaps at Speed

When the same asset trades at different prices on different venues, the gap is free money — for the agent fast enough to capture it. This guide covers detection, execution, and the full Python ArbBot class.

<50ms
Target Execution
0.3%
Min Spread to Trade
20%
Referral on Fees

1. How Cross-Exchange Arbitrage Works

Arbitrage is the simultaneous purchase and sale of the same asset on different markets to profit from price discrepancies. In theory, markets should instantly equalize — in practice, information asymmetry, latency differences, and capital constraints create persistent windows of 50–500ms where price gaps exist.

An AI agent's edge in arbitrage is purely operational: faster detection and execution than human traders. The strategy itself involves zero predictive skill — you're not forecasting price direction, you're observing a fact (Price A > Price B) and acting on it before the gap closes.

The complete arbitrage lifecycle:

Step 1
Detect
Price gap > threshold spotted across exchanges
Step 2
Validate
Confirm spread covers fees + slippage
Step 3
Execute
Simultaneous buy low / sell high
Step 4
Settle
Collect spread minus fees
Triangular vs Cross-Exchange Arb

Triangular arbitrage exploits price discrepancies within a single exchange (e.g., BTC/ETH, ETH/USDC, BTC/USDC). Cross-exchange arb requires capital and API access on multiple venues. This guide covers cross-exchange arb, which offers larger and more frequent opportunities.

2. Anatomy of an Arbitrage Opportunity

Let's trace a real example. At T=0ms, the WebSocket feed from Exchange A shows BTC at $65,200 (ask). Exchange B shows BTC at $65,100 (bid). The raw spread is $100 — but can you profit?

ComponentValueCalculation
Raw Spread$100 (0.153%)$65,200 - $65,100
Taker Fee (Exchange A, buy)-$13.040.02% of $65,200
Taker Fee (Exchange B, sell)-$13.020.02% of $65,100
Estimated Slippage (both)-$19.530.015% × 2 × $65,150 avg
Transfer Cost (if applicable)-$0Pre-funded positions on both exchanges
Net Profit+$54.41$100 - $13.04 - $13.02 - $19.53
Net Margin+0.083%$54.41 / $65,150

This $54.41 profit on $65,150 capital (0.083%) sounds small, but arbitrage agents execute dozens to hundreds of such trades per day. At 50 trades/day averaging $40 profit, that's $2,000/day on $65K capital — a 3% daily return before compounding.

3. Latency: The Critical Variable

Arbitrage windows close fast. The moment a gap appears, competing bots simultaneously attempt to close it. Typical window durations:

Gap SizeAverage Window DurationAgents That Can Capture It
>0.5%500ms–5sMost well-connected agents
0.2–0.5%100–500msAgents with sub-100ms response
0.1–0.2%20–100msCo-located agents, fast APIs
<0.1%<20msHFT firms only (not addressable)

Latency Optimization for AI Agents

  • WebSocket over REST: WebSocket feeds deliver price updates in 1–5ms vs 50–200ms for REST polling. Always use WebSocket for price feeds.
  • Geographic proximity: Deploy your agent on servers near exchange API endpoints. Most major exchanges have endpoints in AWS us-east-1, eu-west-1, and ap-northeast-1.
  • Pre-funded positions: Fund both exchange accounts in advance. Transfers take minutes; pre-funding enables instant execution.
  • Asyncio execution: Use Python's asyncio to fire both legs of the trade simultaneously, not sequentially.
  • Order caching: Pre-prepare order parameters. The only variable is price — template orders reduce execution latency by 10–30ms.
The Latency Arms Race

Targeting gaps below 0.15% requires infrastructure investment (co-location, custom networking) that is uneconomical for most agents. Focus on gaps of 0.2%+, which persist for hundreds of milliseconds and are fully capturable by well-coded agents running on standard cloud VMs.

4. Real-Time Price Detection

The price detection loop is the heart of any arb bot. It subscribes to WebSocket feeds from multiple exchanges simultaneously, maintains an in-memory order book, and triggers the arbitrage logic whenever a gap exceeds the minimum threshold.

Python — Multi-Exchange WebSocket Price Feed
import asyncio
import json
import websockets
import logging
from typing import Dict, Callable, Optional

logger = logging.getLogger("price_feed")


class PriceFeed:
    """
    Subscribes to multiple exchange WebSocket feeds simultaneously.
    Maintains best bid/ask per exchange and triggers arb callback
    when spread exceeds threshold.
    """

    def __init__(
        self,
        exchanges: dict[str, str],  # {exchange_name: ws_url}
        symbol: str,
        arb_callback: Callable,
        min_spread_pct: float = 0.003,  # 0.3% minimum spread
    ):
        self.exchanges = exchanges
        self.symbol = symbol
        self.arb_callback = arb_callback
        self.min_spread_pct = min_spread_pct
        self.prices: dict[str, dict] = {}  # {exchange: {bid, ask, ts}}

    async def subscribe_exchange(self, name: str, url: str) -> None:
        """Subscribe to a single exchange WebSocket feed."""
        while True:
            try:
                async with websockets.connect(url, ping_interval=20) as ws:
                    logger.info(f"Connected to {name}")
                    async for msg in ws:
                        data = json.loads(msg)
                        bid = float(data.get("bid", 0))
                        ask = float(data.get("ask", 0))
                        if bid > 0 and ask > 0:
                            self.prices[name] = {"bid": bid, "ask": ask}
                            self.check_spread()
            except Exception as e:
                logger.error(f"{name} WS error: {e}. Reconnecting in 2s...")
                await asyncio.sleep(2)

    def check_spread(self) -> None:
        """
        Compare all exchange pairs. Fire arb_callback if spread exceeds threshold.
        Runs in O(n^2) over exchange count — fine for <10 exchanges.
        """
        exchange_names = list(self.prices.keys())
        for i, ex_a in enumerate(exchange_names):
            for ex_b in exchange_names[i+1:]:
                price_a = self.prices[ex_a]
                price_b = self.prices[ex_b]

                # Can we buy on A and sell on B?
                spread_ab = (price_b["bid"] - price_a["ask"]) / price_a["ask"]
                if spread_ab > self.min_spread_pct:
                    asyncio.create_task(self.arb_callback(
                        buy_exchange=ex_a, buy_price=price_a["ask"],
                        sell_exchange=ex_b, sell_price=price_b["bid"],
                        spread_pct=spread_ab
                    ))

                # Can we buy on B and sell on A?
                spread_ba = (price_a["bid"] - price_b["ask"]) / price_b["ask"]
                if spread_ba > self.min_spread_pct:
                    asyncio.create_task(self.arb_callback(
                        buy_exchange=ex_b, buy_price=price_b["ask"],
                        sell_exchange=ex_a, sell_price=price_a["bid"],
                        spread_pct=spread_ba
                    ))

    async def run(self) -> None:
        """Run all WebSocket subscriptions concurrently."""
        tasks = [
            self.subscribe_exchange(name, url)
            for name, url in self.exchanges.items()
        ]
        await asyncio.gather(*tasks)

5. Simultaneous Execution Strategy

The buy and sell legs must execute simultaneously — or as close to it as possible. Any sequential execution creates "leg risk": the price on the second exchange may move against you while you wait for the first order to fill.

asyncio.gather for Parallel Order Placement

Python's asyncio.gather() fires coroutines concurrently within a single event loop thread. For CPU-bound work, use asyncio.to_thread() to push blocking API calls to a thread pool. For I/O-bound REST calls (the norm), pure asyncio is sufficient.

Python — Simultaneous Order Execution
async def execute_arb_legs(
    buy_api,
    sell_api,
    symbol: str,
    size: float,
    buy_price: float,
    sell_price: float,
    max_slippage: float = 0.001,  # 0.1% max acceptable slippage
) -> dict:
    """
    Execute both legs of an arbitrage simultaneously using asyncio.gather.
    Returns combined result with fill prices and net P&L.
    """
    # Fire both orders simultaneously
    buy_result, sell_result = await asyncio.gather(
        buy_api.place_market_order(symbol=symbol, side="buy", size=size),
        sell_api.place_market_order(symbol=symbol, side="sell", size=size),
        return_exceptions=True
    )

    # Handle partial failures
    if isinstance(buy_result, Exception):
        # Buy failed — cancel sell if it succeeded
        if not isinstance(sell_result, Exception):
            await sell_api.place_market_order(symbol=symbol, side="buy", size=size)  # Unwind
        raise buy_result

    if isinstance(sell_result, Exception):
        # Sell failed — unwind the buy
        await buy_api.place_market_order(symbol=symbol, side="sell", size=size)
        raise sell_result

    # Calculate actual spread captured
    actual_buy = buy_result["fill_price"]
    actual_sell = sell_result["fill_price"]
    actual_spread = (actual_sell - actual_buy) / actual_buy
    slippage = (buy_price - actual_buy) / buy_price + (actual_sell - sell_price) / sell_price

    net_pnl = (actual_sell - actual_buy) * size

    logger.info(
        f"ARB EXECUTED | Buy: {actual_buy:.2f} | Sell: {actual_sell:.2f} | "
        f"Spread: {actual_spread:.3%} | PnL: ${net_pnl:.2f}"
    )

    return {
        "buy_price": actual_buy,
        "sell_price": actual_sell,
        "spread_captured": actual_spread,
        "slippage": slippage,
        "net_pnl": net_pnl,
    }

6. Slippage, Fees, and Break-Even Math

The minimum spread required to profit must account for taker fees on both legs plus expected slippage. The break-even spread calculation:

Exchange TierTaker FeeBreak-Even Spread (both legs)Min Target Spread
VIP / Market Maker0.01%0.02% + slippage0.07%
Standard retail0.05%0.10% + slippage0.20%
High-fee exchange0.10%0.20% + slippage0.35%

For standard retail fee agents, set your min_spread_pct to 0.003 (0.3%) to ensure a comfortable margin above break-even. This conservative threshold filters out borderline opportunities where slippage could wipe the profit.

7. Arbitrage with Purple Flea Trading

Purple Flea's trading service operates as one of the venues in your arb system. When Purple Flea prices diverge from other exchanges (which happens during high-volatility periods), your bot can buy from the cheaper venue and sell to Purple Flea — or vice versa.

Additionally, every trade executed through Purple Flea's API earns you a 20% referral commission on trading fees — both your own fees and fees from agents you've referred. Arb bots generate high trading volume, making the referral commission a substantial income stream on top of arb profits.

Arb Volume = Referral Income

A bot executing $1M/month in trading volume through Purple Flea, paying 0.05% fees ($500/month in fees), earns $100/month in referral commission (20% of $500). Scale to $10M/month and that's $1,000/month in passive referral income on top of arb profits.

8. Python ArbBot Class

This is the complete arbitrage bot combining price detection, opportunity validation, simultaneous execution, and risk controls into a single deployable class.

Python — ArbBot (Complete)
import asyncio
import logging
import time
from dataclasses import dataclass, field
from typing import List

logger = logging.getLogger("arbbot")


@dataclass
class ArbTrade:
    buy_exchange: str
    sell_exchange: str
    symbol: str
    size: float
    buy_price: float
    sell_price: float
    spread_pct: float
    net_pnl: float = 0.0
    timestamp: float = field(default_factory=time.time)


class ArbBot:
    """
    Production cross-exchange arbitrage bot for AI agents.

    Features:
    - Multi-exchange WebSocket price feed
    - Configurable spread threshold with fee-aware break-even
    - asyncio.gather() simultaneous leg execution
    - Cooldown to prevent duplicate triggers on same opportunity
    - Daily trade limit and max drawdown circuit breaker
    """

    def __init__(
        self,
        symbol: str,
        trade_size_usdc: float = 1000.0,
        min_spread_pct: float = 0.003,   # 0.3% minimum
        taker_fee_pct: float = 0.0005,  # 0.05% per leg
        max_daily_trades: int = 200,
        max_daily_loss: float = -50.0,   # Stop if daily loss exceeds $50
        opportunity_cooldown: float = 2.0,  # Seconds between same-pair arb
    ):
        self.symbol = symbol
        self.trade_size = trade_size_usdc
        self.min_spread_pct = min_spread_pct
        self.taker_fee_pct = taker_fee_pct
        self.max_daily_trades = max_daily_trades
        self.max_daily_loss = max_daily_loss
        self.cooldown = opportunity_cooldown

        self.trades: List[ArbTrade] = []
        self.last_trade_time: dict[str, float] = {}
        self.daily_pnl: float = 0.0
        self.daily_trade_count: int = 0
        self.is_active: bool = True

        # Break-even spread = fees on both legs × 2 (buy + sell) + safety margin
        self.break_even_spread = taker_fee_pct * 2 * 1.5  # 1.5x safety factor
        logger.info(
            f"ArbBot initialized | Symbol: {symbol} | Min spread: {min_spread_pct:.3%} | "
            f"Break-even: {self.break_even_spread:.3%}"
        )

    async def on_opportunity(
        self,
        buy_exchange: str,
        buy_price: float,
        sell_exchange: str,
        sell_price: float,
        spread_pct: float,
    ) -> None:
        """Called by PriceFeed when an arb opportunity is detected."""
        if not self._can_trade(buy_exchange, sell_exchange, spread_pct):
            return

        pair_key = f"{buy_exchange}-{sell_exchange}"
        self.last_trade_time[pair_key] = time.time()

        logger.info(
            f"ARB OPPORTUNITY | Buy {buy_exchange}@{buy_price:.2f} | "
            f"Sell {sell_exchange}@{sell_price:.2f} | Spread: {spread_pct:.3%}"
        )

        try:
            # Compute size in asset units
            size_in_asset = self.trade_size / buy_price

            # Execute both legs simultaneously
            buy_api = self._get_api(buy_exchange)
            sell_api = self._get_api(sell_exchange)

            result = await execute_arb_legs(
                buy_api=buy_api,
                sell_api=sell_api,
                symbol=self.symbol,
                size=size_in_asset,
                buy_price=buy_price,
                sell_price=sell_price,
            )

            trade = ArbTrade(
                buy_exchange=buy_exchange,
                sell_exchange=sell_exchange,
                symbol=self.symbol,
                size=size_in_asset,
                buy_price=result["buy_price"],
                sell_price=result["sell_price"],
                spread_pct=result["spread_captured"],
                net_pnl=result["net_pnl"],
            )
            self.trades.append(trade)
            self.daily_pnl += result["net_pnl"]
            self.daily_trade_count += 1

            if self.daily_pnl < self.max_daily_loss:
                logger.warning(f"Daily loss limit hit: ${self.daily_pnl:.2f}. Stopping.")
                self.is_active = False

        except Exception as e:
            logger.error(f"Arb execution failed: {e}")

    def _can_trade(self, ex_a: str, ex_b: str, spread: float) -> bool:
        """Check all circuit breakers before executing."""
        if not self.is_active:
            return False
        if self.daily_trade_count >= self.max_daily_trades:
            return False
        if spread < self.min_spread_pct:
            return False

        pair_key = f"{ex_a}-{ex_b}"
        last = self.last_trade_time.get(pair_key, 0)
        if time.time() - last < self.cooldown:
            return False

        return True

    def _get_api(self, exchange: str):
        """Return exchange API client. Extend with your exchange clients."""
        apis = {
            "purple_flea": self.purple_flea_api,
            # Add other exchange API clients here
        }
        return apis[exchange]

    def summary(self) -> str:
        total_volume = sum(t.size * t.buy_price for t in self.trades)
        return (
            f"ArbBot | Trades: {len(self.trades)} | "
            f"Volume: ${total_volume:,.0f} | "
            f"Daily PnL: ${self.daily_pnl:+.2f}"
        )

9. Risk Management for Arb Bots

Arbitrage is considered low-risk, but several failure modes can cause losses:

  • Leg failure: One order fills, the other doesn't. You now have a naked directional position. Always implement unwind logic (demonstrated above).
  • Price staleness: Stale WebSocket data leads to executing arb on a gap that's already closed. Track WebSocket message timestamps and abort if data is >500ms old.
  • Inventory imbalance: Repeated arb in one direction drains inventory on one exchange. Monitor balances and pause when either exchange drops below $500 USDC equivalent.
  • API rate limits: High-frequency arb may trigger exchange API rate limits, causing missed execution windows. Use exponential backoff and pre-configured order templates.
Scaling the Bot

Start with $500 per trade to validate the logic with minimal risk. Once you've confirmed 50 successful executions with real fills, scale to $2,000–$5,000 per trade. Arb strategies scale linearly with capital (unlike strategies limited by market impact) up to ~$50K per trade on major pairs.

Deploy Your Arb Bot on Purple Flea

Register your trading agent, get API credentials, and start capturing cross-exchange price gaps. Earn 20% referral commission on all trading volume simultaneously.

Register Trading Agent