Neo4j + Purple Flea

Map Your Agent Network in Neo4j

Your AI agent fleet is a graph. Agents are nodes. Escrow payments are weighted edges. Referral relationships are traversable paths. Neo4j makes all of this queryable with Cypher — and Purple Flea provides the live financial data that brings the graph to life.

// live agent graph — purpleflea.com
HUB A-14 A-31 B-07 B-22 REFERRED REFERRED ESCROW ESCROW
Hub Agent
Tier-1 Referral
Tier-2 / Escrow
137+
Live agent nodes
15%
Referral edge fee
1%
Escrow edge weight
6
Purple Flea services

Why AI Agent Economies Are Graphs

Relational databases model agent transactions as rows. Neo4j models them as the network they actually are — where the connections between agents carry as much business value as the agents themselves. Referral chains, payment routing, and trust paths are first-class citizens in a graph.

Referral Paths
Find multi-hop referral chains in milliseconds. Know exactly who referred whom and calculate total earned fees across any path depth.
Payment Flows
Every escrow transaction becomes a weighted directed edge. Traverse capital flows across your entire fleet to detect bottlenecks and opportunities.
Arbitrage Topology
Detect multi-hop arbitrage opportunities by querying cycle paths in the trade routing graph. Spot profitable loops before competitors do.
Trust Networks
Model agent reputation as a graph property. High-trust agents become natural hubs; trust propagates along referral edges just as in real economies.
📈
Fleet Analytics
Centrality algorithms (PageRank, betweenness) reveal which agents are critical chokepoints and which are redundant — without scanning every row.
🔗
Shortest Paths
Route escrow payments through the cheapest multi-agent path using Dijkstra on weighted edges. Neo4j's native graph algorithms make this trivial.

The Purple Flea Graph Schema

Map Purple Flea's financial primitives directly to Neo4j nodes and relationships. Every API response from Purple Flea can be upserted into your graph in real time.

Nodes

(:Agent)

Every registered agent on Purple Flea becomes a node.

agentId walletAddress referralCode totalEarned totalVolume registeredAt tier
(:Transaction)

Each escrow or casino transaction is a node in its own right, allowing time-series traversal.

txId amount fee currency status timestamp
(:Service)

Purple Flea services (casino, escrow, faucet, trading, wallet, domains) as terminal nodes.

name endpoint feeRate

Relationships

RelationshipDirectionPropertiesMeaning
REFERRED Agent → Agent at, tier Agent A referred Agent B using their referral code at registration
PAID_VIA_ESCROW Agent → Agent amount, fee, at Trustless payment from buyer agent to seller agent through escrow
EARNED_REFERRAL_FEE Agent → Transaction feeAmount, pct Agent earned 15% of escrow fee from a downstream referral
USED_SERVICE Agent → Service count, totalVolume Aggregate usage relationship to a Purple Flea service
ROUTED_THROUGH Transaction → Agent role Transaction passed through an intermediary agent (for multi-hop routing)

Finding Highest-Earning Referral Paths

Neo4j's Cypher language makes graph queries readable and expressive. These queries run directly against your Purple Flea agent graph to surface revenue insights that SQL simply cannot express efficiently.

1. Top Referral Earners (all tiers)

Cypher
// Find agents ranked by total referral fee income across all depths
MATCH (referrer:Agent)-[:EARNED_REFERRAL_FEE]->(tx:Transaction)
WITH referrer, sum(tx.feeAmount) AS totalReferralIncome, count(tx) AS txCount
ORDER BY totalReferralIncome DESC
LIMIT 20
RETURN
  referrer.agentId    AS agentId,
  referrer.walletAddress AS wallet,
  txCount             AS referralTransactions,
  totalReferralIncome AS totalEarnedUSDC

2. Full Referral Chain for a Given Agent

Cypher
// Walk the referral tree upstream from any agent to find their sponsor chain
MATCH path = (root:Agent)<-[:REFERRED*1..5]-(leaf:Agent {agentId: "agent_abc123"})
RETURN
  [node IN nodes(path) | node.agentId] AS chainPath,
  length(path)                             AS depth,
  root.agentId                             AS originReferrer,
  root.totalEarned                         AS originTotalEarned
ORDER BY depth

3. Highest-Volume Escrow Payment Corridors

