Oracle Integration for AI Agents: Chainlink, Pyth, and Real-Time Price Feeds
The integrity of every on-chain trade, liquidation, and settlement depends on a single question: where does the price come from? Blockchain oracles are the answer — but integrating them correctly is one of the most overlooked challenges in AI agent development. This guide covers everything from first principles to production-grade oracle monitoring.
What Are Blockchain Oracles?
Smart contracts are deterministic programs that run in an isolated computational environment — they cannot natively access data outside the blockchain. An AI agent executing a strategy on-chain faces a fundamental problem: how does a smart contract know the current price of ETH, BTC, or a perpetual futures contract?
Oracles are the answer. They are systems that fetch external data (prices, weather, sports scores, interest rates) and submit it to the blockchain in a form smart contracts can read. For financial AI agents, price oracles are the most critical infrastructure component — more important than execution routing or gas optimization.
There are two fundamental oracle architectures AI agents must understand:
- Push oracles — A network of nodes periodically pushes price updates on-chain. Data is always available but may be slightly stale. Chainlink Data Feeds use this model.
- Pull oracles — Price data is published off-chain in a signed format. Users (or agents) pull the data and submit it on-chain when needed. Pyth Network uses this model. More real-time but requires an extra transaction step.
Off-chain nodes ──────→ Aggregate on-chain → Smart contract reads price
Update frequency: Every N minutes OR when price moves > threshold
PULL MODEL (Pyth)
Price publishers ──→ Wormhole/Pythnet ──→ Agent fetches signed VAA ──→ Agent submits update + reads price
Update frequency: <400ms (near real-time)
Chainlink Data Feeds — The Industry Standard
Chainlink is the dominant oracle provider by market share, securing over $50 billion in DeFi value as of 2026. Its Data Feeds are available on 15+ EVM-compatible blockchains and cover hundreds of price pairs.
How Chainlink Works
Each Chainlink price feed is a smart contract (an AggregatorV3Interface) that
aggregates responses from a decentralized network of independent node operators. The nodes
independently fetch prices from multiple premium data providers (Kaiko, Brave New Coin,
CoinGecko Pro, etc.), submit them on-chain, and an on-chain median calculation produces
the final price.
Key parameters for every Chainlink feed:
| Parameter | Description | Typical Value (ETH/USD) |
|---|---|---|
| Heartbeat | Maximum time between updates regardless of price change | 3600 seconds (1 hour) |
| Deviation Threshold | Price must change by this % before triggering an update | 0.5% |
| Decimals | Number of decimal places in the price answer | 8 |
| Round ID | Monotonically increasing round counter for each update | Increments per update |
| Updated At | Unix timestamp of last price update | Within last heartbeat |
Reading Chainlink Feeds in Python
"""
Chainlink Data Feed reader for AI agents
Reads ETH/USD price from Ethereum mainnet via web3.py
"""
from web3 import Web3
from datetime import datetime, timedelta
import json
# Connect to Ethereum (use your own RPC)
w3 = Web3(Web3.HTTPProvider("https://eth.llamarpc.com"))
# AggregatorV3Interface ABI (minimal)
AGGREGATOR_ABI = json.loads("""[
{
"inputs": [],
"name": "latestRoundData",
"outputs": [
{"name": "roundId", "type": "uint80"},
{"name": "answer", "type": "int256"},
{"name": "startedAt", "type": "uint256"},
{"name": "updatedAt", "type": "uint256"},
{"name": "answeredInRound", "type": "uint80"}
],
"stateMutability": "view",
"type": "function"
},
{
"inputs": [],
"name": "decimals",
"outputs": [{"name": "", "type": "uint8"}],
"stateMutability": "view",
"type": "function"
},
{
"inputs": [],
"name": "description",
"outputs": [{"name": "", "type": "string"}],
"stateMutability": "view",
"type": "function"
}
]""")
# Mainnet feed addresses
CHAINLINK_FEEDS = {
"ETH/USD": "0x5f4eC3Df9cbd43714FE2740f5E3616155c5b8419",
"BTC/USD": "0xF4030086522a5bEEa4988F8cA5B36dbC97BeE88c",
"SOL/USD": "0x4ffC43a60e009B551865A93d232E33Fce9f01507",
"LINK/USD": "0x2c1d072e956AFFC0D435Cb7AC38EF18d24d9127c",
"MATIC/USD": "0x7bAC85A8a13A4BcD8abb3eB7d6b4d632c1a52a2a",
}
class ChainlinkReader:
def __init__(self, max_staleness_seconds: int = 3600):
self.contracts = {}
self.max_staleness = max_staleness_seconds
for pair, addr in CHAINLINK_FEEDS.items():
self.contracts[pair] = w3.eth.contract(
address=Web3.to_checksum_address(addr),
abi=AGGREGATOR_ABI
)
def get_price(self, pair: str) -> dict:
"""
Fetch latest price with staleness check.
Raises ValueError if feed is stale or answer is negative.
"""
if pair not in self.contracts:
raise ValueError(f"Unknown pair: {pair}")
contract = self.contracts[pair]
decimals = contract.functions.decimals().call()
round_id, answer, started_at, updated_at, answered_in_round = \
contract.functions.latestRoundData().call()
# Staleness check
age_seconds = datetime.now().timestamp() - updated_at
if age_seconds > self.max_staleness:
raise ValueError(
f"{pair} feed is stale: last updated {age_seconds:.0f}s ago "
f"(max: {self.max_staleness}s)"
)
# Answer validity check
if answer <= 0:
raise ValueError(f"{pair} feed returned invalid answer: {answer}")
# Round completeness check (answeredInRound should == roundId)
if answered_in_round < round_id:
raise ValueError(f"{pair}: answeredInRound ({answered_in_round}) < roundId ({round_id})")
price = answer / (10 ** decimals)
return {
"pair": pair,
"price": price,
"round_id": round_id,
"updated_at": datetime.fromtimestamp(updated_at).isoformat(),
"age_seconds": round(age_seconds, 1),
"decimals": decimals
}
def get_all_prices(self) -> dict:
results = {}
for pair in CHAINLINK_FEEDS:
try:
results[pair] = self.get_price(pair)
except ValueError as e:
results[pair] = {"error": str(e)}
return results
# Usage
reader = ChainlinkReader(max_staleness_seconds=3600)
eth_data = reader.get_price("ETH/USD")
print(f"ETH/USD: ${eth_data['price']:,.2f} (age: {eth_data['age_seconds']}s)")
Pyth Network — Ultra-Low Latency for Crypto
While Chainlink excels at securing high-value DeFi protocols with battle-tested infrastructure, Pyth Network was designed for a different problem: delivering price data at frequencies suitable for high-frequency trading and real-time agent decision-making.
Pyth sources prices from over 90 first-party publishers — exchanges, market makers, and trading firms that submit their own proprietary data. This includes major institutions like Jump Crypto, Jane Street, and CBOE. Prices update every 400 milliseconds on Pythnet (Pyth's appchain), then bridge to 50+ target blockchains via Wormhole.
Pyth's Pull Model in Practice
The pull architecture means agents must fetch a signed price attestation (a VAA — Verified Action Approval) from Pyth's Hermes HTTP service and submit it on-chain before using the price. This adds a small but important step to agent execution loops:
"""
Pyth Network pull oracle integration
Fetches fresh price attestations and reads on-chain
"""
import httpx
import asyncio
from web3 import Web3
from decimal import Decimal
# Hermes endpoint for fetching latest price attestations
HERMES_BASE = "https://hermes.pyth.network"
# Pyth price feed IDs (unique per asset)
PYTH_FEED_IDS = {
"BTC/USD": "0xe62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415b43",
"ETH/USD": "0xff61491a931112ddf1bd8147cd1b641375f79f5825126d665480874634fd0ace",
"SOL/USD": "0xef0d8b6fda2ceba41da15d4095d1da392a0d2f8ed0c6c7bc0f4cfac8c280b56d",
"BNB/USD": "0x2f95862b045670cd22bee3114c39763a4a08beeb663b145d283c31d7d1101c4f",
"MATIC/USD": "0x5de33a9112c2b700b8d30b8a3402c103578ccfa2765696471cc672bd5cf6ac52",
}
class PythReader:
def __init__(self, max_age_ms: int = 5000):
self.max_age_ms = max_age_ms # Reject prices older than 5 seconds
async def fetch_latest_price(self, asset: str) -> dict:
"""Fetch latest price from Pyth Hermes API."""
feed_id = PYTH_FEED_IDS.get(asset)
if not feed_id:
raise ValueError(f"Unknown Pyth asset: {asset}")
async with httpx.AsyncClient() as client:
resp = await client.get(
f"{HERMES_BASE}/v2/updates/price/latest",
params={"ids[]": feed_id},
timeout=5.0
)
resp.raise_for_status()
data = resp.json()
parsed = data["parsed"][0]
price_data = parsed["price"]
conf_data = parsed["conf"]
publish_time = parsed["metadata"]["publish_time"]
# Check age
import time
age_ms = (time.time() - publish_time) * 1000
if age_ms > self.max_age_ms:
raise ValueError(f"{asset} price is {age_ms:.0f}ms old (max: {self.max_age_ms}ms)")
# Convert from fixed-point representation
exponent = price_data["expo"]
price = Decimal(price_data["price"]) * Decimal(10) ** exponent
confidence = Decimal(conf_data["conf"]) * Decimal(10) ** exponent
return {
"asset": asset,
"price": float(price),
"confidence": float(confidence),
"confidence_pct": float(confidence / price * 100),
"publish_time": publish_time,
"age_ms": round(age_ms, 1),
"vaa": data["binary"]["data"][0] # For on-chain submission
}
async def fetch_multiple(self, assets: list) -> dict:
tasks = [self.fetch_latest_price(a) for a in assets]
results = await asyncio.gather(*tasks, return_exceptions=True)
return {
asset: result if not isinstance(result, Exception) else {"error": str(result)}
for asset, result in zip(assets, results)
}
# Usage
async def main():
pyth = PythReader(max_age_ms=3000)
prices = await pyth.fetch_multiple(["BTC/USD", "ETH/USD", "SOL/USD"])
for asset, data in prices.items():
if "error" not in data:
print(f"{asset}: ${data['price']:,.2f} ±{data['confidence_pct']:.3f}% (age: {data['age_ms']}ms)")
asyncio.run(main())
Using Pyth's Confidence Interval
A unique feature of Pyth is its confidence interval — an estimate of price uncertainty derived from spread across publishers. During normal markets, confidence on ETH/USD might be $0.50 (0.01% of price). During volatile periods, it can widen to $50+.
Agents should use confidence intervals as a signal filter:
- If
confidence_pct > 0.5%: market is uncertain — reduce position sizing or pause trading - If
confidence_pct > 2.0%: severe uncertainty — implement emergency procedures - Normal trading condition:
confidence_pct < 0.1%
TWAP vs Spot Pricing
A critical decision for any oracle-integrated agent is whether to use spot prices (current price at time of query) or TWAP prices (Time-Weighted Average Price over a recent window, typically 5-30 minutes).
Comparison
| Characteristic | Spot Price | TWAP (e.g., 30-min) |
|---|---|---|
| Latency | Real-time (seconds) | Lags by window duration |
| Manipulation resistance | Low — single block attack possible | High — requires sustained manipulation |
| Flash crash response | Immediate (good for protective stops) | Delayed (misses early warning) |
| Best for | Order execution, entry/exit signals | Collateral valuation, liquidation triggers |
| Volatility sensitivity | High | Smoothed |
The industry best practice, used by Aave, Compound, and Uniswap v3, is a dual-oracle approach: use the spot price for execution but require the TWAP to be within a tolerance band before proceeding. If the spot price diverges from the TWAP by more than 2-5%, halt the operation — something unusual is happening.
def check_price_sanity(
spot_price: float,
twap_price: float,
max_deviation_pct: float = 2.0,
chainlink_price: float = None,
max_chainlink_deviation_pct: float = 5.0
) -> dict:
"""
Dual-oracle sanity check before executing any trade.
Returns: {valid: bool, reason: str, deviation_pct: float}
"""
deviation_pct = abs(spot_price - twap_price) / twap_price * 100
if deviation_pct > max_deviation_pct:
return {
"valid": False,
"reason": f"Spot-TWAP deviation {deviation_pct:.2f}% exceeds {max_deviation_pct}% threshold",
"deviation_pct": deviation_pct
}
if chainlink_price is not None:
cl_deviation = abs(spot_price - chainlink_price) / chainlink_price * 100
if cl_deviation > max_chainlink_deviation_pct:
return {
"valid": False,
"reason": f"Spot-Chainlink deviation {cl_deviation:.2f}% exceeds {max_chainlink_deviation_pct}% threshold",
"deviation_pct": cl_deviation
}
return {"valid": True, "reason": "OK", "deviation_pct": deviation_pct}
# Example: Pyth spot vs Uniswap TWAP vs Chainlink
result = check_price_sanity(
spot_price=3402.15, # Pyth real-time
twap_price=3396.80, # Uniswap v3 30-min TWAP
max_deviation_pct=2.0,
chainlink_price=3398.00, # Chainlink ETH/USD
max_chainlink_deviation_pct=5.0
)
print(result) # {'valid': True, 'reason': 'OK', 'deviation_pct': 0.157}
Oracle Manipulation Attacks
Oracle manipulation has been responsible for some of the largest DeFi exploits in history. Understanding attack vectors is essential for building agents that don't get rekt.
Flash Loan + Spot Oracle Attack
The classic oracle attack pattern:
- Attacker takes a flash loan of $100M in asset X
- Dumps it on a thin DEX pool, crashing the spot price
- Protocol's oracle reads the manipulated spot price
- Attacker liquidates victim positions or borrows against inflated collateral
- Repays flash loan in same transaction; profit
Mango Markets (Oct 2022): $114M. Attacker manipulated MNGO spot price on thin
markets, borrowed against inflated collateral. Used a push oracle that read DEX prices directly.
CREAM Finance (2021): Multiple exploits totaling $200M+. Flash loan + price
manipulation of illiquid tokens. Protocol used single-source price feeds.
Lesson: Never use a spot price from a single low-liquidity DEX pool for risk calculations.
Price Feed Sandwich Attacks
A subtler attack on push oracles: an adversary monitors the mempool for upcoming price updates and front-runs them. If Chainlink is about to push an ETH price from $3,400 to $3,500, an attacker can:
- Front-run: open a large long position before the update goes on-chain
- Let the oracle update execute (now price shows $3,500)
- Back-run: close the position at the new, higher price
Agents should randomize execution timing and avoid predictable patterns around known heartbeat intervals (e.g., avoid always trading on the hour for hourly-heartbeat feeds).
Agent Safeguards: Deviation Thresholds and Circuit Breakers
Deviation Thresholds
A deviation threshold is a maximum acceptable difference between two price sources before an agent halts. Every production agent should implement at minimum:
- Cross-oracle deviation: |Chainlink - Pyth| / Chainlink < 1% for normal operations
- Spot-TWAP deviation: |spot - TWAP| / TWAP < 2% for DeFi interactions
- Feed staleness: reject any price older than 2× the advertised heartbeat
- Confidence interval: reject Pyth prices with confidence > 0.5% of price
Circuit Breaker Implementation
"""
Production-grade circuit breaker for oracle-dependent AI agents.
Implements exponential backoff and multi-source validation.
"""
import asyncio
import time
from enum import Enum
from collections import deque
from dataclasses import dataclass, field
from typing import Optional
class CircuitState(Enum):
CLOSED = "CLOSED" # Normal: all trading allowed
OPEN = "OPEN" # Triggered: all trading halted
HALF_OPEN = "HALF_OPEN" # Recovery: test trades only
@dataclass
class OracleCircuitBreaker:
# Configuration
max_cross_deviation_pct: float = 1.0 # Chainlink vs Pyth
max_twap_deviation_pct: float = 2.0 # Spot vs TWAP
max_staleness_seconds: float = 120.0
max_confidence_pct: float = 0.5
failure_threshold: int = 3 # Failures before tripping
recovery_timeout: float = 300.0 # Seconds in OPEN before half-open
max_price_velocity_pct_per_min: float = 5.0 # 5% in 60s = flash crash alert
# State
state: CircuitState = CircuitState.CLOSED
failure_count: int = 0
last_failure_time: float = 0.0
trip_time: float = 0.0
price_history: deque = field(default_factory=lambda: deque(maxlen=60))
def record_price(self, price: float):
"""Track price history for velocity calculation."""
self.price_history.append((time.time(), price))
def check_velocity(self) -> Optional[str]:
"""Detect abnormal price velocity (flash crash signal)."""
if len(self.price_history) < 2:
return None
oldest_ts, oldest_price = self.price_history[0]
latest_ts, latest_price = self.price_history[-1]
time_delta_min = (latest_ts - oldest_ts) / 60
if time_delta_min < 0.01:
return None
pct_change = abs(latest_price - oldest_price) / oldest_price * 100
velocity = pct_change / time_delta_min
if velocity > self.max_price_velocity_pct_per_min:
return f"Price velocity {velocity:.1f}%/min exceeds {self.max_price_velocity_pct_per_min}%/min"
return None
def validate(
self,
spot_price: float,
pyth_price: Optional[float] = None,
chainlink_price: Optional[float] = None,
twap_price: Optional[float] = None,
pyth_confidence_pct: Optional[float] = None,
oracle_age_seconds: Optional[float] = None
) -> tuple[bool, str]:
"""
Full oracle validation. Returns (allowed, reason).
Updates circuit breaker state based on results.
"""
# Check state machine
if self.state == CircuitState.OPEN:
recovery_elapsed = time.time() - self.trip_time
if recovery_elapsed >= self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
print("[CIRCUIT] Entering HALF_OPEN recovery mode")
else:
return False, f"Circuit OPEN: {self.recovery_timeout - recovery_elapsed:.0f}s to recovery"
failures = []
# 1. Staleness check
if oracle_age_seconds is not None and oracle_age_seconds > self.max_staleness_seconds:
failures.append(f"Feed stale: {oracle_age_seconds:.0f}s")
# 2. Cross-oracle deviation
if pyth_price and chainlink_price:
dev = abs(pyth_price - chainlink_price) / chainlink_price * 100
if dev > self.max_cross_deviation_pct:
failures.append(f"Cross-oracle deviation {dev:.2f}%")
# 3. TWAP deviation
if twap_price:
dev = abs(spot_price - twap_price) / twap_price * 100
if dev > self.max_twap_deviation_pct:
failures.append(f"Spot-TWAP deviation {dev:.2f}%")
# 4. Pyth confidence
if pyth_confidence_pct is not None and pyth_confidence_pct > self.max_confidence_pct:
failures.append(f"Pyth confidence too wide: {pyth_confidence_pct:.3f}%")
# 5. Velocity
self.record_price(spot_price)
vel_err = self.check_velocity()
if vel_err:
failures.append(vel_err)
if failures:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
self.trip_time = time.time()
print(f"[CIRCUIT] TRIPPED after {self.failure_count} failures: {failures}")
return False, "; ".join(failures)
else:
if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.CLOSED
self.failure_count = 0
print("[CIRCUIT] CLOSED — oracle signals healthy")
elif time.time() - self.last_failure_time > 60:
self.failure_count = max(0, self.failure_count - 1) # Decay failures
return True, "OK"
Purple Flea Trading API vs Oracle Feeds
A natural question for agents using Purple Flea's Trading API is: do I need to run my own oracle integration, or does Purple Flea handle pricing for me?
The answer depends on what you're doing:
| Use Case | Use Purple Flea API | Use External Oracle |
|---|---|---|
| Placing orders on 275 perp markets | Yes — Purple Flea provides mark prices | Optional for cross-reference |
| Valuing collateral held in wallet | Partial (Purple Flea wallet API returns balances) | Yes — use Chainlink/Pyth for USD valuation |
| Cross-exchange arbitrage signals | No | Yes — need independent price source |
| Liquidation monitoring of own positions | Yes — mark price from Purple Flea is authoritative | No |
| On-chain DeFi protocol interactions | No | Yes — must submit oracle VAA |
Purple Flea's Hyperliquid-backed trading engine uses its own mark price derived from a weighted average of major centralized exchanges — functionally similar to an oracle but not readable by external smart contracts. For pure trading on Purple Flea's 275 perpetual markets, the built-in pricing is sufficient and you do not need an external oracle integration.
Run a lightweight Chainlink or Pyth reader alongside your Purple Flea trading agent as an independent sanity check. If the Purple Flea mark price deviates more than 0.5% from your oracle reference, flag the situation and pause automated order submission until the discrepancy resolves. This protects against exchange-side anomalies that your agent might otherwise trade into.
Multi-Oracle Architecture for Production Agents
Production-grade agents should never rely on a single oracle. A robust multi-oracle stack:
- Primary: Pyth Network (highest frequency, <400ms)
- Secondary: Chainlink (established, battle-tested, heartbeat reference)
- Tertiary: Uniswap v3 TWAP (fully decentralized, manipulation-resistant)
- Fallback: CoinGecko/CoinMarketCap REST API (CEX aggregate, off-chain)
class MultiOraclePriceEngine:
"""
Aggregates multiple oracle sources into a single trusted price.
Uses a weighted median approach to resist outliers.
"""
def __init__(self):
self.pyth = PythReader(max_age_ms=3000)
self.chainlink = ChainlinkReader(max_staleness_seconds=3600)
self.circuit_breaker = OracleCircuitBreaker()
async def get_trusted_price(self, asset: str) -> dict:
"""
Returns a consensus price from multiple sources.
Raises if circuit breaker is tripped or consensus fails.
"""
prices = {}
errors = {}
# Fetch all sources concurrently
cl_pair = f"{asset.split('/')[0]}/USD"
pyth_task = asyncio.create_task(self.pyth.fetch_latest_price(asset))
# Chainlink is sync; run in executor
loop = asyncio.get_event_loop()
cl_task = loop.run_in_executor(None, self.chainlink.get_price, cl_pair)
pyth_result = await asyncio.gather(pyth_task, return_exceptions=True)
cl_result = await asyncio.gather(cl_task, return_exceptions=True)
if not isinstance(pyth_result[0], Exception):
prices["pyth"] = pyth_result[0]["price"]
if not isinstance(cl_result[0], Exception):
prices["chainlink"] = cl_result[0]["price"]
if len(prices) == 0:
raise RuntimeError("All oracle sources failed")
# Consensus: weighted median
price_values = list(prices.values())
consensus = sorted(price_values)[len(price_values) // 2]
# Circuit breaker validation
allowed, reason = self.circuit_breaker.validate(
spot_price=prices.get("pyth", consensus),
pyth_price=prices.get("pyth"),
chainlink_price=prices.get("chainlink"),
)
if not allowed:
raise RuntimeError(f"Circuit breaker blocked: {reason}")
return {
"asset": asset,
"consensus_price": consensus,
"sources": prices,
"source_count": len(prices),
"circuit_state": self.circuit_breaker.state.value
}
Oracle Monitoring Agent
Beyond reading oracles for trade execution, agents should run a dedicated monitoring service that alerts on anomalies. This is especially important for agents running unattended:
#!/usr/bin/env python3
"""
Oracle health monitoring daemon.
Runs every 60 seconds, logs deviations, sends alerts on anomalies.
"""
import asyncio
import httpx
from datetime import datetime
ALERT_WEBHOOK = "https://your-alert-endpoint.com/webhook"
MONITOR_PAIRS = ["BTC/USD", "ETH/USD", "SOL/USD"]
async def send_alert(message: str):
async with httpx.AsyncClient() as client:
await client.post(ALERT_WEBHOOK, json={"text": message}, timeout=5.0)
async def monitor_loop():
engine = MultiOraclePriceEngine()
alert_cooldowns = {}
while True:
for pair in MONITOR_PAIRS:
try:
result = await engine.get_trusted_price(pair)
sources = result["sources"]
if len(sources) > 1:
vals = list(sources.values())
spread_pct = (max(vals) - min(vals)) / min(vals) * 100
print(f"[{datetime.now().strftime('%H:%M:%S')}] {pair}: ${result['consensus_price']:,.2f} "
f"| spread: {spread_pct:.3f}% | state: {result['circuit_state']}")
if spread_pct > 0.5:
cooldown_key = f"{pair}_spread"
last_alert = alert_cooldowns.get(cooldown_key, 0)
if asyncio.get_event_loop().time() - last_alert > 300:
await send_alert(
f"ORACLE ALERT: {pair} cross-oracle spread {spread_pct:.3f}% "
f"| Chainlink: {sources.get('chainlink', 'N/A')} "
f"| Pyth: {sources.get('pyth', 'N/A')}"
)
alert_cooldowns[cooldown_key] = asyncio.get_event_loop().time()
except RuntimeError as e:
print(f"ERROR for {pair}: {e}")
await send_alert(f"ORACLE CRITICAL: {pair} — {e}")
await asyncio.sleep(60)
if __name__ == "__main__":
asyncio.run(monitor_loop())
Oracle Gas Cost Optimization
On Ethereum mainnet, updating a Pyth price feed costs approximately 50,000-80,000 gas per update (~$2-8 at typical gas prices). For high-frequency agents making hundreds of trades per day, this adds up quickly. Optimization strategies:
- Batch price updates: Update multiple price feeds in a single transaction using Pyth's
updatePriceFeeds()with an array of VAAs - Deploy on L2: Arbitrum and Optimism both support Chainlink and Pyth with gas costs 10-50x lower than mainnet
- Cache aggressively: Check if the on-chain price is already fresh enough before submitting a redundant update
- Use Purple Flea's off-chain engine: For pure trading, avoid on-chain oracle interactions entirely by using Purple Flea's REST API which handles pricing internally
Conclusion
Oracle integration is the foundation that determines whether your AI agent trades on truth or on lies. The core principles:
- Never trust a single source: Run at least two independent oracles and validate they agree
- Implement staleness checks: An old price is often worse than no price
- Use circuit breakers: When oracles disagree, halt — don't guess
- Respect confidence intervals: Pyth's confidence window tells you how much the market disagrees with itself
- TWAP for risk, spot for execution: Use the appropriate price type for each decision
- Monitor continuously: Oracle health should be a first-class operational metric
For agents trading on Purple Flea's 275 perpetual markets, the built-in mark price engine provides a strong foundation — but layering in Chainlink or Pyth as an independent validation step creates defense-in-depth against adversarial conditions. Register your agent and start building with confidence.
Access 275 perpetual markets with Hyperliquid-grade infrastructure. New agents can claim free funds from the Purple Flea Faucet to test oracle-aware strategies without risk. Register here.