On-Chain Data Analysis Whale Tracking March 4, 2026 24 min read

On-Chain Data Analysis: AI Agents Reading the Blockchain

The blockchain is a permanent, public ledger of every transaction ever made — a real-time feed of financial intelligence that most traders never fully exploit. AI agents that read on-chain data gain a structural edge: they see whale movements before they hit exchanges, track smart money before it impacts price, and detect DEX volume surges minutes before momentum traders pile in.

Table of Contents
  1. On-Chain Metrics Overview
  2. Whale Tracking
  3. Smart Money Flows
  4. DEX Volume Analysis
  5. Mempool Monitoring
  6. Data Pipeline and Trading Code

01 On-Chain Metrics Overview

On-chain data encompasses every piece of information publicly recorded on a blockchain: wallet balances, transaction history, smart contract interactions, DEX swaps, liquidity pool changes, mempool pending transactions, gas fees, and block-level statistics. For an AI agent, this data is raw alpha — unfiltered financial behavior of every participant in the market.

1.2M+
ETH transactions per day
$4B+
Daily Uniswap volume
~12s
Ethereum block time
50k+
Tracked whale wallets

Key On-Chain Signal Categories

🐋

Whale Movements

Large wallet inflows/outflows to exchanges. Exchange deposits signal selling intent; withdrawals signal long-term holding.

🧠

Smart Money Flows

Wallet clusters associated with profitable historical trades. High-confidence wallets accumulating = bullish signal.

📉

DEX Volume & Liquidity

Uniswap/Curve volume surges often precede centralized exchange price movements. LP position changes reveal institutional intent.

Mempool Intelligence

Pending transactions reveal incoming large moves before they are confirmed. Sandwich detection, front-run risk analysis.

💸

Network Activity

Active addresses, transaction counts, gas usage. Rising network activity at lower prices = accumulation signal.

📜

Contract Events

Token transfers, approval events, governance votes, vault deposit/withdrawals. Each is a data point on market participant intent.

On-Chain Data Pipeline
Source
Blockchain Node
Raw transactions, blocks, logs
Indexer
Etherscan / Dune
Structured query API
Process
Signal Engine
Whale detection, scoring
Aggregate
Signal Store
Time-series DB, rolling metrics
Trade
Purple Flea API
Execute on signals

02 Whale Tracking

Whale wallets — addresses holding more than approximately $10M in assets — move markets. A single whale transferring 10,000 BTC to an exchange is a credible selling signal. The key insight is that exchange inflows from whale wallets predict short-term selling pressure, while large withdrawals from exchanges to cold wallets signal long-term conviction.

Exchange Flow Classification

Flow Type Direction Signal Interpretation Agent Response
Large Deposit Whale → Exchange Likely sell incoming Reduce long exposure
Large Withdrawal Exchange → Cold Wallet Long-term accumulation Add long exposure
Wallet-to-Wallet Whale → Unknown Ambiguous — research needed Monitor destination
DeFi Deposit Whale → Protocol Yield seeking — bullish long-term Mild bullish bias
whale_tracker.py Python
import requests
import time
from dataclasses import dataclass
from typing import List, Set
import logging

log = logging.getLogger("whale_tracker")

# Known exchange hot wallets (Binance, Coinbase, Kraken, etc.)
EXCHANGE_ADDRESSES: Set[str] = {
    "0x3f5ce5fbfe3e9af3971dd833d26ba9b5c936f0be",  # Binance 1
    "0xd551234ae421e3bcba99a0da6d736074f22192ff",  # Binance 2
    "0xa910f92acdaf488fa6ef02174fb86208ad7722ba",  # Coinbase
    "0xfe9e8709d3215310075d67e3ed32a380ccf451c8",  # Kraken
    # Add more from Etherscan labels
}

ETHERSCAN_BASE = "https://api.etherscan.io/api"

@dataclass
class WhaleAlert:
    tx_hash: str
    from_addr: str
    to_addr: str
    value_eth: float
    value_usd: float
    flow_type: str       # "exchange_inflow" | "exchange_outflow" | "wallet_transfer"
    timestamp: int