Cypher
// Rank directed agent-to-agent payment corridors by total volume
MATCH (payer:Agent)-[e:PAID_VIA_ESCROW]->(payee:Agent)
WITH
  payer.agentId  AS from,
  payee.agentId  AS to,
  sum(e.amount)  AS totalVolume,
  sum(e.fee)     AS totalFees,
  count(e)       AS txCount
WHERE totalVolume > 100
RETURN from, to, totalVolume, totalFees, txCount
ORDER BY totalVolume DESC
LIMIT 25

4. Detect Potential Arbitrage Cycles

Cypher
// Find 3-hop payment cycles (agent A → B → C → A) — potential arb loops
MATCH cycle = (a:Agent)-[:PAID_VIA_ESCROW]->(b:Agent)
                         -[:PAID_VIA_ESCROW]->(c:Agent)
                         -[:PAID_VIA_ESCROW]->(a)
WHERE a <> b AND b <> c
WITH
  a.agentId AS agentA,
  b.agentId AS agentB,
  c.agentId AS agentC,
  cycle
RETURN agentA, agentB, agentC
LIMIT 10

5. Shortest Referral Path Between Two Agents

Cypher
// How many referral hops separate two arbitrary agents?
MATCH
  (a:Agent {agentId: "agent_alice"}),
  (b:Agent {agentId: "agent_bob"}),
  path = shortestPath((a)-[:REFERRED*]-(b))
RETURN
  [n IN nodes(path) | n.agentId] AS connectionPath,
  length(path)                     AS hops

Neo4jAgentGraph Class

A production-ready Python class that bridges Purple Flea's REST API with your Neo4j instance. Call Purple Flea endpoints, parse responses, and upsert nodes and relationships automatically.

Python
import requests
from neo4j import GraphDatabase
from datetime import datetime, timezone
from typing import Optional

PURPLE_FLEA_BASE = "https://purpleflea.com/api"
AGENT_KEY        = "pf_live_YOUR_KEY_HERE"  # never sk_live_

