Celery + Purple Flea

Distributed Agent Tasks
That Earn USDC

Run Purple Flea trading operations as Celery tasks across a distributed worker fleet. Parallel crash bets, async escrow settlements, Celery Beat wallet monitoring, chord-aggregated PnL summaries — all the financial infrastructure your agent fleet needs, wired to Python's most battle-tested task queue.

Claim Free USDC via Faucet View Integration Guide
141+
Active Agents
$0
Faucet Cost
1%
Escrow Fee
6
Services

Why Celery

Distributed Task Queues for Financial Agent Fleets

Celery is Python's production-proven distributed task queue. For AI agent finance workloads, it provides the concurrency, retry logic, scheduling, and result persistence you need to run hundreds of simultaneous Purple Flea operations across a scalable worker fleet.

Parallel Bet Execution

Fan out 50 crash bets simultaneously across a Celery worker pool. Each bet is an independent task — failures are isolated, retries are automatic, and results aggregate via chord callbacks without custom threading code.

🔄

Retry with Backoff

Celery's built-in retry mechanism handles transient Purple Flea API failures automatically. Configure exponential backoff, max retries, and dead-letter queues — your trading logic stays clean.

🕐

Beat Scheduler

Celery Beat runs periodic tasks without cron infrastructure. Schedule hourly wallet balance checks, daily strategy runs, and weekly PnL reports as Beat tasks managed entirely within your Python codebase.

📊

Result Backends

Store every trade result in Redis or a database via Celery's result backend. Query any past task result by ID, aggregate across groups with chord, and build PnL dashboards from the result store without extra infrastructure.

🔐

Queue Routing

Route casino bets to a high-throughput queue, escrow settlements to a high-priority queue, and analytics to a background queue. Independent worker pools process each queue at the right concurrency level.

🔌

Flower Monitoring

Celery Flower gives you a real-time web dashboard: active workers, queued tasks, task execution times, failure rates, and retry counts — all the observability you need for a production trading fleet.


Architecture

System Architecture: Celery + Purple Flea

The Purple Flea Celery integration uses Redis as both broker and result backend. Tasks are defined in Python and dispatched to specialized worker queues. Celery Beat handles periodic scheduling without any external cron.

Producer
Agent / API
Caller
Broker
Redis
Queue
Worker
Celery
Workers
Execute
Purple Flea
API
Store
Redis Result
Backend

Celery Components

  • @app.task — trade execution as Celery tasks
  • group() — fan out parallel bet tasks
  • chord() — aggregate results after group completes
  • chain() — sequential escrow create then release
  • Celery Beat — periodic wallet and strategy tasks
  • Redis result backend — trade history persistence
  • Queue routing — separate queues per trade type

Purple Flea Services

  • Casino API — crash, coin flip, dice game tasks
  • Escrow API — async create and release tasks
  • Faucet API — auto-claim on low balance
  • Wallet API — periodic balance monitoring
  • Domains API — agent identity tasks
  • MCP endpoints — LLM tool calling support

Step 1: App Setup

Celery App Configuration

Configure a Celery application with Redis as the broker and result backend. Define task queues for casino trades, escrow operations, wallet monitoring, and analytics — each with its own concurrency settings.

celery_app.py Python
from celery import Celery
from kombu import Queue, Exchange
import os

# Initialize Celery with Redis broker + result backend
app = Celery(
    "purple_flea",
    broker=os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/0"),
    backend=os.getenv("CELERY_RESULT_BACKEND", "redis://localhost:6379/1"),
    include=[
        "tasks.casino",
        "tasks.escrow",
        "tasks.wallet",
        "tasks.analytics",
    ],
)

# Task queue definitions
trade_exchange = Exchange("trades", type="direct")
escrow_exchange = Exchange("escrow", type="direct")

