web3.py Integration Guide

On-Chain AI Agents with web3.py + Purple Flea

Connect blockchain data directly to Purple Flea financial services. Read USDC balances, monitor DEX prices for arbitrage, listen to on-chain events, and execute trades — all from Python.

Claim Free Capital API Documentation
USDC balance + approvals
DEX price monitoring
Escrow event listener
Gas-optimized batching
ETH + Arbitrum + Base
Free faucet onboarding
3
chains supported
<50ms
typical RPC latency
1%
escrow fee
Free
faucet starter capital
01

Reading USDC Balances, Approvals, and Transfer Events

USDC is the primary settlement currency for AI agent operations on Purple Flea. Before placing orders or using escrow, your agent needs to read its on-chain USDC balance, check and manage approvals, and optionally track incoming transfers as a trigger for activity.

python — usdc_reader.py web3.py 7.x
from web3 import Web3
from typing import Optional
import time

# USDC contract addresses
USDC_ADDRESSES = {
    "ethereum":  "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48",
    "arbitrum":  "0xaf88d065e77c8cC2239327C5EDb3A432268e5831",
    "base":      "0x833589fCD6eDb6E08f4c7C32D4f71b54bdA02913",
}

# Minimal ERC-20 ABI (balanceOf, allowance, Transfer event)
ERC20_ABI = [
    {
        "inputs": [{"name": "_owner", "type": "address"}],
        "name": "balanceOf",
        "outputs": [{"name": "balance", "type": "uint256"}],
        "stateMutability": "view",
        "type": "function",
    },
    {
        "inputs": [
            {"name": "_owner", "type": "address"},
            {"name": "_spender", "type": "address"},
        ],
        "name": "allowance",
        "outputs": [{"name": "remaining", "type": "uint256"}],
        "stateMutability": "view",
        "type": "function",
    },
    {
        "anonymous": False,
        "inputs": [
            {"indexed": True, "name": "from", "type": "address"},
            {"indexed": True, "name": "to", "type": "address"},
            {"indexed": False, "name": "value", "type": "uint256"},
        ],
        "name": "Transfer",
        "type": "event",
    },
]

USDC_DECIMALS = 6


class USDCReader:
    """
    Read USDC state on Ethereum, Arbitrum, or Base.
    """

    def __init__(self, provider_url: str, chain: str = "ethereum"):
        self.w3 = Web3(Web3.HTTPProvider(provider_url))
        assert self.w3.is_connected(), "RPC connection failed"
        usdc_addr = USDC_ADDRESSES.get(chain.lower())
        if not usdc_addr:
            raise ValueError(f"Unknown chain: {chain}")
        self.usdc = self.w3.eth.contract(
            address=Web3.to_checksum_address(usdc_addr),
            abi=ERC20_ABI,
        )
        self.chain = chain

    def balance(self, address: str) -> float:
        """Return USDC balance as a human-readable float."""
        raw = self.usdc.functions.balanceOf(
            Web3.to_checksum_address(address)
        ).call()
        return raw / 10 ** USDC_DECIMALS

    def allowance(self, owner: str, spender: str) -> float:
        """Return approved USDC allowance for a spender."""
        raw = self.usdc.functions.allowance(
            Web3.to_checksum_address(owner),
            Web3.to_checksum_address(spender),
        ).call()
        return raw / 10 ** USDC_DECIMALS

    def get_transfer_events(
        self,
        address: str,
        from_block: int,
        to_block: Optional[int] = None,
        direction: str = "received",  # "sent" | "received" | "both"
    ) -> list[dict]:
        """
        Fetch ERC-20 Transfer events for an address.

        Args:
            address: the wallet address to inspect
            from_block: earliest block to search
            to_block: latest block (default: latest)
            direction: which side of transfers to return

        Returns:
            List of transfer dicts with amount, counterparty, tx_hash, block.
        """
        addr = Web3.to_checksum_address(address)
        to_block = to_block or self.w3.eth.block_number

        filters = []
        if direction in ("received", "both"):
            filters.append(
                self.usdc.events.Transfer.get_logs(
                    argument_filters={"to": addr},
                    from_block=from_block,
                    to_block=to_block,
                )
            )
        if direction in ("sent", "both"):
            filters.append(
                self.usdc.events.Transfer.get_logs(
                    argument_filters={"from": addr},
                    from_block=from_block,
                    to_block=to_block,
                )
            )

        result = []
        for logs in filters:
            for log in logs:
                is_received = log["args"]["to"].lower() == addr.lower()
                result.append({
                    "direction": "received" if is_received else "sent",
                    "counterparty": log["args"]["from"] if is_received else log["args"]["to"],
                    "amount_usdc": log["args"]["value"] / 10 ** USDC_DECIMALS,
                    "tx_hash": log["transactionHash"].hex(),
                    "block": log["blockNumber"],
                })

        result.sort(key=lambda x: x["block"], reverse=True)
        return result

    def watch_incoming(self, address: str, callback, poll_interval: int = 12):
        """
        Poll for new USDC transfers to an address.
        Calls callback(transfer_dict) on each new incoming transfer.
        """
        addr = Web3.to_checksum_address(address)
        last_block = self.w3.eth.block_number
        print(f"Watching for USDC transfers to {addr[:10]}... (chain={self.chain})")

        while True:
            current = self.w3.eth.block_number
            if current > last_block:
                try:
                    events = self.usdc.events.Transfer.get_logs(
                        argument_filters={"to": addr},
                        from_block=last_block + 1,
                        to_block=current,
                    )
                    for event in events:
                        amount = event["args"]["value"] / 10 ** USDC_DECIMALS
                        transfer = {
                            "from": event["args"]["from"],
                            "amount_usdc": amount,
                            "tx_hash": event["transactionHash"].hex(),
                            "block": event["blockNumber"],
                        }
                        callback(transfer)
                    last_block = current
                except Exception as e:
                    print(f"Watch error: {e}")
            time.sleep(poll_interval)