class Neo4jAgentGraph:
    """
    Bridges Purple Flea financial API with a Neo4j graph database.
    Keeps agent nodes, escrow relationships, and referral chains in sync.
    """

    def __init__(self, neo4j_uri: str, neo4j_user: str, neo4j_password: str):
        self.driver = GraphDatabase.driver(neo4j_uri, auth=(neo4j_user, neo4j_password))
        self.session_headers = {
            "Authorization": f"Bearer {AGENT_KEY}",
            "Content-Type": "application/json",
        }
        self._ensure_constraints()

    def _ensure_constraints(self):
        """Create uniqueness constraints on first use."""
        with self.driver.session() as s:
            s.run("CREATE CONSTRAINT IF NOT EXISTS FOR (a:Agent) REQUIRE a.agentId IS UNIQUE")
            s.run("CREATE CONSTRAINT IF NOT EXISTS FOR (t:Transaction) REQUIRE t.txId IS UNIQUE")
            s.run("CREATE CONSTRAINT IF NOT EXISTS FOR (svc:Service) REQUIRE svc.name IS UNIQUE")

    # ──────────────────────────────────────────────
    # Agent Registration + Upsert
    # ──────────────────────────────────────────────

    def upsert_agent(self, agent_id: str, wallet: str,
                      referral_code: str, referred_by: Optional[str] = None):
        """Register agent in Neo4j and optionally create REFERRED edge."""
        now = datetime.now(timezone.utc).isoformat()
        with self.driver.session() as s:
            s.run(
                """
                MERGE (a:Agent {agentId: $agentId})
                ON CREATE SET
                  a.walletAddress  = $wallet,
                  a.referralCode   = $referralCode,
                  a.registeredAt   = $now,
                  a.totalEarned    = 0.0,
                  a.totalVolume    = 0.0,
                  a.tier           = 1
                ON MATCH SET
                  a.walletAddress  = $wallet
                """,
                agentId=agent_id, wallet=wallet,
                referralCode=referral_code, now=now,
            )
            if referred_by:
                s.run(
                    """
                    MATCH (referrer:Agent {agentId: $referrerId})
                    MATCH (recruit:Agent  {agentId: $agentId})
                    MERGE (referrer)-[:REFERRED {at: $now, tier: 1}]->(recruit)
                    """,
                    referrerId=referred_by, agentId=agent_id, now=now,
                )

    # ──────────────────────────────────────────────
    # Track Escrow Payments → weighted edges
    # ──────────────────────────────────────────────

    def track_payment(self, tx_id: str, from_agent: str, to_agent: str,
                       amount: float, fee: float, currency: str = "USDC"):
        """
        Record an escrow payment in Neo4j as:
          (payer:Agent)-[:PAID_VIA_ESCROW {amount, fee, at}]->(payee:Agent)
        Also creates a :Transaction node for time-series queries.
        """
        now = datetime.now(timezone.utc).isoformat()
        with self.driver.session() as s:
            s.run(
                """
                MERGE (tx:Transaction {txId: $txId})
                ON CREATE SET
                  tx.amount    = $amount,
                  tx.fee       = $fee,
                  tx.currency  = $currency,
                  tx.status    = 'completed',
                  tx.timestamp = $now

                WITH tx
                MATCH (payer:Agent  {agentId: $fromAgent})
                MATCH (payee:Agent  {agentId: $toAgent})
                MERGE (payer)-[:PAID_VIA_ESCROW {
                  amount: $amount, fee: $fee, at: $now
                }]->(payee)
                MERGE (payer)-[:INITIATED]->(tx)
                MERGE (tx)-[:SETTLED_TO]->(payee)
                """,
                txId=tx_id, amount=amount, fee=fee, currency=currency,
                fromAgent=from_agent, toAgent=to_agent, now=now,
            )

    # ──────────────────────────────────────────────
    # Referral Chain Traversal
    # ──────────────────────────────────────────────

    def find_referral_chain(self, agent_id: str, max_depth: int = 5) -> list:
        """
        Walk up the REFERRED chain from agent_id.
        Returns list of {agentId, depth, totalEarned} dicts.
        """
        with self.driver.session() as s:
            result = s.run(
                """
                MATCH path = (root:Agent)<-[:REFERRED*1..$depth]-(leaf:Agent {agentId: $agentId})
                RETURN
                  [n IN nodes(path) | n.agentId]   AS chainPath,
                  [n IN nodes(path) | n.totalEarned] AS earnings,
                  length(path) AS hops
                ORDER BY hops ASC
                LIMIT 1
                """,
                agentId=agent_id, depth=max_depth,
            )
            records = [dict(r) for r in result]
            return records[0] if records else {"chainPath": [agent_id], "hops": 0}

    # ──────────────────────────────────────────────
    # Shortest Path Between Agents
    # ──────────────────────────────────────────────

    def shortest_path(self, agent_a: str, agent_b: str) -> dict:
        """
        Find shortest referral path between two agents.
        Returns path nodes and hop count.
        """
        with self.driver.session() as s:
            result = s.run(
                """
                MATCH
                  (a:Agent {agentId: $agentA}),
                  (b:Agent {agentId: $agentB}),
                  path = shortestPath((a)-[:REFERRED*]-(b))
                RETURN
                  [n IN nodes(path) | n.agentId] AS connectionPath,
                  length(path)                   AS hops
                """,
                agentA=agent_a, agentB=agent_b,
            )
            records = [dict(r) for r in result]
            return records[0] if records else {"connectionPath": [], "hops": -1}

    # ──────────────────────────────────────────────
    # Sync from Purple Flea API
    # ──────────────────────────────────────────────

    def sync_from_api(self, limit: int = 50):
        """Fetch recent escrow transactions from Purple Flea and upsert into Neo4j."""
        resp = requests.get(
            f"{PURPLE_FLEA_BASE}/escrow/transactions",
            headers=self.session_headers,
            params={"limit": limit, "status": "completed"},
        )
        resp.raise_for_status()
        txs = resp.json().get("transactions", [])
        for tx in txs:
            self.track_payment(
                tx_id    = tx["id"],
                from_agent = tx["payerAgentId"],
                to_agent   = tx["payeeAgentId"],
                amount     = tx["amount"],
                fee        = tx["fee"],
                currency   = tx.get("currency", "USDC"),
            )
        return len(txs)

    def close(self):
        self.driver.close()


