Connect Apache Kafka to Purple Flea's financial layer. Consume escrow lifecycle events, real-time price feeds, wallet notifications, and casino outcomes as Kafka topics — with exactly-once semantics, Schema Registry, and Kafka Streams for P&L aggregation.
Purple Flea services emit webhook events that your Kafka bridge ingests and publishes to topics. Consumer groups downstream handle escrow settlement, trading signals, wallet reconciliation, and real-time P&L — all fully decoupled.
Purple Flea emits webhook events synchronously on every state transition. Your Kafka producer bridge ingests them with near-zero latency, making Kafka the single source of truth for agent financial events.
Configure idempotent producers and transactional consumers to guarantee each payment event is processed exactly once. Critical for escrow settlement — no double-credits, no missed releases.
Use Kafka Streams to join escrow.released and trading.executed events into a real-time P&L stream. Window aggregations compute per-agent profit every 60 seconds without a separate database.
All Purple Flea event types are published as Avro schemas in the Confluent Schema Registry. Consumers deserialize with full type safety and automatic schema evolution support.
Separate consumer groups for your trading agent, reconciliation service, and alert system. Each group maintains its own offset — no event contention, independent scaling.
Enable log compaction on wallet.balance topics. The latest balance per agent ID is always available for instant agent bootstrap — no need to replay full history on restart.
Twelve core topics cover the full Purple Flea event surface. Topics follow the
domain.event_type
naming convention and are partitioned by agent ID for ordering guarantees per agent.
Fired when two agents initiate a new escrow agreement. Contains escrow ID, payer agent, payee agent, amount in USDC, and expiry timestamp.
Emitted when the payer agent transfers USDC into the escrow contract. Includes transaction hash, block number, and confirmed amount.
Published on successful release to payee. Includes gross amount, 1% platform fee deducted, 15% referral fee if applicable, and net transfer to payee.
Triggered when either agent raises a dispute. Escrow enters frozen state pending arbitration. Contains dispute reason, raising agent, and evidence hash.
Emitted when an escrow is cancelled or dispute resolves in favor of payer. Full amount returned; fees not charged. Safe to use as rollback signal.
Log-compacted topic. Every USDC balance change for every agent wallet. Latest record per agent ID gives current balance without full replay.
Fired on every confirmed USDC deposit to an agent wallet. Includes source address, amount, transaction hash, and block confirmation count.
Published for every filled order via the Purple Flea Trading API. Contains symbol, side, fill price, quantity, fees, and realized P&L for closing trades.
High-priority alert topic. Emitted when a leveraged position is forcibly closed below margin threshold. Partitioned by agent ID, priority consumer group.
Emitted when a new agent claims free USDC via the Purple Flea faucet. Includes agent ID, claimed amount, and wallet address. Useful for onboarding pipelines.
Published after every casino game resolves. Contains game type (crash, coin-flip, dice), agent bet, multiplier, result, and net USDC change.
Fired on every agent domain purchase via the Purple Flea Domains API. Contains domain string, buyer agent ID, price paid, and expiry date.
Each agent service should operate under its own consumer group. This ensures independent offset tracking, fault isolation, and allows services to be scaled horizontally without event duplication.
| Consumer Group | Topics Subscribed | Purpose | Recommended Instances |
|---|---|---|---|
pf-settlement-cg |
escrow.released, escrow.refunded |
Final settlement — write to ledger, update agent balances, trigger payouts | 1 (idempotent) |
pf-dispute-cg |
escrow.disputed |
Arbitration queue — alert human reviewers or automated resolution agents | 2–3 |
pf-trading-signal-cg |
trading.executed, trading.liquidated |
Risk management — track open risk, compute real-time P&L, trigger hedges | 4–8 (partition count) |
pf-wallet-reconcile-cg |
wallet.balance, wallet.deposit |
Accounting — reconcile agent wallet state against on-chain data nightly | 1–2 |
pf-alert-cg |
All topics | Monitoring — detect anomalies, large bets, unusual escrow patterns, send alerts | 2 |
pf-onboarding-cg |
faucet.claimed, escrow.created |
Onboarding funnel — track new agents, send welcome flows, measure activation | 1 |
The following code shows a production-ready pattern: a FastAPI webhook endpoint that
receives Purple Flea events and publishes them to Kafka, plus a consumer that processes
escrow.released
events with exactly-once semantics.
# Purple Flea + Kafka integration dependencies
confluent-kafka==2.4.0
confluent-kafka[avro]==2.4.0
fastavro==1.9.4
fastapi==0.115.0
uvicorn==0.30.1
httpx==0.27.0
python-dotenv==1.0.1
""" Purple Flea → Kafka bridge. Receives Purple Flea webhook events and publishes them to the appropriate Kafka topic with schema validation. """ import os import json import hashlib import hmac from fastapi import FastAPI, Request, HTTPException, Header from confluent_kafka import Producer, KafkaException from confluent_kafka.schema_registry import SchemaRegistryClient from confluent_kafka.schema_registry.avro import AvroSerializer from confluent_kafka.serialization import SerializationContext, MessageField from dotenv import load_dotenv import logging load_dotenv() logger = logging.getLogger("pf-bridge") app = FastAPI(title="Purple Flea Kafka Bridge") # Kafka producer with idempotency enabled PRODUCER_CONFIG = { "bootstrap.servers": os.getenv("KAFKA_BOOTSTRAP_SERVERS", "localhost:9092"), "enable.idempotence": "true", # exactly-once at producer level "acks": "all", # wait for all replicas "max.in.flight.requests.per.connection": "5", "retries": "2147483647", "compression.type": "snappy", "linger.ms": "5", "batch.size": "65536", } producer = Producer(PRODUCER_CONFIG) SCHEMA_REGISTRY_CONF = { "url": os.getenv("SCHEMA_REGISTRY_URL", "http://localhost:8081") } schema_registry_client = SchemaRegistryClient(SCHEMA_REGISTRY_CONF) # Map Purple Flea event types to Kafka topics EVENT_TOPIC_MAP = { "escrow.created": "escrow.created", "escrow.funded": "escrow.funded", "escrow.released": "escrow.released", "escrow.disputed": "escrow.disputed", "escrow.refunded": "escrow.refunded", "wallet.balance": "wallet.balance", "wallet.deposit": "wallet.deposit", "trading.executed": "trading.executed", "trading.liquidated": "trading.liquidated", "faucet.claimed": "faucet.claimed", "casino.outcome": "casino.outcome", "domain.registered": "domain.registered", } PF_WEBHOOK_SECRET = os.getenv("PF_WEBHOOK_SECRET", "") def verify_signature(payload: bytes, signature: str) -> bool: """Verify HMAC-SHA256 signature from Purple Flea webhook.""" expected = hmac.new( PF_WEBHOOK_SECRET.encode(), payload, hashlib.sha256 ).hexdigest() return hmac.compare_digest(f"sha256={expected}", signature) def delivery_report(err, msg): if err: logger.error("Kafka delivery failed: %s", err) else: logger.debug( "Delivered to %s [%d] offset %d", msg.topic(), msg.partition(), msg.offset() ) @app.post("/webhook/purple-flea") async def receive_webhook( request: Request, x_pf_signature: str = Header(None, alias="X-PF-Signature") ): body = await request.body() # Verify webhook authenticity if PF_WEBHOOK_SECRET and not verify_signature(body, x_pf_signature or ""): raise HTTPException(status_code=401, detail="Invalid webhook signature") event = json.loads(body) event_type = event.get("type") topic = EVENT_TOPIC_MAP.get(event_type) if not topic: logger.warning("Unknown event type: %s", event_type) return {"status": "ignored", "event_type": event_type} # Use agent_id as partition key for ordering guarantees partition_key = ( event.get("data", {}).get("agent_id") or event.get("data", {}).get("payer_agent_id") or event.get("id", "unknown") ) producer.produce( topic=topic, key=partition_key.encode(), value=json.dumps(event).encode(), on_delivery=delivery_report, ) producer.poll(0) # trigger delivery callbacks without blocking return {"status": "published", "topic": topic, "key": partition_key} @app.on_event("shutdown") def shutdown(): producer.flush(timeout=10) logger.info("Producer flushed on shutdown.")
""" Exactly-once consumer for escrow.released events. Uses Kafka transactions to ensure settlement is written to the ledger database exactly once, even on crash/restart. """ import os import json import time import logging from confluent_kafka import Consumer, KafkaException, KafkaError from decimal import Decimal logger = logging.getLogger("pf-settlement") CONSUMER_CONFIG = { "bootstrap.servers": os.getenv("KAFKA_BOOTSTRAP_SERVERS", "localhost:9092"), "group.id": "pf-settlement-cg", "auto.offset.reset": "earliest", "enable.auto.commit": "false", # manual commit for EOS "isolation.level": "read_committed", # only read committed messages "max.poll.interval.ms": "300000", "session.timeout.ms": "45000", "heartbeat.interval.ms": "15000", } class EscrowSettlementConsumer: def __init__(self, ledger_client): self.consumer = Consumer(CONSUMER_CONFIG) self.ledger = ledger_client self.running = True def start(self): self.consumer.subscribe(["escrow.released", "escrow.refunded"]) logger.info("Settlement consumer started. Awaiting escrow events...") while self.running: msgs = self.consumer.consume(num_messages=50, timeout=1.0) if not msgs: continue for msg in msgs: if msg.error(): if msg.error().code() == KafkaError._PARTITION_EOF: continue raise KafkaException(msg.error()) try: self._process(msg) except Exception as e: logger.error("Failed to process offset %d: %s", msg.offset(), e) # Do NOT commit — message will be reprocessed after rebalance raise # Commit only after all messages in batch processed successfully self.consumer.commit(asynchronous=False) def _process(self, msg): event = json.loads(msg.value().decode()) data = event["data"] topic = msg.topic() if topic == "escrow.released": self._handle_released(data) elif topic == "escrow.refunded": self._handle_refunded(data) def _handle_released(self, data): escrow_id = data["escrow_id"] gross = Decimal(str(data["amount_usdc"])) platform_fee = gross * Decimal("0.01") # 1% Purple Flea fee referral_fee = Decimal("0") if data.get("referral_agent_id"): referral_fee = platform_fee * Decimal("0.15") # 15% of platform fee net_to_payee = gross - platform_fee logger.info( "Settling escrow %s: gross=%.4f fee=%.4f referral=%.4f net=%.4f", escrow_id, gross, platform_fee, referral_fee, net_to_payee ) # Idempotent upsert — safe to retry on crash self.ledger.upsert_settlement( escrow_id=escrow_id, payee_agent_id=data["payee_agent_id"], net_amount_usdc=str(net_to_payee), platform_fee_usdc=str(platform_fee), referral_agent_id=data.get("referral_agent_id"), referral_fee_usdc=str(referral_fee), tx_hash=data.get("tx_hash"), ) def _handle_refunded(self, data): escrow_id = data["escrow_id"] amount = Decimal(str(data["amount_usdc"])) logger.info("Refunding escrow %s: %.4f USDC to payer", escrow_id, amount) self.ledger.upsert_refund( escrow_id=escrow_id, payer_agent_id=data["payer_agent_id"], refund_amount_usdc=str(amount), ) def stop(self): self.running = False self.consumer.close() logger.info("Settlement consumer stopped cleanly.")
Join escrow.released
and trading.executed
into a unified P&L stream. Window aggregations roll up per-agent profit every 60 seconds.
The following Python Faust example runs as a lightweight Kafka Streams alternative.
""" Real-time per-agent P&L aggregator using Faust (Python Kafka Streams). Aggregates escrow income and trading P&L into 60-second tumbling windows. """ import faust from decimal import Decimal from dataclasses import dataclass, field from datetime import timedelta app = faust.App( "pf-pnl-aggregator", broker="kafka://localhost:9092", store="rocksdb://", # persistent local state store processing_guarantee="exactly_once", version=1, ) # Input event schemas class EscrowReleasedEvent(faust.Record): escrow_id: str payee_agent_id: str amount_usdc: str platform_fee_usdc: str timestamp: int class TradeExecutedEvent(faust.Record): trade_id: str agent_id: str symbol: str realized_pnl: str # signed — positive = profit fee_usdc: str timestamp: int # Source topics escrow_topic = app.topic("escrow.released", value_type=EscrowReleasedEvent) trade_topic = app.topic("trading.executed", value_type=TradeExecutedEvent) # Windowed aggregation table: agent_id → rolling 60s P&L pnl_table = app.Table( "agent_pnl_60s", default=Decimal, partitions=8, ).tumbling(timedelta(seconds=60), expires=timedelta(hours=24)) # Output topic for downstream consumers (dashboards, alerts) pnl_output = app.topic("agent.pnl_snapshot", value_serializer="json") @app.agent(escrow_topic) async def process_escrow_income(events): async for event in events: agent_id = event.payee_agent_id net_income = Decimal(event.amount_usdc) - Decimal(event.platform_fee_usdc) pnl_table[agent_id] += net_income await pnl_output.send( key=agent_id, value={ "agent_id": agent_id, "source": "escrow", "delta_usdc": str(net_income), "window_pnl": str(pnl_table[agent_id].current()), } ) @app.agent(trade_topic) async def process_trade_pnl(events): async for event in events: agent_id = event.agent_id net_pnl = Decimal(event.realized_pnl) - Decimal(event.fee_usdc) pnl_table[agent_id] += net_pnl await pnl_output.send( key=agent_id, value={ "agent_id": agent_id, "source": "trading", "delta_usdc": str(net_pnl), "window_pnl": str(pnl_table[agent_id].current()), } ) if __name__ == "__main__": app.main()
Register these schemas in your Confluent Schema Registry. All Purple Flea event schemas follow Avro format with nullable optional fields using union types. Schema compatibility mode: BACKWARD — new optional fields can be added without breaking consumers.
{
"type": "record",
"name": "EscrowReleased",
"namespace": "com.purpleflea.events",
"fields": [
{"name": "escrow_id", "type": "string"},
{"name": "payer_agent_id", "type": "string"},
{"name": "payee_agent_id", "type": "string"},
{"name": "amount_usdc", "type": "string"},
{"name": "platform_fee", "type": "string"},
{"name": "referral_agent_id",
"type": ["null", "string"],
"default": null},
{"name": "referral_fee",
"type": ["null", "string"],
"default": null},
{"name": "tx_hash", "type": "string"},
{"name": "timestamp_ms", "type": "long"}
]
}
{
"type": "record",
"name": "TradeExecuted",
"namespace": "com.purpleflea.events",
"fields": [
{"name": "trade_id", "type": "string"},
{"name": "agent_id", "type": "string"},
{"name": "symbol", "type": "string"},
{"name": "side", "type": "string"},
{"name": "fill_price", "type": "string"},
{"name": "quantity", "type": "string"},
{"name": "realized_pnl", "type": "string"},
{"name": "fee_usdc", "type": "string"},
{"name": "order_type", "type": "string"},
{"name": "timestamp_ms", "type": "long"}
]
}
{
"type": "record",
"name": "WalletBalance",
"namespace": "com.purpleflea.events",
"fields": [
{"name": "agent_id", "type": "string"},
{"name": "balance_usdc", "type": "string"},
{"name": "locked_usdc", "type": "string"},
{"name": "available_usdc", "type": "string"},
{"name": "chain", "type": "string"},
{"name": "timestamp_ms", "type": "long"}
]
}
{
"type": "record",
"name": "FaucetClaimed",
"namespace": "com.purpleflea.events",
"fields": [
{"name": "claim_id", "type": "string"},
{"name": "agent_id", "type": "string"},
{"name": "wallet_address", "type": "string"},
{"name": "amount_usdc", "type": "string"},
{"name": "referral_agent_id",
"type": ["null", "string"],
"default": null},
{"name": "timestamp_ms", "type": "long"}
]
}
#!/usr/bin/env bash # Register Purple Flea event schemas with Confluent Schema Registry REGISTRY=http://localhost:8081 for topic in escrow.released escrow.created escrow.funded escrow.disputed \ trading.executed trading.liquidated wallet.balance faucet.claimed; do SCHEMA=$(cat schemas/${topic}.avsc | python3 -c "import sys,json; print(json.dumps({'schema': sys.stdin.read()}))") curl -s -X POST \ -H "Content-Type: application/vnd.schemaregistry.v1+json" \ --data "$SCHEMA" \ ${REGISTRY}/subjects/${topic}-value/versions echo " → Registered ${topic}" done
Recommended Kafka topic configurations for each Purple Flea event category. Payment events use high retention; wallet.balance uses log compaction instead of time-based retention.
#!/usr/bin/env bash KAFKA_BIN=/opt/kafka/bin BOOTSTRAP=localhost:9092 # Escrow topics — 30-day retention, 8 partitions for parallelism for topic in escrow.created escrow.funded escrow.released escrow.disputed escrow.refunded; do $KAFKA_BIN/kafka-topics.sh --create \ --bootstrap-server $BOOTSTRAP \ --topic $topic \ --partitions 8 \ --replication-factor 3 \ --config retention.ms=2592000000 \ # 30 days --config min.insync.replicas=2 \ --if-not-exists done # Trading topics — 7-day retention, higher throughput for topic in trading.executed trading.liquidated casino.outcome; do $KAFKA_BIN/kafka-topics.sh --create \ --bootstrap-server $BOOTSTRAP \ --topic $topic \ --partitions 16 \ --replication-factor 3 \ --config retention.ms=604800000 \ # 7 days --config compression.type=snappy \ --if-not-exists done # Wallet balance — log compacted, no time expiry $KAFKA_BIN/kafka-topics.sh --create \ --bootstrap-server $BOOTSTRAP \ --topic wallet.balance \ --partitions 8 \ --replication-factor 3 \ --config cleanup.policy=compact \ --config min.cleanable.dirty.ratio=0.1 \ --config segment.ms=86400000 \ # compact daily --if-not-exists # Wallet deposits + faucet + domains — 90-day retention for topic in wallet.deposit faucet.claimed domain.registered; do $KAFKA_BIN/kafka-topics.sh --create \ --bootstrap-server $BOOTSTRAP \ --topic $topic \ --partitions 4 \ --replication-factor 3 \ --config retention.ms=7776000000 \ # 90 days --if-not-exists done echo "All Purple Flea Kafka topics created."
Run Kafka locally with Docker Compose, register your Purple Flea webhook, and start consuming events within minutes.
version: '3.8' services: zookeeper: image: confluentinc/cp-zookeeper:7.6.0 environment: ZOOKEEPER_CLIENT_PORT: 2181 kafka: image: confluentinc/cp-kafka:7.6.0 depends_on: [zookeeper] ports: ["9092:9092"] environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_ENABLE_IDEMPOTENCE: "true" schema-registry: image: confluentinc/cp-schema-registry:7.6.0 depends_on: [kafka] ports: ["8081:8081"] environment: SCHEMA_REGISTRY_HOST_NAME: schema-registry SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:9092
# 1. Start Kafka docker compose up -d # 2. Create topics bash create_topics.sh # 3. Register schemas bash register_schemas.sh # 4. Start webhook bridge (exposes :8000/webhook/purple-flea) PF_WEBHOOK_SECRET=your_secret \ KAFKA_BOOTSTRAP_SERVERS=localhost:9092 \ uvicorn webhook_bridge:app --host 0.0.0.0 --port 8000 # 5. Register webhook with Purple Flea curl -X POST https://purpleflea.com/api/v1/webhooks \ -H "Authorization: Bearer pf_live_your_api_key" \ -H "Content-Type: application/json" \ -d '{ "url": "https://your-agent.example.com/webhook/purple-flea", "events": ["escrow.*", "wallet.*", "trading.*", "faucet.*"], "secret": "your_secret" }' # 6. Start settlement consumer python escrow_consumer.py # 7. Start P&L aggregator python pnl_stream.py worker -l info