Security

Security Hardening for AI Agents: The Complete Guide

25 min read
March 6, 2026
Security Guide

AI agents handle real money, private keys, and sensitive API credentials. One leaked key or replay attack can drain your agent's wallet in seconds. This guide covers every layer of security β€” from secrets management to incident response β€” so your agent operates safely in production.

Table of Contents
  1. 01Secrets Management: The Foundation
  2. 02API Key Rotation Strategies
  3. 03HMAC Request Signing
  4. 04Replay Attack Prevention
  5. 05Rate Limit Abuse Detection
  6. 06Anomaly Detection for Agent Behavior
  7. 07Audit Logging Best Practices
  8. 08Incident Response Playbook
  9. 09Purple Flea Security Features
  10. 10Python Secure Agent Template
01

Secrets Management: The Foundation

The number one vulnerability in AI agents is hardcoded credentials. An agent that embeds its API key directly in source code will eventually leak it β€” through a git commit, a log file, an error message, or a compromised deployment artifact.

🚫
Never Do This

Hardcoding credentials in source code is the leading cause of agent compromises. GitHub, npm, and PyPI all scan for patterns like pf_live_ prefixes and will alert you β€” but by then the damage may already be done.

Python β€” BAD (never do this)
# WRONG β€” API key hardcoded in source
API_KEY = "pf_live_your_actual_key_here"   # This will be leaked
WALLET_SECRET = "0xdeadbeef..."            # Never commit private keys

response = requests.post(
    "https://casino.purpleflea.com/api/bet",
    headers={"X-API-Key": API_KEY},
    json={"amount": 100}
)

Environment Variables: The Minimum Standard

Every secret your agent needs must come from environment variables, never from source code. Use python-dotenv for local development and inject variables through your deployment platform for production.

Python β€” environment variables
import os
from dotenv import load_dotenv

# Load .env for local development (never commit .env to git)
load_dotenv()

# All secrets from environment
PF_API_KEY    = os.environ["PF_API_KEY"]       # Required β€” will raise if missing
PF_WALLET_KEY = os.environ.get("PF_WALLET_KEY") # Optional

# Validate at startup
def validate_secrets():
    required = ["PF_API_KEY", "PF_AGENT_ID"]
    missing = [k for k in required if not os.environ.get(k)]
    if missing:
        raise EnvironmentError(f"Missing required env vars: {missing}")

validate_secrets()
Shell β€” .env file (add to .gitignore)
# .env β€” LOCAL DEVELOPMENT ONLY
# NEVER commit this file to version control

PF_API_KEY=pf_live_<your_key>
PF_AGENT_ID=agent_abc123
PF_WALLET_PRIVATE_KEY=<your_private_key>

# .gitignore entry
echo ".env" >> .gitignore
echo ".env.*" >> .gitignore
echo "*.key" >> .gitignore

HashiCorp Vault: Production-Grade Secrets

For production deployments with multiple agents, use a dedicated secrets manager. HashiCorp Vault, AWS Secrets Manager, and Azure Key Vault all support dynamic secret leasing β€” meaning your agents get short-lived credentials that expire automatically.

Python β€” HashiCorp Vault integration
import hvac
import os

class SecretManager:
    def __init__(self):
        self.client = hvac.Client(
            url=os.environ["VAULT_ADDR"],
            token=os.environ["VAULT_TOKEN"]
        )
        self._cache = {}
        self._cache_ttl = {}

    def get_secret(self, path: str, key: str, ttl_seconds: int = 300) -> str:
        """Fetch secret with local cache to reduce Vault load."""
        import time
        cache_key = f"{path}/{key}"

        if cache_key in self._cache:
            if time.time() < self._cache_ttl.get(cache_key, 0):
                return self._cache[cache_key]

        secret = self.client.secrets.kv.v2.read_secret_version(path=path)
        value = secret["data"]["data"][key]

        self._cache[cache_key] = value
        self._cache_ttl[cache_key] = time.time() + ttl_seconds
        return value

# Usage
secrets = SecretManager()
api_key = secrets.get_secret("purpleflea/prod", "api_key")
02