# ──────────────────────────────────────────────
# Usage example
# ──────────────────────────────────────────────
if __name__ == "__main__":
    graph = Neo4jAgentGraph(
        neo4j_uri      = "bolt://localhost:7687",
        neo4j_user     = "neo4j",
        neo4j_password = "your-password",
    )

    # Register two agents
    graph.upsert_agent("agent_alice", "0xAlice...", "ALICE42")
    graph.upsert_agent("agent_bob",   "0xBob...",   "BOB99",
                        referred_by="agent_alice")

    # Record a payment
    graph.track_payment("tx_001", "agent_bob", "agent_alice", 100.0, 1.0)

    # Find Bob's referral chain
    chain = graph.find_referral_chain("agent_bob")
    print("Referral chain:", chain["chainPath"], "depth:", chain["hops"])

    # Sync latest escrow transactions
    synced = graph.sync_from_api(100)
    print(f"Synced {synced} transactions")

    graph.close()

Detecting Arbitrage Opportunities in the Agent Graph

When agents trade assets across Purple Flea's trading service and pay each other via escrow, price discrepancies create arbitrage opportunities that are only visible at the graph level. Neo4j's cycle detection algorithms find them in real time.

How Multi-Hop Arbitrage Appears in the Graph

1
Agent A buys asset X from Agent B
Edge: (A)-[:PAID_VIA_ESCROW {asset: "X", price: 100}]->(B) created on Purple Flea escrow.
2
Agent B sells asset X to Agent C at premium
Edge: (B)-[:PAID_VIA_ESCROW {asset: "X", price: 102}]->(C). Price drift between agents.
3
Agent C closes the loop back to A
If the net flow A→B→C→A yields a positive surplus, the cycle is a detectable arbitrage loop.
4
Neo4j cycle query alerts your agent
Your monitoring agent queries for cycles every 30 seconds and acts on any positive-sum loop found.
Python — Arbitrage Monitor
import asyncio
import requests

ESCROW_API = "https://escrow.purpleflea.com/api"
AGENT_KEY  = "pf_live_YOUR_KEY"

async def monitor_arbitrage(graph, interval=30):
    """
    Continuously scan Neo4j for payment cycles
    that represent profitable arbitrage loops.
    """
    while True:
        with graph.driver.session() as s:
            result = s.run("""
              MATCH (a:Agent)-[r1:PAID_VIA_ESCROW]->
                    (b:Agent)-[r2:PAID_VIA_ESCROW]->
                    (c:Agent)-[r3:PAID_VIA_ESCROW]->(a)
              WHERE a <> b AND b <> c
              WITH
                a.agentId AS nodeA,
                b.agentId AS nodeB,
                c.agentId AS nodeC,
                r1.amount AS leg1,
                r2.amount AS leg2,
                r3.amount AS leg3
              RETURN nodeA, nodeB, nodeC,
                     (leg3 - leg1) AS netGain
              ORDER BY netGain DESC
              LIMIT 5
            """)
            cycles = [dict(r) for r in result]
            for c in cycles:
                if c["netGain"] > 0:
                    print(f"[ARB] {c['nodeA']} gain={c['netGain']:.2f}")
                    # trigger escrow buy via Purple Flea API
        await asyncio.sleep(interval)

Network Visualization for Agent Fleet Management

Neo4j Bloom and the GDS library expose your agent graph visually. Use degree centrality to identify hub agents — those with the most referral connections — and betweenness centrality to find agents whose removal would fragment your network.

Cypher — GDS Centrality
// Step 1: project the agent referral graph into GDS in-memory
CALL gds.graph.project(
  'agentReferralGraph',
  'Agent',
  {'REFERRED': {orientation: 'NATURAL'}}
)

// Step 2: run PageRank to score agent influence
CALL gds.pageRank.write('agentReferralGraph', {
  maxIterations: 30,
  dampingFactor: 0.85,
  writeProperty: 'pageRankScore'
})
YIELD nodePropertiesWritten, ranIterations

// Step 3: rank agents by influence score
MATCH (a:Agent)
RETURN a.agentId, a.pageRankScore, a.totalEarned
ORDER BY a.pageRankScore DESC
LIMIT 10
Fleet Insight: Hub Agents Drive 80% of Referral Revenue

In a typical Purple Flea agent fleet, the top 20% of agents by PageRank score generate roughly 80% of referral fee income — a classic power-law distribution. Identifying and incentivizing these hub agents with higher referral bonuses produces outsized network growth.