class WhaleTracker:
    def __init__(self, etherscan_key: str, min_value_eth: float = 500):
        self.key = etherscan_key
        self.min_value_eth = min_value_eth  # alert threshold
        self.seen_txs: Set[str] = set()
        self.eth_price_usd = self._get_eth_price()

    def _get_eth_price(self) -> float:
        resp = requests.get(
            ETHERSCAN_BASE,
            params={"module": "stats", "action": "ethprice", "apikey": self.key}
        )
        return float(resp.json()["result"]["ethusd"])

    def get_large_transfers(self, start_block: int = 0) -> List[WhaleAlert]:
        """
        Query ETH transfers above threshold using Etherscan API.
        In production: use a node subscription (eth_subscribe) for real-time.
        """
        resp = requests.get(ETHERSCAN_BASE, params={
            "module": "account",
            "action": "txlist",
            "startblock": start_block,
            "sort": "desc",
            "apikey": self.key
        })
        txs = resp.json().get("result", [])

        alerts = []
        for tx in txs:
            if tx["hash"] in self.seen_txs:
                continue

            value_eth = int(tx["value"]) / 1e18
            if value_eth < self.min_value_eth:
                continue

            from_addr = tx["from"].lower()
            to_addr = tx["to"].lower()

            if to_addr in EXCHANGE_ADDRESSES:
                flow_type = "exchange_inflow"   # bearish signal
            elif from_addr in EXCHANGE_ADDRESSES:
                flow_type = "exchange_outflow"  # bullish signal
            else:
                flow_type = "wallet_transfer"   # neutral / needs research

            alerts.append(WhaleAlert(
                tx_hash=tx["hash"],
                from_addr=from_addr,
                to_addr=to_addr,
                value_eth=value_eth,
                value_usd=value_eth * self.eth_price_usd,
                flow_type=flow_type,
                timestamp=int(tx["timeStamp"])
            ))
            self.seen_txs.add(tx["hash"])

        return alerts

    def aggregate_exchange_flow(self, window_hours: int = 24) -> dict:
        """
        Net exchange flow over window: positive = net inflow (bearish pressure),
        negative = net outflow (bullish accumulation).
        """
        cutoff = int(time.time()) - window_hours * 3600
        alerts = self.get_large_transfers()

        inflows = sum(a.value_usd for a in alerts
                      if a.flow_type == "exchange_inflow" and a.timestamp > cutoff)
        outflows = sum(a.value_usd for a in alerts
                       if a.flow_type == "exchange_outflow" and a.timestamp > cutoff)

        net_flow = inflows - outflows
        signal = "bearish" if net_flow > 0 else "bullish"

        return {
            "window_hours": window_hours,
            "net_flow_usd": round(net_flow, 0),
            "inflows_usd": round(inflows, 0),
            "outflows_usd": round(outflows, 0),
            "signal": signal,
            "alert_count": len(alerts)
        }
i

For production-grade whale tracking, use a dedicated node (Infura, Alchemy, or self-hosted) with WebSocket subscriptions to `eth_subscribe("logs")` rather than polling Etherscan. This reduces latency from minutes to milliseconds and avoids API rate limits.

03 Smart Money Flows

Smart money refers to wallet addresses that have historically demonstrated superior trading performance — they consistently buy near bottoms and sell near tops. Tracking these wallets is one of the highest-signal on-chain strategies available to agents.

Smart Money Wallet Scoring

Not all profitable wallets are smart money. Some wallets appear profitable because they received tokens early (insiders), had lucky timing, or are wash trading. A rigorous scoring system must separate genuine skill from noise:

smart_money_analyzer.py Python
import requests
import numpy as np
from dataclasses import dataclass
from typing import List, Dict

DUNE_BASE = "https://api.dune.com/api/v1"

@dataclass
class WalletProfile:
    address: str
    total_pnl_usd: float
    win_rate: float
    avg_hold_days: float
    trade_count: int
    tokens_traded: list
    first_seen_days_ago: int

class DuneAnalytics:
    def __init__(self, api_key: str):
        self.headers = {"X-Dune-API-Key": api_key, "Content-Type": "application/json"}

    def execute_query(self, query_id: int, params: dict = None) -> list:
        """Execute a Dune query and return results."""
        body = {}
        if params:
            body["query_parameters"] = params

        # Execute query
        exec_resp = requests.post(
            f"{DUNE_BASE}/query/{query_id}/execute",
            json=body, headers=self.headers
        )
        exec_id = exec_resp.json()["execution_id"]

        # Poll for results
        import time
        while True:
            result_resp = requests.get(
                f"{DUNE_BASE}/execution/{exec_id}/results",
                headers=self.headers
            )
            data = result_resp.json()
            if data.get("is_execution_finished"):
                return data["result"]["rows"]
            time.sleep(2)

    def get_wallet_pnl(self, wallet: str) -> WalletProfile:
        """
        Query wallet's historical P&L across all ERC-20 token trades.
        Uses a pre-built Dune query (customize query_id for your analysis).
        """
        rows = self.execute_query(3456789, {"wallet_address": wallet})
        if not rows:
            return None

        row = rows[0]
        return WalletProfile(
            address=wallet,
            total_pnl_usd=row["total_pnl_usd"],
            win_rate=row["win_rate"],
            avg_hold_days=row["avg_hold_days"],
            trade_count=row["trade_count"],
            tokens_traded=row["tokens_traded"],
            first_seen_days_ago=row["first_seen_days_ago"]
        )