# Example usage
if __name__ == "__main__":
    reader = USDCReader(
        provider_url="https://eth-mainnet.g.alchemy.com/v2/YOUR_KEY",
        chain="ethereum"
    )

    my_wallet = "0xYOUR_WALLET_ADDRESS"
    balance = reader.balance(my_wallet)
    print(f"USDC balance: ${balance:,.2f}")

    # Check allowance for Purple Flea escrow spender
    pf_escrow = "0xPURPLE_FLEA_ESCROW_CONTRACT"
    allowed = reader.allowance(my_wallet, pf_escrow)
    print(f"Escrow allowance: ${allowed:,.2f}")

    # Recent incoming transfers
    recent = reader.get_transfer_events(
        my_wallet,
        from_block=reader.w3.eth.block_number - 5000,
        direction="received",
    )
    for t in recent[:5]:
        print(f"Received ${t['amount_usdc']:.2f} from {t['counterparty'][:10]}... (block {t['block']})")
Approval Best Practice

When your agent needs to use USDC with Purple Flea escrow, pre-approve a large allowance (e.g., 2^256-1) rather than approving per-transaction. This saves gas and avoids approval race conditions. Ensure your private key management follows security best practices — never hardcode keys in source files.

02

DEX Price Monitoring for Arbitrage Signals

Decentralized exchange prices deviate from centralized exchange prices constantly. These deviations — arbitrage opportunities — are the raw material for on-chain trading agents. The gap is usually small (0.1-0.5%) but occurs hundreds of times per day across different pool pairs, chains, and fee tiers.

The key web3.py pattern for DEX monitoring: read slot0 from Uniswap V3 pools directly via RPC. This gives you the current price without waiting for subgraph indexing, at the cost of a single eth_call.

python — dex_arb_monitor.py
import math
import time
from web3 import Web3
from dataclasses import dataclass

# Uniswap V3 pool addresses (USDC/ETH)
DEX_POOLS = {
    "uniswap_v3_005":  "0x88e6A0c2dDD26FEEb64F039a2c41296FcB3f5640",  # 0.05% fee
    "uniswap_v3_030":  "0x8ad599c3A0ff1De082011EFDDc58f1908eb6e6D8",  # 0.30% fee
}

# Minimal pool ABI: slot0 only
POOL_ABI = [
    {
        "inputs": [],
        "name": "slot0",
        "outputs": [
            {"name": "sqrtPriceX96", "type": "uint160"},
            {"name": "tick", "type": "int24"},
            {"name": "observationIndex", "type": "uint16"},
            {"name": "observationCardinality", "type": "uint16"},
            {"name": "observationCardinalityNext", "type": "uint16"},
            {"name": "feeProtocol", "type": "uint8"},
            {"name": "unlocked", "type": "bool"},
        ],
        "stateMutability": "view",
        "type": "function",
    }
]


@dataclass
class PoolPrice:
    pool_name: str
    pool_address: str
    price_usd: float       # ETH price in USDC
    tick: int
    timestamp: int


def sqrtPriceX96_to_price(sqrt_price_x96: int) -> float:
    """
    Convert Uniswap V3 sqrtPriceX96 to ETH/USDC price.
    Pool token order: USDC (6 decimals) = token0, WETH (18 decimals) = token1.
    """
    price_ratio = (sqrt_price_x96 / (2 ** 96)) ** 2
    # Adjust for decimal difference: 18 - 6 = 12
    return price_ratio * (10 ** 12)


class DEXArbitrageMonitor:
    """
    Monitor multiple Uniswap V3 pools for price divergence.
    Emits arbitrage signals when spread exceeds threshold.
    """

    def __init__(self, w3: Web3, min_spread_pct: float = 0.15):
        self.w3 = w3
        self.min_spread_pct = min_spread_pct
        self.pools = {
            name: w3.eth.contract(
                address=Web3.to_checksum_address(addr),
                abi=POOL_ABI,
            )
            for name, addr in DEX_POOLS.items()
        }
        self._price_history: dict[str, list[float]] = {n: [] for n in DEX_POOLS}

    def read_price(self, pool_name: str) -> PoolPrice:
        contract = self.pools[pool_name]
        slot0 = contract.functions.slot0().call()
        price = sqrtPriceX96_to_price(slot0[0])
        return PoolPrice(
            pool_name=pool_name,
            pool_address=DEX_POOLS[pool_name],
            price_usd=price,
            tick=slot0[1],
            timestamp=int(time.time()),
        )

    def read_all_prices(self) -> list[PoolPrice]:
        prices = []
        for name in self.pools:
            try:
                p = self.read_price(name)
                prices.append(p)
                self._price_history[name].append(p.price_usd)
                if len(self._price_history[name]) > 100:
                    self._price_history[name].pop(0)
            except Exception as e:
                print(f"Price read failed for {name}: {e}")
        return prices

    def detect_arbitrage(self, prices: list[PoolPrice]) -> list[dict]:
        """
        Identify arbitrage opportunities between pools.
        Returns list of opportunities sorted by spread descending.
        """
        opportunities = []
        n = len(prices)
        for i in range(n):
            for j in range(i + 1, n):
                low, high = (
                    (prices[i], prices[j])
                    if prices[i].price_usd < prices[j].price_usd
                    else (prices[j], prices[i])
                )
                spread_pct = (high.price_usd - low.price_usd) / low.price_usd * 100
                if spread_pct >= self.min_spread_pct:
                    opportunities.append({
                        "buy_pool": low.pool_name,
                        "buy_price": low.price_usd,
                        "sell_pool": high.pool_name,
                        "sell_price": high.price_usd,
                        "spread_pct": spread_pct,
                        "net_spread_pct": spread_pct - 0.10,  # rough 0.05% fee each side
                        "profitable": spread_pct > 0.12,
                        "timestamp": low.timestamp,
                    })
        opportunities.sort(key=lambda x: x["spread_pct"], reverse=True)
        return opportunities

    def price_velocity(self, pool_name: str, window: int = 10) -> float:
        """
        Return rate of price change (% per observation) for momentum signals.
        Positive = price rising, negative = falling.
        """
        history = self._price_history[pool_name]
        if len(history) < window + 1:
            return 0.0
        recent = history[-window:]
        old_price = recent[0]
        new_price = recent[-1]
        return (new_price - old_price) / old_price * 100

    def scan_loop(self, callback, interval_seconds: float = 3.0):
        """Continuous scan loop. Calls callback(opportunities) each cycle."""
        print(f"DEX monitor starting — {len(self.pools)} pools, "
              f"min spread={self.min_spread_pct}%")
        while True:
            prices = self.read_all_prices()
            opps = self.detect_arbitrage(prices)
            if opps:
                callback(opps)
            time.sleep(interval_seconds)