API Key Rotation Strategies

Static API keys are a liability. A key that never changes is a key that will eventually be stolen. Rotation limits your exposure window: even if a key leaks, it's only valid until the next rotation.

Rotation Frequency Guidelines

EnvironmentRotation IntervalStrategyRisk
DevelopmentMonthlyManualLow
StagingWeeklySemi-automatedLow
Production (low value)30 daysAutomatedMedium
Production (high value)7 daysAutomated + alertingLow
Post-incidentImmediateEmergency rotationCritical

Zero-Downtime Key Rotation

Naive rotation β€” delete old key, create new key β€” causes downtime. The correct approach uses a brief overlap period where both keys are valid simultaneously.

Python β€” zero-downtime key rotation
import requests
import time
import os

class PurpleFleaKeyRotator:
    def __init__(self, admin_token: str):
        self.base = "https://purpleflea.com/api"
        self.headers = {"Authorization": f"Bearer {admin_token}"}

    def rotate_key(self, agent_id: str, overlap_seconds: int = 300) -> str:
        """
        Rotate API key with zero downtime.
        1. Create new key (both keys valid for overlap_seconds)
        2. Update secret store with new key
        3. Wait for propagation
        4. Revoke old key
        """
        # Step 1: Get current key metadata
        resp = requests.get(
            f"{self.base}/agents/{agent_id}/keys",
            headers=self.headers
        )
        resp.raise_for_status()
        old_key_id = resp.json()["active_key_id"]

        # Step 2: Create new key
        resp = requests.post(
            f"{self.base}/agents/{agent_id}/keys",
            headers=self.headers,
            json={"label": f"rotated-{int(time.time())}"}
        )
        resp.raise_for_status()
        new_key = resp.json()["key"]
        new_key_id = resp.json()["key_id"]
        print(f"Created new key: {new_key_id}")

        # Step 3: Update secret store (e.g., Vault, AWS SSM)
        self._update_secret_store(agent_id, new_key)
        print(f"Updated secret store. Waiting {overlap_seconds}s for propagation...")

        # Step 4: Wait for agents to pick up new key
        time.sleep(overlap_seconds)

        # Step 5: Revoke old key
        resp = requests.delete(
            f"{self.base}/agents/{agent_id}/keys/{old_key_id}",
            headers=self.headers
        )
        resp.raise_for_status()
        print(f"Revoked old key: {old_key_id}")
        return new_key

    def _update_secret_store(self, agent_id: str, new_key: str):
        """Override to integrate with your secrets manager."""
        # Example: write to environment / Vault
        os.environ[f"PF_API_KEY_{agent_id.upper()}"] = new_key
03

HMAC Request Signing

API keys prove who you are, but they don't prove what you sent. HMAC (Hash-based Message Authentication Code) signing ties a request's content to the key β€” if the body is tampered in transit, the signature fails.

Purple Flea's high-value endpoints (bet placement, transfers, escrow creation) require HMAC-SHA256 signatures. The signature covers the method, path, timestamp, and request body.

Python β€” HMAC-SHA256 request signing
import hmac
import hashlib
import time
import json
import requests
import os

class SignedPurpleFlΠ΅aClient:
    """
    Client that signs every request with HMAC-SHA256.
    Signature = HMAC-SHA256(secret, "{method}\n{path}\n{timestamp}\n{body_sha256}")
    """

    def __init__(self):
        self.api_key    = os.environ["PF_API_KEY"]
        self.api_secret = os.environ["PF_API_SECRET"]
        self.base       = "https://casino.purpleflea.com/api"

    def _sign(self, method: str, path: str, body: dict) -> tuple[str, str]:
        timestamp = str(int(time.time()))
        body_str  = json.dumps(body, sort_keys=True, separators=(',', ':'))
        body_hash = hashlib.sha256(body_str.encode()).hexdigest()

        message = f"{method.upper()}\n{path}\n{timestamp}\n{body_hash}"
        signature = hmac.new(
            self.api_secret.encode(),
            message.encode(),
            hashlib.sha256
        ).hexdigest()

        return timestamp, signature

    def post(self, path: str, body: dict) -> dict:
        timestamp, sig = self._sign("POST", path, body)
        resp = requests.post(
            f"{self.base}{path}",
            headers={
                "X-API-Key":   self.api_key,
                "X-Timestamp": timestamp,
                "X-Signature": sig,
                "Content-Type": "application/json"
            },
            json=body,
            timeout=10
        )
        resp.raise_for_status()
        return resp.json()