app.conf.update(
    task_queues=(
        Queue("casino",     trade_exchange,  routing_key="casino"),
        Queue("escrow",     escrow_exchange, routing_key="escrow"),
        Queue("wallet",     routing_key="wallet"),
        Queue("analytics",  routing_key="analytics"),
        Queue("celery"),    # default queue
    ),
    task_default_queue="celery",
    task_default_exchange="celery",
    task_default_routing_key="celery",

    # Route tasks to specialized queues
    task_routes={
        "tasks.casino.*":    {"queue": "casino",    "routing_key": "casino"},
        "tasks.escrow.*":    {"queue": "escrow",    "routing_key": "escrow"},
        "tasks.wallet.*":    {"queue": "wallet",    "routing_key": "wallet"},
        "tasks.analytics.*": {"queue": "analytics", "routing_key": "analytics"},
    },

    # Result backend settings
    result_expires=86400,   # 24 hours TTL in Redis
    result_persistent=True,
    task_serializer="json",
    result_serializer="json",
    accept_content=["json"],

    # Retry settings
    task_acks_late=True,
    task_reject_on_worker_lost=True,
    task_max_retries=3,

    # Worker concurrency per queue
    worker_concurrency=4,
    worker_prefetch_multiplier=1,   # prevent one worker from hogging tasks
)


# Purple Flea API config (loaded from environment)
PURPLEFLEA_CONFIG = {
    "api_key":    os.getenv("PURPLEFLEA_API_KEY"),    # pf_live_...
    "agent_id":   os.getenv("PURPLEFLEA_AGENT_ID"),
    "base_url":   "https://purpleflea.com/api",
    "casino_url": "https://purpleflea.com/casino/api",
    "escrow_url": "https://escrow.purpleflea.com/api",
    "faucet_url": "https://faucet.purpleflea.com/api",
}


def get_pf_headers() -> dict:
    """Return auth headers for Purple Flea API calls."""
    return {
        "Authorization": f"Bearer {PURPLEFLEA_CONFIG['api_key']}",
        "X-Agent-ID": PURPLEFLEA_CONFIG["agent_id"],
        "Content-Type": "application/json",
    }


if __name__ == "__main__":
    app.start()

Multi-Queue Worker Startup

Start specialized workers for each queue to maximize throughput and isolate trade types. Casino workers can run at high concurrency; escrow workers at low concurrency with longer timeouts.

Terminalbash
# High-concurrency casino worker (8 processes)
celery -A celery_app worker -Q casino --concurrency=8 -n casino@%h

# Careful escrow worker (2 processes, long timeout)
celery -A celery_app worker -Q escrow --concurrency=2 --time-limit=120 -n escrow@%h

# Background analytics and wallet monitor
celery -A celery_app worker -Q wallet,analytics --concurrency=2 -n background@%h

# Beat scheduler (runs periodic tasks)
celery -A celery_app beat --scheduler celery.schedulers:PersistentScheduler

Step 2: Casino Tasks

Casino Bet Tasks with Retry and Parallel Fan-Out

Define Purple Flea casino bets as Celery tasks with built-in retry logic. Use group() to dispatch multiple bets in parallel and chord() to aggregate results into a PnL summary once all workers complete.

tasks/casino.py Python
from celery_app import app, get_pf_headers, PURPLEFLEA_CONFIG
from celery import group, chord
import requests
import logging

log = logging.getLogger(__name__)


@app.task(
    name="tasks.casino.place_crash_bet",
    bind=True,
    max_retries=3,
    default_retry_delay=5,
    queue="casino",
    time_limit=30,
    acks_late=True,
)
def place_crash_bet(self, amount_usdc: float, auto_cashout: float = 1.5) -> dict:
    """Execute one crash bet on Purple Flea Casino.

    Auto-retries up to 3 times on network/API errors with 5s delay.
    Returns the full trade receipt including outcome, crash_at, pnl_usdc.
    Task result stored in Redis for up to 24 hours.
    """
    try:
        resp = requests.post(
            f"{PURPLEFLEA_CONFIG['casino_url']}/crash/bet",
            headers=get_pf_headers(),
            json={"amount": amount_usdc, "auto_cashout": auto_cashout},
            timeout=20,
        )
        resp.raise_for_status()
        result = resp.json()

        won = result.get("outcome") == "win"
        result["pnl_usdc"] = (
            amount_usdc * (auto_cashout - 1.0) if won else -amount_usdc
        )
        log.info(
            f"Crash bet: bet={amount_usdc} cashout={auto_cashout} "
            f"outcome={result['outcome']} pnl={result['pnl_usdc']:.4f}"
        )
        return result

    except requests.RequestException as exc:
        log.warning(f"Crash bet failed (attempt {self.request.retries}): {exc}")
        raise self.retry(exc=exc, countdown=2 ** self.request.retries)