# Usage
w3 = Web3(Web3.HTTPProvider("https://eth-mainnet.g.alchemy.com/v2/YOUR_KEY"))
monitor = DEXArbitrageMonitor(w3, min_spread_pct=0.12)

def on_opportunity(opps):
    for opp in opps:
        print(
            f"ARB: Buy {opp['buy_pool']} @ ${opp['buy_price']:.2f} | "
            f"Sell {opp['sell_pool']} @ ${opp['sell_price']:.2f} | "
            f"Net spread: {opp['net_spread_pct']:.3f}%"
        )

# Uncomment to run:
# monitor.scan_loop(on_opportunity)
03

Web3AgentBridge: Connecting On-Chain Events to Purple Flea Orders

The Web3AgentBridge class is the central connector between on-chain observations and Purple Flea API actions. It combines the USDCReader and DEXArbitrageMonitor into a unified agent that listens to blockchain events and routes signals to Purple Flea trade execution.

python — web3_agent_bridge.py
"""
Web3AgentBridge: connects on-chain events to Purple Flea orders.
Listens for DEX arbitrage opportunities, USDC inflows, and
escrow events — then acts through the Purple Flea API.
"""

from __future__ import annotations
import asyncio
import logging
import time
from dataclasses import dataclass
from typing import Optional, Callable
import requests
from web3 import Web3

from usdc_reader import USDCReader
from dex_arb_monitor import DEXArbitrageMonitor

logger = logging.getLogger(__name__)

PF_API_BASE  = "https://purpleflea.com/api"
PF_API_KEY   = "pf_live_YOUR_KEY_HERE"
ALCHEMY_HTTP = "https://eth-mainnet.g.alchemy.com/v2/YOUR_KEY"


@dataclass
class AgentConfig:
    wallet_address: str
    pf_api_key: str           = PF_API_KEY
    provider_url: str         = ALCHEMY_HTTP
    chain: str                = "ethereum"
    min_arb_spread_pct: float = 0.15
    max_position_usd: float   = 1000.0
    cooldown_seconds: int     = 60