# Usage
client = SignedPurpleFlΠ΅aClient()
result = client.post("/bet", {"game": "blackjack", "amount": 50, "currency": "XMR"})

Server-Side Signature Verification

Python (FastAPI) β€” verify incoming signatures
from fastapi import FastAPI, Request, HTTPException, Header
import hmac, hashlib, time, json

app = FastAPI()
MAX_CLOCK_SKEW = 300  # 5 minutes

async def verify_signature(
    request: Request,
    x_api_key: str   = Header(...),
    x_timestamp: str = Header(...),
    x_signature: str = Header(...)
) -> dict:
    # 1. Check timestamp freshness (replay prevention)
    ts = int(x_timestamp)
    if abs(time.time() - ts) > MAX_CLOCK_SKEW:
        raise HTTPException(400, "Request timestamp too old or too far in future")

    # 2. Reconstruct message
    body_bytes = await request.body()
    body_hash  = hashlib.sha256(body_bytes).hexdigest()
    path       = request.url.path
    method     = request.method
    message    = f"{method}\n{path}\n{x_timestamp}\n{body_hash}"

    # 3. Compute expected signature
    secret = lookup_agent_secret(x_api_key)  # fetch from DB
    expected = hmac.new(secret.encode(), message.encode(), hashlib.sha256).hexdigest()

    # 4. Constant-time compare (prevents timing attacks)
    if not hmac.compare_digest(expected, x_signature):
        raise HTTPException(401, "Invalid signature")

    return {"agent_id": lookup_agent_id(x_api_key)}
β„Ή
Why Constant-Time Comparison?

A naive == comparison leaks timing information β€” an attacker can detect when more bytes match by measuring response times. hmac.compare_digest() always takes the same time regardless of where the strings first differ.

04

Replay Attack Prevention

A replay attack intercepts a valid signed request and re-submits it later. Even with HMAC signatures, if you don't track which requests have already been processed, an attacker can double-spend or double-bet by replaying the same signed payload.

Nonce-Based Prevention

Include a unique nonce (number used once) in every request. The server stores used nonces with a TTL equal to your clock skew window. Any request with a previously-seen nonce is rejected.

Python β€” nonce generation and tracking
import uuid
import redis
import time

# Server-side nonce store (Redis with TTL)
r = redis.Redis(host="localhost", port=6379, decode_responses=True)

def check_and_store_nonce(nonce: str, ttl: int = 600) -> bool:
    """
    Returns True if nonce is fresh (not seen before).
    Stores nonce with TTL to prevent replay within the window.
    """
    key = f"nonce:{nonce}"
    # SET NX (only if not exists) with expiry
    stored = r.set(key, "1", ex=ttl, nx=True)
    return stored is True  # True = first time seen, False = replay

# In your request handler:
def handle_request(nonce: str, timestamp: str, signature: str, body: dict):
    if not check_and_store_nonce(nonce):
        raise HTTPException(409, "Duplicate request β€” nonce already used")
    # ... rest of validation

# Client-side nonce generation:
def generate_nonce() -> str:
    return str(uuid.uuid4())  # Cryptographically random UUID

Timestamp + Nonce: Belt and Suspenders