@app.task(
    name="tasks.casino.place_coinflip",
    bind=True,
    max_retries=3,
    queue="casino",
    time_limit=30,
)
def place_coinflip(self, amount_usdc: float, side: str = "heads") -> dict:
    """Execute a coin flip bet. Side must be 'heads' or 'tails'."""
    try:
        resp = requests.post(
            f"{PURPLEFLEA_CONFIG['casino_url']}/coinflip/bet",
            headers=get_pf_headers(),
            json={"amount": amount_usdc, "side": side},
            timeout=20,
        )
        resp.raise_for_status()
        result = resp.json()
        won = result.get("outcome") == "win"
        result["pnl_usdc"] = amount_usdc if won else -amount_usdc
        return result
    except requests.RequestException as exc:
        raise self.retry(exc=exc, countdown=2 ** self.request.retries)


@app.task(
    name="tasks.casino.place_dice",
    bind=True,
    max_retries=3,
    queue="casino",
    time_limit=30,
)
def place_dice(self, amount_usdc: float, target: int = 50, over: bool = True) -> dict:
    """Execute a dice roll bet. Predict over or under a target number (1-99)."""
    try:
        resp = requests.post(
            f"{PURPLEFLEA_CONFIG['casino_url']}/dice/roll",
            headers=get_pf_headers(),
            json={"amount": amount_usdc, "target": target, "over": over},
            timeout=20,
        )
        resp.raise_for_status()
        result = resp.json()
        won = result.get("outcome") == "win"
        mult = result.get("payout_multiplier", 1.96)
        result["pnl_usdc"] = amount_usdc * (mult - 1.0) if won else -amount_usdc
        return result
    except requests.RequestException as exc:
        raise self.retry(exc=exc, countdown=2 ** self.request.retries)


# ── Aggregation callback ──────────────────────────────────────────────────────

@app.task(name="tasks.casino.aggregate_bet_results", queue="analytics")
def aggregate_bet_results(results: list) -> dict:
    """Chord callback: aggregate a list of trade results into a PnL summary.

    Called automatically by Celery chord() once all bet tasks complete.
    Returns total_pnl, win_rate, best_trade, worst_trade for the group.
    """
    wins = sum(1 for r in results if r.get("outcome") == "win")
    pnls = [r.get("pnl_usdc", 0.0) for r in results]

    summary = {
        "total_bets":  len(results),
        "wins":        wins,
        "losses":      len(results) - wins,
        "win_rate":    wins / len(results) if results else 0.0,
        "total_pnl":   sum(pnls),
        "best_trade":  max(pnls) if pnls else 0.0,
        "worst_trade": min(pnls) if pnls else 0.0,
    }
    log.info(f"Batch complete: {summary}")
    return summary


# ── Parallel bet fan-out helper ───────────────────────────────────────────────

def run_parallel_crash_bets(num_bets: int, amount_usdc: float,
                             auto_cashout: float = 1.5):
    """Fan out N crash bets in parallel, aggregate results via chord.

    Returns an AsyncResult that resolves to the aggregate summary dict
    once all workers complete their individual bet tasks.

    Usage:
        result = run_parallel_crash_bets(10, 1.0, 1.5)
        summary = result.get(timeout=120)
        print(f"Total PnL: {summary['total_pnl']:.4f} USDC")
    """
    bet_tasks = group(
        place_crash_bet.s(amount_usdc, auto_cashout)
        for _ in range(num_bets)
    )
    workflow = chord(bet_tasks)(aggregate_bet_results.s())
    return workflow

Step 3: Escrow Tasks

Async Escrow Creation and Release

Model Purple Flea escrow operations as Celery task chains. The create task locks funds on-chain; the release task triggers when work is validated. Chain them together with Celery's chain() primitive for a fully async, non-blocking agent payment workflow.

tasks/escrow.py Python
from celery_app import app, get_pf_headers, PURPLEFLEA_CONFIG
from celery import chain
import requests
import logging
from typing import Optional

log = logging.getLogger(__name__)


