01 On-Chain Metrics Overview
On-chain data encompasses every piece of information publicly recorded on a blockchain: wallet balances, transaction history, smart contract interactions, DEX swaps, liquidity pool changes, mempool pending transactions, gas fees, and block-level statistics. For an AI agent, this data is raw alpha — unfiltered financial behavior of every participant in the market.
Key On-Chain Signal Categories
Whale Movements
Large wallet inflows/outflows to exchanges. Exchange deposits signal selling intent; withdrawals signal long-term holding.
Smart Money Flows
Wallet clusters associated with profitable historical trades. High-confidence wallets accumulating = bullish signal.
DEX Volume & Liquidity
Uniswap/Curve volume surges often precede centralized exchange price movements. LP position changes reveal institutional intent.
Mempool Intelligence
Pending transactions reveal incoming large moves before they are confirmed. Sandwich detection, front-run risk analysis.
Network Activity
Active addresses, transaction counts, gas usage. Rising network activity at lower prices = accumulation signal.
Contract Events
Token transfers, approval events, governance votes, vault deposit/withdrawals. Each is a data point on market participant intent.
02 Whale Tracking
Whale wallets — addresses holding more than approximately $10M in assets — move markets. A single whale transferring 10,000 BTC to an exchange is a credible selling signal. The key insight is that exchange inflows from whale wallets predict short-term selling pressure, while large withdrawals from exchanges to cold wallets signal long-term conviction.
Exchange Flow Classification
| Flow Type | Direction | Signal Interpretation | Agent Response |
|---|---|---|---|
| Large Deposit | Whale → Exchange | Likely sell incoming | Reduce long exposure |
| Large Withdrawal | Exchange → Cold Wallet | Long-term accumulation | Add long exposure |
| Wallet-to-Wallet | Whale → Unknown | Ambiguous — research needed | Monitor destination |
| DeFi Deposit | Whale → Protocol | Yield seeking — bullish long-term | Mild bullish bias |
import requests
import time
from dataclasses import dataclass
from typing import List, Set
import logging
log = logging.getLogger("whale_tracker")
# Known exchange hot wallets (Binance, Coinbase, Kraken, etc.)
EXCHANGE_ADDRESSES: Set[str] = {
"0x3f5ce5fbfe3e9af3971dd833d26ba9b5c936f0be", # Binance 1
"0xd551234ae421e3bcba99a0da6d736074f22192ff", # Binance 2
"0xa910f92acdaf488fa6ef02174fb86208ad7722ba", # Coinbase
"0xfe9e8709d3215310075d67e3ed32a380ccf451c8", # Kraken
# Add more from Etherscan labels
}
ETHERSCAN_BASE = "https://api.etherscan.io/api"
@dataclass
class WhaleAlert:
tx_hash: str
from_addr: str
to_addr: str
value_eth: float
value_usd: float
flow_type: str # "exchange_inflow" | "exchange_outflow" | "wallet_transfer"
timestamp: int
class WhaleTracker:
def __init__(self, etherscan_key: str, min_value_eth: float = 500):
self.key = etherscan_key
self.min_value_eth = min_value_eth # alert threshold
self.seen_txs: Set[str] = set()
self.eth_price_usd = self._get_eth_price()
def _get_eth_price(self) -> float:
resp = requests.get(
ETHERSCAN_BASE,
params={"module": "stats", "action": "ethprice", "apikey": self.key}
)
return float(resp.json()["result"]["ethusd"])
def get_large_transfers(self, start_block: int = 0) -> List[WhaleAlert]:
"""
Query ETH transfers above threshold using Etherscan API.
In production: use a node subscription (eth_subscribe) for real-time.
"""
resp = requests.get(ETHERSCAN_BASE, params={
"module": "account",
"action": "txlist",
"startblock": start_block,
"sort": "desc",
"apikey": self.key
})
txs = resp.json().get("result", [])
alerts = []
for tx in txs:
if tx["hash"] in self.seen_txs:
continue
value_eth = int(tx["value"]) / 1e18
if value_eth < self.min_value_eth:
continue
from_addr = tx["from"].lower()
to_addr = tx["to"].lower()
if to_addr in EXCHANGE_ADDRESSES:
flow_type = "exchange_inflow" # bearish signal
elif from_addr in EXCHANGE_ADDRESSES:
flow_type = "exchange_outflow" # bullish signal
else:
flow_type = "wallet_transfer" # neutral / needs research
alerts.append(WhaleAlert(
tx_hash=tx["hash"],
from_addr=from_addr,
to_addr=to_addr,
value_eth=value_eth,
value_usd=value_eth * self.eth_price_usd,
flow_type=flow_type,
timestamp=int(tx["timeStamp"])
))
self.seen_txs.add(tx["hash"])
return alerts
def aggregate_exchange_flow(self, window_hours: int = 24) -> dict:
"""
Net exchange flow over window: positive = net inflow (bearish pressure),
negative = net outflow (bullish accumulation).
"""
cutoff = int(time.time()) - window_hours * 3600
alerts = self.get_large_transfers()
inflows = sum(a.value_usd for a in alerts
if a.flow_type == "exchange_inflow" and a.timestamp > cutoff)
outflows = sum(a.value_usd for a in alerts
if a.flow_type == "exchange_outflow" and a.timestamp > cutoff)
net_flow = inflows - outflows
signal = "bearish" if net_flow > 0 else "bullish"
return {
"window_hours": window_hours,
"net_flow_usd": round(net_flow, 0),
"inflows_usd": round(inflows, 0),
"outflows_usd": round(outflows, 0),
"signal": signal,
"alert_count": len(alerts)
}
For production-grade whale tracking, use a dedicated node (Infura, Alchemy, or self-hosted) with WebSocket subscriptions to `eth_subscribe("logs")` rather than polling Etherscan. This reduces latency from minutes to milliseconds and avoids API rate limits.
03 Smart Money Flows
Smart money refers to wallet addresses that have historically demonstrated superior trading performance — they consistently buy near bottoms and sell near tops. Tracking these wallets is one of the highest-signal on-chain strategies available to agents.
Smart Money Wallet Scoring
Not all profitable wallets are smart money. Some wallets appear profitable because they received tokens early (insiders), had lucky timing, or are wash trading. A rigorous scoring system must separate genuine skill from noise:
import requests
import numpy as np
from dataclasses import dataclass
from typing import List, Dict
DUNE_BASE = "https://api.dune.com/api/v1"
@dataclass
class WalletProfile:
address: str
total_pnl_usd: float
win_rate: float
avg_hold_days: float
trade_count: int
tokens_traded: list
first_seen_days_ago: int
class DuneAnalytics:
def __init__(self, api_key: str):
self.headers = {"X-Dune-API-Key": api_key, "Content-Type": "application/json"}
def execute_query(self, query_id: int, params: dict = None) -> list:
"""Execute a Dune query and return results."""
body = {}
if params:
body["query_parameters"] = params
# Execute query
exec_resp = requests.post(
f"{DUNE_BASE}/query/{query_id}/execute",
json=body, headers=self.headers
)
exec_id = exec_resp.json()["execution_id"]
# Poll for results
import time
while True:
result_resp = requests.get(
f"{DUNE_BASE}/execution/{exec_id}/results",
headers=self.headers
)
data = result_resp.json()
if data.get("is_execution_finished"):
return data["result"]["rows"]
time.sleep(2)
def get_wallet_pnl(self, wallet: str) -> WalletProfile:
"""
Query wallet's historical P&L across all ERC-20 token trades.
Uses a pre-built Dune query (customize query_id for your analysis).
"""
rows = self.execute_query(3456789, {"wallet_address": wallet})
if not rows:
return None
row = rows[0]
return WalletProfile(
address=wallet,
total_pnl_usd=row["total_pnl_usd"],
win_rate=row["win_rate"],
avg_hold_days=row["avg_hold_days"],
trade_count=row["trade_count"],
tokens_traded=row["tokens_traded"],
first_seen_days_ago=row["first_seen_days_ago"]
)
class SmartMoneyScorer:
def score(self, wallet: WalletProfile) -> float:
"""Score a wallet 0-100 for 'smart money' classification."""
if wallet.trade_count < 50:
return 0.0 # not enough history
if wallet.first_seen_days_ago < 180:
return 0.0 # wallet too new
pnl_score = np.clip(wallet.total_pnl_usd / 1_000_000, 0, 1)
winrate_score = wallet.win_rate
hold_score = np.clip(wallet.avg_hold_days / 30, 0, 1) # prefer medium-term holds
experience_score = np.clip(wallet.trade_count / 500, 0, 1)
score = (pnl_score * 0.40 + winrate_score * 0.30 +
hold_score * 0.15 + experience_score * 0.15)
return round(score * 100, 1)
def get_current_holdings(self, wallet: str, etherscan_key: str) -> list:
"""
Fetch all ERC-20 token holdings for a smart money wallet.
Used to generate 'smart money is accumulating TOKEN_X' signal.
"""
resp = requests.get(ETHERSCAN_BASE, params={
"module": "account",
"action": "tokentx",
"address": wallet,
"sort": "desc",
"apikey": etherscan_key
})
txs = resp.json().get("result", [])
# Net token balances from recent transfers
holdings: Dict[str, float] = {}
for tx in txs[:200]: # last 200 token transfers
symbol = tx["tokenSymbol"]
value = int(tx["value"]) / (10 ** int(tx["tokenDecimal"]))
sign = 1 if tx["to"].lower() == wallet.lower() else -1
holdings[symbol] = holdings.get(symbol, 0) + sign * value
return [
{"token": sym, "net_balance": bal}
for sym, bal in holdings.items() if bal > 0
]
Dune Analytics (dune.com) is the most powerful tool for building custom on-chain queries. Their SQL-based query engine runs directly against indexed Ethereum data. Build a query that identifies wallets with >$500K realized profits, >60% win rate, and >100 trades over 12 months — this is your smart money universe.
04 DEX Volume Analysis
Decentralized exchange (DEX) data is among the most predictive on-chain signals for agents. Because DEX transactions are visible in the mempool before CEX price discovery, large DEX volume surges often precede price movements on centralized exchanges by 2–10 minutes.
Uniswap V3 Signals
Uniswap V3's concentrated liquidity model gives agents additional signal sources:
- Volume/Liquidity Ratio: High volume relative to available liquidity means large slippage is occurring — a sign of urgent, informed trading.
- LP Position Changes: Large LPs removing liquidity from a pool often precede sharp price movements (informed withdrawal).
- Price Impact per Trade: Consistently high price impact signals thin order books and potential for sharp moves.
- Token Pair Rotation: When volume suddenly appears in a previously illiquid pair, it often signals early accumulation of a token before broader discovery.
import requests
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import json
UNISWAP_SUBGRAPH = "https://api.thegraph.com/subgraphs/name/uniswap/uniswap-v3"
DUNE_BASE = "https://api.dune.com/api/v1"
def query_uniswap_volume(token_address: str, hours: int = 24) -> dict:
"""
Query Uniswap V3 subgraph for token pair volume and liquidity.
Returns hourly volume data for anomaly detection.
"""
ts_from = int((datetime.utcnow() - timedelta(hours=hours)).timestamp())
query = f"""
{{
tokenHourDatas(
where: {{
token: "{token_address.lower()}",
periodStartUnix_gt: {ts_from}
}}
orderBy: periodStartUnix
orderDirection: desc
first: 100
) {{
periodStartUnix
volume
volumeUSD
totalValueLockedUSD
priceUSD
}}
}}
"""
resp = requests.post(UNISWAP_SUBGRAPH, json={"query": query})
resp.raise_for_status()
return resp.json()["data"]["tokenHourDatas"]
def detect_volume_anomaly(hourly_data: list, z_threshold: float = 2.5) -> dict:
"""
Detect anomalous volume spikes using z-score.
Returns signal if latest hour is an outlier vs. trailing 30-day baseline.
"""
df = pd.DataFrame(hourly_data)
df["volumeUSD"] = df["volumeUSD"].astype(float)
df["totalValueLockedUSD"] = df["totalValueLockedUSD"].astype(float)
df["vol_tvl_ratio"] = df["volumeUSD"] / df["totalValueLockedUSD"].replace(0, np.nan)
if len(df) < 24:
return {"signal": "insufficient_data"}
mu = df["volumeUSD"][1:].mean() # exclude latest hour
sigma = df["volumeUSD"][1:].std()
latest_vol = df["volumeUSD"].iloc[0]
z = (latest_vol - mu) / (sigma + 1e-9)
latest_ratio = df["vol_tvl_ratio"].iloc[0]
avg_ratio = df["vol_tvl_ratio"][1:].mean()
return {
"z_score": round(z, 2),
"latest_volume_usd": round(latest_vol, 0),
"baseline_volume_usd": round(mu, 0),
"vol_tvl_ratio": round(latest_ratio, 4),
"vol_tvl_vs_baseline": round(latest_ratio / (avg_ratio + 1e-9), 2),
"signal": "volume_spike" if z > z_threshold else "normal",
"strength": "strong" if z > 4.0 else ("moderate" if z > 2.5 else "weak")
}
def scan_emerging_tokens(dune_api_key: str) -> list:
"""
Use Dune to find tokens with sudden volume emergence in the last 6 hours.
Tokens appearing for the first time in the top-100 DEX pairs by volume.
"""
dune = DuneAnalytics(dune_api_key)
# Query ID 3892011 = custom query: "new tokens in top-100 DEX pairs last 6h"
rows = dune.execute_query(3892011)
return [{
"token": r["token_symbol"],
"address": r["token_address"],
"volume_6h_usd": r["volume_6h_usd"],
"unique_traders": r["unique_traders"],
"pool_pair": r["pool_pair"]
} for r in rows]
class DEXSignalAgent:
def __init__(self, trading_api_key: str, dune_key: str):
self.trading_key = trading_api_key
self.dune_key = dune_key
self.watchlist = [
"0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2", # WETH
"0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48", # USDC
"0x2260fac5e5542a773aa44fbcfedf7c193bc2c599", # WBTC
]
def scan_and_generate_signals(self) -> list:
signals = []
for token_addr in self.watchlist:
hourly = query_uniswap_volume(token_addr, hours=72)
anomaly = detect_volume_anomaly(hourly)
if anomaly["signal"] == "volume_spike":
signals.append({
"token_address": token_addr,
"signal_type": "dex_volume_spike",
"z_score": anomaly["z_score"],
"strength": anomaly["strength"],
"action": "consider_long"
})
return signals
05 Mempool Monitoring
The mempool is the waiting room for unconfirmed transactions — transactions broadcast to the network but not yet included in a block. It is a window into the near future of on-chain activity. Agents monitoring the mempool can detect large pending swaps before they execute, anticipate price impact, and adjust positions accordingly.
What Agents Watch in the Mempool
| Mempool Event | What It Signals | Agent Response |
|---|---|---|
| Large DEX swap (pending) | Price will move when confirmed | Pre-position in direction of swap |
| High gas whale TX | Urgent transaction, willing to overpay | Treat as high-priority signal |
| Large exchange withdrawal (pending) | Imminent long-term accumulation | Bullish signal ahead of confirmation |
| NFT/token contract deployment | New asset launch incoming | Monitor for launch volume signal |
Mempool-based front-running is regulated in many jurisdictions and may violate exchange terms. Agents should only use mempool data for macroscopic flow analysis (large whale signals) and should never attempt to sandwich or front-run individual users' transactions.
from web3 import Web3
import asyncio
import logging
from typing import Callable
log = logging.getLogger("mempool_monitor")
UNISWAP_V3_ROUTER = "0xE592427A0AEce92De3Edee1F18E0157C05861564".lower()
UNISWAP_V2_ROUTER = "0x7a250d5630B4cF539739dF2C5dAcb4c659F2488D".lower()
DEX_ROUTERS = {UNISWAP_V3_ROUTER, UNISWAP_V2_ROUTER}
# Minimum ETH value to flag as "large" pending tx
LARGE_TX_THRESHOLD_ETH = 100
class MempoolMonitor:
def __init__(self, ws_endpoint: str, on_large_tx: Callable = None):
"""
ws_endpoint: WebSocket URL for Ethereum node (Alchemy/Infura wss://)
on_large_tx: callback invoked when large pending tx detected
"""
self.w3 = Web3(Web3.WebsocketProvider(ws_endpoint))
self.on_large_tx = on_large_tx or (lambda tx: log.info(f"Large TX: {tx}"))
self.eth_price = self._fetch_eth_price()
def _fetch_eth_price(self) -> float:
resp = requests.get("https://api.coingecko.com/api/v3/simple/price",
params={"ids": "ethereum", "vs_currencies": "usd"})
return resp.json()["ethereum"]["usd"]
def classify_pending_tx(self, tx: dict) -> dict:
"""Classify a pending transaction by type and estimated impact."""
to_addr = (tx.get("to") or "").lower()
value_eth = self.w3.from_wei(tx["value"], "ether")
gas_price_gwei = tx["gasPrice"] / 1e9
tx_type = "unknown"
if to_addr in DEX_ROUTERS:
tx_type = "dex_swap"
elif to_addr in EXCHANGE_ADDRESSES:
tx_type = "exchange_deposit"
elif tx.get("input") == "0x":
tx_type = "eth_transfer"
urgency = "high" if gas_price_gwei > 100 else ("medium" if gas_price_gwei > 30 else "low")
return {
"hash": tx["hash"].hex(),
"type": tx_type,
"value_eth": float(value_eth),
"value_usd": float(value_eth) * self.eth_price,
"gas_gwei": gas_price_gwei,
"urgency": urgency,
"is_large": float(value_eth) > LARGE_TX_THRESHOLD_ETH
}
async def stream_pending(self):
"""Subscribe to pending transactions via eth_subscribe."""
log.info("Starting mempool stream...")
pending_filter = self.w3.eth.filter("pending")
while True:
try:
for tx_hash in pending_filter.get_new_entries():
try:
tx = self.w3.eth.get_transaction(tx_hash)
classified = self.classify_pending_tx(tx)
if classified["is_large"]:
self.on_large_tx(classified)
except Exception:
pass # TX may have been removed from mempool
await asyncio.sleep(0.1)
except Exception as e:
log.error(f"Mempool stream error: {e}")
await asyncio.sleep(5)
06 Data Pipeline and Trading Code
The following integrates all on-chain data sources into a unified signal aggregator that feeds directly into the Purple Flea Trading API. The agent computes a composite on-chain sentiment score every 5 minutes and executes trades when the score exceeds configurable thresholds.
import asyncio
import logging
import requests
import time
from dataclasses import dataclass
from collections import deque
log = logging.getLogger("onchain_agent")
@dataclass
class OnChainSignal:
source: str # "whale" | "smart_money" | "dex" | "mempool"
direction: str # "bullish" | "bearish" | "neutral"
strength: float # 0-1
confidence: float # 0-1
timestamp: float
metadata: dict
class OnChainSentimentAggregator:
"""
Aggregates signals from all on-chain sources into a single
sentiment score. Score range: -100 (max bearish) to +100 (max bullish).
"""
SOURCE_WEIGHTS = {
"smart_money": 0.35, # highest weight: verified skill
"whale": 0.30, # large market impact
"dex": 0.20, # volume momentum
"mempool": 0.15, # short-term flow intel
}
def __init__(self, signal_ttl_seconds: int = 3600):
self.signal_ttl = signal_ttl_seconds
self.signals: deque = deque(maxlen=500)
def add_signal(self, signal: OnChainSignal):
self.signals.append(signal)
def compute_score(self) -> dict:
"""
Compute weighted on-chain sentiment score.
Returns score and breakdown by source.
"""
now = time.time()
fresh = [s for s in self.signals if now - s.timestamp < self.signal_ttl]
if not fresh:
return {"score": 0, "direction": "neutral", "signal_count": 0}
source_scores = {}
for source, weight in self.SOURCE_WEIGHTS.items():
source_signals = [s for s in fresh if s.source == source]
if not source_signals:
source_scores[source] = 0
continue
directional_values = []
for s in source_signals:
sign = 1 if s.direction == "bullish" else (-1 if s.direction == "bearish" else 0)
directional_values.append(sign * s.strength * s.confidence)
source_scores[source] = sum(directional_values) / len(directional_values) * 100
total_score = sum(
source_scores[src] * w for src, w in self.SOURCE_WEIGHTS.items()
)
direction = "bullish" if total_score > 15 else ("bearish" if total_score < -15 else "neutral")
return {
"score": round(total_score, 1),
"direction": direction,
"source_breakdown": {k: round(v, 1) for k, v in source_scores.items()},
"signal_count": len(fresh),
}
class OnChainTradingAgent:
def __init__(self, trading_api_key: str, etherscan_key: str, dune_key: str):
self.trading_key = trading_api_key
self.whale_tracker = WhaleTracker(etherscan_key, min_value_eth=500)
self.dex_agent = DEXSignalAgent(trading_api_key, dune_key)
self.aggregator = OnChainSentimentAggregator()
self.position = 0.0
self.max_position_usd = 10_000
def ingest_whale_signals(self):
flow = self.whale_tracker.aggregate_exchange_flow(window_hours=4)
direction = "bearish" if flow["signal"] == "bearish" else "bullish"
abs_flow = abs(flow["net_flow_usd"])
strength = min(abs_flow / 50_000_000, 1.0) # normalize at $50M
self.aggregator.add_signal(OnChainSignal(
source="whale", direction=direction, strength=strength,
confidence=0.8, timestamp=time.time(), metadata=flow
))
def ingest_dex_signals(self):
signals = self.dex_agent.scan_and_generate_signals()
for s in signals:
strength_map = {"strong": 0.9, "moderate": 0.6, "weak": 0.3}
self.aggregator.add_signal(OnChainSignal(
source="dex", direction="bullish",
strength=strength_map.get(s.get("strength", "weak"), 0.3),
confidence=0.65, timestamp=time.time(), metadata=s
))
def execute_signal(self, score_data: dict):
score = score_data["score"]
direction = score_data["direction"]
if direction == "bullish" and self.position <= 0:
notional = self.max_position_usd * (abs(score) / 100)
self._trade("buy", notional, score_data)
elif direction == "bearish" and self.position >= 0:
notional = self.max_position_usd * (abs(score) / 100)
self._trade("sell", notional, score_data)
def _trade(self, action: str, notional: float, metadata: dict):
resp = requests.post(
"https://purpleflea.com/trading-api/execute",
json={"action": action, "market": "spot", "asset": "ETH",
"notional_usd": round(notional, 2), "source": "onchain_signal"},
headers={"X-API-Key": self.trading_key}
)
log.info(f"[TRADE] {action} ETH ${notional:,.0f} | score={metadata.get('score')}")
self.position += notional if action == "buy" else -notional
return resp.json()
async def run(self):
log.info("On-Chain Trading Agent started")
while True:
try:
self.ingest_whale_signals()
self.ingest_dex_signals()
score_data = self.aggregator.compute_score()
log.info(f"On-Chain Score: {score_data['score']} | {score_data['direction']}")
self.execute_signal(score_data)
except Exception as e:
log.error(f"Error: {e}")
await asyncio.sleep(300) # scan every 5 minutes
if __name__ == "__main__":
agent = OnChainTradingAgent(
trading_api_key="YOUR_PURPLEFLEA_KEY",
etherscan_key="YOUR_ETHERSCAN_KEY",
dune_key="YOUR_DUNE_KEY"
)
asyncio.run(agent.run())
The Purple Flea Trading API accepts on-chain signals as first-class inputs — you can annotate every trade order with the source signal metadata for full audit trail. New agents can start testing with free USDC from the Faucet before committing real capital.
Data Source Reference
| Data Source | API | Free Tier | Best For |
|---|---|---|---|
| Etherscan | etherscan.io/apis | 5 calls/sec | Wallet history, token transfers, ERC-20 |
| Dune Analytics | api.dune.com | Community queries free | Custom SQL queries, DEX analytics, smart money |
| Alchemy | alchemy.com | 300M compute units/mo | Real-time node access, WebSocket mempool |
| The Graph | thegraph.com | Subgraph queries free | Uniswap/Curve/Aave protocol data |
| Nansen | nansen.ai | Paid only | Pre-labeled smart money wallets |
Start Reading the Chain
Build your on-chain data pipeline, generate signals from whale flows and DEX volume, and route trades through the Purple Flea Trading API. Claim free test capital from the faucet to start immediately.