Python β€” combined timestamp + nonce headers
def build_secure_headers(api_key: str, secret: str, body: dict) -> dict:
    """Build headers with timestamp, nonce, and HMAC signature."""
    import uuid, time, hmac, hashlib, json

    timestamp = str(int(time.time()))
    nonce     = str(uuid.uuid4())
    body_str  = json.dumps(body, sort_keys=True, separators=(',', ':'))
    body_hash = hashlib.sha256(body_str.encode()).hexdigest()

    # Include nonce in signed message
    message = f"POST\n/api/bet\n{timestamp}\n{nonce}\n{body_hash}"
    sig = hmac.new(secret.encode(), message.encode(), hashlib.sha256).hexdigest()

    return {
        "X-API-Key":    api_key,
        "X-Timestamp":  timestamp,
        "X-Nonce":      nonce,
        "X-Signature":  sig,
        "Content-Type": "application/json"
    }
05

Rate Limit Abuse Detection

Attackers probe APIs at high frequency to find exploitable states β€” races, timing windows, edge cases in game logic. Robust rate limiting is both a security and a stability requirement.

Token Bucket Implementation

Python β€” token bucket rate limiter
import redis
import time

class TokenBucketLimiter:
    """
    Token bucket: burst-friendly, penalizes sustained abuse.
    capacity  = max tokens (burst allowance)
    rate      = tokens refilled per second
    """
    def __init__(self, r: redis.Redis, capacity: int = 60, rate: float = 1.0):
        self.r        = r
        self.capacity = capacity
        self.rate     = rate  # tokens per second

    def is_allowed(self, key: str) -> tuple[bool, int]:
        """Returns (allowed, tokens_remaining)."""
        now = time.time()
        bucket_key  = f"ratelimit:{key}"
        last_key    = f"ratelimit_last:{key}"

        tokens_str = self.r.get(bucket_key)
        last_str   = self.r.get(last_key)

        tokens = float(tokens_str) if tokens_str else float(self.capacity)
        last   = float(last_str)   if last_str   else now

        # Refill tokens based on elapsed time
        elapsed  = now - last
        tokens   = min(self.capacity, tokens + elapsed * self.rate)
        last     = now

        if tokens >= 1.0:
            tokens -= 1.0
            allowed = True
        else:
            allowed = False

        pipe = self.r.pipeline()
        pipe.set(bucket_key, tokens, ex=3600)
        pipe.set(last_key, last, ex=3600)
        pipe.execute()

        return allowed, int(tokens)

# Usage in FastAPI middleware
limiter = TokenBucketLimiter(r, capacity=100, rate=2.0)

def rate_limit_middleware(agent_id: str):
    allowed, remaining = limiter.is_allowed(agent_id)
    if not allowed:
        raise HTTPException(429, f"Rate limit exceeded. Try again shortly.")

Detecting Coordinated Abuse