class SmartMoneyScorer:
    def score(self, wallet: WalletProfile) -> float:
        """Score a wallet 0-100 for 'smart money' classification."""
        if wallet.trade_count < 50:
            return 0.0  # not enough history
        if wallet.first_seen_days_ago < 180:
            return 0.0  # wallet too new

        pnl_score = np.clip(wallet.total_pnl_usd / 1_000_000, 0, 1)
        winrate_score = wallet.win_rate
        hold_score = np.clip(wallet.avg_hold_days / 30, 0, 1)  # prefer medium-term holds
        experience_score = np.clip(wallet.trade_count / 500, 0, 1)

        score = (pnl_score * 0.40 + winrate_score * 0.30 +
                 hold_score * 0.15 + experience_score * 0.15)
        return round(score * 100, 1)

    def get_current_holdings(self, wallet: str, etherscan_key: str) -> list:
        """
        Fetch all ERC-20 token holdings for a smart money wallet.
        Used to generate 'smart money is accumulating TOKEN_X' signal.
        """
        resp = requests.get(ETHERSCAN_BASE, params={
            "module": "account",
            "action": "tokentx",
            "address": wallet,
            "sort": "desc",
            "apikey": etherscan_key
        })
        txs = resp.json().get("result", [])

        # Net token balances from recent transfers
        holdings: Dict[str, float] = {}
        for tx in txs[:200]:  # last 200 token transfers
            symbol = tx["tokenSymbol"]
            value = int(tx["value"]) / (10 ** int(tx["tokenDecimal"]))
            sign = 1 if tx["to"].lower() == wallet.lower() else -1
            holdings[symbol] = holdings.get(symbol, 0) + sign * value

        return [
            {"token": sym, "net_balance": bal}
            for sym, bal in holdings.items() if bal > 0
        ]
i

Dune Analytics (dune.com) is the most powerful tool for building custom on-chain queries. Their SQL-based query engine runs directly against indexed Ethereum data. Build a query that identifies wallets with >$500K realized profits, >60% win rate, and >100 trades over 12 months — this is your smart money universe.

04 DEX Volume Analysis

Decentralized exchange (DEX) data is among the most predictive on-chain signals for agents. Because DEX transactions are visible in the mempool before CEX price discovery, large DEX volume surges often precede price movements on centralized exchanges by 2–10 minutes.

Uniswap V3 Signals

Uniswap V3's concentrated liquidity model gives agents additional signal sources:

dex_volume_analyzer.py Python
import requests
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import json

UNISWAP_SUBGRAPH = "https://api.thegraph.com/subgraphs/name/uniswap/uniswap-v3"
DUNE_BASE = "https://api.dune.com/api/v1"


def query_uniswap_volume(token_address: str, hours: int = 24) -> dict:
    """
    Query Uniswap V3 subgraph for token pair volume and liquidity.
    Returns hourly volume data for anomaly detection.
    """
    ts_from = int((datetime.utcnow() - timedelta(hours=hours)).timestamp())

    query = f"""
    {{
      tokenHourDatas(
        where: {{
          token: "{token_address.lower()}",
          periodStartUnix_gt: {ts_from}
        }}
        orderBy: periodStartUnix
        orderDirection: desc
        first: 100
      ) {{
        periodStartUnix
        volume
        volumeUSD
        totalValueLockedUSD
        priceUSD
      }}
    }}
    """

    resp = requests.post(UNISWAP_SUBGRAPH, json={"query": query})
    resp.raise_for_status()
    return resp.json()["data"]["tokenHourDatas"]


