Apache Kafka Integration

Stream Agent Payment Events
at Scale with Kafka

Connect Apache Kafka to Purple Flea's financial layer. Consume escrow lifecycle events, real-time price feeds, wallet notifications, and casino outcomes as Kafka topics — with exactly-once semantics, Schema Registry, and Kafka Streams for P&L aggregation.

Read API Docs Get Free USDC
12
Kafka Topics
<5ms
Event Latency
EOS
Exactly-Once
Avro
Schema Format
6
Services
Architecture

How Kafka Fits the Purple Flea Stack

Purple Flea services emit webhook events that your Kafka bridge ingests and publishes to topics. Consumer groups downstream handle escrow settlement, trading signals, wallet reconciliation, and real-time P&L — all fully decoupled.

Source
Purple Flea
Webhooks
Bridge
Kafka Producer
(Python)
Broker
Apache Kafka
Cluster
Registry
Schema
Registry
Consumers
Agent
Handlers

Sub-5ms Event Delivery

Purple Flea emits webhook events synchronously on every state transition. Your Kafka producer bridge ingests them with near-zero latency, making Kafka the single source of truth for agent financial events.

🔒

Exactly-Once Semantics

Configure idempotent producers and transactional consumers to guarantee each payment event is processed exactly once. Critical for escrow settlement — no double-credits, no missed releases.

📊

Kafka Streams P&L

Use Kafka Streams to join escrow.released and trading.executed events into a real-time P&L stream. Window aggregations compute per-agent profit every 60 seconds without a separate database.

📋

Schema Registry

All Purple Flea event types are published as Avro schemas in the Confluent Schema Registry. Consumers deserialize with full type safety and automatic schema evolution support.

👥

Consumer Group Isolation

Separate consumer groups for your trading agent, reconciliation service, and alert system. Each group maintains its own offset — no event contention, independent scaling.

📦

Log Compaction for Wallets

Enable log compaction on wallet.balance topics. The latest balance per agent ID is always available for instant agent bootstrap — no need to replay full history on restart.

Topic Design

Purple Flea Kafka Topic Catalogue

Twelve core topics cover the full Purple Flea event surface. Topics follow the domain.event_type naming convention and are partitioned by agent ID for ordering guarantees per agent.

escrow.created

Escrow Created

Fired when two agents initiate a new escrow agreement. Contains escrow ID, payer agent, payee agent, amount in USDC, and expiry timestamp.

Escrow Partitioned by escrow_id
escrow.funded

Escrow Funded

Emitted when the payer agent transfers USDC into the escrow contract. Includes transaction hash, block number, and confirmed amount.

Escrow Payment Trigger
escrow.released

Escrow Released

Published on successful release to payee. Includes gross amount, 1% platform fee deducted, 15% referral fee if applicable, and net transfer to payee.

Settlement Fee Event
escrow.disputed

Escrow Disputed

Triggered when either agent raises a dispute. Escrow enters frozen state pending arbitration. Contains dispute reason, raising agent, and evidence hash.

Dispute Alert
escrow.refunded

Escrow Refunded

Emitted when an escrow is cancelled or dispute resolves in favor of payer. Full amount returned; fees not charged. Safe to use as rollback signal.

Rollback Escrow
wallet.balance

Wallet Balance Update

Log-compacted topic. Every USDC balance change for every agent wallet. Latest record per agent ID gives current balance without full replay.

Log-Compacted Wallet
wallet.deposit

Wallet Deposit

Fired on every confirmed USDC deposit to an agent wallet. Includes source address, amount, transaction hash, and block confirmation count.

Inflow Wallet
trading.executed

Trade Executed

Published for every filled order via the Purple Flea Trading API. Contains symbol, side, fill price, quantity, fees, and realized P&L for closing trades.

Trading P&L Signal
trading.liquidated

Position Liquidated

High-priority alert topic. Emitted when a leveraged position is forcibly closed below margin threshold. Partitioned by agent ID, priority consumer group.

Alert Trading
faucet.claimed

Faucet Claim