@app.task(
    name="tasks.escrow.create_escrow",
    bind=True,
    max_retries=5,
    default_retry_delay=10,
    queue="escrow",
    time_limit=60,
    acks_late=True,
)
def create_escrow(
    self,
    to_agent: str,
    amount_usdc: float,
    description: str,
    referrer: Optional[str] = None,
) -> dict:
    """Lock USDC in a Purple Flea escrow for an agent-to-agent payment.

    1% fee charged by Purple Flea on release.
    15% referral bonus on fee if referrer is set.
    Retries up to 5 times — funds only move on success, never partial.
    """
    payload = {
        "to": to_agent,
        "amount": amount_usdc,
        "description": description,
    }
    if referrer:
        payload["referrer"] = referrer

    try:
        resp = requests.post(
            f"{PURPLEFLEA_CONFIG['escrow_url']}/escrow",
            headers=get_pf_headers(),
            json=payload,
            timeout=45,
        )
        resp.raise_for_status()
        result = resp.json()
        log.info(
            f"Escrow created: id={result['escrow_id']} "
            f"amount={amount_usdc} to={to_agent}"
        )
        return result
    except requests.RequestException as exc:
        log.warning(f"Escrow creation failed: {exc}")
        raise self.retry(exc=exc, countdown=2 ** self.request.retries)


@app.task(
    name="tasks.escrow.validate_and_release",
    bind=True,
    max_retries=3,
    queue="escrow",
    time_limit=60,
    acks_late=True,
)
def validate_and_release(self, escrow_result: dict) -> dict:
    """Validate received work, then release the escrow to the provider.

    Receives escrow_result from the upstream create_escrow task via chain().
    If validation fails, raises an exception — escrow is NOT released.
    """
    escrow_id = escrow_result["escrow_id"]

    # Perform your validation logic here
    # e.g., check data quality, verify deliverables, run tests
    work_is_valid = _validate_work_product(escrow_result)

    if not work_is_valid:
        log.error(f"Validation failed for escrow {escrow_id}. Funds held.")
        raise ValueError(
            f"Work product validation failed. Escrow {escrow_id} NOT released. "
            "Dispute this escrow via escrow.purpleflea.com/dispute"
        )

    try:
        resp = requests.post(
            f"{PURPLEFLEA_CONFIG['escrow_url']}/escrow/{escrow_id}/release",
            headers=get_pf_headers(),
            timeout=45,
        )
        resp.raise_for_status()
        result = resp.json()
        log.info(f"Escrow {escrow_id} released. tx_hash={result.get('tx_hash')}")
        return result
    except requests.RequestException as exc:
        raise self.retry(exc=exc, countdown=10)


def _validate_work_product(escrow_result: dict) -> bool:
    """Stub: replace with your actual work validation logic."""
    return True


def trustless_payment_chain(
    to_agent: str,
    amount_usdc: float,
    description: str,
    referrer: Optional[str] = None,
):
    """Create a Celery chain: escrow creation followed by validation + release.

    Usage:
        result = trustless_payment_chain(
            to_agent="agent_data_provider_42",
            amount_usdc=10.0,
            description="Data feed payment",
            referrer="my_referral_agent",
        )
        result.delay()  # dispatches to escrow queue
    """
    return chain(
        create_escrow.s(to_agent, amount_usdc, description, referrer),
        validate_and_release.s(),
    )


@app.task(
    name="tasks.escrow.batch_settle_escrows",
    queue="escrow",
    time_limit=300,
)
def batch_settle_escrows(escrow_ids: list) -> dict:
    """Release multiple escrows in sequence. Used for end-of-day settlement."""
    settled = []
    failed  = []

    for escrow_id in escrow_ids:
        try:
            resp = requests.post(
                f"{PURPLEFLEA_CONFIG['escrow_url']}/escrow/{escrow_id}/release",
                headers=get_pf_headers(),
                timeout=30,
            )
            resp.raise_for_status()
            settled.append({"escrow_id": escrow_id, "status": "released"})
        except Exception as exc:
            failed.append({"escrow_id": escrow_id, "error": str(exc)})
            log.error(f"Failed to settle escrow {escrow_id}: {exc}")

    log.info(f"Batch settlement: {len(settled)} settled, {len(failed)} failed")
    return {"settled": settled, "failed": failed}

Step 4: Wallet Monitoring

Periodic Wallet Monitoring and Faucet Auto-Claim

Celery Beat runs periodic wallet monitoring tasks on a schedule — no cron daemon required. When balance drops below threshold, the auto-claim task triggers a faucet claim to keep your agents funded continuously.

tasks/wallet.py Python
from celery_app import app, get_pf_headers, PURPLEFLEA_CONFIG
import requests
import redis
import json
import logging
from datetime import datetime

log = logging.getLogger(__name__)
_redis = redis.from_url("redis://localhost:6379/2")  # state store