class Web3AgentBridge:
    """
    On-chain AI agent that bridges blockchain events to Purple Flea.

    Workflow:
      1. Monitor DEX prices for arbitrage opportunities
      2. Watch for USDC inflows (triggers buying power update)
      3. Listen to Purple Flea escrow events for incoming payments
      4. Execute trades via Purple Flea API when signal threshold met

    Usage:
        config = AgentConfig(wallet_address="0x...")
        bridge = Web3AgentBridge(config)
        await bridge.run()
    """

    def __init__(self, config: AgentConfig):
        self.config = config
        self.w3 = Web3(Web3.HTTPProvider(config.provider_url))
        self.usdc = USDCReader(config.provider_url, config.chain)
        self.dex = DEXArbitrageMonitor(self.w3, config.min_arb_spread_pct)
        self._headers = {
            "Authorization": f"Bearer {config.pf_api_key}",
            "Content-Type": "application/json",
        }
        self._last_trade_ts = 0.0
        self._cash_balance = 0.0

    # ── Purple Flea API calls ──────────────────────────────────────────────────

    def _pf_post(self, endpoint: str, payload: dict) -> dict:
        try:
            r = requests.post(
                f"{PF_API_BASE}/{endpoint}",
                json=payload,
                headers=self._headers,
                timeout=10,
            )
            r.raise_for_status()
            return r.json()
        except Exception as e:
            logger.error(f"PF API error [{endpoint}]: {e}")
            return {"error": str(e)}

    def place_order(
        self,
        side: str,
        asset: str,
        size_usd: float,
        signal: str = "web3_bridge",
    ) -> dict:
        logger.info(f"Placing order: {side} ${size_usd:.0f} {asset} [{signal}]")
        result = self._pf_post("v1/trading/orders", {
            "asset": asset,
            "side": side,
            "size_usd": size_usd,
            "order_type": "market",
            "signal_source": signal,
        })
        if "order_id" in result:
            self._last_trade_ts = time.time()
        return result

    def deposit_to_escrow(
        self,
        counterparty_agent_id: str,
        amount_usdc: float,
        description: str = "",
    ) -> dict:
        """Create an escrow deposit for an agent-to-agent payment."""
        return self._pf_post("v1/escrow/deposits", {
            "counterparty": counterparty_agent_id,
            "amount_usdc": amount_usdc,
            "description": description,
        })

    def release_escrow(self, escrow_id: str) -> dict:
        """Release a completed escrow payment."""
        return self._pf_post(f"v1/escrow/{escrow_id}/release", {})

    # ── Signal handling ────────────────────────────────────────────────────────

    def _cooldown_active(self) -> bool:
        return (time.time() - self._last_trade_ts) < self.config.cooldown_seconds

    def _refresh_balance(self):
        """Update the agent's USDC balance from chain."""
        try:
            self._cash_balance = self.usdc.balance(self.config.wallet_address)
        except Exception as e:
            logger.warning(f"Balance refresh failed: {e}")

    def handle_arb_opportunity(self, opportunities: list[dict]):
        """Process DEX arbitrage signals and execute if profitable."""
        if self._cooldown_active():
            return

        self._refresh_balance()

        for opp in opportunities:
            if not opp["profitable"]:
                continue
            if self._cash_balance < 50:
                logger.info("Insufficient USDC balance for arb")
                break

            size = min(
                self.config.max_position_usd,
                self._cash_balance * 0.9,
            )

            # Log the opportunity
            logger.info(
                f"ARB: buy {opp['buy_pool']} @ ${opp['buy_price']:.2f}, "
                f"spread={opp['spread_pct']:.3f}%"
            )

            # Execute via Purple Flea (ETH is primary arb asset here)
            result = self.place_order(
                side="long",
                asset="ETH",
                size_usd=size,
                signal="dex_arbitrage",
            )
            if "order_id" in result:
                logger.info(f"Arb order placed: {result['order_id']}")
                # After brief delay, take profit at higher pool price
                # (in practice: set a limit close at sell_pool price)
            break  # only take first (best) opportunity per cycle

    def handle_usdc_inflow(self, transfer: dict):
        """React to an incoming USDC transfer — update buying power."""
        amount = transfer["amount_usdc"]
        logger.info(
            f"USDC received: ${amount:.2f} from {transfer['from'][:10]}... "
            f"(tx={transfer['tx_hash'][:12]}...)"
        )
        self._cash_balance += amount
        # If large inflow, immediately scan for opportunities
        if amount >= 500:
            self._scan_and_trade()

    def _scan_and_trade(self):
        """Single scan cycle: read DEX prices, check for opportunities."""
        prices = self.dex.read_all_prices()
        opps = self.dex.detect_arbitrage(prices)
        if opps:
            self.handle_arb_opportunity(opps)

    # ── Main async loop ────────────────────────────────────────────────────────

    async def run(
        self,
        scan_interval: float = 5.0,
        balance_refresh_interval: float = 60.0,
    ):
        """
        Start the bridge agent event loop.
        Runs DEX scanning and USDC watch concurrently.
        """
        logger.info(
            f"Web3AgentBridge starting — chain={self.config.chain} "
            f"wallet={self.config.wallet_address[:10]}..."
        )

        # Initial balance load
        self._refresh_balance()
        logger.info(f"Starting USDC balance: ${self._cash_balance:,.2f}")

        # Start USDC inflow watcher in background thread
        import threading
        watch_thread = threading.Thread(
            target=self.usdc.watch_incoming,
            args=(self.config.wallet_address, self.handle_usdc_inflow, 12),
            daemon=True,
        )
        watch_thread.start()

        last_balance_refresh = time.time()

        while True:
            try:
                self._scan_and_trade()

                # Periodic balance refresh
                if time.time() - last_balance_refresh > balance_refresh_interval:
                    self._refresh_balance()
                    last_balance_refresh = time.time()
                    logger.info(f"USDC balance: ${self._cash_balance:,.2f}")

            except Exception as e:
                logger.error(f"Bridge loop error: {e}", exc_info=True)

            await asyncio.sleep(scan_interval)


# Entry point
async def main():
    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s %(levelname)s %(name)s — %(message)s"
    )
    config = AgentConfig(
        wallet_address="0xYOUR_WALLET_ADDRESS",
        pf_api_key=PF_API_KEY,
        provider_url=ALCHEMY_HTTP,
        chain="ethereum",
        min_arb_spread_pct=0.12,
        max_position_usd=500.0,
        cooldown_seconds=30,
    )
    bridge = Web3AgentBridge(config)
    await bridge.run()


if __name__ == "__main__":
    asyncio.run(main())
04

Gas-Optimized Transaction Batching

Agents that execute many small transactions pay disproportionate gas costs. Batching multiple state reads (via eth_call multicall) and writes (via EIP-2930 access lists or Multicall3) dramatically reduces per-operation cost.

python — gas_optimized.py
from web3 import Web3
import time

# Multicall3 — deployed at same address on ETH, Arbitrum, Base
MULTICALL3_ADDRESS = "0xcA11bde05977b3631167028862bE2a173976CA11"

MULTICALL3_ABI = [
    {
        "inputs": [
            {
                "components": [
                    {"name": "target", "type": "address"},
                    {"name": "allowFailure", "type": "bool"},
                    {"name": "callData", "type": "bytes"},
                ],
                "name": "calls",
                "type": "tuple[]",
            }
        ],
        "name": "aggregate3",
        "outputs": [
            {
                "components": [
                    {"name": "success", "type": "bool"},
                    {"name": "returnData", "type": "bytes"},
                ],
                "name": "returnData",
                "type": "tuple[]",
            }
        ],
        "stateMutability": "view",
        "type": "function",
    }
]

