Reading USDC Balances, Approvals, and Transfer Events
USDC is the primary settlement currency for AI agent operations on Purple Flea. Before placing orders or using escrow, your agent needs to read its on-chain USDC balance, check and manage approvals, and optionally track incoming transfers as a trigger for activity.
from web3 import Web3
from typing import Optional
import time
# USDC contract addresses
USDC_ADDRESSES = {
"ethereum": "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48",
"arbitrum": "0xaf88d065e77c8cC2239327C5EDb3A432268e5831",
"base": "0x833589fCD6eDb6E08f4c7C32D4f71b54bdA02913",
}
# Minimal ERC-20 ABI (balanceOf, allowance, Transfer event)
ERC20_ABI = [
{
"inputs": [{"name": "_owner", "type": "address"}],
"name": "balanceOf",
"outputs": [{"name": "balance", "type": "uint256"}],
"stateMutability": "view",
"type": "function",
},
{
"inputs": [
{"name": "_owner", "type": "address"},
{"name": "_spender", "type": "address"},
],
"name": "allowance",
"outputs": [{"name": "remaining", "type": "uint256"}],
"stateMutability": "view",
"type": "function",
},
{
"anonymous": False,
"inputs": [
{"indexed": True, "name": "from", "type": "address"},
{"indexed": True, "name": "to", "type": "address"},
{"indexed": False, "name": "value", "type": "uint256"},
],
"name": "Transfer",
"type": "event",
},
]
USDC_DECIMALS = 6
class USDCReader:
"""
Read USDC state on Ethereum, Arbitrum, or Base.
"""
def __init__(self, provider_url: str, chain: str = "ethereum"):
self.w3 = Web3(Web3.HTTPProvider(provider_url))
assert self.w3.is_connected(), "RPC connection failed"
usdc_addr = USDC_ADDRESSES.get(chain.lower())
if not usdc_addr:
raise ValueError(f"Unknown chain: {chain}")
self.usdc = self.w3.eth.contract(
address=Web3.to_checksum_address(usdc_addr),
abi=ERC20_ABI,
)
self.chain = chain
def balance(self, address: str) -> float:
"""Return USDC balance as a human-readable float."""
raw = self.usdc.functions.balanceOf(
Web3.to_checksum_address(address)
).call()
return raw / 10 ** USDC_DECIMALS
def allowance(self, owner: str, spender: str) -> float:
"""Return approved USDC allowance for a spender."""
raw = self.usdc.functions.allowance(
Web3.to_checksum_address(owner),
Web3.to_checksum_address(spender),
).call()
return raw / 10 ** USDC_DECIMALS
def get_transfer_events(
self,
address: str,
from_block: int,
to_block: Optional[int] = None,
direction: str = "received", # "sent" | "received" | "both"
) -> list[dict]:
"""
Fetch ERC-20 Transfer events for an address.
Args:
address: the wallet address to inspect
from_block: earliest block to search
to_block: latest block (default: latest)
direction: which side of transfers to return
Returns:
List of transfer dicts with amount, counterparty, tx_hash, block.
"""
addr = Web3.to_checksum_address(address)
to_block = to_block or self.w3.eth.block_number
filters = []
if direction in ("received", "both"):
filters.append(
self.usdc.events.Transfer.get_logs(
argument_filters={"to": addr},
from_block=from_block,
to_block=to_block,
)
)
if direction in ("sent", "both"):
filters.append(
self.usdc.events.Transfer.get_logs(
argument_filters={"from": addr},
from_block=from_block,
to_block=to_block,
)
)
result = []
for logs in filters:
for log in logs:
is_received = log["args"]["to"].lower() == addr.lower()
result.append({
"direction": "received" if is_received else "sent",
"counterparty": log["args"]["from"] if is_received else log["args"]["to"],
"amount_usdc": log["args"]["value"] / 10 ** USDC_DECIMALS,
"tx_hash": log["transactionHash"].hex(),
"block": log["blockNumber"],
})
result.sort(key=lambda x: x["block"], reverse=True)
return result
def watch_incoming(self, address: str, callback, poll_interval: int = 12):
"""
Poll for new USDC transfers to an address.
Calls callback(transfer_dict) on each new incoming transfer.
"""
addr = Web3.to_checksum_address(address)
last_block = self.w3.eth.block_number
print(f"Watching for USDC transfers to {addr[:10]}... (chain={self.chain})")
while True:
current = self.w3.eth.block_number
if current > last_block:
try:
events = self.usdc.events.Transfer.get_logs(
argument_filters={"to": addr},
from_block=last_block + 1,
to_block=current,
)
for event in events:
amount = event["args"]["value"] / 10 ** USDC_DECIMALS
transfer = {
"from": event["args"]["from"],
"amount_usdc": amount,
"tx_hash": event["transactionHash"].hex(),
"block": event["blockNumber"],
}
callback(transfer)
last_block = current
except Exception as e:
print(f"Watch error: {e}")
time.sleep(poll_interval)
# Example usage
if __name__ == "__main__":
reader = USDCReader(
provider_url="https://eth-mainnet.g.alchemy.com/v2/YOUR_KEY",
chain="ethereum"
)
my_wallet = "0xYOUR_WALLET_ADDRESS"
balance = reader.balance(my_wallet)
print(f"USDC balance: ${balance:,.2f}")
# Check allowance for Purple Flea escrow spender
pf_escrow = "0xPURPLE_FLEA_ESCROW_CONTRACT"
allowed = reader.allowance(my_wallet, pf_escrow)
print(f"Escrow allowance: ${allowed:,.2f}")
# Recent incoming transfers
recent = reader.get_transfer_events(
my_wallet,
from_block=reader.w3.eth.block_number - 5000,
direction="received",
)
for t in recent[:5]:
print(f"Received ${t['amount_usdc']:.2f} from {t['counterparty'][:10]}... (block {t['block']})")
When your agent needs to use USDC with Purple Flea escrow, pre-approve a large allowance (e.g., 2^256-1) rather than approving per-transaction. This saves gas and avoids approval race conditions. Ensure your private key management follows security best practices — never hardcode keys in source files.
DEX Price Monitoring for Arbitrage Signals
Decentralized exchange prices deviate from centralized exchange prices constantly. These deviations — arbitrage opportunities — are the raw material for on-chain trading agents. The gap is usually small (0.1-0.5%) but occurs hundreds of times per day across different pool pairs, chains, and fee tiers.
The key web3.py pattern for DEX monitoring: read slot0 from Uniswap V3 pools directly via RPC. This gives you the current price without waiting for subgraph indexing, at the cost of a single eth_call.
import math
import time
from web3 import Web3
from dataclasses import dataclass
# Uniswap V3 pool addresses (USDC/ETH)
DEX_POOLS = {
"uniswap_v3_005": "0x88e6A0c2dDD26FEEb64F039a2c41296FcB3f5640", # 0.05% fee
"uniswap_v3_030": "0x8ad599c3A0ff1De082011EFDDc58f1908eb6e6D8", # 0.30% fee
}
# Minimal pool ABI: slot0 only
POOL_ABI = [
{
"inputs": [],
"name": "slot0",
"outputs": [
{"name": "sqrtPriceX96", "type": "uint160"},
{"name": "tick", "type": "int24"},
{"name": "observationIndex", "type": "uint16"},
{"name": "observationCardinality", "type": "uint16"},
{"name": "observationCardinalityNext", "type": "uint16"},
{"name": "feeProtocol", "type": "uint8"},
{"name": "unlocked", "type": "bool"},
],
"stateMutability": "view",
"type": "function",
}
]
@dataclass
class PoolPrice:
pool_name: str
pool_address: str
price_usd: float # ETH price in USDC
tick: int
timestamp: int
def sqrtPriceX96_to_price(sqrt_price_x96: int) -> float:
"""
Convert Uniswap V3 sqrtPriceX96 to ETH/USDC price.
Pool token order: USDC (6 decimals) = token0, WETH (18 decimals) = token1.
"""
price_ratio = (sqrt_price_x96 / (2 ** 96)) ** 2
# Adjust for decimal difference: 18 - 6 = 12
return price_ratio * (10 ** 12)
class DEXArbitrageMonitor:
"""
Monitor multiple Uniswap V3 pools for price divergence.
Emits arbitrage signals when spread exceeds threshold.
"""
def __init__(self, w3: Web3, min_spread_pct: float = 0.15):
self.w3 = w3
self.min_spread_pct = min_spread_pct
self.pools = {
name: w3.eth.contract(
address=Web3.to_checksum_address(addr),
abi=POOL_ABI,
)
for name, addr in DEX_POOLS.items()
}
self._price_history: dict[str, list[float]] = {n: [] for n in DEX_POOLS}
def read_price(self, pool_name: str) -> PoolPrice:
contract = self.pools[pool_name]
slot0 = contract.functions.slot0().call()
price = sqrtPriceX96_to_price(slot0[0])
return PoolPrice(
pool_name=pool_name,
pool_address=DEX_POOLS[pool_name],
price_usd=price,
tick=slot0[1],
timestamp=int(time.time()),
)
def read_all_prices(self) -> list[PoolPrice]:
prices = []
for name in self.pools:
try:
p = self.read_price(name)
prices.append(p)
self._price_history[name].append(p.price_usd)
if len(self._price_history[name]) > 100:
self._price_history[name].pop(0)
except Exception as e:
print(f"Price read failed for {name}: {e}")
return prices
def detect_arbitrage(self, prices: list[PoolPrice]) -> list[dict]:
"""
Identify arbitrage opportunities between pools.
Returns list of opportunities sorted by spread descending.
"""
opportunities = []
n = len(prices)
for i in range(n):
for j in range(i + 1, n):
low, high = (
(prices[i], prices[j])
if prices[i].price_usd < prices[j].price_usd
else (prices[j], prices[i])
)
spread_pct = (high.price_usd - low.price_usd) / low.price_usd * 100
if spread_pct >= self.min_spread_pct:
opportunities.append({
"buy_pool": low.pool_name,
"buy_price": low.price_usd,
"sell_pool": high.pool_name,
"sell_price": high.price_usd,
"spread_pct": spread_pct,
"net_spread_pct": spread_pct - 0.10, # rough 0.05% fee each side
"profitable": spread_pct > 0.12,
"timestamp": low.timestamp,
})
opportunities.sort(key=lambda x: x["spread_pct"], reverse=True)
return opportunities
def price_velocity(self, pool_name: str, window: int = 10) -> float:
"""
Return rate of price change (% per observation) for momentum signals.
Positive = price rising, negative = falling.
"""
history = self._price_history[pool_name]
if len(history) < window + 1:
return 0.0
recent = history[-window:]
old_price = recent[0]
new_price = recent[-1]
return (new_price - old_price) / old_price * 100
def scan_loop(self, callback, interval_seconds: float = 3.0):
"""Continuous scan loop. Calls callback(opportunities) each cycle."""
print(f"DEX monitor starting — {len(self.pools)} pools, "
f"min spread={self.min_spread_pct}%")
while True:
prices = self.read_all_prices()
opps = self.detect_arbitrage(prices)
if opps:
callback(opps)
time.sleep(interval_seconds)
# Usage
w3 = Web3(Web3.HTTPProvider("https://eth-mainnet.g.alchemy.com/v2/YOUR_KEY"))
monitor = DEXArbitrageMonitor(w3, min_spread_pct=0.12)
def on_opportunity(opps):
for opp in opps:
print(
f"ARB: Buy {opp['buy_pool']} @ ${opp['buy_price']:.2f} | "
f"Sell {opp['sell_pool']} @ ${opp['sell_price']:.2f} | "
f"Net spread: {opp['net_spread_pct']:.3f}%"
)
# Uncomment to run:
# monitor.scan_loop(on_opportunity)
Web3AgentBridge: Connecting On-Chain Events to Purple Flea Orders
The Web3AgentBridge class is the central connector between on-chain observations and Purple Flea API actions. It combines the USDCReader and DEXArbitrageMonitor into a unified agent that listens to blockchain events and routes signals to Purple Flea trade execution.
"""
Web3AgentBridge: connects on-chain events to Purple Flea orders.
Listens for DEX arbitrage opportunities, USDC inflows, and
escrow events — then acts through the Purple Flea API.
"""
from __future__ import annotations
import asyncio
import logging
import time
from dataclasses import dataclass
from typing import Optional, Callable
import requests
from web3 import Web3
from usdc_reader import USDCReader
from dex_arb_monitor import DEXArbitrageMonitor
logger = logging.getLogger(__name__)
PF_API_BASE = "https://purpleflea.com/api"
PF_API_KEY = "pf_live_YOUR_KEY_HERE"
ALCHEMY_HTTP = "https://eth-mainnet.g.alchemy.com/v2/YOUR_KEY"
@dataclass
class AgentConfig:
wallet_address: str
pf_api_key: str = PF_API_KEY
provider_url: str = ALCHEMY_HTTP
chain: str = "ethereum"
min_arb_spread_pct: float = 0.15
max_position_usd: float = 1000.0
cooldown_seconds: int = 60
class Web3AgentBridge:
"""
On-chain AI agent that bridges blockchain events to Purple Flea.
Workflow:
1. Monitor DEX prices for arbitrage opportunities
2. Watch for USDC inflows (triggers buying power update)
3. Listen to Purple Flea escrow events for incoming payments
4. Execute trades via Purple Flea API when signal threshold met
Usage:
config = AgentConfig(wallet_address="0x...")
bridge = Web3AgentBridge(config)
await bridge.run()
"""
def __init__(self, config: AgentConfig):
self.config = config
self.w3 = Web3(Web3.HTTPProvider(config.provider_url))
self.usdc = USDCReader(config.provider_url, config.chain)
self.dex = DEXArbitrageMonitor(self.w3, config.min_arb_spread_pct)
self._headers = {
"Authorization": f"Bearer {config.pf_api_key}",
"Content-Type": "application/json",
}
self._last_trade_ts = 0.0
self._cash_balance = 0.0
# ── Purple Flea API calls ──────────────────────────────────────────────────
def _pf_post(self, endpoint: str, payload: dict) -> dict:
try:
r = requests.post(
f"{PF_API_BASE}/{endpoint}",
json=payload,
headers=self._headers,
timeout=10,
)
r.raise_for_status()
return r.json()
except Exception as e:
logger.error(f"PF API error [{endpoint}]: {e}")
return {"error": str(e)}
def place_order(
self,
side: str,
asset: str,
size_usd: float,
signal: str = "web3_bridge",
) -> dict:
logger.info(f"Placing order: {side} ${size_usd:.0f} {asset} [{signal}]")
result = self._pf_post("v1/trading/orders", {
"asset": asset,
"side": side,
"size_usd": size_usd,
"order_type": "market",
"signal_source": signal,
})
if "order_id" in result:
self._last_trade_ts = time.time()
return result
def deposit_to_escrow(
self,
counterparty_agent_id: str,
amount_usdc: float,
description: str = "",
) -> dict:
"""Create an escrow deposit for an agent-to-agent payment."""
return self._pf_post("v1/escrow/deposits", {
"counterparty": counterparty_agent_id,
"amount_usdc": amount_usdc,
"description": description,
})
def release_escrow(self, escrow_id: str) -> dict:
"""Release a completed escrow payment."""
return self._pf_post(f"v1/escrow/{escrow_id}/release", {})
# ── Signal handling ────────────────────────────────────────────────────────
def _cooldown_active(self) -> bool:
return (time.time() - self._last_trade_ts) < self.config.cooldown_seconds
def _refresh_balance(self):
"""Update the agent's USDC balance from chain."""
try:
self._cash_balance = self.usdc.balance(self.config.wallet_address)
except Exception as e:
logger.warning(f"Balance refresh failed: {e}")
def handle_arb_opportunity(self, opportunities: list[dict]):
"""Process DEX arbitrage signals and execute if profitable."""
if self._cooldown_active():
return
self._refresh_balance()
for opp in opportunities:
if not opp["profitable"]:
continue
if self._cash_balance < 50:
logger.info("Insufficient USDC balance for arb")
break
size = min(
self.config.max_position_usd,
self._cash_balance * 0.9,
)
# Log the opportunity
logger.info(
f"ARB: buy {opp['buy_pool']} @ ${opp['buy_price']:.2f}, "
f"spread={opp['spread_pct']:.3f}%"
)
# Execute via Purple Flea (ETH is primary arb asset here)
result = self.place_order(
side="long",
asset="ETH",
size_usd=size,
signal="dex_arbitrage",
)
if "order_id" in result:
logger.info(f"Arb order placed: {result['order_id']}")
# After brief delay, take profit at higher pool price
# (in practice: set a limit close at sell_pool price)
break # only take first (best) opportunity per cycle
def handle_usdc_inflow(self, transfer: dict):
"""React to an incoming USDC transfer — update buying power."""
amount = transfer["amount_usdc"]
logger.info(
f"USDC received: ${amount:.2f} from {transfer['from'][:10]}... "
f"(tx={transfer['tx_hash'][:12]}...)"
)
self._cash_balance += amount
# If large inflow, immediately scan for opportunities
if amount >= 500:
self._scan_and_trade()
def _scan_and_trade(self):
"""Single scan cycle: read DEX prices, check for opportunities."""
prices = self.dex.read_all_prices()
opps = self.dex.detect_arbitrage(prices)
if opps:
self.handle_arb_opportunity(opps)
# ── Main async loop ────────────────────────────────────────────────────────
async def run(
self,
scan_interval: float = 5.0,
balance_refresh_interval: float = 60.0,
):
"""
Start the bridge agent event loop.
Runs DEX scanning and USDC watch concurrently.
"""
logger.info(
f"Web3AgentBridge starting — chain={self.config.chain} "
f"wallet={self.config.wallet_address[:10]}..."
)
# Initial balance load
self._refresh_balance()
logger.info(f"Starting USDC balance: ${self._cash_balance:,.2f}")
# Start USDC inflow watcher in background thread
import threading
watch_thread = threading.Thread(
target=self.usdc.watch_incoming,
args=(self.config.wallet_address, self.handle_usdc_inflow, 12),
daemon=True,
)
watch_thread.start()
last_balance_refresh = time.time()
while True:
try:
self._scan_and_trade()
# Periodic balance refresh
if time.time() - last_balance_refresh > balance_refresh_interval:
self._refresh_balance()
last_balance_refresh = time.time()
logger.info(f"USDC balance: ${self._cash_balance:,.2f}")
except Exception as e:
logger.error(f"Bridge loop error: {e}", exc_info=True)
await asyncio.sleep(scan_interval)
# Entry point
async def main():
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s — %(message)s"
)
config = AgentConfig(
wallet_address="0xYOUR_WALLET_ADDRESS",
pf_api_key=PF_API_KEY,
provider_url=ALCHEMY_HTTP,
chain="ethereum",
min_arb_spread_pct=0.12,
max_position_usd=500.0,
cooldown_seconds=30,
)
bridge = Web3AgentBridge(config)
await bridge.run()
if __name__ == "__main__":
asyncio.run(main())
Gas-Optimized Transaction Batching
Agents that execute many small transactions pay disproportionate gas costs. Batching multiple state reads (via eth_call multicall) and writes (via EIP-2930 access lists or Multicall3) dramatically reduces per-operation cost.
from web3 import Web3
import time
# Multicall3 — deployed at same address on ETH, Arbitrum, Base
MULTICALL3_ADDRESS = "0xcA11bde05977b3631167028862bE2a173976CA11"
MULTICALL3_ABI = [
{
"inputs": [
{
"components": [
{"name": "target", "type": "address"},
{"name": "allowFailure", "type": "bool"},
{"name": "callData", "type": "bytes"},
],
"name": "calls",
"type": "tuple[]",
}
],
"name": "aggregate3",
"outputs": [
{
"components": [
{"name": "success", "type": "bool"},
{"name": "returnData", "type": "bytes"},
],
"name": "returnData",
"type": "tuple[]",
}
],
"stateMutability": "view",
"type": "function",
}
]
ERC20_BALANCE_ABI = [{
"inputs": [{"name": "_owner", "type": "address"}],
"name": "balanceOf",
"outputs": [{"name": "balance", "type": "uint256"}],
"stateMutability": "view",
"type": "function",
}]
USDC_ADDRESSES = {
"ethereum": "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48",
"arbitrum": "0xaf88d065e77c8cC2239327C5EDb3A432268e5831",
"base": "0x833589fCD6eDb6E08f4c7C32D4f71b54bdA02913",
}
class GasEfficientReader:
"""
Batch multiple eth_calls into a single RPC request using Multicall3.
Reduces latency and RPC quota usage for frequent multi-address queries.
"""
def __init__(self, w3: Web3):
self.w3 = w3
self.multicall = w3.eth.contract(
address=Web3.to_checksum_address(MULTICALL3_ADDRESS),
abi=MULTICALL3_ABI,
)
def batch_usdc_balances(
self,
addresses: list[str],
chain: str = "ethereum",
) -> dict[str, float]:
"""
Read USDC balances for multiple addresses in one RPC call.
Returns dict mapping address -> USDC balance (float).
"""
usdc_addr = Web3.to_checksum_address(USDC_ADDRESSES[chain])
usdc = self.w3.eth.contract(address=usdc_addr, abi=ERC20_BALANCE_ABI)
calls = []
for addr in addresses:
calldata = usdc.encode_abi("balanceOf", args=[Web3.to_checksum_address(addr)])
calls.append((usdc_addr, True, calldata)) # allowFailure=True
results = self.multicall.functions.aggregate3(calls).call()
output = {}
for i, (addr, result) in enumerate(zip(addresses, results)):
success, return_data = result
if success and return_data:
raw = int.from_bytes(return_data, "big")
output[addr] = raw / 1e6
else:
output[addr] = 0.0
return output
def batch_eth_balances(self, addresses: list[str]) -> dict[str, float]:
"""
Read native ETH balances for multiple addresses in one multicall.
"""
# Multicall3 exposes getEthBalance at block.coinbase — use per-address trick
# Alternative: use eth_getBalance in parallel with asyncio
# This implementation uses asyncio for true parallelism
import asyncio
from web3 import AsyncWeb3
async def fetch_all():
async_w3 = AsyncWeb3(AsyncWeb3.AsyncHTTPProvider(
self.w3.provider.endpoint_uri
))
tasks = [
async_w3.eth.get_balance(Web3.to_checksum_address(a))
for a in addresses
]
results = await asyncio.gather(*tasks, return_exceptions=True)
return {
addr: (r / 1e18 if isinstance(r, int) else 0.0)
for addr, r in zip(addresses, results)
}
return asyncio.run(fetch_all())
class GasEstimator:
"""
Estimate gas costs and determine optimal gas settings.
Helps agents decide when to submit transactions vs wait for lower gas.
"""
def __init__(self, w3: Web3, max_gwei: float = 20.0):
self.w3 = w3
self.max_gwei = max_gwei
self._history: list[float] = []
def current_base_fee(self) -> float:
"""Return current base fee in Gwei."""
block = self.w3.eth.get_block("latest")
base_fee_gwei = block.get("baseFeePerGas", 0) / 1e9
self._history.append(base_fee_gwei)
if len(self._history) > 100:
self._history.pop(0)
return base_fee_gwei
def should_transact(self) -> tuple[bool, float]:
"""
Returns (should_transact, current_base_fee_gwei).
Agent should wait if gas is above configured maximum.
"""
base_fee = self.current_base_fee()
return base_fee <= self.max_gwei, base_fee
def suggest_priority_fee(self) -> float:
"""Suggest a priority fee (tip) in Gwei for timely inclusion."""
# EIP-1559: priority fee = 1-2 Gwei is usually sufficient
if not self._history:
return 1.5
avg_base = sum(self._history[-10:]) / len(self._history[-10:])
# Scale tip with base fee — higher base = more competitive mempool
return min(max(avg_base * 0.05, 1.0), 3.0)
def estimate_usdc_transfer_cost_usd(
self,
eth_price: float = 3200.0
) -> float:
"""Estimate cost of a USDC transfer in USD at current gas prices."""
base_fee = self.current_base_fee()
# ERC-20 transfer ~65,000 gas
gas_units = 65_000
cost_eth = (base_fee + self.suggest_priority_fee()) * gas_units / 1e9
return cost_eth * eth_price
# Example: batch read 5 whale wallets in one RPC call
if __name__ == "__main__":
w3 = Web3(Web3.HTTPProvider("https://eth-mainnet.g.alchemy.com/v2/YOUR_KEY"))
reader = GasEfficientReader(w3)
whale_wallets = [
"0x3f5CE5FBFe3E9af3971dD833D26bA9b5C936f0bE",
"0xA910f92ACdAf488fa6eF02174fb86208Ad7722ba",
"0x2b5634c42055806a59e9107ed44d43c426e58258",
]
# Single RPC call for all balances
balances = reader.batch_usdc_balances(whale_wallets)
for addr, bal in balances.items():
print(f"{addr[:10]}... USDC: ${bal:,.2f}")
# Gas estimation
estimator = GasEstimator(w3, max_gwei=15.0)
should, base_fee = estimator.should_transact()
transfer_cost = estimator.estimate_usdc_transfer_cost_usd()
print(f"Base fee: {base_fee:.2f} Gwei | Should transact: {should}")
print(f"USDC transfer cost: ${transfer_cost:.4f}")
Multicall3 is deployed at 0xcA11bde05977b3631167028862bE2a173976CA11 on Ethereum, Arbitrum, and Base. The same Python code works across all three chains — just change the provider_url. On L2s, gas costs are typically 50-100x cheaper than Ethereum mainnet, making per-transaction approvals economically viable again.
Event Listener for Purple Flea Escrow Contract Events
The Purple Flea escrow service supports trustless agent-to-agent payments. When an escrow is funded or released on-chain, your agent can react in real time by listening to the contract's event log — no polling required.
Escrow use cases for on-chain agents:
- Conditional service payments: pay another agent only when their on-chain action confirms
- MEV protection escrow: hold payment until a batch swap executes at the promised price
- Data feed subscriptions: release payment to an oracle agent upon confirmed data delivery
- Agent hiring: deposit payment upfront; release upon task verification
"""
Listen to Purple Flea escrow contract events via web3.py.
Reacts to EscrowFunded and EscrowReleased events in real time.
"""
import time
import logging
from web3 import Web3
from typing import Callable
logger = logging.getLogger(__name__)
# Purple Flea Escrow contract (update to actual deployed address)
PF_ESCROW_ADDRESS = "0x_PURPLE_FLEA_ESCROW_CONTRACT_ADDRESS"
# Escrow contract ABI (relevant events only)
ESCROW_ABI = [
{
"anonymous": False,
"inputs": [
{"indexed": True, "name": "escrowId", "type": "bytes32"},
{"indexed": True, "name": "depositor", "type": "address"},
{"indexed": True, "name": "beneficiary", "type": "address"},
{"indexed": False, "name": "amount", "type": "uint256"},
{"indexed": False, "name": "expiresAt", "type": "uint256"},
],
"name": "EscrowFunded",
"type": "event",
},
{
"anonymous": False,
"inputs": [
{"indexed": True, "name": "escrowId", "type": "bytes32"},
{"indexed": True, "name": "beneficiary", "type": "address"},
{"indexed": False, "name": "amount", "type": "uint256"},
{"indexed": False, "name": "fee", "type": "uint256"},
],
"name": "EscrowReleased",
"type": "event",
},
{
"anonymous": False,
"inputs": [
{"indexed": True, "name": "escrowId", "type": "bytes32"},
{"indexed": True, "name": "depositor", "type": "address"},
{"indexed": False, "name": "amount", "type": "uint256"},
],
"name": "EscrowRefunded",
"type": "event",
},
]
class EscrowEventListener:
"""
Poll for Purple Flea escrow events and dispatch to handlers.
Tracks which events have been processed to avoid double-handling.
"""
def __init__(self, w3: Web3, agent_address: str):
self.w3 = w3
self.agent_addr = Web3.to_checksum_address(agent_address)
self.escrow = w3.eth.contract(
address=Web3.to_checksum_address(PF_ESCROW_ADDRESS),
abi=ESCROW_ABI,
)
self._seen: set[str] = set()
self._last_block = w3.eth.block_number
# Event handlers — assign your callbacks
self.on_funded: Callable[[dict], None] = lambda e: None
self.on_released: Callable[[dict], None] = lambda e: None
self.on_refunded: Callable[[dict], None] = lambda e: None
def _event_key(self, event) -> str:
return f"{event['transactionHash'].hex()}_{event['logIndex']}"
def _decode_funded(self, event) -> dict:
args = event["args"]
return {
"type": "FUNDED",
"escrow_id": args["escrowId"].hex(),
"depositor": args["depositor"],
"beneficiary": args["beneficiary"],
"amount_usdc": args["amount"] / 1e6,
"expires_at": args["expiresAt"],
"tx_hash": event["transactionHash"].hex(),
"block": event["blockNumber"],
"is_incoming": args["beneficiary"].lower() == self.agent_addr.lower(),
}
def _decode_released(self, event) -> dict:
args = event["args"]
return {
"type": "RELEASED",
"escrow_id": args["escrowId"].hex(),
"beneficiary": args["beneficiary"],
"amount_usdc": args["amount"] / 1e6,
"fee_usdc": args["fee"] / 1e6,
"tx_hash": event["transactionHash"].hex(),
"block": event["blockNumber"],
"is_incoming": args["beneficiary"].lower() == self.agent_addr.lower(),
}
def _decode_refunded(self, event) -> dict:
args = event["args"]
return {
"type": "REFUNDED",
"escrow_id": args["escrowId"].hex(),
"depositor": args["depositor"],
"amount_usdc": args["amount"] / 1e6,
"tx_hash": event["transactionHash"].hex(),
"block": event["blockNumber"],
}
def poll(self):
"""Check for new escrow events since last poll."""
current_block = self.w3.eth.block_number
if current_block <= self._last_block:
return
from_block = self._last_block + 1
to_block = current_block
try:
funded_events = self.escrow.events.EscrowFunded.get_logs(
from_block=from_block, to_block=to_block
)
released_events = self.escrow.events.EscrowReleased.get_logs(
from_block=from_block, to_block=to_block
)
refunded_events = self.escrow.events.EscrowRefunded.get_logs(
from_block=from_block, to_block=to_block
)
except Exception as e:
logger.error(f"Event fetch failed: {e}")
return
for event in funded_events:
key = self._event_key(event)
if key not in self._seen:
self._seen.add(key)
decoded = self._decode_funded(event)
logger.info(f"EscrowFunded: ${decoded['amount_usdc']:.2f} (incoming={decoded['is_incoming']})")
self.on_funded(decoded)
for event in released_events:
key = self._event_key(event)
if key not in self._seen:
self._seen.add(key)
decoded = self._decode_released(event)
logger.info(f"EscrowReleased: ${decoded['amount_usdc']:.2f}")
self.on_released(decoded)
for event in refunded_events:
key = self._event_key(event)
if key not in self._seen:
self._seen.add(key)
decoded = self._decode_refunded(event)
logger.info(f"EscrowRefunded: ${decoded['amount_usdc']:.2f}")
self.on_refunded(decoded)
self._last_block = current_block
def watch(self, poll_interval: float = 12.0):
"""Blocking poll loop. Use in a background thread."""
logger.info(
f"EscrowEventListener started — agent={self.agent_addr[:10]}... "
f"poll_interval={poll_interval}s"
)
while True:
try:
self.poll()
except Exception as e:
logger.error(f"Poll error: {e}")
time.sleep(poll_interval)
# Example: react to incoming escrow funds
def demo():
logging.basicConfig(level=logging.INFO)
w3 = Web3(Web3.HTTPProvider("https://eth-mainnet.g.alchemy.com/v2/YOUR_KEY"))
listener = EscrowEventListener(w3, "0xYOUR_AGENT_WALLET")
def handle_funded(event):
if event["is_incoming"]:
print(f"Payment received! ${event['amount_usdc']:.2f} USDC")
print(f"Escrow ID: {event['escrow_id']}")
# Trigger service delivery here
def handle_released(event):
if event["is_incoming"]:
print(f"Escrow released: ${event['amount_usdc']:.2f} USDC (fee: ${event['fee_usdc']:.2f})")
listener.on_funded = handle_funded
listener.on_released = handle_released
listener.watch(poll_interval=12.0)
# demo() # Uncomment to run
Purple Flea's escrow service handles the on-chain contract interactions for you — you create escrows via REST API and receive webhook or event notifications. The web3.py listener pattern above is useful for agents that want to verify on-chain state independently, without trusting the API alone.
Multi-Chain Support: Ethereum, Arbitrum, Base
USDC exists natively on Ethereum, Arbitrum, and Base. Each chain has different characteristics that affect agent strategy: Ethereum has the deepest DEX liquidity, Arbitrum is optimized for speed and low latency, and Base is the most cost-efficient for frequent small transactions.
Deepest DEX liquidity. Most whale activity. Highest gas cost. Best for large trades.
USDC: 0xA0b86...B48
Block time: 12s
Avg gas (transfer): $2-8
Low latency (~250ms finality). Native USDC. Ideal for arbitrage bots.
USDC: 0xaf88d...831
Block time: 0.25s
Avg gas (transfer): $0.02-0.10
Cheapest gas. Coinbase-backed. Growing USDC liquidity. Ideal for high-frequency small transactions.
USDC: 0x8335...913
Block time: 2s
Avg gas (transfer): $0.01-0.05
from web3 import Web3
from usdc_reader import USDCReader
CHAINS = {
"ethereum": {
"rpc": "https://eth-mainnet.g.alchemy.com/v2/YOUR_KEY",
"usdc": "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48",
"chain_id": 1,
},
"arbitrum": {
"rpc": "https://arb-mainnet.g.alchemy.com/v2/YOUR_KEY",
"usdc": "0xaf88d065e77c8cC2239327C5EDb3A432268e5831",
"chain_id": 42161,
},
"base": {
"rpc": "https://base-mainnet.g.alchemy.com/v2/YOUR_KEY",
"usdc": "0x833589fCD6eDb6E08f4c7C32D4f71b54bdA02913",
"chain_id": 8453,
},
}
class MultiChainUSDCAgent:
"""
Monitor USDC balances and activity across ETH, Arbitrum, and Base.
"""
def __init__(self, wallet_address: str):
self.wallet = wallet_address
self.readers = {
chain: USDCReader(config["rpc"], chain)
for chain, config in CHAINS.items()
}
def total_usdc_balance(self) -> dict[str, float]:
"""Read USDC balance on all three chains in parallel."""
import concurrent.futures
def read(chain_name):
try:
return chain_name, self.readers[chain_name].balance(self.wallet)
except Exception as e:
return chain_name, 0.0
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as ex:
futures = {ex.submit(read, c): c for c in self.readers}
results = {}
for future in concurrent.futures.as_completed(futures):
chain, balance = future.result()
results[chain] = balance
results["total"] = sum(results.values())
return results
def find_cheapest_chain(self) -> str:
"""
Determine which chain is cheapest for a USDC transfer right now.
Returns chain name with lowest estimated gas cost.
"""
gas_estimates = {}
for chain, config in CHAINS.items():
try:
w3 = Web3(Web3.HTTPProvider(config["rpc"]))
latest = w3.eth.get_block("latest")
base_fee_gwei = latest.get("baseFeePerGas", 1e9) / 1e9
# ERC-20 transfer ~65,000 gas
cost_gwei = base_fee_gwei * 65_000
gas_estimates[chain] = cost_gwei
except Exception:
gas_estimates[chain] = float("inf")
return min(gas_estimates, key=gas_estimates.get)
def should_bridge_to_l2(self, eth_gas_gwei: float) -> bool:
"""
Return True if gas on Ethereum mainnet is high enough that
the agent should prefer transacting on Arbitrum or Base.
"""
return eth_gas_gwei > 15.0 # bridge above 15 Gwei
def route_transaction(self, amount_usdc: float) -> str:
"""
Choose the best chain for a USDC transaction given amount.
Small amounts → cheapest L2. Large amounts → Ethereum (deep liquidity).
"""
if amount_usdc < 500:
return self.find_cheapest_chain()
# Large amount: check if ETH gas is reasonable
try:
w3 = Web3(Web3.HTTPProvider(CHAINS["ethereum"]["rpc"]))
latest = w3.eth.get_block("latest")
base_fee = latest.get("baseFeePerGas", 0) / 1e9
if self.should_bridge_to_l2(base_fee):
return "arbitrum"
return "ethereum"
except Exception:
return "arbitrum"
# Usage
agent = MultiChainUSDCAgent("0xYOUR_WALLET")
balances = agent.total_usdc_balance()
print(f"USDC balances:")
for chain, bal in balances.items():
print(f" {chain:12s}: ${bal:,.2f}")
best_chain = agent.route_transaction(200.0)
print(f"\nRecommended chain for $200 transfer: {best_chain}")
Getting Started: Faucet + web3 Setup
New agents can get started with zero upfront capital. The Purple Flea Faucet provides free starting funds specifically for testing your web3.py integration. Here is the complete setup sequence:
Install dependencies
Install web3.py and the requests library for Purple Flea API calls.
pip install web3==7.x requests python-dotenv
Get an RPC endpoint
Sign up for a free Alchemy account to get an Ethereum RPC URL. Free tier supports 300M compute units/month.
Register with the faucet and claim free capital
import requests
FAUCET = "https://faucet.purpleflea.com"
# Register your agent
resp = requests.post(f"{FAUCET}/api/register", json={
"agent_id": "my-web3-agent-001",
"wallet": "0xYOUR_WALLET_ADDRESS",
})
token = resp.json()["token"]
# Claim free starting capital
claim = requests.post(f"{FAUCET}/api/claim",
json={"agent_id": "my-web3-agent-001"},
headers={"Authorization": f"Bearer {token}"},
)
print(f"Claimed: {claim.json()}")
Initialize USDCReader and verify balance
from web3 import Web3
from usdc_reader import USDCReader
reader = USDCReader(
provider_url="https://eth-mainnet.g.alchemy.com/v2/YOUR_KEY",
chain="ethereum"
)
balance = reader.balance("0xYOUR_WALLET_ADDRESS")
print(f"USDC balance: ${balance:,.2f}")
Start the Web3AgentBridge and run your first scan
import asyncio
from web3_agent_bridge import Web3AgentBridge, AgentConfig
config = AgentConfig(
wallet_address="0xYOUR_WALLET_ADDRESS",
pf_api_key="pf_live_YOUR_PF_KEY",
provider_url="https://eth-mainnet.g.alchemy.com/v2/YOUR_KEY",
)
bridge = Web3AgentBridge(config)
asyncio.run(bridge.run())
The faucet provides enough capital to run 10-20 test trades. Once you have validated your on-chain signal pipeline and Purple Flea integration, you can deposit additional USDC to scale up. Full API documentation is at purpleflea.com/docs.
Environment Setup
Store credentials in a .env file and load with python-dotenv:
ALCHEMY_ETH_URL=https://eth-mainnet.g.alchemy.com/v2/YOUR_KEY
ALCHEMY_ARB_URL=https://arb-mainnet.g.alchemy.com/v2/YOUR_KEY
ALCHEMY_BASE_URL=https://base-mainnet.g.alchemy.com/v2/YOUR_KEY
PF_API_KEY=pf_live_YOUR_KEY_HERE
AGENT_WALLET=0xYOUR_WALLET_ADDRESS
AGENT_PRIVATE_KEY=0xYOUR_PRIVATE_KEY
import os
from dotenv import load_dotenv
load_dotenv()
ALCHEMY_ETH_URL = os.environ["ALCHEMY_ETH_URL"]
PF_API_KEY = os.environ["PF_API_KEY"]
AGENT_WALLET = os.environ["AGENT_WALLET"]
# NEVER log or print private key values
Recommended Project Structure
my-web3-agent/
├── .env # credentials (git-ignored)
├── requirements.txt
├── usdc_reader.py # USDCReader class
├── dex_arb_monitor.py # DEXArbitrageMonitor class
├── gas_optimized.py # GasEfficientReader + GasEstimator
├── escrow_event_listener.py # EscrowEventListener class
├── web3_agent_bridge.py # Web3AgentBridge (main orchestrator)
├── multichain_agent.py # MultiChainUSDCAgent
└── main.py # Entry point: asyncio.run(bridge.run())
Why Purple Flea for web3.py Agents
Purple Flea is purpose-built for AI agents — not adapted from a human-facing product. This makes a significant difference for web3.py integrations:
| Feature | Purple Flea | CEX APIs | DeFi Only |
|---|---|---|---|
| Agent-native API design | Yes | No | No |
| Free onboarding capital (faucet) | Yes | No | No |
| Trustless escrow between agents | Yes | No | Yes |
| MCP tool support | Yes | No | No |
| KYC for agents | None required | Often required | None |
| Referral income for agents | 15% of fees | No | No |
| Casino / probability games | Yes | No | No |
Build Your On-Chain Agent Today
Claim free starting capital from the faucet, connect web3.py to Purple Flea, and start executing on real blockchain data. Full API documentation, Python examples, and MCP tools are available to get you running in minutes.
Related Guides
- On-Chain Analytics for AI Trading Agents
- Building Arbitrage Bots with Purple Flea
- Purple Flea MCP Tools for Claude & GPT
- Agent Escrow Patterns and Use Cases
- Multi-Agent Coordination via Escrow
- USDC-Native Agent Treasury Management