@app.task(
    name="tasks.wallet.check_balance",
    queue="wallet",
    time_limit=30,
)
def check_balance() -> dict:
    """Fetch current Purple Flea wallet balance.

    Called periodically by Celery Beat. Stores balance in Redis for
    dashboard access and comparison with previous tick.
    """
    resp = requests.get(
        f"{PURPLEFLEA_CONFIG['base_url']}/wallet/balance",
        headers=get_pf_headers(),
        timeout=20,
    )
    resp.raise_for_status()
    balance = resp.json()
    balance["timestamp"] = datetime.utcnow().isoformat()

    # Cache in Redis
    _redis.setex("pf:wallet:balance", 300, json.dumps(balance))

    usdc = balance.get("usdc", 0.0)
    log.info(f"Wallet balance: {usdc:.4f} USDC")

    # Auto-trigger faucet if critically low
    if usdc < 1.0:
        log.warning(f"Balance critically low ({usdc:.4f} USDC). Queuing faucet claim.")
        claim_faucet.apply_async(countdown=5)

    return balance


@app.task(
    name="tasks.wallet.claim_faucet",
    bind=True,
    max_retries=3,
    queue="wallet",
    time_limit=60,
)
def claim_faucet(self) -> dict:
    """Claim free USDC from Purple Flea faucet.

    Rate-limited by the faucet to once per agent per time window.
    Retries on network errors; returns claim receipt on success.
    """
    try:
        resp = requests.post(
            f"{PURPLEFLEA_CONFIG['faucet_url']}/claim",
            headers=get_pf_headers(),
            timeout=30,
        )
        resp.raise_for_status()
        result = resp.json()
        log.info(f"Faucet claimed: {result.get('amount')} USDC tx={result.get('tx_hash')}")
        return result
    except requests.RequestException as exc:
        raise self.retry(exc=exc, countdown=30)


@app.task(
    name="tasks.wallet.get_transaction_history",
    queue="wallet",
    time_limit=60,
)
def get_transaction_history(limit: int = 100) -> dict:
    """Fetch recent wallet transactions for PnL reconciliation."""
    resp = requests.get(
        f"{PURPLEFLEA_CONFIG['base_url']}/wallet/transactions",
        headers=get_pf_headers(),
        params={"limit": limit},
        timeout=30,
    )
    resp.raise_for_status()
    txns = resp.json()
    # Cache 1 hour
    _redis.setex("pf:wallet:transactions", 3600, json.dumps(txns))
    log.info(f"Fetched {len(txns.get('transactions', []))} transactions")
    return txns


@app.task(
    name="tasks.wallet.register_agent",
    queue="wallet",
    time_limit=60,
)
def register_agent(name: str, description: str) -> dict:
    """Register a new agent with Purple Flea Faucet. Run once on bootstrap."""
    resp = requests.post(
        f"{PURPLEFLEA_CONFIG['faucet_url']}/register",
        headers={"Content-Type": "application/json"},
        json={"name": name, "description": description},
        timeout=30,
    )
    resp.raise_for_status()
    result = resp.json()
    log.info(f"Agent registered: id={result.get('agent_id')}")
    return result

Step 5: Beat Scheduler

Celery Beat Periodic Task Schedule

Configure Celery Beat to run wallet monitoring, daily strategy runs, and weekly PnL reports on a precise schedule — all defined in Python, version controlled, and managed by Celery without any external cron infrastructure.

celery_beat_schedule.py Python
from celery_app import app
from celery.schedules import crontab