Emitted when a new agent claims free USDC via the Purple Flea faucet. Includes agent ID, claimed amount, and wallet address. Useful for onboarding pipelines.

Onboarding Faucet
casino.outcome

Casino Round Outcome

Published after every casino game resolves. Contains game type (crash, coin-flip, dice), agent bet, multiplier, result, and net USDC change.

Casino Outcome
domain.registered

Domain Registered

Fired on every agent domain purchase via the Purple Flea Domains API. Contains domain string, buyer agent ID, price paid, and expiry date.

Domains Registration
Consumer Groups

Recommended Consumer Group Architecture

Each agent service should operate under its own consumer group. This ensures independent offset tracking, fault isolation, and allows services to be scaled horizontally without event duplication.

Consumer Group Topics Subscribed Purpose Recommended Instances
pf-settlement-cg escrow.released, escrow.refunded Final settlement — write to ledger, update agent balances, trigger payouts 1 (idempotent)
pf-dispute-cg escrow.disputed Arbitration queue — alert human reviewers or automated resolution agents 2–3
pf-trading-signal-cg trading.executed, trading.liquidated Risk management — track open risk, compute real-time P&L, trigger hedges 4–8 (partition count)
pf-wallet-reconcile-cg wallet.balance, wallet.deposit Accounting — reconcile agent wallet state against on-chain data nightly 1–2
pf-alert-cg All topics Monitoring — detect anomalies, large bets, unusual escrow patterns, send alerts 2
pf-onboarding-cg faucet.claimed, escrow.created Onboarding funnel — track new agents, send welcome flows, measure activation 1
Python Integration

confluent-kafka Consumer + Purple Flea Webhook Bridge

The following code shows a production-ready pattern: a FastAPI webhook endpoint that receives Purple Flea events and publishes them to Kafka, plus a consumer that processes escrow.released events with exactly-once semantics.

requirements.txt pip
# Purple Flea + Kafka integration dependencies
confluent-kafka==2.4.0
confluent-kafka[avro]==2.4.0
fastavro==1.9.4
fastapi==0.115.0
uvicorn==0.30.1
httpx==0.27.0
python-dotenv==1.0.1
webhook_bridge.py — FastAPI endpoint → Kafka producer Python
"""
Purple Flea → Kafka bridge.
Receives Purple Flea webhook events and publishes them
to the appropriate Kafka topic with schema validation.
"""
import os
import json
import hashlib
import hmac
from fastapi import FastAPI, Request, HTTPException, Header
from confluent_kafka import Producer, KafkaException
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
from confluent_kafka.serialization import SerializationContext, MessageField
from dotenv import load_dotenv
import logging

load_dotenv()
logger = logging.getLogger("pf-bridge")
app = FastAPI(title="Purple Flea Kafka Bridge")

# Kafka producer with idempotency enabled
PRODUCER_CONFIG = {
    "bootstrap.servers": os.getenv("KAFKA_BOOTSTRAP_SERVERS", "localhost:9092"),
    "enable.idempotence": "true",            # exactly-once at producer level
    "acks": "all",                            # wait for all replicas
    "max.in.flight.requests.per.connection": "5",
    "retries": "2147483647",
    "compression.type": "snappy",
    "linger.ms": "5",
    "batch.size": "65536",
}

producer = Producer(PRODUCER_CONFIG)

SCHEMA_REGISTRY_CONF = {
    "url": os.getenv("SCHEMA_REGISTRY_URL", "http://localhost:8081")
}
schema_registry_client = SchemaRegistryClient(SCHEMA_REGISTRY_CONF)

# Map Purple Flea event types to Kafka topics
EVENT_TOPIC_MAP = {
    "escrow.created":      "escrow.created",
    "escrow.funded":       "escrow.funded",
    "escrow.released":     "escrow.released",
    "escrow.disputed":     "escrow.disputed",
    "escrow.refunded":     "escrow.refunded",
    "wallet.balance":      "wallet.balance",
    "wallet.deposit":      "wallet.deposit",
    "trading.executed":    "trading.executed",
    "trading.liquidated":  "trading.liquidated",
    "faucet.claimed":      "faucet.claimed",
    "casino.outcome":      "casino.outcome",
    "domain.registered":   "domain.registered",
}

