Introduction
Markets never sleep. Bitcoin trades at 3 a.m. on a Tuesday, ETH perpetuals gap up on a Sunday morning, and macro events hit without warning at any hour. Human traders can't maintain 24/7 vigilance — but software can. A trading agent runs indefinitely, reacting to signals in seconds, executing orders with zero hesitation, and logging every decision without fatigue or emotion clouding the process.
Beyond the obvious uptime advantage, algorithmic agents enforce consistency. A human trader might override their own stop-loss in a moment of hope; an agent won't. The rules you write are the rules that run — every time, without exception. That discipline is worth more than any edge a discretionary trader thinks they have.
This tutorial builds a fully autonomous EMA crossover trading agent from scratch using Python and the Purple Flea Trading API. By the end you'll have a working agent that monitors BTC-PERP, identifies trend signals, manages a leveraged position, and enforces hard risk limits — all without touching a UI.
What We're Building
The agent we're building does the following on a one-minute loop:
- Fetches the last 100 one-minute candles for BTC-PERP from the Purple Flea Trading API
- Calculates a fast EMA-20 and a slow EMA-50 over closing prices
- Opens a long position when EMA-20 crosses above EMA-50 (golden cross)
- Opens a short position when EMA-20 crosses below EMA-50 (death cross)
- Applies 5x leverage with a hard 2% stop-loss on each position
- Closes any open position when the reverse signal fires
- Enforces a daily loss limit and a maximum drawdown halt
EMA crossover is deliberately simple — it's not the point. The point is the scaffolding: authenticated API calls, clean position management, layered risk controls, and a resilient main loop you can drop any strategy into.
Prerequisites
You'll need Python 3.9 or later, a Purple Flea API key (register free at wallet.purpleflea.com), and the following packages:
pip install purpleflea pandas numpy
Set your API key as an environment variable before running anything:
PURPLEFLEA_API_KEY=pf_sk_your_key_here
Step 1: Setup and Authentication
Start by initializing the Purple Flea client and confirming connectivity. A failed health check at startup is far better than a silent failure during a live trade.
import os
import time
import logging
import requests
API_KEY = os.environ["PURPLEFLEA_API_KEY"]
BASE_URL = "https://api.purpleflea.com"
HEADERS = {"Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json"}
def check_health():
r = requests.get(f"{BASE_URL}/health", headers=HEADERS, timeout=10)
r.raise_for_status()
data = r.json()
assert data["status"] == "ok", f"Unhealthy API: {data}"
logging.info("API health check passed. Latency: %sms", data.get("latency_ms"))
check_health()
If check_health() raises, your API key is wrong, the network is down, or the service is degraded. Fix it before going further — never skip this step before starting a live loop.
Step 2: Fetch Market Data
The GET /trading/candles endpoint returns OHLCV data for any of the 275 perpetual markets. We'll request 100 one-minute candles — enough history for a stable EMA-50.
import pandas as pd
def get_price_history(symbol: str, periods: int = 100) -> pd.Series:
"""Return closing prices as a pandas Series, newest last."""
r = requests.get(
f"{BASE_URL}/trading/candles",
headers=HEADERS,
params={
"symbol": symbol,
"interval": "1m",
"limit": periods,
},
timeout=10,
)
r.raise_for_status()
candles = r.json()["candles"]
closes = [float(c["close"]) for c in candles]
return pd.Series(closes)
Step 3: Calculate EMAs
Pandas makes EMA calculation a one-liner. We compute both the fast (20-period) and slow (50-period) exponential moving averages and return only the most recent value of each, since that's all the signal logic needs.
def calculate_emas(prices: pd.Series) -> tuple[float, float]:
"""Return (ema_fast, ema_slow) — most recent values only."""
ema_fast = prices.ewm(span=20, adjust=False).mean().iloc[-1]
ema_slow = prices.ewm(span=50, adjust=False).mean().iloc[-1]
return float(ema_fast), float(ema_slow)
def calculate_previous_emas(prices: pd.Series) -> tuple[float, float]:
"""Return EMAs from one bar ago — needed to detect the crossover."""
ema_fast = prices.ewm(span=20, adjust=False).mean().iloc[-2]
ema_slow = prices.ewm(span=50, adjust=False).mean().iloc[-2]
return float(ema_fast), float(ema_slow)
Step 4: Signal Logic
A crossover signal requires comparing the current bar's EMA relationship against the previous bar's relationship. If EMA-20 was below EMA-50 one bar ago and is above EMA-50 now, a golden cross just occurred. The reverse is a death cross. If neither condition is true, the signal is "hold".
def get_signal(prices: pd.Series) -> str:
"""Return 'long', 'short', or 'hold'."""
fast_now, slow_now = calculate_emas(prices)
fast_prev, slow_prev = calculate_previous_emas(prices)
# Golden cross: fast crossed above slow
if fast_prev < slow_prev and fast_now > slow_now:
logging.info("SIGNAL: long (EMA20=%.2f, EMA50=%.2f)", fast_now, slow_now)
return "long"
# Death cross: fast crossed below slow
if fast_prev > slow_prev and fast_now < slow_now:
logging.info("SIGNAL: short (EMA20=%.2f, EMA50=%.2f)", fast_now, slow_now)
return "short"
return "hold"
Step 5: Position Management
The PositionManager class wraps the Purple Flea REST endpoints for opening and closing positions. It tracks the current position side and entry price so the rest of the agent can query state without extra API calls.
class PositionManager:
def __init__(self, symbol: str, leverage: int = 5):
self.symbol = symbol
self.leverage = leverage
self.current_side = None # "long" | "short" | None
self.entry_price = None
self.position_id = None
def open_long(self, size_usd: float):
return self._open("long", size_usd)
def open_short(self, size_usd: float):
return self._open("short", size_usd)
def _open(self, side: str, size_usd: float):
r = requests.post(
f"{BASE_URL}/trading/positions",
headers=HEADERS,
json={
"symbol": self.symbol,
"side": side,
"size_usd": size_usd,
"leverage": self.leverage,
"order_type": "market",
},
timeout=15,
)
r.raise_for_status()
result = r.json()
self.current_side = side
self.entry_price = float(result["fill_price"])
self.position_id = result["position_id"]
logging.info("Opened %s at %.2f (id=%s)", side, self.entry_price, self.position_id)
return result
def close_position(self):
if not self.position_id:
return
r = requests.delete(
f"{BASE_URL}/trading/positions/{self.position_id}",
headers=HEADERS,
timeout=15,
)
r.raise_for_status()
logging.info("Closed position %s", self.position_id)
self.current_side = None
self.entry_price = None
self.position_id = None
Step 6: Risk Controls
Risk management is not optional. Without it, a single bad trade or API anomaly can wipe the account. The RiskGuard class enforces four independent safeguards that run before every order.
class RiskGuard:
MAX_POSITION_PCT = 0.02 # 2% of portfolio per trade
STOP_LOSS_PCT = 0.02 # exit if unrealised loss exceeds 2%
MAX_DRAWDOWN_PCT = 0.20 # halt all trading if portfolio down 20%
DAILY_LOSS_LIMIT = 0.05 # halt if daily P&L down 5%
def __init__(self, starting_equity: float):
self.starting_equity = starting_equity
self.peak_equity = starting_equity
self.daily_start_equity = starting_equity
self.halted = False
def position_size_usd(self, equity: float) -> float:
"""Return maximum order size in USD."""
return equity * self.MAX_POSITION_PCT
def check_stop_loss(self, entry: float, current: float, side: str) -> bool:
"""Return True if stop-loss should trigger."""
if side == "long":
return (entry - current) / entry >= self.STOP_LOSS_PCT
else:
return (current - entry) / entry >= self.STOP_LOSS_PCT
def update_and_check(self, current_equity: float) -> bool:
"""Update equity state. Return True if trading should halt."""
if self.halted:
return True
self.peak_equity = max(self.peak_equity, current_equity)
drawdown = (self.peak_equity - current_equity) / self.peak_equity
daily_loss = (self.daily_start_equity - current_equity) / self.daily_start_equity
if drawdown >= self.MAX_DRAWDOWN_PCT or daily_loss >= self.DAILY_LOSS_LIMIT:
logging.critical("RISK HALT: drawdown=%.1f%% daily_loss=%.1f%%",
drawdown * 100, daily_loss * 100)
self.halted = True
return True
return False
Leverage warning: This agent uses 5x leverage. A 20% adverse move against a 5x position results in a 100% loss of margin. Always test with minimal size and paper-trade before deploying real capital. Never risk more than you can afford to lose.
Step 7: The Main Loop
The main loop ties everything together. It runs every 60 seconds: fetch prices, compute signal, consult risk guard, execute if appropriate, then sleep. Any exception is caught and logged — the loop continues rather than crashing.
import asyncio
SYMBOL = "BTC-PERP"
LOOP_INTERVAL = 60 # seconds
async def main():
logging.basicConfig(level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s")
check_health()
pm = PositionManager(SYMBOL, leverage=5)
# Fetch starting equity from wallet
equity_resp = requests.get(f"{BASE_URL}/wallet/balance", headers=HEADERS)
equity = float(equity_resp.json()["total_usd"])
rg = RiskGuard(starting_equity=equity)
logging.info("Agent started. Symbol=%s Equity=%.2f", SYMBOL, equity)
while True:
try:
# 1. Refresh equity
equity_resp = requests.get(f"{BASE_URL}/wallet/balance", headers=HEADERS)
equity = float(equity_resp.json()["total_usd"])
# 2. Check drawdown / daily loss halt
if rg.update_and_check(equity):
logging.warning("Trading halted by RiskGuard. Closing any open position.")
pm.close_position()
break
# 3. Check stop-loss on open position
if pm.current_side and pm.entry_price:
prices = get_price_history(SYMBOL, 1)
current_price = prices.iloc[-1]
if rg.check_stop_loss(pm.entry_price, current_price, pm.current_side):
logging.warning("Stop-loss triggered at %.2f", current_price)
pm.close_position()
# 4. Compute signal
prices = get_price_history(SYMBOL, 100)
signal = get_signal(prices)
# 5. Execute
if signal == "long" and pm.current_side != "long":
pm.close_position()
pm.open_long(rg.position_size_usd(equity))
elif signal == "short" and pm.current_side != "short":
pm.close_position()
pm.open_short(rg.position_size_usd(equity))
except Exception as e:
logging.error("Loop error: %s", e, exc_info=True)
await asyncio.sleep(LOOP_INTERVAL)
if __name__ == "__main__":
asyncio.run(main())
Step 8: Logging and Monitoring
Structured JSON logging makes it trivial to pipe output into log aggregators like Datadog, Loki, or CloudWatch. Add a webhook notification whenever a fill occurs so you're alerted on your phone even when the agent is running unattended.
import json, logging, os
from datetime import datetime, timezone
class JsonFormatter(logging.Formatter):
def format(self, record):
payload = {
"ts": datetime.now(timezone.utc).isoformat(),
"level": record.levelname,
"msg": record.getMessage(),
}
return json.dumps(payload)
def setup_logging(log_file: str = "agent.log"):
handler = logging.FileHandler(log_file)
handler.setFormatter(JsonFormatter())
logging.getLogger().addHandler(handler)
logging.getLogger().setLevel(logging.INFO)
WEBHOOK_URL = os.environ.get("ALERT_WEBHOOK_URL")
def notify_fill(side: str, price: float, size_usd: float):
if not WEBHOOK_URL:
return
import requests
requests.post(WEBHOOK_URL, json={
"text": f"Fill: {side.upper()} ${size_usd:.0f} @ ${price:,.2f}"
}, timeout=5)
Deployment
For production deployment, run the agent as a managed process with PM2 or systemd so it restarts automatically on crash or reboot.
module.exports = {
apps: [{
name: "trading-agent",
script: "python",
args: "agent.py",
autorestart: true,
watch: false,
env: {
PURPLEFLEA_API_KEY: process.env.PURPLEFLEA_API_KEY,
ALERT_WEBHOOK_URL: process.env.ALERT_WEBHOOK_URL,
}
}]
}
Start with pm2 start ecosystem.config.cjs and monitor with pm2 logs trading-agent. You can also poll open positions directly via GET /trading/positions to confirm the agent's state from any HTTP client.
Results and Backtesting
Before running this agent with real funds, backtest it against historical data. Past performance is never indicative of future results, but backtesting will reveal edge cases in your signal logic, help you calibrate position sizes, and expose bugs in the risk guard that you don't want to find live.
Purple Flea provides historical candle data via the same endpoint with a historical flag:
r = requests.get(
f"{BASE_URL}/trading/candles",
headers=HEADERS,
params={
"symbol": "BTC-PERP",
"interval": "1m",
"limit": 10000,
"historical": "true",
"start": "2026-01-01T00:00:00Z",
}
)
historical_candles = r.json()["candles"]
Run your signal logic over the historical candles in a simulation loop. Track each hypothetical trade's entry/exit price, P&L, and drawdown. Only deploy live when the strategy shows a positive expectation across at least 200 trades in the backtest.
Conclusion
You've built a complete end-to-end autonomous trading agent: authenticated API connectivity, real-time market data, EMA crossover signal logic, 5x leveraged position management, and four independent risk controls — all wrapped in a resilient 60-second async loop with structured logging and webhook alerting.
The architecture is strategy-agnostic. Swap out the EMA logic for RSI divergence, funding rate arbitrage, order book imbalance, or an LLM-generated signal — the position management and risk layers stay exactly the same. That separation of concerns is what makes the agent maintainable as complexity grows.
The full Trading API reference documents every available endpoint including limit orders, take-profit targets, and funding rate queries. Ready to start? Register your account and get your API key in under a minute.