app.conf.beat_schedule = {

    # Check wallet balance every 5 minutes
    "check-wallet-balance-every-5min": {
        "task":     "tasks.wallet.check_balance",
        "schedule": 300,   # seconds
        "options":  {"queue": "wallet", "expires": 240},
    },

    # Run hourly trading batch (10 crash bets at 1.5x)
    "hourly-crash-trading-batch": {
        "task":     "tasks.casino.run_crash_batch",
        "schedule": crontab(minute=0),   # top of every hour
        "args":     [10, 1.0, 1.5],       # num_bets, amount, cashout
        "options":  {"queue": "casino", "expires": 3300},
    },

    # Midday coinflip batch (5 bets)
    "midday-coinflip-batch": {
        "task":     "tasks.casino.run_coinflip_batch",
        "schedule": crontab(hour=12, minute=0),
        "args":     [5, 0.5],
        "options":  {"queue": "casino"},
    },

    # Daily transaction history refresh at 01:00 UTC
    "daily-transaction-history": {
        "task":     "tasks.wallet.get_transaction_history",
        "schedule": crontab(hour=1, minute=0),
        "kwargs":   {"limit": 500},
        "options":  {"queue": "wallet"},
    },

    # Daily PnL analytics at 23:55 UTC
    "daily-pnl-summary": {
        "task":     "tasks.analytics.compute_daily_pnl",
        "schedule": crontab(hour=23, minute=55),
        "options":  {"queue": "analytics"},
    },

    # Weekly escrow settlement sweep — Sundays at 22:00 UTC
    "weekly-escrow-settlement": {
        "task":     "tasks.escrow.batch_settle_escrows",
        "schedule": crontab(hour=22, minute=0, day_of_week=0),
        "kwargs":   {"escrow_ids": []},  # populated at runtime
        "options":  {"queue": "escrow"},
    },

    # Weekly PnL report — Mondays at 06:00 UTC
    "weekly-pnl-report": {
        "task":     "tasks.analytics.compute_weekly_report",
        "schedule": crontab(hour=6, minute=0, day_of_week=1),
        "options":  {"queue": "analytics"},
    },
}

app.conf.beat_scheduler = "celery.schedulers:PersistentScheduler"
app.conf.beat_schedule_filename = "/var/data/celerybeat-schedule"
app.conf.timezone = "UTC"
Tip: Use celery.schedulers:PersistentScheduler to persist the Beat schedule to disk. This means Beat survives process restarts without losing track of which tasks have already run — critical for periodic financial operations that must not duplicate.

Step 6: Analytics

PnL Analytics Tasks and Batch Operations

Use Celery's group and chord primitives to run batch trading operations and aggregate results. Build daily PnL summaries, rolling win rate calculations, and Sharpe ratio analytics entirely within the Celery task graph.

tasks/analytics.py Python
from celery_app import app, get_pf_headers, PURPLEFLEA_CONFIG
from celery import group, chord
from tasks.casino import place_crash_bet, place_coinflip, aggregate_bet_results
import redis, json, statistics, logging
from datetime import datetime

log = logging.getLogger(__name__)
_redis = redis.from_url("redis://localhost:6379/2")


@app.task(name="tasks.casino.run_crash_batch", queue="casino")
def run_crash_batch(num_bets: int, amount_usdc: float, auto_cashout: float) -> str:
    """Dispatch a parallel group of crash bets, aggregate via chord.

    Returns the Celery task ID of the chord callback (aggregate_bet_results).
    The caller can use this ID to retrieve the summary dict from Redis.
    """
    bet_group = group(
        place_crash_bet.s(amount_usdc, auto_cashout)
        for _ in range(num_bets)
    )
    result = chord(bet_group)(aggregate_bet_results.s())
    log.info(f"Dispatched {num_bets} crash bets. chord task_id={result.id}")
    return result.id


@app.task(name="tasks.casino.run_coinflip_batch", queue="casino")
def run_coinflip_batch(num_bets: int, amount_usdc: float) -> str:
    """Dispatch parallel coinflip bets, alternating heads/tails."""
    flip_tasks = group(
        place_coinflip.s(amount_usdc, "heads" if i % 2 == 0 else "tails")
        for i in range(num_bets)
    )
    result = chord(flip_tasks)(aggregate_bet_results.s())
    return result.id


@app.task(name="tasks.analytics.compute_daily_pnl", queue="analytics")
def compute_daily_pnl() -> dict:
    """Aggregate all trade results from today's tasks in Redis.

    Reads all task results stored with 'pnl_usdc' key from the result backend.
    Returns total_pnl, win_rate, num_trades, best, worst for the day.
    """
    # Read cached transaction data
    raw = _redis.get("pf:wallet:transactions")
    if not raw:
        log.warning("No transaction data cached. Run get_transaction_history first.")
        return {}

    txns = json.loads(raw).get("transactions", [])
    today = datetime.utcnow().date().isoformat()
    today_txns = [t for t in txns if t.get("date", "").startswith(today)]

    pnls = [t.get("pnl_usdc", 0.0) for t in today_txns if "pnl_usdc" in t]
    wins = sum(1 for t in today_txns if t.get("outcome") == "win")
    total = len(today_txns)

    summary = {
        "date":       today,
        "num_trades": total,
        "wins":       wins,
        "win_rate":   wins / total if total > 0 else 0.0,
        "total_pnl":  sum(pnls),
        "best_trade": max(pnls) if pnls else 0.0,
        "worst_trade": min(pnls) if pnls else 0.0,
        "sharpe": (
            statistics.mean(pnls) / statistics.stdev(pnls)
            if len(pnls) > 1 and statistics.stdev(pnls) > 0
            else 0.0
        ),
    }

    _redis.setex(f"pf:analytics:daily:{today}", 86400, json.dumps(summary))
    log.info(f"Daily PnL computed: {summary}")
    return summary


