On-Chain Analytics for AI Trading Agents
Blockchain data is the most transparent dataset in finance. AI agents that read on-chain signals — whale movements, exchange reserves, profit/loss ratios — gain a structural edge over agents relying solely on price feeds.
Why On-Chain Data Gives Agents an Edge
Traditional financial markets hide order books, institutional flows, and large wallet positions behind opaque intermediaries. Crypto is different. Every transaction, every wallet balance, every smart contract interaction is permanently recorded on a public ledger — available to any agent with an Etherscan key and a few lines of Python.
On-chain analytics converts this raw transparency into actionable signals: when whales move coins from cold storage to exchanges, it often precedes selling pressure. When exchange reserves fall while price rises, it signals strong demand. When the MVRV ratio climbs above 3.5, historically the market is overheated. These aren't guesses — they're measurable, repeatable patterns.
On-chain data operates on longer time horizons than order book data. It's most powerful for position sizing and macro regime detection, not tick-by-tick execution. Combine it with short-term price signals for best results.
Whale Wallet Tracking
Whale wallets — addresses holding more than 1,000 BTC or 10,000 ETH — move markets. Tracking their accumulation and distribution patterns is one of the highest-signal activities an on-chain analytics agent can perform.
Identifying Whale Wallets
Not all large wallets are whales in the trading sense. Exchange hot wallets, protocol treasuries, and bridging contracts hold enormous balances but don't reflect individual conviction. Your agent needs to filter these out using a known-address database before drawing conclusions.
| Wallet Type | Typical Balance | Signal Value | Filter Strategy |
|---|---|---|---|
| True whale (individual) | 1,000+ BTC | High | Cross-ref with exchange lists |
| Exchange hot wallet | 10,000+ BTC | Low (noise) | Exclude known exchange addresses |
| Protocol treasury | Variable | Medium | Tag via Etherscan labels |
| Mining pool | 100–5,000 BTC | Medium | Monitor outflow timing |
| OTC desk | 500–10,000 BTC | High | Identify by counterparty clusters |
Python: Whale Movement Detector
The following fetches large transactions on Ethereum using the Etherscan API and flags whale-level movements in real time:
import httpx
import asyncio
from datetime import datetime, timedelta
from typing import Optional
ETHERSCAN_KEY = "pf_live_<your_etherscan_key>"
ETHERSCAN_BASE = "https://api.etherscan.io/api"
# Known exchange/protocol addresses to filter out
KNOWN_NON_WHALE = {
"0x3f5ce5fbfe3e9af3971dd833d26ba9b5c936f0be", # Binance
"0x28c6c06298d514db089934071355e5743bf21d60", # Binance 2
"0xda9dfa130df4de4673b89022ee50ff26f6ea73cf", # Kraken
"0x71660c4005ba85c37ccec55d0c4493e66fe775d3", # Coinbase
}
WHALE_THRESHOLD_ETH = 1000 # 1000 ETH = whale-level move
async def fetch_large_transactions(
client: httpx.AsyncClient,
start_block: int,
end_block: int = 99999999
) -> list[dict]:
"""Fetch large ETH transfers in a block range."""
params = {
"module": "account",
"action": "txlist",
"startblock": start_block,
"endblock": end_block,
"sort": "desc",
"apikey": ETHERSCAN_KEY,
}
resp = await client.get(ETHERSCAN_BASE, params=params)
resp.raise_for_status()
data = resp.json()
if data["status"] != "1":
return []
txs = data["result"]
whale_txs = []
for tx in txs:
value_eth = int(tx["value"]) / 1e18
if value_eth < WHALE_THRESHOLD_ETH:
continue
sender = tx["from"].lower()
receiver = tx["to"].lower() if tx["to"] else ""
if sender in KNOWN_NON_WHALE or receiver in KNOWN_NON_WHALE:
continue
whale_txs.append({
"hash": tx["hash"],
"from": sender,
"to": receiver,
"value_eth": round(value_eth, 2),
"block": int(tx["blockNumber"]),
"timestamp": datetime.fromtimestamp(int(tx["timeStamp"])).isoformat(),
"direction": classify_direction(sender, receiver),
})
return whale_txs
def classify_direction(sender: str, receiver: str) -> str:
"""Classify whether this is an exchange inflow or outflow."""
EXCHANGE_ADDRESSES = {
"0x3f5ce5fbfe3e9af3971dd833d26ba9b5c936f0be",
"0x28c6c06298d514db089934071355e5743bf21d60",
}
if receiver in EXCHANGE_ADDRESSES:
return "exchange_inflow" # Potential sell signal
if sender in EXCHANGE_ADDRESSES:
return "exchange_outflow" # Potential buy/hold signal
return "wallet_to_wallet"
async def get_current_block(client: httpx.AsyncClient) -> int:
"""Get the latest block number."""
params = {"module": "proxy", "action": "eth_blockNumber", "apikey": ETHERSCAN_KEY}
resp = await client.get(ETHERSCAN_BASE, params=params)
return int(resp.json()["result"], 16)
async def whale_monitor_loop():
"""Continuously monitor for whale transactions."""
async with httpx.AsyncClient(timeout=30) as client:
last_block = await get_current_block(client) - 100
print(f"Starting whale monitor from block {last_block}")
while True:
current_block = await get_current_block(client)
if current_block <= last_block:
await asyncio.sleep(15)
continue
txs = await fetch_large_transactions(client, last_block + 1, current_block)
for tx in txs:
signal = "BEARISH" if tx["direction"] == "exchange_inflow" else "BULLISH"
print(
f"[WHALE] {tx['value_eth']} ETH | {tx['direction']} | "
f"{signal} | tx: {tx['hash'][:10]}... | {tx['timestamp']}"
)
last_block = current_block
await asyncio.sleep(30) # Poll every 30 seconds
if __name__ == "__main__":
asyncio.run(whale_monitor_loop())
Exchange Inflow/Outflow Analysis
Exchange reserve data is one of the most reliable macro signals in crypto. When coins flow into exchanges, holders are preparing to sell. When coins flow out, they're moving to self-custody — a sign of long-term conviction.
The Logic Behind Exchange Flows
- Increasing reserves + falling price — panic selling, potential capitulation bottom nearby
- Increasing reserves + rising price — distribution phase, whales selling into strength
- Decreasing reserves + falling price — accumulation, smart money buying the dip
- Decreasing reserves + rising price — supply squeeze, powerful bullish setup
The most bullish on-chain setup: exchange reserves at multi-year lows, MVRV below 1.0, and net exchange outflow for 30+ consecutive days. This combination has historically preceded the strongest rallies.
Querying Exchange Flows via Dune Analytics
Dune Analytics provides pre-built dashboards and an API for querying exchange flow data directly in Python:
import httpx
import time
from dataclasses import dataclass
DUNE_API_KEY = "pf_live_<your_dune_key>"
DUNE_BASE = "https://api.dune.com/api/v1"
@dataclass
class ExchangeFlowResult:
date: str
net_flow_btc: float
exchange_reserve_btc: float
signal: str # "accumulation" | "distribution" | "neutral"
async def query_exchange_flows(query_id: int, days: int = 30) -> list[ExchangeFlowResult]:
"""
Execute a Dune Analytics query for exchange flow data.
query_id: Your Dune query ID for exchange reserve tracking
"""
headers = {"X-Dune-API-Key": DUNE_API_KEY}
# Execute query
async with httpx.AsyncClient(timeout=60) as client:
exec_resp = await client.post(
f"{DUNE_BASE}/query/{query_id}/execute",
headers=headers,
json={"query_parameters": {"days": days}}
)
exec_data = exec_resp.json()
execution_id = exec_data["execution_id"]
# Poll for completion
for _ in range(30):
status_resp = await client.get(
f"{DUNE_BASE}/execution/{execution_id}/status",
headers=headers
)
status = status_resp.json()
if status["state"] == "QUERY_STATE_COMPLETED":
break
time.sleep(2)
# Fetch results
results_resp = await client.get(
f"{DUNE_BASE}/execution/{execution_id}/results",
headers=headers
)
rows = results_resp.json()["result"]["rows"]
results = []
for row in rows:
net_flow = row.get("net_flow_btc", 0)
reserve = row.get("total_reserve_btc", 0)
signal = "neutral"
if net_flow < -1000:
signal = "accumulation"
elif net_flow > 1000:
signal = "distribution"
results.append(ExchangeFlowResult(
date=row["date"],
net_flow_btc=net_flow,
exchange_reserve_btc=reserve,
signal=signal,
))
return results
MVRV Ratio
The Market Value to Realized Value (MVRV) ratio compares Bitcoin's market cap to its realized cap — the aggregate value of all coins at the price they last moved on-chain. It's the most robust on-chain valuation metric for cycle timing.
Interpreting MVRV
| MVRV Range | Market State | Signal | Historical Accuracy |
|---|---|---|---|
| < 1.0 | Below realized value | STRONG BUY | ~92% (2012–2025) |
| 1.0 – 2.0 | Fair value zone | NEUTRAL | N/A |
| 2.0 – 3.5 | Elevated premium | CAUTION | ~75% reduce exposure |
| > 3.5 | Historically overheated | SELL ZONE | ~87% preceded corrections |
import httpx
from functools import lru_cache
GLASSNODE_KEY = "pf_live_<your_glassnode_key>"
GLASSNODE_BASE = "https://api.glassnode.com/v1/metrics"
@lru_cache(maxsize=32)
async def get_mvrv_zscore(asset: str = "BTC") -> dict:
"""
Fetch MVRV Z-score from Glassnode.
Z-score > 7 = extreme greed; < 0 = undervalued.
"""
async with httpx.AsyncClient() as client:
resp = await client.get(
f"{GLASSNODE_BASE}/market/mvrv_z_score",
params={"a": asset, "api_key": GLASSNODE_KEY, "i": "24h"},
timeout=20,
)
resp.raise_for_status()
data = resp.json()
latest = data[-1]
z = latest["v"]
return {
"asset": asset,
"timestamp": latest["t"],
"mvrv_z_score": round(z, 3),
"signal": (
"strong_buy" if z < 0 else
"buy" if z < 2 else
"neutral" if z < 4 else
"sell" if z < 6 else
"strong_sell"
),
"cycle_position": (
"capitulation" if z < 0 else
"accumulation" if z < 2 else
"markup" if z < 5 else
"distribution"
),
}
HODL Waves
HODL waves visualize the age distribution of Bitcoin's UTXO set — how long coins have been unmoved. When old coins (1y+) represent a large percentage of supply, long-term holders are not selling. When they shrink rapidly, experienced holders are distributing into retail demand.
Reading HODL Wave Signals
- Short-term holder (STH) wave expanding — new money entering, often seen near tops
- Long-term holder (LTH) wave contracting — LTHs distributing to newcomers
- LTH wave expanding — experienced holders accumulating, bullish structural signal
- 1y+ bands growing during price decline — hodlers refusing to sell, supply shock building
import httpx
import pandas as pd
from io import StringIO
async def fetch_hodl_waves_data() -> pd.DataFrame:
"""
Fetch HODL wave data from Glassnode (CSV export endpoint).
Returns a DataFrame with UTXO age band percentages.
"""
headers = {"X-Glassnode-API-Key": f"pf_live_<your_glassnode_key>"}
age_bands = ["24h", "1d_1w", "1w_1m", "1m_3m", "3m_6m", "6m_12m", "1y_2y", "2y_3y", "3y_5y", "5y_plus"]
async with httpx.AsyncClient(timeout=30) as client:
frames = []
for band in age_bands:
resp = await client.get(
f"https://api.glassnode.com/v1/metrics/supply/hodl_waves",
params={
"api_key": "pf_live_<your_glassnode_key>",
"band": band,
"a": "BTC",
"i": "24h",
"f": "JSON"
}
)
data = resp.json()
df = pd.DataFrame(data, columns=["timestamp", band])
df["timestamp"] = pd.to_datetime(df["timestamp"], unit="s")
frames.append(df.set_index("timestamp"))
combined = pd.concat(frames, axis=1)
combined["lth_total"] = combined[["6m_12m", "1y_2y", "2y_3y", "3y_5y", "5y_plus"]].sum(axis=1)
combined["sth_total"] = combined[["24h", "1d_1w", "1w_1m", "1m_3m", "3m_6m"]].sum(axis=1)
combined["lth_signal"] = combined["lth_total"].diff().apply(
lambda x: "accumulating" if x > 0 else "distributing"
)
return combined
Spent Output Profit Ratio (SOPR)
SOPR measures whether coins moved on any given day were in profit or loss at the time they moved. A SOPR above 1.0 means movers were in profit; below 1.0 means they sold at a loss. It's one of the best behavioral finance metrics for on-chain analysis.
SOPR Signal Patterns
- SOPR bouncing off 1.0 from below — bullish, loss-takers exhausted
- SOPR rejected at 1.0 from above — bearish, breakeven sellers blocking recovery
- Sustained SOPR > 1.2 — distribution phase, profit-taking accelerating
- Long-term SOPR (LTH-SOPR) spike — experienced holders taking profits, near market tops
async def compute_sopr_signal(glassnode_key: str, lookback_days: int = 14) -> dict:
"""
Fetch SOPR and compute a short-term signal.
Returns a dict with current SOPR and trend signal.
"""
async with httpx.AsyncClient(timeout=30) as client:
resp = await client.get(
"https://api.glassnode.com/v1/metrics/indicators/sopr",
params={
"api_key": glassnode_key,
"a": "BTC",
"i": "24h",
}
)
data = resp.json()
recent = data[-lookback_days:]
values = [r["v"] for r in recent]
current_sopr = values[-1]
avg_sopr = sum(values) / len(values)
above_1 = sum(1 for v in values if v > 1.0)
signal = "neutral"
if current_sopr < 0.98 and above_1 < 3:
signal = "buy" # Loss capitulation
elif current_sopr > 1.1 and avg_sopr > 1.05:
signal = "reduce_exposure"
return {
"sopr_current": round(current_sopr, 4),
"sopr_14d_avg": round(avg_sopr, 4),
"days_above_1": above_1,
"signal": signal,
}
Realized Cap and Net Unrealized Profit/Loss
The Realized Cap values each UTXO at its last-moved price, giving a more stable estimate of aggregate cost basis than market cap. NUPL (Net Unrealized Profit/Loss) divides the difference between market cap and realized cap by market cap, producing a 0–1 sentiment indicator.
| NUPL Value | Sentiment Zone | Typical Market Phase |
|---|---|---|
| < 0 | Capitulation | Bear market bottom |
| 0 – 0.25 | Hope / Fear | Early recovery |
| 0.25 – 0.5 | Optimism | Mid bull market |
| 0.5 – 0.75 | Belief | Late bull market |
| > 0.75 | Euphoria | Cycle top region |
Building a Signal Aggregator
Individual on-chain metrics are noisy. The real power comes from combining them into a composite signal score. The following class aggregates MVRV Z-score, SOPR, exchange flows, and NUPL into a single directional bias score from -100 (strong bear) to +100 (strong bull):
import asyncio
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class OnChainSignal:
metric: str
raw_value: float
normalized_score: float # -1.0 to +1.0
weight: float
description: str
@dataclass
class AggregateSignal:
score: float # -100 to +100
bias: str # "strong_bull", "bull", "neutral", "bear", "strong_bear"
signals: list[OnChainSignal] = field(default_factory=list)
confidence: float = 0.0 # 0.0 to 1.0
class OnChainAggregator:
"""Aggregate multiple on-chain signals into a single trading bias."""
WEIGHTS = {
"mvrv_zscore": 0.30,
"sopr": 0.20,
"nupl": 0.20,
"exchange_flow": 0.15,
"lth_wave": 0.15,
}
def __init__(self, glassnode_key: str, etherscan_key: str, dune_key: str):
self.glassnode_key = glassnode_key
self.etherscan_key = etherscan_key
self.dune_key = dune_key
def _normalize_mvrv(self, z: float) -> float:
"""Map MVRV Z-score to [-1, 1]. Z=0 → bullish, Z=7 → very bearish."""
if z < 0: return 1.0
if z > 7: return -1.0
return 1.0 - (z / 3.5)
def _normalize_sopr(self, sopr: float) -> float:
"""Map SOPR to [-1, 1]. SOPR=0.95 → buy, SOPR=1.15 → sell."""
if sopr < 0.95: return 1.0
if sopr > 1.15: return -1.0
return 1.0 - ((sopr - 0.95) / 0.10)
def _normalize_nupl(self, nupl: float) -> float:
"""Map NUPL to [-1, 1]. <0 → buy, >0.75 → sell."""
if nupl < 0: return 1.0
if nupl > 0.75: return -1.0
return 1.0 - (nupl / 0.375)
def compute_aggregate(self, metrics: dict) -> AggregateSignal:
signals = []
normalizers = {
"mvrv_zscore": (self._normalize_mvrv, "MVRV Z-Score cycle valuation"),
"sopr": (self._normalize_sopr, "Spent Output Profit Ratio"),
"nupl": (self._normalize_nupl, "Net Unrealized Profit/Loss"),
"exchange_flow": (lambda x: -x, "Exchange net flow (neg = outflow)"),
"lth_wave": (lambda x: x, "LTH wave momentum"),
}
weighted_sum = 0.0
for key, (normalizer, desc) in normalizers.items():
if key not in metrics:
continue
raw = metrics[key]
norm = max(-1.0, min(1.0, normalizer(raw)))
weight = self.WEIGHTS.get(key, 0.1)
weighted_sum += norm * weight
signals.append(OnChainSignal(
metric=key,
raw_value=raw,
normalized_score=round(norm, 3),
weight=weight,
description=desc,
))
score = weighted_sum * 100
confidence = len(signals) / len(self.WEIGHTS)
bias = (
"strong_bull" if score > 60 else
"bull" if score > 20 else
"neutral" if score > -20 else
"bear" if score > -60 else
"strong_bear"
)
return AggregateSignal(
score=round(score, 1),
bias=bias,
signals=signals,
confidence=round(confidence, 2),
)
# Usage example:
async def get_trading_signal() -> AggregateSignal:
aggregator = OnChainAggregator(
glassnode_key="pf_live_<your_glassnode_key>",
etherscan_key="pf_live_<your_etherscan_key>",
dune_key="pf_live_<your_dune_key>",
)
# In production, fetch these concurrently from Glassnode/Dune
metrics = {
"mvrv_zscore": 2.4,
"sopr": 1.03,
"nupl": 0.41,
"exchange_flow": -0.3, # Normalized net flow
"lth_wave": 0.2, # LTH momentum
}
return aggregator.compute_aggregate(metrics)
Purple Flea Trading API Integration
Purple Flea's Trading API accepts on-chain signals as supplementary context for order execution. You can annotate your orders with signal scores to enable position sizing rules — for example, only deploy full position size when the on-chain aggregate score exceeds +40.
import httpx
PF_API_BASE = "https://api.purpleflea.com/v1"
PF_API_KEY = "pf_live_<your_purple_flea_key>"
async def execute_signal_weighted_trade(
asset: str,
direction: str, # "long" | "short"
base_size_usd: float,
on_chain_score: float, # -100 to +100
) -> dict:
"""
Submit a trade with on-chain signal-weighted position sizing.
Score multiplier: 0.25x at score=20, 1.0x at score=80+.
"""
if direction == "long" and on_chain_score < 20:
return {"skipped": True, "reason": "on_chain_score below long threshold"}
if direction == "short" and on_chain_score > -20:
return {"skipped": True, "reason": "on_chain_score above short threshold"}
size_multiplier = min(1.0, max(0.25, abs(on_chain_score) / 80))
adjusted_size = base_size_usd * size_multiplier
payload = {
"asset": asset,
"direction": direction,
"size_usd": round(adjusted_size, 2),
"metadata": {
"on_chain_score": on_chain_score,
"size_multiplier": size_multiplier,
"source": "on_chain_aggregator_v1",
}
}
async with httpx.AsyncClient() as client:
resp = await client.post(
f"{PF_API_BASE}/trading/orders",
json=payload,
headers={"Authorization": f"Bearer {PF_API_KEY}"},
timeout=15,
)
resp.raise_for_status()
return resp.json()
Glassnode's free tier is limited to 1 API call per minute and only provides daily data. For real-time on-chain monitoring, use the Professional tier or combine Etherscan (free, up to 5 calls/sec) with Dune Analytics for aggregated metrics.
Full On-Chain Data Pipeline
Here is a production-ready pipeline that fetches all signals concurrently, aggregates them, and emits a directional bias with confidence score every 5 minutes:
import asyncio
import httpx
from datetime import datetime
async def run_onchain_pipeline(api_keys: dict) -> None:
"""
Main loop: fetch on-chain data concurrently, aggregate, and print signal.
Run every 5 minutes — most on-chain metrics update every 24h, but whale
transactions can arrive at any time.
"""
aggregator = OnChainAggregator(
glassnode_key=api_keys["glassnode"],
etherscan_key=api_keys["etherscan"],
dune_key=api_keys["dune"],
)
while True:
try:
# Fetch all metrics concurrently
mvrv_task = asyncio.create_task(
get_mvrv_zscore("BTC")
)
sopr_task = asyncio.create_task(
compute_sopr_signal(api_keys["glassnode"])
)
whale_task = asyncio.create_task(
fetch_large_transactions(
httpx.AsyncClient(),
start_block=await get_current_block(httpx.AsyncClient()) - 50
)
)
mvrv_result, sopr_result, whale_txs = await asyncio.gather(
mvrv_task, sopr_task, whale_task
)
# Compute aggregate signal
metrics = {
"mvrv_zscore": mvrv_result["mvrv_z_score"],
"sopr": sopr_result["sopr_current"],
}
signal = aggregator.compute_aggregate(metrics)
print(f"\n[{datetime.utcnow().isoformat()}] ON-CHAIN SIGNAL")
print(f" Score: {signal.score:+.1f} | Bias: {signal.bias.upper()}")
print(f" Confidence: {signal.confidence:.0%}")
for s in signal.signals:
print(f" {s.metric}: {s.raw_value:.3f} → normalized {s.normalized_score:+.2f}")
if whale_txs:
print(f" Whale alerts: {len(whale_txs)} transactions flagged")
except Exception as e:
print(f"[ERROR] Pipeline failed: {e}")
await asyncio.sleep(300) # 5 minute interval
if __name__ == "__main__":
keys = {
"glassnode": "pf_live_<your_glassnode_key>",
"etherscan": "pf_live_<your_etherscan_key>",
"dune": "pf_live_<your_dune_key>",
}
asyncio.run(run_onchain_pipeline(keys))
Summary
On-chain analytics gives AI trading agents access to a layer of market intelligence unavailable to traditional finance participants. The key metrics — MVRV, SOPR, exchange flows, HODL waves, and realized cap — each capture a distinct behavioral dimension of the market.
- Use MVRV Z-Score for macro cycle positioning
- Use SOPR for short-term behavioral signals
- Use exchange flows for supply pressure context
- Use HODL waves for structural supply dynamics
- Use the composite aggregator for final position sizing decisions
Paired with Purple Flea's trading API, these signals can power an agent that adapts position sizes dynamically based on on-chain conviction — running lean in distribution phases and scaling in aggressively near capitulation bottoms.
Start Trading with On-Chain Signals
Connect your on-chain analytics pipeline to Purple Flea's trading infrastructure in minutes.
Get Trading API Key →