PF_WEBHOOK_SECRET = os.getenv("PF_WEBHOOK_SECRET", "")

def verify_signature(payload: bytes, signature: str) -> bool:
    """Verify HMAC-SHA256 signature from Purple Flea webhook."""
    expected = hmac.new(
        PF_WEBHOOK_SECRET.encode(),
        payload,
        hashlib.sha256
    ).hexdigest()
    return hmac.compare_digest(f"sha256={expected}", signature)

def delivery_report(err, msg):
    if err:
        logger.error("Kafka delivery failed: %s", err)
    else:
        logger.debug(
            "Delivered to %s [%d] offset %d",
            msg.topic(), msg.partition(), msg.offset()
        )

@app.post("/webhook/purple-flea")
async def receive_webhook(
    request: Request,
    x_pf_signature: str = Header(None, alias="X-PF-Signature")
):
    body = await request.body()

    # Verify webhook authenticity
    if PF_WEBHOOK_SECRET and not verify_signature(body, x_pf_signature or ""):
        raise HTTPException(status_code=401, detail="Invalid webhook signature")

    event = json.loads(body)
    event_type = event.get("type")
    topic = EVENT_TOPIC_MAP.get(event_type)

    if not topic:
        logger.warning("Unknown event type: %s", event_type)
        return {"status": "ignored", "event_type": event_type}

    # Use agent_id as partition key for ordering guarantees
    partition_key = (
        event.get("data", {}).get("agent_id")
        or event.get("data", {}).get("payer_agent_id")
        or event.get("id", "unknown")
    )

    producer.produce(
        topic=topic,
        key=partition_key.encode(),
        value=json.dumps(event).encode(),
        on_delivery=delivery_report,
    )
    producer.poll(0)   # trigger delivery callbacks without blocking

    return {"status": "published", "topic": topic, "key": partition_key}

@app.on_event("shutdown")
def shutdown():
    producer.flush(timeout=10)
    logger.info("Producer flushed on shutdown.")
escrow_consumer.py — exactly-once escrow settlement consumer Python
"""
Exactly-once consumer for escrow.released events.
Uses Kafka transactions to ensure settlement is written
to the ledger database exactly once, even on crash/restart.
"""
import os
import json
import time
import logging
from confluent_kafka import Consumer, KafkaException, KafkaError
from decimal import Decimal

logger = logging.getLogger("pf-settlement")

CONSUMER_CONFIG = {
    "bootstrap.servers": os.getenv("KAFKA_BOOTSTRAP_SERVERS", "localhost:9092"),
    "group.id": "pf-settlement-cg",
    "auto.offset.reset": "earliest",
    "enable.auto.commit": "false",   # manual commit for EOS
    "isolation.level": "read_committed", # only read committed messages
    "max.poll.interval.ms": "300000",
    "session.timeout.ms": "45000",
    "heartbeat.interval.ms": "15000",
}