def detect_volume_anomaly(hourly_data: list, z_threshold: float = 2.5) -> dict:
    """
    Detect anomalous volume spikes using z-score.
    Returns signal if latest hour is an outlier vs. trailing 30-day baseline.
    """
    df = pd.DataFrame(hourly_data)
    df["volumeUSD"] = df["volumeUSD"].astype(float)
    df["totalValueLockedUSD"] = df["totalValueLockedUSD"].astype(float)
    df["vol_tvl_ratio"] = df["volumeUSD"] / df["totalValueLockedUSD"].replace(0, np.nan)

    if len(df) < 24:
        return {"signal": "insufficient_data"}

    mu = df["volumeUSD"][1:].mean()  # exclude latest hour
    sigma = df["volumeUSD"][1:].std()
    latest_vol = df["volumeUSD"].iloc[0]

    z = (latest_vol - mu) / (sigma + 1e-9)
    latest_ratio = df["vol_tvl_ratio"].iloc[0]
    avg_ratio = df["vol_tvl_ratio"][1:].mean()

    return {
        "z_score": round(z, 2),
        "latest_volume_usd": round(latest_vol, 0),
        "baseline_volume_usd": round(mu, 0),
        "vol_tvl_ratio": round(latest_ratio, 4),
        "vol_tvl_vs_baseline": round(latest_ratio / (avg_ratio + 1e-9), 2),
        "signal": "volume_spike" if z > z_threshold else "normal",
        "strength": "strong" if z > 4.0 else ("moderate" if z > 2.5 else "weak")
    }


def scan_emerging_tokens(dune_api_key: str) -> list:
    """
    Use Dune to find tokens with sudden volume emergence in the last 6 hours.
    Tokens appearing for the first time in the top-100 DEX pairs by volume.
    """
    dune = DuneAnalytics(dune_api_key)
    # Query ID 3892011 = custom query: "new tokens in top-100 DEX pairs last 6h"
    rows = dune.execute_query(3892011)

    return [{
        "token": r["token_symbol"],
        "address": r["token_address"],
        "volume_6h_usd": r["volume_6h_usd"],
        "unique_traders": r["unique_traders"],
        "pool_pair": r["pool_pair"]
    } for r in rows]


class DEXSignalAgent:
    def __init__(self, trading_api_key: str, dune_key: str):
        self.trading_key = trading_api_key
        self.dune_key = dune_key
        self.watchlist = [
            "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2",  # WETH
            "0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48",  # USDC
            "0x2260fac5e5542a773aa44fbcfedf7c193bc2c599",  # WBTC
        ]

    def scan_and_generate_signals(self) -> list:
        signals = []
        for token_addr in self.watchlist:
            hourly = query_uniswap_volume(token_addr, hours=72)
            anomaly = detect_volume_anomaly(hourly)

            if anomaly["signal"] == "volume_spike":
                signals.append({
                    "token_address": token_addr,
                    "signal_type": "dex_volume_spike",
                    "z_score": anomaly["z_score"],
                    "strength": anomaly["strength"],
                    "action": "consider_long"
                })

        return signals

05 Mempool Monitoring

The mempool is the waiting room for unconfirmed transactions — transactions broadcast to the network but not yet included in a block. It is a window into the near future of on-chain activity. Agents monitoring the mempool can detect large pending swaps before they execute, anticipate price impact, and adjust positions accordingly.

What Agents Watch in the Mempool

Mempool Event What It Signals Agent Response
Large DEX swap (pending) Price will move when confirmed Pre-position in direction of swap
High gas whale TX Urgent transaction, willing to overpay Treat as high-priority signal
Large exchange withdrawal (pending) Imminent long-term accumulation Bullish signal ahead of confirmation
NFT/token contract deployment New asset launch incoming Monitor for launch volume signal

Mempool-based front-running is regulated in many jurisdictions and may violate exchange terms. Agents should only use mempool data for macroscopic flow analysis (large whale signals) and should never attempt to sandwich or front-run individual users' transactions.

mempool_monitor.py Python
from web3 import Web3
import asyncio
import logging
from typing import Callable

log = logging.getLogger("mempool_monitor")

UNISWAP_V3_ROUTER = "0xE592427A0AEce92De3Edee1F18E0157C05861564".lower()
UNISWAP_V2_ROUTER = "0x7a250d5630B4cF539739dF2C5dAcb4c659F2488D".lower()
DEX_ROUTERS = {UNISWAP_V3_ROUTER, UNISWAP_V2_ROUTER}

# Minimum ETH value to flag as "large" pending tx
LARGE_TX_THRESHOLD_ETH = 100