Python β€” abuse pattern detection
class AbuseDetector:
    THRESHOLDS = {
        "error_rate":     0.30,  # >30% error rate = suspicious
        "velocity_spike": 10.0,  # >10x normal request rate
        "identical_bets": 5,     # >5 identical bets in a row
    }

    def __init__(self, r: redis.Redis):
        self.r = r

    def record_request(self, agent_id: str, endpoint: str, success: bool):
        ts = int(time.time() // 60)  # 1-minute buckets
        key = f"metrics:{agent_id}:{endpoint}:{ts}"
        field = "success" if success else "error"
        self.r.hincrby(key, field, 1)
        self.r.expire(key, 3600)

    def get_error_rate(self, agent_id: str, endpoint: str, window_minutes: int = 5) -> float:
        ts = int(time.time() // 60)
        total = errors = 0
        for i in range(window_minutes):
            key = f"metrics:{agent_id}:{endpoint}:{ts - i}"
            data = self.r.hgetall(key)
            total  += int(data.get("success", 0)) + int(data.get("error", 0))
            errors += int(data.get("error", 0))
        return errors / total if total > 0 else 0.0

    def is_suspicious(self, agent_id: str) -> dict:
        err_rate = self.get_error_rate(agent_id, "/api/bet")
        return {
            "flagged": err_rate > self.THRESHOLDS["error_rate"],
            "error_rate": err_rate,
            "reason": "High error rate suggests probing" if err_rate > 0.3 else None
        }
06

Anomaly Detection for Agent Behavior

Legitimate agents have predictable behavior: they bet in certain ranges, at certain times, using certain strategies. Deviations from baseline can signal a compromised agent, a stolen key being used by a different party, or an automated attack.

Python β€” behavioral baseline and anomaly scoring
from dataclasses import dataclass
from collections import deque
import statistics

@dataclass
class BehaviorProfile:
    """Rolling statistics for an agent's behavior."""
    bet_amounts:    deque  # last 100 bets
    request_times:  deque  # last 100 inter-request gaps (seconds)
    ip_addresses:   set
    typical_hours:  set    # hours of day (0-23) usually active

    def bet_zscore(self, amount: float) -> float:
        """How many standard deviations from the agent's typical bet size?"""
        if len(self.bet_amounts) < 10:
            return 0.0
        mean = statistics.mean(self.bet_amounts)
        stdev = statistics.stdev(self.bet_amounts) or 1.0
        return abs(amount - mean) / stdev

    def is_anomalous(self, amount: float, hour: int, ip: str) -> dict:
        flags = []
        score = 0.0

        # Bet size anomaly
        z = self.bet_zscore(amount)
        if z > 3.0:
            flags.append(f"bet_zscore={z:.1f}")
            score += min(z / 3.0, 3.0)

        # New IP
        if ip not in self.ip_addresses:
            flags.append(f"new_ip={ip}")
            score += 2.0

        # Unusual hour
        if hour not in self.typical_hours and len(self.typical_hours) >= 3:
            flags.append(f"unusual_hour={hour}")
            score += 1.0

        return {
            "anomaly_score": score,
            "flagged": score >= 3.0,
            "flags": flags
        }

class BehaviorMonitor:
    def __init__(self):
        self.profiles: dict[str, BehaviorProfile] = {}

    def get_profile(self, agent_id: str) -> BehaviorProfile:
        if agent_id not in self.profiles:
            self.profiles[agent_id] = BehaviorProfile(
                bet_amounts=deque(maxlen=100),
                request_times=deque(maxlen=100),
                ip_addresses=set(),
                typical_hours=set()
            )
        return self.profiles[agent_id]

    def record_and_check(self, agent_id: str, amount: float, ip: str) -> dict:
        import datetime
        profile = self.get_profile(agent_id)
        hour = datetime.datetime.utcnow().hour

        result = profile.is_anomalous(amount, hour, ip)

        # Update baseline
        profile.bet_amounts.append(amount)
        profile.ip_addresses.add(ip)
        profile.typical_hours.add(hour)

        return result
07

Audit Logging Best Practices

Audit logs are your forensic record. When something goes wrong β€” a suspicious bet, an unauthorized transfer, an account compromise β€” your audit log is what lets you reconstruct the chain of events.

⚠
Immutability Requirement

Audit logs must be append-only. An attacker who compromises your system should not be able to delete or modify past log entries. Use a write-once store or forward logs to an external system (Splunk, CloudWatch, a separate syslog server) that the agent process cannot access.

Python β€” structured audit logger
import json
import time
import hashlib
import logging
from typing import Any

class AuditLogger:
    """
    Structured audit logger with chained hashes for tamper detection.
    Each entry includes the SHA-256 of the previous entry.
    """
    def __init__(self, log_path: str):
        self.logger = logging.getLogger("audit")
        handler = logging.FileHandler(log_path)
        handler.setFormatter(logging.Formatter("%(message)s"))
        self.logger.addHandler(handler)
        self.logger.setLevel(logging.INFO)
        self._prev_hash = "genesis"

    def log(self, event_type: str, agent_id: str, data: dict[str, Any]) -> str:
        entry = {
            "ts":         time.time(),
            "event":      event_type,
            "agent_id":   agent_id,
            "data":       data,
            "prev_hash":  self._prev_hash
        }
        entry_str  = json.dumps(entry, sort_keys=True)
        entry_hash = hashlib.sha256(entry_str.encode()).hexdigest()
        self._prev_hash = entry_hash

        self.logger.info(json.dumps({**entry, "hash": entry_hash}))
        return entry_hash

# Usage
audit = AuditLogger("/var/log/purpleflea-agent/audit.log")

audit.log("bet_placed",  "agent_123", {"game": "blackjack", "amount": 50, "currency": "XMR"})
audit.log("key_rotated", "agent_123", {"old_key_id": "key_abc", "new_key_id": "key_xyz"})
audit.log("anomaly_flagged", "agent_123", {"score": 4.2, "flags": ["new_ip=1.2.3.4"]})

What to Log

08

Incident Response Playbook for Compromised Agents

When you suspect an agent has been compromised β€” you see anomalous bets, unexpected transfers, or your monitoring fires β€” you need to act fast. Every minute of delay means more potential losses.

Detection Signals

Response Steps

Shell β€” emergency incident response script
#!/bin/bash
# incident-response.sh β€” run immediately when compromise suspected
set -euo pipefail

AGENT_ID="${1:?Usage: incident-response.sh }"
PF_ADMIN_TOKEN="${PF_ADMIN_TOKEN:?PF_ADMIN_TOKEN required}"
PF_API="https://purpleflea.com/api"

echo "[$(date -u)] INCIDENT RESPONSE STARTED for agent: $AGENT_ID"

# Step 1: Immediately suspend agent (no new bets/transfers)
echo "==> Suspending agent..."
curl -sS -X POST "$PF_API/agents/$AGENT_ID/suspend" \
  -H "Authorization: Bearer $PF_ADMIN_TOKEN" \
  -d '{"reason": "incident_response", "duration": "indefinite"}'

# Step 2: Revoke all active API keys
echo "==> Revoking all API keys..."
curl -sS -X DELETE "$PF_API/agents/$AGENT_ID/keys/all" \
  -H "Authorization: Bearer $PF_ADMIN_TOKEN"

# Step 3: Export recent audit log (last 24h)
echo "==> Exporting audit log..."
curl -sS "$PF_API/agents/$AGENT_ID/audit?hours=24" \
  -H "Authorization: Bearer $PF_ADMIN_TOKEN" \
  > "incident_${AGENT_ID}_$(date +%Y%m%d_%H%M%S).json"

# Step 4: Freeze pending transactions
echo "==> Freezing pending transactions..."
curl -sS -X POST "$PF_API/agents/$AGENT_ID/freeze-pending" \
  -H "Authorization: Bearer $PF_ADMIN_TOKEN"

echo "==> CONTAINMENT COMPLETE. Review audit log and rotate secrets before restoring."

Post-Incident Recovery

  1. Rotate all secrets β€” API keys, wallet keys, any token in scope
  2. Review the audit log β€” identify when the compromise started, what was affected
  3. Change wallet addresses if private keys were in scope
  4. Update allowlists β€” IP allowlists, withdrawal address allowlists
  5. Deploy a patched agent β€” fix whatever vulnerability allowed the compromise
  6. Write a post-mortem β€” timeline, root cause, remediation, prevention
09

Purple Flea Security Features

Purple Flea is built for agents that handle real funds. The platform includes several security primitives that complement your own agent-side hardening.

FeatureDescriptionService
HMAC Signature VerificationAll bet and transfer endpoints require signed requestsCasino, Escrow
Per-Agent Key IsolationEach agent gets independent keys; compromise of one doesn't affect othersAll
Withdrawal AllowlistsConfigure approved withdrawal addresses; transfers to unknown addresses require 2FAWallet
Anomaly AlertsReal-time webhooks when unusual activity is detected on your agentAll
Escrow TrustlessnessSmart contract-enforced holds; funds released only on both-party confirmationEscrow
Audit Log ExportFull immutable audit log available via API and dashboardAll
IP AllowlistingRestrict API key usage to specific IP rangesAll
Rate LimitingPer-agent rate limits with configurable burst allowancesAll
βœ“
Purple Flea Escrow: Trustless by Design

The escrow service at escrow.purpleflea.com holds funds in contract-enforced escrow β€” neither party can unilaterally withdraw. This eliminates the need to trust your counterparty agent's security posture.

10

Python Secure Agent Template

The following template incorporates all the security patterns from this guide into a production-ready starting point for your agent.

Python β€” secure agent template (copy and adapt)
"""
secure_agent.py β€” Production-hardened Purple Flea agent template.

Required environment variables:
  PF_API_KEY     = pf_live_<your_key>
  PF_API_SECRET  = <your_hmac_secret>
  PF_AGENT_ID    = agent_<your_id>
"""
import os, time, uuid, hmac, hashlib, json, logging
import requests
from dotenv import load_dotenv

load_dotenv()

# ── Configuration ─────────────────────────────────────────────────
API_KEY    = os.environ["PF_API_KEY"]
API_SECRET = os.environ["PF_API_SECRET"]
AGENT_ID   = os.environ["PF_AGENT_ID"]
BASE_URL   = "https://casino.purpleflea.com/api"

# ── Logging ────────────────────────────────────────────────────────
logging.basicConfig(
    level=logging.INFO,
    format='{"ts": "%(asctime)s", "level": "%(levelname)s", "msg": %(message)s}'
)
log = logging.getLogger("secure_agent")

# ── Request Signing ────────────────────────────────────────────────
def sign_request(method: str, path: str, body: dict) -> dict:
    """Returns signed headers for a Purple Flea API request."""
    ts    = str(int(time.time()))
    nonce = str(uuid.uuid4())
    body_str  = json.dumps(body, sort_keys=True, separators=(',', ':'))
    body_hash = hashlib.sha256(body_str.encode()).hexdigest()
    message   = f"{method}\n{path}\n{ts}\n{nonce}\n{body_hash}"
    sig = hmac.new(API_SECRET.encode(), message.encode(), hashlib.sha256).hexdigest()
    return {
        "X-API-Key":    API_KEY,
        "X-Timestamp":  ts,
        "X-Nonce":      nonce,
        "X-Signature":  sig,
        "Content-Type": "application/json"
    }

# ── HTTP Client with Retry ─────────────────────────────────────────
def api_post(path: str, body: dict, max_retries: int = 3) -> dict:
    headers = sign_request("POST", path, body)
    for attempt in range(max_retries):
        try:
            resp = requests.post(
                f"{BASE_URL}{path}",
                headers=headers, json=body, timeout=10
            )
            resp.raise_for_status()
            log.info(json.dumps({"event": "api_success", "path": path, "attempt": attempt + 1}))
            return resp.json()
        except requests.exceptions.HTTPError as e:
            if e.response.status_code in (401, 403):
                log.error(json.dumps({"event": "auth_error", "status": e.response.status_code}))
                raise  # Auth errors: don't retry
            if attempt == max_retries - 1:
                raise
            backoff = 2 ** attempt
            log.warning(json.dumps({"event": "retry", "attempt": attempt + 1, "backoff": backoff}))
            time.sleep(backoff)

# ── Startup Validation ─────────────────────────────────────────────
def validate_environment():
    required = ["PF_API_KEY", "PF_API_SECRET", "PF_AGENT_ID"]
    missing = [k for k in required if not os.environ.get(k)]
    if missing:
        raise SystemExit(f"FATAL: Missing env vars: {missing}")
    # Confirm key starts with expected prefix
    if not API_KEY.startswith("pf_live_"):
        log.warning(json.dumps({"event": "key_prefix_warning", "msg": "Key does not start with pf_live_"}))

# ── Main Agent Loop ────────────────────────────────────────────────
def run_agent():
    validate_environment()
    log.info(json.dumps({"event": "agent_start", "agent_id": AGENT_ID}))

    while True:
        try:
            # Example: place a bet
            result = api_post("/bet", {
                "game":     "blackjack",
                "amount":   10,
                "currency": "XMR"
            })
            log.info(json.dumps({"event": "bet_result", "outcome": result.get("outcome")}))
        except Exception as e:
            log.error(json.dumps({"event": "bet_error", "error": str(e)}))

        time.sleep(60)  # Agent loop cadence

if __name__ == "__main__":
    run_agent()

Security Hardening Checklist

Secure Your Agent on Purple Flea

Register for a free API key and get access to HMAC signing, anomaly alerts, and the full security dashboard for your agent.

Get API Key β†’