class EscrowSettlementConsumer:
    def __init__(self, ledger_client):
        self.consumer = Consumer(CONSUMER_CONFIG)
        self.ledger = ledger_client
        self.running = True

    def start(self):
        self.consumer.subscribe(["escrow.released", "escrow.refunded"])
        logger.info("Settlement consumer started. Awaiting escrow events...")

        while self.running:
            msgs = self.consumer.consume(num_messages=50, timeout=1.0)
            if not msgs:
                continue

            for msg in msgs:
                if msg.error():
                    if msg.error().code() == KafkaError._PARTITION_EOF:
                        continue
                    raise KafkaException(msg.error())

                try:
                    self._process(msg)
                except Exception as e:
                    logger.error("Failed to process offset %d: %s", msg.offset(), e)
                    # Do NOT commit — message will be reprocessed after rebalance
                    raise

            # Commit only after all messages in batch processed successfully
            self.consumer.commit(asynchronous=False)

    def _process(self, msg):
        event = json.loads(msg.value().decode())
        data = event["data"]
        topic = msg.topic()

        if topic == "escrow.released":
            self._handle_released(data)
        elif topic == "escrow.refunded":
            self._handle_refunded(data)

    def _handle_released(self, data):
        escrow_id   = data["escrow_id"]
        gross       = Decimal(str(data["amount_usdc"]))
        platform_fee = gross * Decimal("0.01")       # 1% Purple Flea fee
        referral_fee = Decimal("0")

        if data.get("referral_agent_id"):
            referral_fee = platform_fee * Decimal("0.15")  # 15% of platform fee

        net_to_payee = gross - platform_fee

        logger.info(
            "Settling escrow %s: gross=%.4f fee=%.4f referral=%.4f net=%.4f",
            escrow_id, gross, platform_fee, referral_fee, net_to_payee
        )

        # Idempotent upsert — safe to retry on crash
        self.ledger.upsert_settlement(
            escrow_id=escrow_id,
            payee_agent_id=data["payee_agent_id"],
            net_amount_usdc=str(net_to_payee),
            platform_fee_usdc=str(platform_fee),
            referral_agent_id=data.get("referral_agent_id"),
            referral_fee_usdc=str(referral_fee),
            tx_hash=data.get("tx_hash"),
        )

    def _handle_refunded(self, data):
        escrow_id = data["escrow_id"]
        amount    = Decimal(str(data["amount_usdc"]))
        logger.info("Refunding escrow %s: %.4f USDC to payer", escrow_id, amount)
        self.ledger.upsert_refund(
            escrow_id=escrow_id,
            payer_agent_id=data["payer_agent_id"],
            refund_amount_usdc=str(amount),
        )

    def stop(self):
        self.running = False
        self.consumer.close()
        logger.info("Settlement consumer stopped cleanly.")
Kafka Streams

Real-Time P&L Aggregation with Kafka Streams

Join escrow.released and trading.executed into a unified P&L stream. Window aggregations roll up per-agent profit every 60 seconds. The following Python Faust example runs as a lightweight Kafka Streams alternative.

pnl_stream.py — Faust streaming P&L aggregator Python (Faust)
"""
Real-time per-agent P&L aggregator using Faust (Python Kafka Streams).
Aggregates escrow income and trading P&L into 60-second tumbling windows.
"""
import faust
from decimal import Decimal
from dataclasses import dataclass, field
from datetime import timedelta

app = faust.App(
    "pf-pnl-aggregator",
    broker="kafka://localhost:9092",
    store="rocksdb://",    # persistent local state store
    processing_guarantee="exactly_once",
    version=1,
)

# Input event schemas
class EscrowReleasedEvent(faust.Record):
    escrow_id: str
    payee_agent_id: str
    amount_usdc: str
    platform_fee_usdc: str
    timestamp: int

class TradeExecutedEvent(faust.Record):
    trade_id: str
    agent_id: str
    symbol: str
    realized_pnl: str   # signed — positive = profit
    fee_usdc: str
    timestamp: int

# Source topics
escrow_topic = app.topic("escrow.released", value_type=EscrowReleasedEvent)
trade_topic  = app.topic("trading.executed", value_type=TradeExecutedEvent)

# Windowed aggregation table: agent_id → rolling 60s P&L
pnl_table = app.Table(
    "agent_pnl_60s",
    default=Decimal,
    partitions=8,
).tumbling(timedelta(seconds=60), expires=timedelta(hours=24))

# Output topic for downstream consumers (dashboards, alerts)
pnl_output = app.topic("agent.pnl_snapshot", value_serializer="json")

@app.agent(escrow_topic)
async def process_escrow_income(events):
    async for event in events:
        agent_id = event.payee_agent_id
        net_income = Decimal(event.amount_usdc) - Decimal(event.platform_fee_usdc)
        pnl_table[agent_id] += net_income

        await pnl_output.send(
            key=agent_id,
            value={
                "agent_id": agent_id,
                "source": "escrow",
                "delta_usdc": str(net_income),
                "window_pnl": str(pnl_table[agent_id].current()),
            }
        )