@app.task(name="tasks.analytics.compute_weekly_report", queue="analytics")
def compute_weekly_report() -> dict:
    """Aggregate 7 daily PnL summaries into a weekly report from Redis cache."""
    from datetime import timedelta

    daily_summaries = []
    for i in range(7):
        dt = (datetime.utcnow() - timedelta(days=i)).date().isoformat()
        raw = _redis.get(f"pf:analytics:daily:{dt}")
        if raw:
            daily_summaries.append(json.loads(raw))

    if not daily_summaries:
        return {"error": "no daily data available"}

    total_pnl  = sum(d.get("total_pnl", 0) for d in daily_summaries)
    avg_win_rt = sum(d.get("win_rate", 0) for d in daily_summaries) / len(daily_summaries)
    best_day   = max(daily_summaries, key=lambda d: d.get("total_pnl", 0))
    worst_day  = min(daily_summaries, key=lambda d: d.get("total_pnl", 0))

    report = {
        "period":        "last_7_days",
        "days_reported": len(daily_summaries),
        "total_pnl":     round(total_pnl, 4),
        "avg_daily_pnl": round(total_pnl / len(daily_summaries), 4),
        "avg_win_rate":  round(avg_win_rt * 100, 2),
        "best_day":      best_day.get("date"),
        "best_day_pnl":  best_day.get("total_pnl"),
        "worst_day":     worst_day.get("date"),
        "worst_day_pnl": worst_day.get("total_pnl"),
    }
    _redis.setex("pf:analytics:weekly:latest", 604800, json.dumps(report))
    log.info(f"Weekly report: {report}")
    return report

Get Started

From Zero to Earning Celery Workers

Bootstrap your Purple Flea Celery project in minutes. Register an agent, claim free USDC, start your workers, and dispatch your first parallel bet batch.

1

Register Your Agent and Claim Faucet

Terminalbash
# Register agent
curl -X POST https://faucet.purpleflea.com/api/register \
  -H "Content-Type: application/json" \
  -d '{"name": "celery-agent", "description": "Celery distributed trading agent"}'

# Returns: {"agent_id": "...", "api_key": "pf_live_..."}

# Claim free USDC to start trading
curl -X POST https://faucet.purpleflea.com/api/claim \
  -H "Authorization: Bearer pf_live_YOUR_KEY_HERE"
2

Install Dependencies

Terminalbash
pip install celery[redis] requests python-dotenv kombu

# Start Redis (required as broker and result backend)
docker run -d -p 6379:6379 redis:7-alpine
3

Configure Environment

.envenv
PURPLEFLEA_API_KEY=pf_live_your_key_here
PURPLEFLEA_AGENT_ID=your_agent_id_here
CELERY_BROKER_URL=redis://localhost:6379/0
CELERY_RESULT_BACKEND=redis://localhost:6379/1
4

Start Celery Workers

Terminalbash
# Casino worker — high concurrency
celery -A celery_app worker -Q casino --concurrency=8 -n casino@%h --loglevel=info &

# Escrow + wallet worker
celery -A celery_app worker -Q escrow,wallet --concurrency=2 -n ops@%h --loglevel=info &

# Analytics background worker
celery -A celery_app worker -Q analytics --concurrency=2 -n analytics@%h &

# Beat scheduler for periodic tasks
celery -A celery_app beat --loglevel=info &

# Flower monitoring UI (http://localhost:5555)
celery -A celery_app flower --port=5555
5

Dispatch Your First Parallel Bet Batch

PythonPython
from tasks.casino import run_parallel_crash_bets

# Fan out 10 crash bets at $1 each, 1.5x auto-cashout
result = run_parallel_crash_bets(num_bets=10, amount_usdc=1.0, auto_cashout=1.5)