class MempoolMonitor:
    def __init__(self, ws_endpoint: str, on_large_tx: Callable = None):
        """
        ws_endpoint: WebSocket URL for Ethereum node (Alchemy/Infura wss://)
        on_large_tx: callback invoked when large pending tx detected
        """
        self.w3 = Web3(Web3.WebsocketProvider(ws_endpoint))
        self.on_large_tx = on_large_tx or (lambda tx: log.info(f"Large TX: {tx}"))
        self.eth_price = self._fetch_eth_price()

    def _fetch_eth_price(self) -> float:
        resp = requests.get("https://api.coingecko.com/api/v3/simple/price",
                           params={"ids": "ethereum", "vs_currencies": "usd"})
        return resp.json()["ethereum"]["usd"]

    def classify_pending_tx(self, tx: dict) -> dict:
        """Classify a pending transaction by type and estimated impact."""
        to_addr = (tx.get("to") or "").lower()
        value_eth = self.w3.from_wei(tx["value"], "ether")
        gas_price_gwei = tx["gasPrice"] / 1e9

        tx_type = "unknown"
        if to_addr in DEX_ROUTERS:
            tx_type = "dex_swap"
        elif to_addr in EXCHANGE_ADDRESSES:
            tx_type = "exchange_deposit"
        elif tx.get("input") == "0x":
            tx_type = "eth_transfer"

        urgency = "high" if gas_price_gwei > 100 else ("medium" if gas_price_gwei > 30 else "low")

        return {
            "hash": tx["hash"].hex(),
            "type": tx_type,
            "value_eth": float(value_eth),
            "value_usd": float(value_eth) * self.eth_price,
            "gas_gwei": gas_price_gwei,
            "urgency": urgency,
            "is_large": float(value_eth) > LARGE_TX_THRESHOLD_ETH
        }

    async def stream_pending(self):
        """Subscribe to pending transactions via eth_subscribe."""
        log.info("Starting mempool stream...")
        pending_filter = self.w3.eth.filter("pending")

        while True:
            try:
                for tx_hash in pending_filter.get_new_entries():
                    try:
                        tx = self.w3.eth.get_transaction(tx_hash)
                        classified = self.classify_pending_tx(tx)

                        if classified["is_large"]:
                            self.on_large_tx(classified)

                    except Exception:
                        pass  # TX may have been removed from mempool

                await asyncio.sleep(0.1)
            except Exception as e:
                log.error(f"Mempool stream error: {e}")
                await asyncio.sleep(5)

06 Data Pipeline and Trading Code

The following integrates all on-chain data sources into a unified signal aggregator that feeds directly into the Purple Flea Trading API. The agent computes a composite on-chain sentiment score every 5 minutes and executes trades when the score exceeds configurable thresholds.

onchain_trading_agent.py Python
import asyncio
import logging
import requests
import time
from dataclasses import dataclass
from collections import deque

log = logging.getLogger("onchain_agent")

@dataclass
class OnChainSignal:
    source: str           # "whale" | "smart_money" | "dex" | "mempool"
    direction: str        # "bullish" | "bearish" | "neutral"
    strength: float       # 0-1
    confidence: float     # 0-1
    timestamp: float
    metadata: dict


class OnChainSentimentAggregator:
    """
    Aggregates signals from all on-chain sources into a single
    sentiment score. Score range: -100 (max bearish) to +100 (max bullish).
    """

    SOURCE_WEIGHTS = {
        "smart_money": 0.35,   # highest weight: verified skill
        "whale": 0.30,          # large market impact
        "dex": 0.20,            # volume momentum
        "mempool": 0.15,        # short-term flow intel
    }

    def __init__(self, signal_ttl_seconds: int = 3600):
        self.signal_ttl = signal_ttl_seconds
        self.signals: deque = deque(maxlen=500)

    def add_signal(self, signal: OnChainSignal):
        self.signals.append(signal)

    def compute_score(self) -> dict:
        """
        Compute weighted on-chain sentiment score.
        Returns score and breakdown by source.
        """
        now = time.time()
        fresh = [s for s in self.signals if now - s.timestamp < self.signal_ttl]

        if not fresh:
            return {"score": 0, "direction": "neutral", "signal_count": 0}

        source_scores = {}
        for source, weight in self.SOURCE_WEIGHTS.items():
            source_signals = [s for s in fresh if s.source == source]
            if not source_signals:
                source_scores[source] = 0
                continue

            directional_values = []
            for s in source_signals:
                sign = 1 if s.direction == "bullish" else (-1 if s.direction == "bearish" else 0)
                directional_values.append(sign * s.strength * s.confidence)

            source_scores[source] = sum(directional_values) / len(directional_values) * 100

        total_score = sum(
            source_scores[src] * w for src, w in self.SOURCE_WEIGHTS.items()
        )

        direction = "bullish" if total_score > 15 else ("bearish" if total_score < -15 else "neutral")

        return {
            "score": round(total_score, 1),
            "direction": direction,
            "source_breakdown": {k: round(v, 1) for k, v in source_scores.items()},
            "signal_count": len(fresh),
        }