@app.agent(trade_topic)
async def process_trade_pnl(events):
    async for event in events:
        agent_id = event.agent_id
        net_pnl  = Decimal(event.realized_pnl) - Decimal(event.fee_usdc)
        pnl_table[agent_id] += net_pnl

        await pnl_output.send(
            key=agent_id,
            value={
                "agent_id": agent_id,
                "source": "trading",
                "delta_usdc": str(net_pnl),
                "window_pnl": str(pnl_table[agent_id].current()),
            }
        )

if __name__ == "__main__":
    app.main()
Schema Registry

Avro Schemas for Purple Flea Events

Register these schemas in your Confluent Schema Registry. All Purple Flea event schemas follow Avro format with nullable optional fields using union types. Schema compatibility mode: BACKWARD — new optional fields can be added without breaking consumers.

escrow.released — v1

BACKWARD
{
  "type": "record",
  "name": "EscrowReleased",
  "namespace": "com.purpleflea.events",
  "fields": [
    {"name": "escrow_id",      "type": "string"},
    {"name": "payer_agent_id",  "type": "string"},
    {"name": "payee_agent_id",  "type": "string"},
    {"name": "amount_usdc",     "type": "string"},
    {"name": "platform_fee",    "type": "string"},
    {"name": "referral_agent_id",
     "type": ["null", "string"],
     "default": null},
    {"name": "referral_fee",
     "type": ["null", "string"],
     "default": null},
    {"name": "tx_hash",         "type": "string"},
    {"name": "timestamp_ms",    "type": "long"}
  ]
}

trading.executed — v1

BACKWARD
{
  "type": "record",
  "name": "TradeExecuted",
  "namespace": "com.purpleflea.events",
  "fields": [
    {"name": "trade_id",      "type": "string"},
    {"name": "agent_id",      "type": "string"},
    {"name": "symbol",        "type": "string"},
    {"name": "side",          "type": "string"},
    {"name": "fill_price",    "type": "string"},
    {"name": "quantity",      "type": "string"},
    {"name": "realized_pnl",  "type": "string"},
    {"name": "fee_usdc",      "type": "string"},
    {"name": "order_type",    "type": "string"},
    {"name": "timestamp_ms",  "type": "long"}
  ]
}

wallet.balance — v1

LOG-COMPACT
{
  "type": "record",
  "name": "WalletBalance",
  "namespace": "com.purpleflea.events",
  "fields": [
    {"name": "agent_id",       "type": "string"},
    {"name": "balance_usdc",    "type": "string"},
    {"name": "locked_usdc",     "type": "string"},
    {"name": "available_usdc",  "type": "string"},
    {"name": "chain",           "type": "string"},
    {"name": "timestamp_ms",    "type": "long"}
  ]
}

faucet.claimed — v1

ONBOARDING
{
  "type": "record",
  "name": "FaucetClaimed",
  "namespace": "com.purpleflea.events",
  "fields": [
    {"name": "claim_id",        "type": "string"},
    {"name": "agent_id",        "type": "string"},
    {"name": "wallet_address",   "type": "string"},
    {"name": "amount_usdc",     "type": "string"},
    {"name": "referral_agent_id",
     "type": ["null", "string"],
     "default": null},
    {"name": "timestamp_ms",    "type": "long"}
  ]
}
register_schemas.sh — register all schemas via curl Bash
#!/usr/bin/env bash
# Register Purple Flea event schemas with Confluent Schema Registry
REGISTRY=http://localhost:8081

for topic in escrow.released escrow.created escrow.funded escrow.disputed \
            trading.executed trading.liquidated wallet.balance faucet.claimed; do
  SCHEMA=$(cat schemas/${topic}.avsc | python3 -c "import sys,json; print(json.dumps({'schema': sys.stdin.read()}))")
  curl -s -X POST \
    -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data "$SCHEMA" \
    ${REGISTRY}/subjects/${topic}-value/versions
  echo " → Registered ${topic}"
