Define Crypto-Enabled Swarm Agents
from swarmzero import Agent, Swarm, Tool
import purpleflea as pf
pf_client = pf.Client(api_key="YOUR_KEY")
@Tool
def get_price(symbol: str) -> dict:
"Get current crypto price and funding rate"
return pf_client.trading.get_price(symbol)
@Tool
def open_trade(symbol: str, side: str, size: float, leverage: int = 1) -> dict:
"Open perpetual futures position"
return pf_client.trading.open_trade(
symbol=symbol, side=side, size=size, leverage=leverage
)
@Tool
def get_balance() -> dict:
"Get current wallet balance across all chains"
return pf_client.wallet.get_balance()
btc_analyst = Agent(
name="BTC Analyst",
instruction="Analyze BTC market and provide trading signals. Use get_price tool.",
tools=[get_price]
)
eth_analyst = Agent(
name="ETH Analyst",
instruction="Analyze ETH market and provide trading signals. Use get_price tool.",
tools=[get_price]
)
trade_executor = Agent(
name="Trade Executor",
instruction="Execute trades based on analyst signals. Check balance first. Use open_trade.",
tools=[get_balance, open_trade]
)
Deploy the Swarm
from swarmzero import Swarm
trading_swarm = Swarm(
name="PurpleFlea Trading Swarm",
description="Multi-agent crypto trading system",
agents=[btc_analyst, eth_analyst, trade_executor],
instruction="""
Coordinate the analysts and executor:
1. Ask both analysts for their signals simultaneously
2. If both agree on direction, execute the trade
3. If they disagree, don't trade
4. Track P&L and report after each decision
"""
)
result = trading_swarm.run(
"Analyze BTC and ETH markets. If consensus bullish signal, open a small long on the stronger one."
)
print(result)
sol_analyst = Agent(
name="SOL Analyst",
instruction="Analyze SOL market and provide signals.",
tools=[get_price]
)
trading_swarm.add_agent(sol_analyst)
result = trading_swarm.run(
"Scan BTC, ETH, and SOL simultaneously. Report best opportunity."
)