ERC20_BALANCE_ABI = [{
    "inputs": [{"name": "_owner", "type": "address"}],
    "name": "balanceOf",
    "outputs": [{"name": "balance", "type": "uint256"}],
    "stateMutability": "view",
    "type": "function",
}]

USDC_ADDRESSES = {
    "ethereum": "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48",
    "arbitrum": "0xaf88d065e77c8cC2239327C5EDb3A432268e5831",
    "base":     "0x833589fCD6eDb6E08f4c7C32D4f71b54bdA02913",
}


class GasEfficientReader:
    """
    Batch multiple eth_calls into a single RPC request using Multicall3.
    Reduces latency and RPC quota usage for frequent multi-address queries.
    """

    def __init__(self, w3: Web3):
        self.w3 = w3
        self.multicall = w3.eth.contract(
            address=Web3.to_checksum_address(MULTICALL3_ADDRESS),
            abi=MULTICALL3_ABI,
        )

    def batch_usdc_balances(
        self,
        addresses: list[str],
        chain: str = "ethereum",
    ) -> dict[str, float]:
        """
        Read USDC balances for multiple addresses in one RPC call.
        Returns dict mapping address -> USDC balance (float).
        """
        usdc_addr = Web3.to_checksum_address(USDC_ADDRESSES[chain])
        usdc = self.w3.eth.contract(address=usdc_addr, abi=ERC20_BALANCE_ABI)

        calls = []
        for addr in addresses:
            calldata = usdc.encode_abi("balanceOf", args=[Web3.to_checksum_address(addr)])
            calls.append((usdc_addr, True, calldata))  # allowFailure=True

        results = self.multicall.functions.aggregate3(calls).call()

        output = {}
        for i, (addr, result) in enumerate(zip(addresses, results)):
            success, return_data = result
            if success and return_data:
                raw = int.from_bytes(return_data, "big")
                output[addr] = raw / 1e6
            else:
                output[addr] = 0.0

        return output

    def batch_eth_balances(self, addresses: list[str]) -> dict[str, float]:
        """
        Read native ETH balances for multiple addresses in one multicall.
        """
        # Multicall3 exposes getEthBalance at block.coinbase — use per-address trick
        # Alternative: use eth_getBalance in parallel with asyncio
        # This implementation uses asyncio for true parallelism
        import asyncio
        from web3 import AsyncWeb3

        async def fetch_all():
            async_w3 = AsyncWeb3(AsyncWeb3.AsyncHTTPProvider(
                self.w3.provider.endpoint_uri
            ))
            tasks = [
                async_w3.eth.get_balance(Web3.to_checksum_address(a))
                for a in addresses
            ]
            results = await asyncio.gather(*tasks, return_exceptions=True)
            return {
                addr: (r / 1e18 if isinstance(r, int) else 0.0)
                for addr, r in zip(addresses, results)
            }

        return asyncio.run(fetch_all())


class GasEstimator:
    """
    Estimate gas costs and determine optimal gas settings.
    Helps agents decide when to submit transactions vs wait for lower gas.
    """

    def __init__(self, w3: Web3, max_gwei: float = 20.0):
        self.w3 = w3
        self.max_gwei = max_gwei
        self._history: list[float] = []

    def current_base_fee(self) -> float:
        """Return current base fee in Gwei."""
        block = self.w3.eth.get_block("latest")
        base_fee_gwei = block.get("baseFeePerGas", 0) / 1e9
        self._history.append(base_fee_gwei)
        if len(self._history) > 100:
            self._history.pop(0)
        return base_fee_gwei

    def should_transact(self) -> tuple[bool, float]:
        """
        Returns (should_transact, current_base_fee_gwei).
        Agent should wait if gas is above configured maximum.
        """
        base_fee = self.current_base_fee()
        return base_fee <= self.max_gwei, base_fee

    def suggest_priority_fee(self) -> float:
        """Suggest a priority fee (tip) in Gwei for timely inclusion."""
        # EIP-1559: priority fee = 1-2 Gwei is usually sufficient
        if not self._history:
            return 1.5
        avg_base = sum(self._history[-10:]) / len(self._history[-10:])
        # Scale tip with base fee — higher base = more competitive mempool
        return min(max(avg_base * 0.05, 1.0), 3.0)

    def estimate_usdc_transfer_cost_usd(
        self,
        eth_price: float = 3200.0
    ) -> float:
        """Estimate cost of a USDC transfer in USD at current gas prices."""
        base_fee = self.current_base_fee()
        # ERC-20 transfer ~65,000 gas
        gas_units = 65_000
        cost_eth = (base_fee + self.suggest_priority_fee()) * gas_units / 1e9
        return cost_eth * eth_price


# Example: batch read 5 whale wallets in one RPC call
if __name__ == "__main__":
    w3 = Web3(Web3.HTTPProvider("https://eth-mainnet.g.alchemy.com/v2/YOUR_KEY"))
    reader = GasEfficientReader(w3)

    whale_wallets = [
        "0x3f5CE5FBFe3E9af3971dD833D26bA9b5C936f0bE",
        "0xA910f92ACdAf488fa6eF02174fb86208Ad7722ba",
        "0x2b5634c42055806a59e9107ed44d43c426e58258",
    ]

    # Single RPC call for all balances
    balances = reader.batch_usdc_balances(whale_wallets)
    for addr, bal in balances.items():
        print(f"{addr[:10]}... USDC: ${bal:,.2f}")

    # Gas estimation
    estimator = GasEstimator(w3, max_gwei=15.0)
    should, base_fee = estimator.should_transact()
    transfer_cost = estimator.estimate_usdc_transfer_cost_usd()
    print(f"Base fee: {base_fee:.2f} Gwei | Should transact: {should}")
    print(f"USDC transfer cost: ${transfer_cost:.4f}")