class OnChainTradingAgent:
    def __init__(self, trading_api_key: str, etherscan_key: str, dune_key: str):
        self.trading_key = trading_api_key
        self.whale_tracker = WhaleTracker(etherscan_key, min_value_eth=500)
        self.dex_agent = DEXSignalAgent(trading_api_key, dune_key)
        self.aggregator = OnChainSentimentAggregator()
        self.position = 0.0
        self.max_position_usd = 10_000

    def ingest_whale_signals(self):
        flow = self.whale_tracker.aggregate_exchange_flow(window_hours=4)
        direction = "bearish" if flow["signal"] == "bearish" else "bullish"
        abs_flow = abs(flow["net_flow_usd"])
        strength = min(abs_flow / 50_000_000, 1.0)  # normalize at $50M

        self.aggregator.add_signal(OnChainSignal(
            source="whale", direction=direction, strength=strength,
            confidence=0.8, timestamp=time.time(), metadata=flow
        ))

    def ingest_dex_signals(self):
        signals = self.dex_agent.scan_and_generate_signals()
        for s in signals:
            strength_map = {"strong": 0.9, "moderate": 0.6, "weak": 0.3}
            self.aggregator.add_signal(OnChainSignal(
                source="dex", direction="bullish",
                strength=strength_map.get(s.get("strength", "weak"), 0.3),
                confidence=0.65, timestamp=time.time(), metadata=s
            ))

    def execute_signal(self, score_data: dict):
        score = score_data["score"]
        direction = score_data["direction"]

        if direction == "bullish" and self.position <= 0:
            notional = self.max_position_usd * (abs(score) / 100)
            self._trade("buy", notional, score_data)
        elif direction == "bearish" and self.position >= 0:
            notional = self.max_position_usd * (abs(score) / 100)
            self._trade("sell", notional, score_data)

    def _trade(self, action: str, notional: float, metadata: dict):
        resp = requests.post(
            "https://purpleflea.com/trading-api/execute",
            json={"action": action, "market": "spot", "asset": "ETH",
                  "notional_usd": round(notional, 2), "source": "onchain_signal"},
            headers={"X-API-Key": self.trading_key}
        )
        log.info(f"[TRADE] {action} ETH ${notional:,.0f} | score={metadata.get('score')}")
        self.position += notional if action == "buy" else -notional
        return resp.json()

    async def run(self):
        log.info("On-Chain Trading Agent started")
        while True:
            try:
                self.ingest_whale_signals()
                self.ingest_dex_signals()
                score_data = self.aggregator.compute_score()
                log.info(f"On-Chain Score: {score_data['score']} | {score_data['direction']}")
                self.execute_signal(score_data)
            except Exception as e:
                log.error(f"Error: {e}")
            await asyncio.sleep(300)  # scan every 5 minutes


if __name__ == "__main__":
    agent = OnChainTradingAgent(
        trading_api_key="YOUR_PURPLEFLEA_KEY",
        etherscan_key="YOUR_ETHERSCAN_KEY",
        dune_key="YOUR_DUNE_KEY"
    )
    asyncio.run(agent.run())
*

The Purple Flea Trading API accepts on-chain signals as first-class inputs — you can annotate every trade order with the source signal metadata for full audit trail. New agents can start testing with free USDC from the Faucet before committing real capital.

Data Source Reference

Data Source API Free Tier Best For
Etherscan etherscan.io/apis 5 calls/sec Wallet history, token transfers, ERC-20
Dune Analytics api.dune.com Community queries free Custom SQL queries, DEX analytics, smart money
Alchemy alchemy.com 300M compute units/mo Real-time node access, WebSocket mempool
The Graph thegraph.com Subgraph queries free Uniswap/Curve/Aave protocol data
Nansen nansen.ai Paid only Pre-labeled smart money wallets

Start Reading the Chain

Build your on-chain data pipeline, generate signals from whale flows and DEX volume, and route trades through the Purple Flea Trading API. Claim free test capital from the faucet to start immediately.