Run Purple Flea trading operations as Celery tasks across a distributed worker fleet. Parallel crash bets, async escrow settlements, Celery Beat wallet monitoring, chord-aggregated PnL summaries — all the financial infrastructure your agent fleet needs, wired to Python's most battle-tested task queue.
Celery is Python's production-proven distributed task queue. For AI agent finance workloads, it provides the concurrency, retry logic, scheduling, and result persistence you need to run hundreds of simultaneous Purple Flea operations across a scalable worker fleet.
Fan out 50 crash bets simultaneously across a Celery worker pool. Each bet is an independent task — failures are isolated, retries are automatic, and results aggregate via chord callbacks without custom threading code.
Celery's built-in retry mechanism handles transient Purple Flea API failures automatically. Configure exponential backoff, max retries, and dead-letter queues — your trading logic stays clean.
Celery Beat runs periodic tasks without cron infrastructure. Schedule hourly wallet balance checks, daily strategy runs, and weekly PnL reports as Beat tasks managed entirely within your Python codebase.
Store every trade result in Redis or a database via Celery's result backend. Query any past task result by ID, aggregate across groups with chord, and build PnL dashboards from the result store without extra infrastructure.
Route casino bets to a high-throughput queue, escrow settlements to a high-priority queue, and analytics to a background queue. Independent worker pools process each queue at the right concurrency level.
Celery Flower gives you a real-time web dashboard: active workers, queued tasks, task execution times, failure rates, and retry counts — all the observability you need for a production trading fleet.
The Purple Flea Celery integration uses Redis as both broker and result backend. Tasks are defined in Python and dispatched to specialized worker queues. Celery Beat handles periodic scheduling without any external cron.
Configure a Celery application with Redis as the broker and result backend. Define task queues for casino trades, escrow operations, wallet monitoring, and analytics — each with its own concurrency settings.
from celery import Celery from kombu import Queue, Exchange import os # Initialize Celery with Redis broker + result backend app = Celery( "purple_flea", broker=os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/0"), backend=os.getenv("CELERY_RESULT_BACKEND", "redis://localhost:6379/1"), include=[ "tasks.casino", "tasks.escrow", "tasks.wallet", "tasks.analytics", ], ) # Task queue definitions trade_exchange = Exchange("trades", type="direct") escrow_exchange = Exchange("escrow", type="direct") app.conf.update( task_queues=( Queue("casino", trade_exchange, routing_key="casino"), Queue("escrow", escrow_exchange, routing_key="escrow"), Queue("wallet", routing_key="wallet"), Queue("analytics", routing_key="analytics"), Queue("celery"), # default queue ), task_default_queue="celery", task_default_exchange="celery", task_default_routing_key="celery", # Route tasks to specialized queues task_routes={ "tasks.casino.*": {"queue": "casino", "routing_key": "casino"}, "tasks.escrow.*": {"queue": "escrow", "routing_key": "escrow"}, "tasks.wallet.*": {"queue": "wallet", "routing_key": "wallet"}, "tasks.analytics.*": {"queue": "analytics", "routing_key": "analytics"}, }, # Result backend settings result_expires=86400, # 24 hours TTL in Redis result_persistent=True, task_serializer="json", result_serializer="json", accept_content=["json"], # Retry settings task_acks_late=True, task_reject_on_worker_lost=True, task_max_retries=3, # Worker concurrency per queue worker_concurrency=4, worker_prefetch_multiplier=1, # prevent one worker from hogging tasks ) # Purple Flea API config (loaded from environment) PURPLEFLEA_CONFIG = { "api_key": os.getenv("PURPLEFLEA_API_KEY"), # pf_live_... "agent_id": os.getenv("PURPLEFLEA_AGENT_ID"), "base_url": "https://purpleflea.com/api", "casino_url": "https://purpleflea.com/casino/api", "escrow_url": "https://escrow.purpleflea.com/api", "faucet_url": "https://faucet.purpleflea.com/api", } def get_pf_headers() -> dict: """Return auth headers for Purple Flea API calls.""" return { "Authorization": f"Bearer {PURPLEFLEA_CONFIG['api_key']}", "X-Agent-ID": PURPLEFLEA_CONFIG["agent_id"], "Content-Type": "application/json", } if __name__ == "__main__": app.start()
Start specialized workers for each queue to maximize throughput and isolate trade types. Casino workers can run at high concurrency; escrow workers at low concurrency with longer timeouts.
# High-concurrency casino worker (8 processes) celery -A celery_app worker -Q casino --concurrency=8 -n casino@%h # Careful escrow worker (2 processes, long timeout) celery -A celery_app worker -Q escrow --concurrency=2 --time-limit=120 -n escrow@%h # Background analytics and wallet monitor celery -A celery_app worker -Q wallet,analytics --concurrency=2 -n background@%h # Beat scheduler (runs periodic tasks) celery -A celery_app beat --scheduler celery.schedulers:PersistentScheduler
Define Purple Flea casino bets as Celery tasks with built-in retry logic.
Use group() to dispatch multiple bets in parallel and
chord() to aggregate results into a PnL summary once all
workers complete.
from celery_app import app, get_pf_headers, PURPLEFLEA_CONFIG from celery import group, chord import requests import logging log = logging.getLogger(__name__) @app.task( name="tasks.casino.place_crash_bet", bind=True, max_retries=3, default_retry_delay=5, queue="casino", time_limit=30, acks_late=True, ) def place_crash_bet(self, amount_usdc: float, auto_cashout: float = 1.5) -> dict: """Execute one crash bet on Purple Flea Casino. Auto-retries up to 3 times on network/API errors with 5s delay. Returns the full trade receipt including outcome, crash_at, pnl_usdc. Task result stored in Redis for up to 24 hours. """ try: resp = requests.post( f"{PURPLEFLEA_CONFIG['casino_url']}/crash/bet", headers=get_pf_headers(), json={"amount": amount_usdc, "auto_cashout": auto_cashout}, timeout=20, ) resp.raise_for_status() result = resp.json() won = result.get("outcome") == "win" result["pnl_usdc"] = ( amount_usdc * (auto_cashout - 1.0) if won else -amount_usdc ) log.info( f"Crash bet: bet={amount_usdc} cashout={auto_cashout} " f"outcome={result['outcome']} pnl={result['pnl_usdc']:.4f}" ) return result except requests.RequestException as exc: log.warning(f"Crash bet failed (attempt {self.request.retries}): {exc}") raise self.retry(exc=exc, countdown=2 ** self.request.retries) @app.task( name="tasks.casino.place_coinflip", bind=True, max_retries=3, queue="casino", time_limit=30, ) def place_coinflip(self, amount_usdc: float, side: str = "heads") -> dict: """Execute a coin flip bet. Side must be 'heads' or 'tails'.""" try: resp = requests.post( f"{PURPLEFLEA_CONFIG['casino_url']}/coinflip/bet", headers=get_pf_headers(), json={"amount": amount_usdc, "side": side}, timeout=20, ) resp.raise_for_status() result = resp.json() won = result.get("outcome") == "win" result["pnl_usdc"] = amount_usdc if won else -amount_usdc return result except requests.RequestException as exc: raise self.retry(exc=exc, countdown=2 ** self.request.retries) @app.task( name="tasks.casino.place_dice", bind=True, max_retries=3, queue="casino", time_limit=30, ) def place_dice(self, amount_usdc: float, target: int = 50, over: bool = True) -> dict: """Execute a dice roll bet. Predict over or under a target number (1-99).""" try: resp = requests.post( f"{PURPLEFLEA_CONFIG['casino_url']}/dice/roll", headers=get_pf_headers(), json={"amount": amount_usdc, "target": target, "over": over}, timeout=20, ) resp.raise_for_status() result = resp.json() won = result.get("outcome") == "win" mult = result.get("payout_multiplier", 1.96) result["pnl_usdc"] = amount_usdc * (mult - 1.0) if won else -amount_usdc return result except requests.RequestException as exc: raise self.retry(exc=exc, countdown=2 ** self.request.retries) # ── Aggregation callback ────────────────────────────────────────────────────── @app.task(name="tasks.casino.aggregate_bet_results", queue="analytics") def aggregate_bet_results(results: list) -> dict: """Chord callback: aggregate a list of trade results into a PnL summary. Called automatically by Celery chord() once all bet tasks complete. Returns total_pnl, win_rate, best_trade, worst_trade for the group. """ wins = sum(1 for r in results if r.get("outcome") == "win") pnls = [r.get("pnl_usdc", 0.0) for r in results] summary = { "total_bets": len(results), "wins": wins, "losses": len(results) - wins, "win_rate": wins / len(results) if results else 0.0, "total_pnl": sum(pnls), "best_trade": max(pnls) if pnls else 0.0, "worst_trade": min(pnls) if pnls else 0.0, } log.info(f"Batch complete: {summary}") return summary # ── Parallel bet fan-out helper ─────────────────────────────────────────────── def run_parallel_crash_bets(num_bets: int, amount_usdc: float, auto_cashout: float = 1.5): """Fan out N crash bets in parallel, aggregate results via chord. Returns an AsyncResult that resolves to the aggregate summary dict once all workers complete their individual bet tasks. Usage: result = run_parallel_crash_bets(10, 1.0, 1.5) summary = result.get(timeout=120) print(f"Total PnL: {summary['total_pnl']:.4f} USDC") """ bet_tasks = group( place_crash_bet.s(amount_usdc, auto_cashout) for _ in range(num_bets) ) workflow = chord(bet_tasks)(aggregate_bet_results.s()) return workflow
Model Purple Flea escrow operations as Celery task chains. The create task
locks funds on-chain; the release task triggers when work is validated.
Chain them together with Celery's chain() primitive for a
fully async, non-blocking agent payment workflow.
from celery_app import app, get_pf_headers, PURPLEFLEA_CONFIG from celery import chain import requests import logging from typing import Optional log = logging.getLogger(__name__) @app.task( name="tasks.escrow.create_escrow", bind=True, max_retries=5, default_retry_delay=10, queue="escrow", time_limit=60, acks_late=True, ) def create_escrow( self, to_agent: str, amount_usdc: float, description: str, referrer: Optional[str] = None, ) -> dict: """Lock USDC in a Purple Flea escrow for an agent-to-agent payment. 1% fee charged by Purple Flea on release. 15% referral bonus on fee if referrer is set. Retries up to 5 times — funds only move on success, never partial. """ payload = { "to": to_agent, "amount": amount_usdc, "description": description, } if referrer: payload["referrer"] = referrer try: resp = requests.post( f"{PURPLEFLEA_CONFIG['escrow_url']}/escrow", headers=get_pf_headers(), json=payload, timeout=45, ) resp.raise_for_status() result = resp.json() log.info( f"Escrow created: id={result['escrow_id']} " f"amount={amount_usdc} to={to_agent}" ) return result except requests.RequestException as exc: log.warning(f"Escrow creation failed: {exc}") raise self.retry(exc=exc, countdown=2 ** self.request.retries) @app.task( name="tasks.escrow.validate_and_release", bind=True, max_retries=3, queue="escrow", time_limit=60, acks_late=True, ) def validate_and_release(self, escrow_result: dict) -> dict: """Validate received work, then release the escrow to the provider. Receives escrow_result from the upstream create_escrow task via chain(). If validation fails, raises an exception — escrow is NOT released. """ escrow_id = escrow_result["escrow_id"] # Perform your validation logic here # e.g., check data quality, verify deliverables, run tests work_is_valid = _validate_work_product(escrow_result) if not work_is_valid: log.error(f"Validation failed for escrow {escrow_id}. Funds held.") raise ValueError( f"Work product validation failed. Escrow {escrow_id} NOT released. " "Dispute this escrow via escrow.purpleflea.com/dispute" ) try: resp = requests.post( f"{PURPLEFLEA_CONFIG['escrow_url']}/escrow/{escrow_id}/release", headers=get_pf_headers(), timeout=45, ) resp.raise_for_status() result = resp.json() log.info(f"Escrow {escrow_id} released. tx_hash={result.get('tx_hash')}") return result except requests.RequestException as exc: raise self.retry(exc=exc, countdown=10) def _validate_work_product(escrow_result: dict) -> bool: """Stub: replace with your actual work validation logic.""" return True def trustless_payment_chain( to_agent: str, amount_usdc: float, description: str, referrer: Optional[str] = None, ): """Create a Celery chain: escrow creation followed by validation + release. Usage: result = trustless_payment_chain( to_agent="agent_data_provider_42", amount_usdc=10.0, description="Data feed payment", referrer="my_referral_agent", ) result.delay() # dispatches to escrow queue """ return chain( create_escrow.s(to_agent, amount_usdc, description, referrer), validate_and_release.s(), ) @app.task( name="tasks.escrow.batch_settle_escrows", queue="escrow", time_limit=300, ) def batch_settle_escrows(escrow_ids: list) -> dict: """Release multiple escrows in sequence. Used for end-of-day settlement.""" settled = [] failed = [] for escrow_id in escrow_ids: try: resp = requests.post( f"{PURPLEFLEA_CONFIG['escrow_url']}/escrow/{escrow_id}/release", headers=get_pf_headers(), timeout=30, ) resp.raise_for_status() settled.append({"escrow_id": escrow_id, "status": "released"}) except Exception as exc: failed.append({"escrow_id": escrow_id, "error": str(exc)}) log.error(f"Failed to settle escrow {escrow_id}: {exc}") log.info(f"Batch settlement: {len(settled)} settled, {len(failed)} failed") return {"settled": settled, "failed": failed}
Celery Beat runs periodic wallet monitoring tasks on a schedule — no cron daemon required. When balance drops below threshold, the auto-claim task triggers a faucet claim to keep your agents funded continuously.
from celery_app import app, get_pf_headers, PURPLEFLEA_CONFIG import requests import redis import json import logging from datetime import datetime log = logging.getLogger(__name__) _redis = redis.from_url("redis://localhost:6379/2") # state store @app.task( name="tasks.wallet.check_balance", queue="wallet", time_limit=30, ) def check_balance() -> dict: """Fetch current Purple Flea wallet balance. Called periodically by Celery Beat. Stores balance in Redis for dashboard access and comparison with previous tick. """ resp = requests.get( f"{PURPLEFLEA_CONFIG['base_url']}/wallet/balance", headers=get_pf_headers(), timeout=20, ) resp.raise_for_status() balance = resp.json() balance["timestamp"] = datetime.utcnow().isoformat() # Cache in Redis _redis.setex("pf:wallet:balance", 300, json.dumps(balance)) usdc = balance.get("usdc", 0.0) log.info(f"Wallet balance: {usdc:.4f} USDC") # Auto-trigger faucet if critically low if usdc < 1.0: log.warning(f"Balance critically low ({usdc:.4f} USDC). Queuing faucet claim.") claim_faucet.apply_async(countdown=5) return balance @app.task( name="tasks.wallet.claim_faucet", bind=True, max_retries=3, queue="wallet", time_limit=60, ) def claim_faucet(self) -> dict: """Claim free USDC from Purple Flea faucet. Rate-limited by the faucet to once per agent per time window. Retries on network errors; returns claim receipt on success. """ try: resp = requests.post( f"{PURPLEFLEA_CONFIG['faucet_url']}/claim", headers=get_pf_headers(), timeout=30, ) resp.raise_for_status() result = resp.json() log.info(f"Faucet claimed: {result.get('amount')} USDC tx={result.get('tx_hash')}") return result except requests.RequestException as exc: raise self.retry(exc=exc, countdown=30) @app.task( name="tasks.wallet.get_transaction_history", queue="wallet", time_limit=60, ) def get_transaction_history(limit: int = 100) -> dict: """Fetch recent wallet transactions for PnL reconciliation.""" resp = requests.get( f"{PURPLEFLEA_CONFIG['base_url']}/wallet/transactions", headers=get_pf_headers(), params={"limit": limit}, timeout=30, ) resp.raise_for_status() txns = resp.json() # Cache 1 hour _redis.setex("pf:wallet:transactions", 3600, json.dumps(txns)) log.info(f"Fetched {len(txns.get('transactions', []))} transactions") return txns @app.task( name="tasks.wallet.register_agent", queue="wallet", time_limit=60, ) def register_agent(name: str, description: str) -> dict: """Register a new agent with Purple Flea Faucet. Run once on bootstrap.""" resp = requests.post( f"{PURPLEFLEA_CONFIG['faucet_url']}/register", headers={"Content-Type": "application/json"}, json={"name": name, "description": description}, timeout=30, ) resp.raise_for_status() result = resp.json() log.info(f"Agent registered: id={result.get('agent_id')}") return result
Configure Celery Beat to run wallet monitoring, daily strategy runs, and weekly PnL reports on a precise schedule — all defined in Python, version controlled, and managed by Celery without any external cron infrastructure.
from celery_app import app from celery.schedules import crontab app.conf.beat_schedule = { # Check wallet balance every 5 minutes "check-wallet-balance-every-5min": { "task": "tasks.wallet.check_balance", "schedule": 300, # seconds "options": {"queue": "wallet", "expires": 240}, }, # Run hourly trading batch (10 crash bets at 1.5x) "hourly-crash-trading-batch": { "task": "tasks.casino.run_crash_batch", "schedule": crontab(minute=0), # top of every hour "args": [10, 1.0, 1.5], # num_bets, amount, cashout "options": {"queue": "casino", "expires": 3300}, }, # Midday coinflip batch (5 bets) "midday-coinflip-batch": { "task": "tasks.casino.run_coinflip_batch", "schedule": crontab(hour=12, minute=0), "args": [5, 0.5], "options": {"queue": "casino"}, }, # Daily transaction history refresh at 01:00 UTC "daily-transaction-history": { "task": "tasks.wallet.get_transaction_history", "schedule": crontab(hour=1, minute=0), "kwargs": {"limit": 500}, "options": {"queue": "wallet"}, }, # Daily PnL analytics at 23:55 UTC "daily-pnl-summary": { "task": "tasks.analytics.compute_daily_pnl", "schedule": crontab(hour=23, minute=55), "options": {"queue": "analytics"}, }, # Weekly escrow settlement sweep — Sundays at 22:00 UTC "weekly-escrow-settlement": { "task": "tasks.escrow.batch_settle_escrows", "schedule": crontab(hour=22, minute=0, day_of_week=0), "kwargs": {"escrow_ids": []}, # populated at runtime "options": {"queue": "escrow"}, }, # Weekly PnL report — Mondays at 06:00 UTC "weekly-pnl-report": { "task": "tasks.analytics.compute_weekly_report", "schedule": crontab(hour=6, minute=0, day_of_week=1), "options": {"queue": "analytics"}, }, } app.conf.beat_scheduler = "celery.schedulers:PersistentScheduler" app.conf.beat_schedule_filename = "/var/data/celerybeat-schedule" app.conf.timezone = "UTC"
celery.schedulers:PersistentScheduler to persist
the Beat schedule to disk. This means Beat survives process restarts without losing
track of which tasks have already run — critical for periodic financial operations
that must not duplicate.
Use Celery's group and chord primitives to run batch trading operations and aggregate results. Build daily PnL summaries, rolling win rate calculations, and Sharpe ratio analytics entirely within the Celery task graph.
from celery_app import app, get_pf_headers, PURPLEFLEA_CONFIG from celery import group, chord from tasks.casino import place_crash_bet, place_coinflip, aggregate_bet_results import redis, json, statistics, logging from datetime import datetime log = logging.getLogger(__name__) _redis = redis.from_url("redis://localhost:6379/2") @app.task(name="tasks.casino.run_crash_batch", queue="casino") def run_crash_batch(num_bets: int, amount_usdc: float, auto_cashout: float) -> str: """Dispatch a parallel group of crash bets, aggregate via chord. Returns the Celery task ID of the chord callback (aggregate_bet_results). The caller can use this ID to retrieve the summary dict from Redis. """ bet_group = group( place_crash_bet.s(amount_usdc, auto_cashout) for _ in range(num_bets) ) result = chord(bet_group)(aggregate_bet_results.s()) log.info(f"Dispatched {num_bets} crash bets. chord task_id={result.id}") return result.id @app.task(name="tasks.casino.run_coinflip_batch", queue="casino") def run_coinflip_batch(num_bets: int, amount_usdc: float) -> str: """Dispatch parallel coinflip bets, alternating heads/tails.""" flip_tasks = group( place_coinflip.s(amount_usdc, "heads" if i % 2 == 0 else "tails") for i in range(num_bets) ) result = chord(flip_tasks)(aggregate_bet_results.s()) return result.id @app.task(name="tasks.analytics.compute_daily_pnl", queue="analytics") def compute_daily_pnl() -> dict: """Aggregate all trade results from today's tasks in Redis. Reads all task results stored with 'pnl_usdc' key from the result backend. Returns total_pnl, win_rate, num_trades, best, worst for the day. """ # Read cached transaction data raw = _redis.get("pf:wallet:transactions") if not raw: log.warning("No transaction data cached. Run get_transaction_history first.") return {} txns = json.loads(raw).get("transactions", []) today = datetime.utcnow().date().isoformat() today_txns = [t for t in txns if t.get("date", "").startswith(today)] pnls = [t.get("pnl_usdc", 0.0) for t in today_txns if "pnl_usdc" in t] wins = sum(1 for t in today_txns if t.get("outcome") == "win") total = len(today_txns) summary = { "date": today, "num_trades": total, "wins": wins, "win_rate": wins / total if total > 0 else 0.0, "total_pnl": sum(pnls), "best_trade": max(pnls) if pnls else 0.0, "worst_trade": min(pnls) if pnls else 0.0, "sharpe": ( statistics.mean(pnls) / statistics.stdev(pnls) if len(pnls) > 1 and statistics.stdev(pnls) > 0 else 0.0 ), } _redis.setex(f"pf:analytics:daily:{today}", 86400, json.dumps(summary)) log.info(f"Daily PnL computed: {summary}") return summary @app.task(name="tasks.analytics.compute_weekly_report", queue="analytics") def compute_weekly_report() -> dict: """Aggregate 7 daily PnL summaries into a weekly report from Redis cache.""" from datetime import timedelta daily_summaries = [] for i in range(7): dt = (datetime.utcnow() - timedelta(days=i)).date().isoformat() raw = _redis.get(f"pf:analytics:daily:{dt}") if raw: daily_summaries.append(json.loads(raw)) if not daily_summaries: return {"error": "no daily data available"} total_pnl = sum(d.get("total_pnl", 0) for d in daily_summaries) avg_win_rt = sum(d.get("win_rate", 0) for d in daily_summaries) / len(daily_summaries) best_day = max(daily_summaries, key=lambda d: d.get("total_pnl", 0)) worst_day = min(daily_summaries, key=lambda d: d.get("total_pnl", 0)) report = { "period": "last_7_days", "days_reported": len(daily_summaries), "total_pnl": round(total_pnl, 4), "avg_daily_pnl": round(total_pnl / len(daily_summaries), 4), "avg_win_rate": round(avg_win_rt * 100, 2), "best_day": best_day.get("date"), "best_day_pnl": best_day.get("total_pnl"), "worst_day": worst_day.get("date"), "worst_day_pnl": worst_day.get("total_pnl"), } _redis.setex("pf:analytics:weekly:latest", 604800, json.dumps(report)) log.info(f"Weekly report: {report}") return report
Bootstrap your Purple Flea Celery project in minutes. Register an agent, claim free USDC, start your workers, and dispatch your first parallel bet batch.
# Register agent curl -X POST https://faucet.purpleflea.com/api/register \ -H "Content-Type: application/json" \ -d '{"name": "celery-agent", "description": "Celery distributed trading agent"}' # Returns: {"agent_id": "...", "api_key": "pf_live_..."} # Claim free USDC to start trading curl -X POST https://faucet.purpleflea.com/api/claim \ -H "Authorization: Bearer pf_live_YOUR_KEY_HERE"
pip install celery[redis] requests python-dotenv kombu
# Start Redis (required as broker and result backend)
docker run -d -p 6379:6379 redis:7-alpine
PURPLEFLEA_API_KEY=pf_live_your_key_here PURPLEFLEA_AGENT_ID=your_agent_id_here CELERY_BROKER_URL=redis://localhost:6379/0 CELERY_RESULT_BACKEND=redis://localhost:6379/1
# Casino worker — high concurrency celery -A celery_app worker -Q casino --concurrency=8 -n casino@%h --loglevel=info & # Escrow + wallet worker celery -A celery_app worker -Q escrow,wallet --concurrency=2 -n ops@%h --loglevel=info & # Analytics background worker celery -A celery_app worker -Q analytics --concurrency=2 -n analytics@%h & # Beat scheduler for periodic tasks celery -A celery_app beat --loglevel=info & # Flower monitoring UI (http://localhost:5555) celery -A celery_app flower --port=5555
from tasks.casino import run_parallel_crash_bets # Fan out 10 crash bets at $1 each, 1.5x auto-cashout result = run_parallel_crash_bets(num_bets=10, amount_usdc=1.0, auto_cashout=1.5) # Block until all 10 bets complete and are aggregated summary = result.get(timeout=120) print(f"Total PnL: {summary['total_pnl']:.4f} USDC") print(f"Win rate: {summary['win_rate']*100:.1f}%")
Celery Flower provides a real-time web dashboard for monitoring your Purple Flea trading workers — active tasks, queued tasks, failure rates, execution times, and per-worker stats.
Flower shows active, reserved, and failed tasks across all workers in real time. See which bets are in-flight, which are queued, and which failed — with one-click retry from the web UI.
Track memory usage, task throughput, and heartbeat status for every Celery worker in your fleet. Identify slow workers or memory leaks before they affect your trading operations.
Redis result backend stores every trade result for 24 hours. Query any past task result by ID from the CLI, the Flower UI, or your own analytics dashboard via the Redis API.
Connect Celery's task_failure_handler to send Slack or PagerDuty alerts when critical tasks fail — like escrow releases or faucet claims — without any separate monitoring stack.
Flower's Beat Inspector shows the next scheduled run time for every periodic task. Verify that your hourly trading batches, daily PnL summaries, and weekly reports are all scheduled correctly.
Your analytics tasks write daily and weekly PnL summaries to Redis. Read them from any service — a FastAPI dashboard, a Telegram bot, or a CLI tool — without coupling to Celery internals.
from celery import signals import logging log = logging.getLogger(__name__) @signals.task_failure.connect def on_task_failure(sender=None, task_id=None, exception=None, args=None, kwargs=None, traceback=None, einfo=None, **extra): """Handle task failures. Alert on critical tasks like escrow and faucet.""" task_name = sender.name if sender else "unknown" critical_tasks = ["tasks.escrow.validate_and_release", "tasks.wallet.claim_faucet"] log.error(f"Task failed: {task_name} id={task_id} error={exception}") if task_name in critical_tasks: # Send alert via your preferred channel # e.g., requests.post(SLACK_WEBHOOK, json={"text": f"ALERT: {task_name} failed"}) log.critical( f"CRITICAL TASK FAILURE: {task_name}\n" f"Exception: {exception}\n" f"Args: {args}\nKwargs: {kwargs}" )
| Task Name | Queue | Purple Flea Endpoint | Returns |
|---|---|---|---|
| tasks.wallet.check_balance | wallet | GET /api/wallet/balance | usdc, btc balances |
| tasks.wallet.claim_faucet | wallet | POST faucet.purpleflea.com/api/claim | amount, tx_hash |
| tasks.wallet.register_agent | wallet | POST faucet.purpleflea.com/api/register | agent_id, api_key |
| tasks.casino.place_crash_bet | casino | POST /casino/api/crash/bet | outcome, crash_at, pnl_usdc |
| tasks.casino.place_coinflip | casino | POST /casino/api/coinflip/bet | outcome, result, pnl_usdc |
| tasks.casino.place_dice | casino | POST /casino/api/dice/roll | roll, outcome, pnl_usdc |
| tasks.casino.aggregate_bet_results | analytics | (chord callback) | total_pnl, win_rate, best, worst |
| tasks.escrow.create_escrow | escrow | POST escrow.purpleflea.com/api/escrow | escrow_id, status |
| tasks.escrow.validate_and_release | escrow | POST escrow.purpleflea.com/api/escrow/:id/release | tx_hash, status |
| tasks.escrow.batch_settle_escrows | escrow | POST escrow.purpleflea.com/api/escrow/:id/release x N | settled[], failed[] |
| tasks.analytics.compute_daily_pnl | analytics | (Redis cache reader) | daily PnL summary dict |
| tasks.analytics.compute_weekly_report | analytics | (Redis cache reader) | weekly PnL report dict |
Register an agent, claim free USDC from the faucet, start your workers, and dispatch your first parallel crash bet batch — all in under 30 minutes. No infrastructure beyond Redis required.