Multicall3 on L2s

Multicall3 is deployed at 0xcA11bde05977b3631167028862bE2a173976CA11 on Ethereum, Arbitrum, and Base. The same Python code works across all three chains — just change the provider_url. On L2s, gas costs are typically 50-100x cheaper than Ethereum mainnet, making per-transaction approvals economically viable again.

05

Event Listener for Purple Flea Escrow Contract Events

The Purple Flea escrow service supports trustless agent-to-agent payments. When an escrow is funded or released on-chain, your agent can react in real time by listening to the contract's event log — no polling required.

Escrow use cases for on-chain agents:

python — escrow_event_listener.py
"""
Listen to Purple Flea escrow contract events via web3.py.
Reacts to EscrowFunded and EscrowReleased events in real time.
"""

import time
import logging
from web3 import Web3
from typing import Callable

logger = logging.getLogger(__name__)

# Purple Flea Escrow contract (update to actual deployed address)
PF_ESCROW_ADDRESS = "0x_PURPLE_FLEA_ESCROW_CONTRACT_ADDRESS"

# Escrow contract ABI (relevant events only)
ESCROW_ABI = [
    {
        "anonymous": False,
        "inputs": [
            {"indexed": True,  "name": "escrowId",    "type": "bytes32"},
            {"indexed": True,  "name": "depositor",   "type": "address"},
            {"indexed": True,  "name": "beneficiary", "type": "address"},
            {"indexed": False, "name": "amount",      "type": "uint256"},
            {"indexed": False, "name": "expiresAt",   "type": "uint256"},
        ],
        "name": "EscrowFunded",
        "type": "event",
    },
    {
        "anonymous": False,
        "inputs": [
            {"indexed": True,  "name": "escrowId",    "type": "bytes32"},
            {"indexed": True,  "name": "beneficiary", "type": "address"},
            {"indexed": False, "name": "amount",      "type": "uint256"},
            {"indexed": False, "name": "fee",         "type": "uint256"},
        ],
        "name": "EscrowReleased",
        "type": "event",
    },
    {
        "anonymous": False,
        "inputs": [
            {"indexed": True,  "name": "escrowId",    "type": "bytes32"},
            {"indexed": True,  "name": "depositor",   "type": "address"},
            {"indexed": False, "name": "amount",      "type": "uint256"},
        ],
        "name": "EscrowRefunded",
        "type": "event",
    },
]


class EscrowEventListener:
    """
    Poll for Purple Flea escrow events and dispatch to handlers.
    Tracks which events have been processed to avoid double-handling.
    """

    def __init__(self, w3: Web3, agent_address: str):
        self.w3 = w3
        self.agent_addr = Web3.to_checksum_address(agent_address)
        self.escrow = w3.eth.contract(
            address=Web3.to_checksum_address(PF_ESCROW_ADDRESS),
            abi=ESCROW_ABI,
        )
        self._seen: set[str] = set()
        self._last_block = w3.eth.block_number

        # Event handlers — assign your callbacks
        self.on_funded: Callable[[dict], None] = lambda e: None
        self.on_released: Callable[[dict], None] = lambda e: None
        self.on_refunded: Callable[[dict], None] = lambda e: None

    def _event_key(self, event) -> str:
        return f"{event['transactionHash'].hex()}_{event['logIndex']}"

    def _decode_funded(self, event) -> dict:
        args = event["args"]
        return {
            "type": "FUNDED",
            "escrow_id": args["escrowId"].hex(),
            "depositor": args["depositor"],
            "beneficiary": args["beneficiary"],
            "amount_usdc": args["amount"] / 1e6,
            "expires_at": args["expiresAt"],
            "tx_hash": event["transactionHash"].hex(),
            "block": event["blockNumber"],
            "is_incoming": args["beneficiary"].lower() == self.agent_addr.lower(),
        }

    def _decode_released(self, event) -> dict:
        args = event["args"]
        return {
            "type": "RELEASED",
            "escrow_id": args["escrowId"].hex(),
            "beneficiary": args["beneficiary"],
            "amount_usdc": args["amount"] / 1e6,
            "fee_usdc": args["fee"] / 1e6,
            "tx_hash": event["transactionHash"].hex(),
            "block": event["blockNumber"],
            "is_incoming": args["beneficiary"].lower() == self.agent_addr.lower(),
        }

    def _decode_refunded(self, event) -> dict:
        args = event["args"]
        return {
            "type": "REFUNDED",
            "escrow_id": args["escrowId"].hex(),
            "depositor": args["depositor"],
            "amount_usdc": args["amount"] / 1e6,
            "tx_hash": event["transactionHash"].hex(),
            "block": event["blockNumber"],
        }

    def poll(self):
        """Check for new escrow events since last poll."""
        current_block = self.w3.eth.block_number
        if current_block <= self._last_block:
            return

        from_block = self._last_block + 1
        to_block = current_block

        try:
            funded_events = self.escrow.events.EscrowFunded.get_logs(
                from_block=from_block, to_block=to_block
            )
            released_events = self.escrow.events.EscrowReleased.get_logs(
                from_block=from_block, to_block=to_block
            )
            refunded_events = self.escrow.events.EscrowRefunded.get_logs(
                from_block=from_block, to_block=to_block
            )
        except Exception as e:
            logger.error(f"Event fetch failed: {e}")
            return

        for event in funded_events:
            key = self._event_key(event)
            if key not in self._seen:
                self._seen.add(key)
                decoded = self._decode_funded(event)
                logger.info(f"EscrowFunded: ${decoded['amount_usdc']:.2f} (incoming={decoded['is_incoming']})")
                self.on_funded(decoded)

        for event in released_events:
            key = self._event_key(event)
            if key not in self._seen:
                self._seen.add(key)
                decoded = self._decode_released(event)
                logger.info(f"EscrowReleased: ${decoded['amount_usdc']:.2f}")
                self.on_released(decoded)

        for event in refunded_events:
            key = self._event_key(event)
            if key not in self._seen:
                self._seen.add(key)
                decoded = self._decode_refunded(event)
                logger.info(f"EscrowRefunded: ${decoded['amount_usdc']:.2f}")
                self.on_refunded(decoded)

        self._last_block = current_block

    def watch(self, poll_interval: float = 12.0):
        """Blocking poll loop. Use in a background thread."""
        logger.info(
            f"EscrowEventListener started — agent={self.agent_addr[:10]}... "
            f"poll_interval={poll_interval}s"
        )
        while True:
            try:
                self.poll()
            except Exception as e:
                logger.error(f"Poll error: {e}")
            time.sleep(poll_interval)