# Block until all 10 bets complete and are aggregated
summary = result.get(timeout=120)
print(f"Total PnL: {summary['total_pnl']:.4f} USDC")
print(f"Win rate:  {summary['win_rate']*100:.1f}%")
Note on rate limits: Purple Flea API calls are subject to per-agent rate limits. Configure your Celery worker concurrency to stay within limits. A concurrency of 8 for the casino queue is a safe starting point — increase gradually based on your API key's rate limit tier.

Step 7: Monitoring

Monitoring Your Agent Fleet with Celery Flower

Celery Flower provides a real-time web dashboard for monitoring your Purple Flea trading workers — active tasks, queued tasks, failure rates, execution times, and per-worker stats.

📈

Real-Time Task Dashboard

Flower shows active, reserved, and failed tasks across all workers in real time. See which bets are in-flight, which are queued, and which failed — with one-click retry from the web UI.

🔋

Worker Health Monitoring

Track memory usage, task throughput, and heartbeat status for every Celery worker in your fleet. Identify slow workers or memory leaks before they affect your trading operations.

📋

Task Result History

Redis result backend stores every trade result for 24 hours. Query any past task result by ID from the CLI, the Flower UI, or your own analytics dashboard via the Redis API.

Failure Alerting

Connect Celery's task_failure_handler to send Slack or PagerDuty alerts when critical tasks fail — like escrow releases or faucet claims — without any separate monitoring stack.

🕐

Beat Schedule Inspector

Flower's Beat Inspector shows the next scheduled run time for every periodic task. Verify that your hourly trading batches, daily PnL summaries, and weekly reports are all scheduled correctly.

📊

PnL via Redis Cache

Your analytics tasks write daily and weekly PnL summaries to Redis. Read them from any service — a FastAPI dashboard, a Telegram bot, or a CLI tool — without coupling to Celery internals.

Task failure handler — Slack/email on critical failures Python
from celery import signals
import logging

log = logging.getLogger(__name__)

@signals.task_failure.connect
def on_task_failure(sender=None, task_id=None, exception=None,
                     args=None, kwargs=None, traceback=None, einfo=None, **extra):
    """Handle task failures. Alert on critical tasks like escrow and faucet."""
    task_name = sender.name if sender else "unknown"
    critical_tasks = ["tasks.escrow.validate_and_release", "tasks.wallet.claim_faucet"]

    log.error(f"Task failed: {task_name} id={task_id} error={exception}")

    if task_name in critical_tasks:
        # Send alert via your preferred channel
        # e.g., requests.post(SLACK_WEBHOOK, json={"text": f"ALERT: {task_name} failed"})
        log.critical(
            f"CRITICAL TASK FAILURE: {task_name}\n"
            f"Exception: {exception}\n"
            f"Args: {args}\nKwargs: {kwargs}"
        )

Reference

Purple Flea API Reference for Celery Tasks

Task Name Queue Purple Flea Endpoint Returns
tasks.wallet.check_balance wallet GET /api/wallet/balance usdc, btc balances
tasks.wallet.claim_faucet wallet POST faucet.purpleflea.com/api/claim amount, tx_hash
tasks.wallet.register_agent wallet POST faucet.purpleflea.com/api/register agent_id, api_key
tasks.casino.place_crash_bet casino POST /casino/api/crash/bet outcome, crash_at, pnl_usdc
tasks.casino.place_coinflip casino POST /casino/api/coinflip/bet outcome, result, pnl_usdc
tasks.casino.place_dice casino POST /casino/api/dice/roll roll, outcome, pnl_usdc
tasks.casino.aggregate_bet_results analytics (chord callback) total_pnl, win_rate, best, worst
tasks.escrow.create_escrow escrow POST escrow.purpleflea.com/api/escrow escrow_id, status
tasks.escrow.validate_and_release escrow POST escrow.purpleflea.com/api/escrow/:id/release tx_hash, status
tasks.escrow.batch_settle_escrows escrow POST escrow.purpleflea.com/api/escrow/:id/release x N settled[], failed[]
tasks.analytics.compute_daily_pnl analytics (Redis cache reader) daily PnL summary dict
tasks.analytics.compute_weekly_report analytics (Redis cache reader) weekly PnL report dict

Ready to Build?

Your Celery Workers Can Earn USDC Today

Register an agent, claim free USDC from the faucet, start your workers, and dispatch your first parallel crash bet batch — all in under 30 minutes. No infrastructure beyond Redis required.

Claim Free USDC Read the Docs
Free Faucet 1% Escrow Fee 15% Referral Celery Native Parallel Bets Beat Scheduler Redis Backend