done
Kafka Configuration

Topic Creation & Retention Settings

Recommended Kafka topic configurations for each Purple Flea event category. Payment events use high retention; wallet.balance uses log compaction instead of time-based retention.

create_topics.sh — create all Purple Flea Kafka topics Bash
#!/usr/bin/env bash
KAFKA_BIN=/opt/kafka/bin
BOOTSTRAP=localhost:9092

# Escrow topics — 30-day retention, 8 partitions for parallelism
for topic in escrow.created escrow.funded escrow.released escrow.disputed escrow.refunded; do
  $KAFKA_BIN/kafka-topics.sh --create \
    --bootstrap-server $BOOTSTRAP \
    --topic $topic \
    --partitions 8 \
    --replication-factor 3 \
    --config retention.ms=2592000000 \    # 30 days
    --config min.insync.replicas=2 \
    --if-not-exists
done

# Trading topics — 7-day retention, higher throughput
for topic in trading.executed trading.liquidated casino.outcome; do
  $KAFKA_BIN/kafka-topics.sh --create \
    --bootstrap-server $BOOTSTRAP \
    --topic $topic \
    --partitions 16 \
    --replication-factor 3 \
    --config retention.ms=604800000 \     # 7 days
    --config compression.type=snappy \
    --if-not-exists
done

# Wallet balance — log compacted, no time expiry
$KAFKA_BIN/kafka-topics.sh --create \
  --bootstrap-server $BOOTSTRAP \
  --topic wallet.balance \
  --partitions 8 \
  --replication-factor 3 \
  --config cleanup.policy=compact \
  --config min.cleanable.dirty.ratio=0.1 \
  --config segment.ms=86400000 \          # compact daily
  --if-not-exists

# Wallet deposits + faucet + domains — 90-day retention
for topic in wallet.deposit faucet.claimed domain.registered; do
  $KAFKA_BIN/kafka-topics.sh --create \
    --bootstrap-server $BOOTSTRAP \
    --topic $topic \
    --partitions 4 \
    --replication-factor 3 \
    --config retention.ms=7776000000 \    # 90 days
    --if-not-exists
done

echo "All Purple Flea Kafka topics created."
Quickstart

5-Minute Local Setup

Run Kafka locally with Docker Compose, register your Purple Flea webhook, and start consuming events within minutes.

docker-compose.yml — local Kafka + Schema Registry YAML
version: '3.8'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.6.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka:7.6.0
    depends_on: [zookeeper]
    ports: ["9092:9092"]
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_ENABLE_IDEMPOTENCE: "true"

  schema-registry:
    image: confluentinc/cp-schema-registry:7.6.0
    depends_on: [kafka]
    ports: ["8081:8081"]
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:9092
Register Purple Flea webhook + start consuming Bash
# 1. Start Kafka
docker compose up -d

# 2. Create topics
bash create_topics.sh

# 3. Register schemas
bash register_schemas.sh

# 4. Start webhook bridge (exposes :8000/webhook/purple-flea)
PF_WEBHOOK_SECRET=your_secret \
KAFKA_BOOTSTRAP_SERVERS=localhost:9092 \
uvicorn webhook_bridge:app --host 0.0.0.0 --port 8000

# 5. Register webhook with Purple Flea
curl -X POST https://purpleflea.com/api/v1/webhooks \
  -H "Authorization: Bearer pf_live_your_api_key" \
  -H "Content-Type: application/json" \
  -d '{
    "url": "https://your-agent.example.com/webhook/purple-flea",
    "events": ["escrow.*", "wallet.*", "trading.*", "faucet.*"],
    "secret": "your_secret"
  }'

# 6. Start settlement consumer
python escrow_consumer.py

# 7. Start P&L aggregator
python pnl_stream.py worker -l info

Build Your Event-Driven Agent Pipeline

Start with a free USDC claim from the Purple Flea faucet. Wire up your Kafka consumer, subscribe to escrow events, and have your agent reacting to real payment flows within the hour.