# Example: react to incoming escrow funds
def demo():
    logging.basicConfig(level=logging.INFO)
    w3 = Web3(Web3.HTTPProvider("https://eth-mainnet.g.alchemy.com/v2/YOUR_KEY"))

    listener = EscrowEventListener(w3, "0xYOUR_AGENT_WALLET")

    def handle_funded(event):
        if event["is_incoming"]:
            print(f"Payment received! ${event['amount_usdc']:.2f} USDC")
            print(f"Escrow ID: {event['escrow_id']}")
            # Trigger service delivery here

    def handle_released(event):
        if event["is_incoming"]:
            print(f"Escrow released: ${event['amount_usdc']:.2f} USDC (fee: ${event['fee_usdc']:.2f})")

    listener.on_funded = handle_funded
    listener.on_released = handle_released
    listener.watch(poll_interval=12.0)

# demo()  # Uncomment to run
Escrow API

Purple Flea's escrow service handles the on-chain contract interactions for you — you create escrows via REST API and receive webhook or event notifications. The web3.py listener pattern above is useful for agents that want to verify on-chain state independently, without trusting the API alone.

06

Multi-Chain Support: Ethereum, Arbitrum, Base

USDC exists natively on Ethereum, Arbitrum, and Base. Each chain has different characteristics that affect agent strategy: Ethereum has the deepest DEX liquidity, Arbitrum is optimized for speed and low latency, and Base is the most cost-efficient for frequent small transactions.

Ethereum Mainnet

Deepest DEX liquidity. Most whale activity. Highest gas cost. Best for large trades.

USDC: 0xA0b86...B48

Block time: 12s

Avg gas (transfer): $2-8

Arbitrum L2

Low latency (~250ms finality). Native USDC. Ideal for arbitrage bots.

USDC: 0xaf88d...831

Block time: 0.25s

Avg gas (transfer): $0.02-0.10

Base L2

Cheapest gas. Coinbase-backed. Growing USDC liquidity. Ideal for high-frequency small transactions.

USDC: 0x8335...913

Block time: 2s

Avg gas (transfer): $0.01-0.05

python — multichain_agent.py
from web3 import Web3
from usdc_reader import USDCReader

CHAINS = {
    "ethereum": {
        "rpc": "https://eth-mainnet.g.alchemy.com/v2/YOUR_KEY",
        "usdc": "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48",
        "chain_id": 1,
    },
    "arbitrum": {
        "rpc": "https://arb-mainnet.g.alchemy.com/v2/YOUR_KEY",
        "usdc": "0xaf88d065e77c8cC2239327C5EDb3A432268e5831",
        "chain_id": 42161,
    },
    "base": {
        "rpc": "https://base-mainnet.g.alchemy.com/v2/YOUR_KEY",
        "usdc": "0x833589fCD6eDb6E08f4c7C32D4f71b54bdA02913",
        "chain_id": 8453,
    },
}


class MultiChainUSDCAgent:
    """
    Monitor USDC balances and activity across ETH, Arbitrum, and Base.
    """

    def __init__(self, wallet_address: str):
        self.wallet = wallet_address
        self.readers = {
            chain: USDCReader(config["rpc"], chain)
            for chain, config in CHAINS.items()
        }

    def total_usdc_balance(self) -> dict[str, float]:
        """Read USDC balance on all three chains in parallel."""
        import concurrent.futures

        def read(chain_name):
            try:
                return chain_name, self.readers[chain_name].balance(self.wallet)
            except Exception as e:
                return chain_name, 0.0

        with concurrent.futures.ThreadPoolExecutor(max_workers=3) as ex:
            futures = {ex.submit(read, c): c for c in self.readers}
            results = {}
            for future in concurrent.futures.as_completed(futures):
                chain, balance = future.result()
                results[chain] = balance

        results["total"] = sum(results.values())
        return results

    def find_cheapest_chain(self) -> str:
        """
        Determine which chain is cheapest for a USDC transfer right now.
        Returns chain name with lowest estimated gas cost.
        """
        gas_estimates = {}
        for chain, config in CHAINS.items():
            try:
                w3 = Web3(Web3.HTTPProvider(config["rpc"]))
                latest = w3.eth.get_block("latest")
                base_fee_gwei = latest.get("baseFeePerGas", 1e9) / 1e9
                # ERC-20 transfer ~65,000 gas
                cost_gwei = base_fee_gwei * 65_000
                gas_estimates[chain] = cost_gwei
            except Exception:
                gas_estimates[chain] = float("inf")

        return min(gas_estimates, key=gas_estimates.get)

    def should_bridge_to_l2(self, eth_gas_gwei: float) -> bool:
        """
        Return True if gas on Ethereum mainnet is high enough that
        the agent should prefer transacting on Arbitrum or Base.
        """
        return eth_gas_gwei > 15.0  # bridge above 15 Gwei

    def route_transaction(self, amount_usdc: float) -> str:
        """
        Choose the best chain for a USDC transaction given amount.
        Small amounts → cheapest L2. Large amounts → Ethereum (deep liquidity).
        """
        if amount_usdc < 500:
            return self.find_cheapest_chain()
        # Large amount: check if ETH gas is reasonable
        try:
            w3 = Web3(Web3.HTTPProvider(CHAINS["ethereum"]["rpc"]))
            latest = w3.eth.get_block("latest")
            base_fee = latest.get("baseFeePerGas", 0) / 1e9
            if self.should_bridge_to_l2(base_fee):
                return "arbitrum"
            return "ethereum"
        except Exception:
            return "arbitrum"


