CrewAI Perpetual Futures Trading: Build a Multi-Agent Trading System
Single-agent trading bots have a fundamental limitation: one agent cannot simultaneously analyze the market, assess risk, and execute orders with the discipline each task requires. CrewAI solves this by letting you define specialized agents with distinct roles, each doing one thing well, coordinating through a shared crew.
In this tutorial, you will build a three-agent perpetual futures trading system using CrewAI and the Purple Flea Trading API. By the end, you will have a production-ready crew with an Analyst agent reading market signals, a Risk Manager agent sizing positions, and an Execution agent placing and managing orders.
Architecture Overview
The crew follows a sequential task pipeline with a feedback loop:
Market Analyst Agent
Reads price data, funding rates, open interest, and sentiment signals from Purple Flea's market data endpoints. Produces a structured signal: direction (long/short/neutral), confidence (0 to 1), and key levels.
Risk Manager Agent
Takes the analyst's signal plus current portfolio state. Calculates position size using Kelly Criterion or fixed-fraction, checks drawdown limits, validates margin requirements, and outputs an approved trade spec or a rejection.
Execution Agent
Receives the approved trade spec and submits orders to the Purple Flea Trading API. Manages entry (limit vs. market), sets stop-loss and take-profit levels, and monitors the position until closure.
Installation
pip install crewai crewai-tools requests python-dotenv
# .env
OPENAI_API_KEY=sk-...
PURPLE_FLEA_API_KEY=pf_live_YOUR_KEY
PURPLE_FLEA_BASE=https://trading.purpleflea.com/api
Step 1: Define Purple Flea Trading Tools
CrewAI agents use tools to interact with external systems. We define tools for fetching market data and submitting orders.
from crewai.tools import BaseTool
from pydantic import BaseModel, Field
import requests
import os
BASE = os.getenv("PURPLE_FLEA_BASE")
KEY = os.getenv("PURPLE_FLEA_API_KEY")
HEADERS = {"Authorization": f"Bearer {KEY}"}
class MarketDataInput(BaseModel):
symbol: str = Field(..., description="Trading pair, e.g. BTC-USD-PERP")
timeframe: str = Field(default="1h", description="Candle timeframe: 1m 5m 15m 1h 4h 1d")
class GetMarketDataTool(BaseTool):
name: str = "get_market_data"
description: str = "Fetch OHLCV candles, funding rate, and open interest for a perpetual market."
args_schema: type[BaseModel] = MarketDataInput
def _run(self, symbol: str, timeframe: str = "1h") -> str:
r = requests.get(
f"{BASE}/markets/{symbol}/candles",
params={"timeframe": timeframe, "limit": 100},
headers=HEADERS
)
data = r.json()
candles = data["candles"][-10:]
funding = data.get("funding_rate", 0)
oi = data.get("open_interest", 0)
latest = candles[-1]
return (
f"Symbol: {symbol}\n"
f"Latest close: {latest['close']}\n"
f"24h change: {data.get('change_24h', 'N/A')}%\n"
f"Funding rate: {funding:.4f}% per 8h\n"
f"Open interest: ${oi:,.0f}\n"
f"Last 10 closes: {[c['close'] for c in candles]}\n"
)
class PortfolioInput(BaseModel):
pass
class GetPortfolioTool(BaseTool):
name: str = "get_portfolio"
description: str = "Get current positions, available margin, and unrealized PnL."
args_schema: type[BaseModel] = PortfolioInput
def _run(self) -> str:
r = requests.get(f"{BASE}/portfolio", headers=HEADERS)
p = r.json()
positions = p.get("positions", [])
pos_summary = "\n".join(
f" {pos['symbol']}: {pos['side']} {pos['size']} @ {pos['entry_price']} (PnL: {pos['unrealized_pnl']:+.2f})"
for pos in positions
) or " No open positions"
return (
f"Available margin: ${p.get('available_margin', 0):,.2f}\n"
f"Total equity: ${p.get('total_equity', 0):,.2f}\n"
f"Current drawdown: {p.get('drawdown_pct', 0):.1f}%\n"
f"Open positions:\n{pos_summary}\n"
)
class OrderInput(BaseModel):
symbol: str
side: str = Field(..., description="'long' or 'short'")
size_usd: float = Field(..., description="Position size in USD")
leverage: float = Field(default=3.0)
order_type: str = Field(default="market")
stop_loss_pct: float = Field(default=2.0)
take_profit_pct: float = Field(default=6.0)
class PlaceOrderTool(BaseTool):
name: str = "place_order"
description: str = "Place a perpetual futures order with stop-loss and take-profit."
args_schema: type[BaseModel] = OrderInput
def _run(self, symbol, side, size_usd, leverage=3.0,
order_type="market", stop_loss_pct=2.0, take_profit_pct=6.0) -> str:
payload = {
"symbol": symbol, "side": side,
"size_usd": size_usd, "leverage": leverage,
"type": order_type,
"stop_loss_pct": stop_loss_pct,
"take_profit_pct": take_profit_pct
}
r = requests.post(f"{BASE}/orders", json=payload, headers=HEADERS)
order = r.json()
return f"Order placed: {order.get('order_id')} | Status: {order.get('status')} | Fill: {order.get('fill_price', 'pending')}"
Step 2: Define the Three Agents
from crewai import Agent
market_data_tool = GetMarketDataTool()
portfolio_tool = GetPortfolioTool()
order_tool = PlaceOrderTool()
# Agent 1: Market Analyst
analyst = Agent(
role="Crypto Market Analyst",
goal="Analyze perpetual futures market conditions and generate high-confidence directional signals.",
backstory="""You are a quantitative analyst specializing in perpetual futures markets.
You look for confluence between price action, funding rates, and open interest trends.
You only generate signals when multiple indicators align. Confidence is expressed as
a number from 0 to 1 and you always cite your reasoning.""",
tools=[market_data_tool],
verbose=True,
llm="gpt-4o",
max_iter=5
)
# Agent 2: Risk Manager
risk_manager = Agent(
role="Risk Manager",
goal="Evaluate trade signals against portfolio risk limits. Approve or reject with specific position sizing.",
backstory="""You are a disciplined risk manager. Core rules:
- Never risk more than 1% of total equity on a single trade
- Reject if drawdown exceeds 10%
- Maximum leverage: 5x
- Require analyst confidence >= 0.6 to approve
- Size positions using 25% Kelly Criterion
You always return APPROVE or REJECT with position size if approved.""",
tools=[portfolio_tool],
verbose=True,
llm="gpt-4o",
max_iter=3
)
# Agent 3: Trade Executor
executor = Agent(
role="Trade Executor",
goal="Execute approved trades via the Purple Flea Trading API with appropriate risk controls.",
backstory="""You are a precise trade executor. You receive approved trade specs and
execute them faithfully. Stop-loss is set at 2% from entry, take-profit at 6%
for a 3:1 risk/reward ratio. You always confirm the order ID and fill price.""",
tools=[order_tool],
verbose=True,
llm="gpt-4o",
max_iter=3
)
Step 3: Define Tasks
from crewai import Task
analyze_task = Task(
description="""Analyze the BTC-USD-PERP perpetual futures market.
1. Fetch the last 100 hourly candles
2. Identify the current trend (higher highs/lows or lower highs/lows)
3. Note the funding rate — high positive funding suggests overleveraged longs
4. Check open interest direction (rising OI with rising price = healthy trend)
5. Generate a signal: LONG, SHORT, or NEUTRAL
6. State confidence (0.0 to 1.0) and three bullet-point reasons
Output format:
SIGNAL: [LONG|SHORT|NEUTRAL]
CONFIDENCE: [0.0-1.0]
REASONS:
- reason 1
- reason 2
- reason 3
KEY_LEVELS: support=[X], resistance=[Y]""",
expected_output="A structured signal with direction, confidence score, and supporting reasons.",
agent=analyst
)
risk_task = Task(
description="""Review the analyst signal and current portfolio state.
1. Fetch current portfolio (equity, margin, drawdown, open positions)
2. If analyst confidence < 0.6, REJECT with reason
3. If current drawdown > 10%, REJECT with reason
4. If position already open in BTC-USD-PERP, REJECT to avoid doubling
5. Calculate position size:
equity * 0.01 / 0.02 * confidence * 0.25
(1% equity risk / 2% stop / 25% Kelly scaling)
6. Cap at $10,000 per position
7. Specify leverage (max 5x)
Output format:
DECISION: [APPROVE|REJECT]
REASON: [one sentence]
SIZE_USD: [amount, only if APPROVE]
LEVERAGE: [1-5, only if APPROVE]""",
expected_output="APPROVE or REJECT decision with position sizing if approved.",
agent=risk_manager,
context=[analyze_task]
)
execute_task = Task(
description="""Execute the trade if the risk manager approved.
1. Check risk manager's DECISION
2. If REJECT, report the rejection reason and stop
3. If APPROVE, place the order:
- Symbol: BTC-USD-PERP
- Side: match analyst's SIGNAL direction
- Size and leverage from risk manager output
- Stop-loss: 2% from entry
- Take-profit: 6% from entry (3:1 R/R)
- Order type: market
4. Report order ID, fill price, stop-loss level, take-profit level
Output format:
STATUS: [EXECUTED|REJECTED|ERROR]
ORDER_ID: [id or N/A]
FILL_PRICE: [price or N/A]
STOP_LOSS: [price]
TAKE_PROFIT: [price]
NOTE: [any relevant details]""",
expected_output="Execution report with order details or rejection notice.",
agent=executor,
context=[analyze_task, risk_task]
)
Step 4: Assemble and Run the Crew
from crewai import Crew, Process
trading_crew = Crew(
agents=[analyst, risk_manager, executor],
tasks=[analyze_task, risk_task, execute_task],
process=Process.sequential, # analyst -> risk manager -> executor
verbose=True
)
if __name__ == "__main__":
result = trading_crew.kickoff()
print("\n=== CREW RESULT ===")
print(result.raw)
Adding a Scheduler for Continuous Operation
To run the crew on a regular cadence — every 4 hours to align with major session opens — wrap the kickoff in a scheduler:
import schedule
import time
import logging
from datetime import datetime
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("trading-crew")
def run_trading_cycle():
logger.info(f"Starting cycle at {datetime.utcnow().isoformat()}")
try:
result = trading_crew.kickoff()
logger.info(f"Cycle complete: {result.raw[:200]}")
except Exception as e:
logger.error(f"Cycle failed: {e}")
schedule.every(4).hours.do(run_trading_cycle)
run_trading_cycle() # run immediately on startup
while True:
schedule.run_pending()
time.sleep(60)
Risk Parameters Reference
| Parameter | Recommended Value | Rationale |
|---|---|---|
| Max risk per trade | 1% of equity | Allows 10 losses before hitting 10% drawdown |
| Stop-loss distance | 2% | Outside normal hourly BTC volatility |
| Take-profit distance | 6% | 3:1 minimum reward-to-risk ratio |
| Max leverage | 5x | Liquidation at 20% — well beyond typical stops |
| Max drawdown to halt | 10% | Pause trading and review at this level |
| Min analyst confidence | 0.60 | Filters out low-conviction signals |
| Kelly fraction | 25% | Fractional Kelly substantially reduces variance |
| Max position size | $10,000 | Liquidity ceiling for most markets |
Position Monitoring via API
After the crew places a trade, monitor it independently. The Purple Flea Trading API supports REST polling and webhooks:
# List open positions
curl https://trading.purpleflea.com/api/positions \
-H 'Authorization: Bearer pf_live_YOUR_KEY'
# Close a specific position
curl -X DELETE https://trading.purpleflea.com/api/positions/BTC-USD-PERP \
-H 'Authorization: Bearer pf_live_YOUR_KEY'
# Register a webhook for position events
curl -X POST https://trading.purpleflea.com/api/webhooks \
-H 'Authorization: Bearer pf_live_YOUR_KEY' \
-H 'Content-Type: application/json' \
-d '{"url":"https://your-agent.com/hooks","events":["position.opened","position.closed","position.stopped"]}'
Extending the Crew
Once the core three-agent crew is running reliably, common extensions include:
- Sentiment agent: Pulls social and on-chain flow data to feed additional context into the analyst's signal generation.
- Portfolio rebalancer: Periodically reviews all open positions and suggests trimming or adding based on updated signals.
- Reporting agent: Generates daily P&L summaries and publishes them to Telegram or a dashboard.
- Multi-asset analyst: Runs the analyst across ETH, SOL, and BTC in parallel using CrewAI's async task execution.
CrewAI version note: This tutorial targets CrewAI 0.80+. The BaseTool pattern shown here is stable across 0.70+. Earlier versions use slightly different tool registration syntax.
Start Building Your Trading Crew
Access Purple Flea's perpetual futures markets with real-time data, REST API, and WebSocket feeds. Designed for autonomous agents.
Open Trading API Read the Docs