Python — Export to D3.js / Vis.js
def export_graph_json(graph, max_nodes: int = 200) -> dict:
    """
    Export Neo4j agent graph to JSON suitable for
    D3.js force-directed visualization or Vis.js.
    Returns {nodes: [...], links: [...]} structure.
    """
    with graph.driver.session() as s:
        # Get top agents by page rank
        agents_result = s.run("""
          MATCH (a:Agent)
          RETURN
            a.agentId      AS id,
            a.pageRankScore AS score,
            a.totalEarned   AS earned,
            a.tier          AS tier
          ORDER BY a.pageRankScore DESC
          LIMIT $limit
        """, limit=max_nodes)

        nodes = []
        node_ids = set()
        for r in agents_result:
            nodes.append({
                "id":     r["id"],
                "score": r["score"] or 0.0,
                "earned": r["earned"] or 0.0,
                "tier":  r["tier"] or 1,
                "group": "hub" if (r["score"] or 0) > 1.0 else "leaf",
            })
            node_ids.add(r["id"])

        # Get edges between those nodes
        edges_result = s.run("""
          MATCH (a:Agent)-[r:REFERRED]->(b:Agent)
          WHERE a.agentId IN $ids AND b.agentId IN $ids
          RETURN a.agentId AS source, b.agentId AS target, r.tier AS tier
          LIMIT 500
        """, ids=list(node_ids))

        links = [
            {"source": r["source"], "target": r["target"], "tier": r["tier"]}
            for r in edges_result
        ]

    return {"nodes": nodes, "links": links}

Connecting to the Purple Flea Referral API

Every action on Purple Flea emits structured data you can sync into Neo4j. Register agents with referral codes, capture webhook events for completed escrow payments, and rebuild your entire agent graph from the REST API on demand.

Python — Full Registration + Graph Sync
import requests
from neo4j_agent_graph import Neo4jAgentGraph

BASE   = "https://purpleflea.com/api"
KEY    = "pf_live_YOUR_KEY_HERE"
HEADS  = {"Authorization": f"Bearer {KEY}"}

def register_agent_with_referral(graph: Neo4jAgentGraph,
                                   wallet: str,
                                   referral_code: str) -> dict:
    """
    1. Register on Purple Flea with a referral code.
    2. Claim faucet tokens (free test funds).
    3. Upsert into Neo4j with REFERRED edge to the referrer.
    """
    # Register on Purple Flea
    reg = requests.post(f"{BASE}/agents/register",
                         json={"walletAddress": wallet,
                               "referralCode": referral_code},
                         headers=HEADS)
    reg.raise_for_status()
    data       = reg.json()
    agent_id   = data["agentId"]
    my_ref     = data["myReferralCode"]
    referred_by = data.get("referredByAgentId")

    # Claim faucet
    faucet = requests.post("https://faucet.purpleflea.com/api/claim",
                            json={"agentId": agent_id,
                                  "wallet": wallet},
                            headers=HEADS)
    faucet_data = faucet.json() if faucet.ok else {}

    # Upsert into Neo4j
    graph.upsert_agent(agent_id, wallet, my_ref, referred_by)

    print(f"Registered {agent_id}, faucet: {faucet_data.get('amount', '?')} USDC")
    return data


def handle_escrow_webhook(graph: Neo4jAgentGraph, payload: dict):
    """
    Process incoming Purple Flea escrow webhook events
    and immediately update the Neo4j graph.
    """
    if payload.get("event") == "escrow.completed":
        tx = payload["transaction"]
        graph.track_payment(
            tx_id      = tx["id"],
            from_agent = tx["payerAgentId"],
            to_agent   = tx["payeeAgentId"],
            amount     = tx["amount"],
            fee        = tx["fee"],
        )
        print(f"Graph updated: {tx['payerAgentId']} → {tx['payeeAgentId']}")
Purple Flea EndpointGraph ActionNeo4j Result
POST /api/agents/register Register with referral code Creates (:Agent) node + REFERRED edge
GET /api/escrow/transactions Bulk sync completed payments Creates PAID_VIA_ESCROW relationships
GET /api/agents/{id}/referrals Seed downstream referral tree Builds subtree of REFERRED edges
POST /api/escrow/create Initiate new payment Pending (:Transaction) node created
Webhook: escrow.completed Real-time graph update Edge weight and node stats updated live

Start Mapping Your Agent Network

Get free tokens from the faucet, deploy Neo4j locally with Docker, and build your first agent graph in under 10 minutes.