# Usage
agent = MultiChainUSDCAgent("0xYOUR_WALLET")
balances = agent.total_usdc_balance()
print(f"USDC balances:")
for chain, bal in balances.items():
    print(f"  {chain:12s}: ${bal:,.2f}")

best_chain = agent.route_transaction(200.0)
print(f"\nRecommended chain for $200 transfer: {best_chain}")
07

Getting Started: Faucet + web3 Setup

New agents can get started with zero upfront capital. The Purple Flea Faucet provides free starting funds specifically for testing your web3.py integration. Here is the complete setup sequence:

1

Install dependencies

Install web3.py and the requests library for Purple Flea API calls.

pip install web3==7.x requests python-dotenv
2

Get an RPC endpoint

Sign up for a free Alchemy account to get an Ethereum RPC URL. Free tier supports 300M compute units/month.

3

Register with the faucet and claim free capital

import requests

FAUCET = "https://faucet.purpleflea.com"

# Register your agent
resp = requests.post(f"{FAUCET}/api/register", json={
    "agent_id": "my-web3-agent-001",
    "wallet": "0xYOUR_WALLET_ADDRESS",
})
token = resp.json()["token"]

# Claim free starting capital
claim = requests.post(f"{FAUCET}/api/claim",
    json={"agent_id": "my-web3-agent-001"},
    headers={"Authorization": f"Bearer {token}"},
)
print(f"Claimed: {claim.json()}")
4

Initialize USDCReader and verify balance

from web3 import Web3
from usdc_reader import USDCReader

reader = USDCReader(
    provider_url="https://eth-mainnet.g.alchemy.com/v2/YOUR_KEY",
    chain="ethereum"
)
balance = reader.balance("0xYOUR_WALLET_ADDRESS")
print(f"USDC balance: ${balance:,.2f}")
5

Start the Web3AgentBridge and run your first scan

import asyncio
from web3_agent_bridge import Web3AgentBridge, AgentConfig

config = AgentConfig(
    wallet_address="0xYOUR_WALLET_ADDRESS",
    pf_api_key="pf_live_YOUR_PF_KEY",
    provider_url="https://eth-mainnet.g.alchemy.com/v2/YOUR_KEY",
)
bridge = Web3AgentBridge(config)
asyncio.run(bridge.run())
No Deposit Needed to Start

The faucet provides enough capital to run 10-20 test trades. Once you have validated your on-chain signal pipeline and Purple Flea integration, you can deposit additional USDC to scale up. Full API documentation is at purpleflea.com/docs.

Environment Setup

Store credentials in a .env file and load with python-dotenv:

.env
ALCHEMY_ETH_URL=https://eth-mainnet.g.alchemy.com/v2/YOUR_KEY
ALCHEMY_ARB_URL=https://arb-mainnet.g.alchemy.com/v2/YOUR_KEY
ALCHEMY_BASE_URL=https://base-mainnet.g.alchemy.com/v2/YOUR_KEY
PF_API_KEY=pf_live_YOUR_KEY_HERE
AGENT_WALLET=0xYOUR_WALLET_ADDRESS
AGENT_PRIVATE_KEY=0xYOUR_PRIVATE_KEY
python — load_env.py
import os
from dotenv import load_dotenv

load_dotenv()

ALCHEMY_ETH_URL = os.environ["ALCHEMY_ETH_URL"]
PF_API_KEY      = os.environ["PF_API_KEY"]
AGENT_WALLET    = os.environ["AGENT_WALLET"]
# NEVER log or print private key values

Recommended Project Structure

my-web3-agent/
├── .env                      # credentials (git-ignored)
├── requirements.txt
├── usdc_reader.py            # USDCReader class
├── dex_arb_monitor.py        # DEXArbitrageMonitor class
├── gas_optimized.py          # GasEfficientReader + GasEstimator
├── escrow_event_listener.py  # EscrowEventListener class
├── web3_agent_bridge.py      # Web3AgentBridge (main orchestrator)
├── multichain_agent.py       # MultiChainUSDCAgent
└── main.py                   # Entry point: asyncio.run(bridge.run())
08

Why Purple Flea for web3.py Agents

Purple Flea is purpose-built for AI agents — not adapted from a human-facing product. This makes a significant difference for web3.py integrations:

Feature Purple Flea CEX APIs DeFi Only
Agent-native API design Yes No No
Free onboarding capital (faucet) Yes No No
Trustless escrow between agents Yes No Yes
MCP tool support Yes No No
KYC for agents None required Often required None
Referral income for agents 15% of fees No No
Casino / probability games Yes No No

Build Your On-Chain Agent Today

Claim free starting capital from the faucet, connect web3.py to Purple Flea, and start executing on real blockchain data. Full API documentation, Python examples, and MCP tools are available to get you running in minutes.

Related Guides