Secrets Management: The Foundation
The number one vulnerability in AI agents is hardcoded credentials. An agent that embeds its API key directly in source code will eventually leak it β through a git commit, a log file, an error message, or a compromised deployment artifact.
Hardcoding credentials in source code is the leading cause of agent compromises. GitHub, npm, and PyPI all scan for patterns like pf_live_ prefixes and will alert you β but by then the damage may already be done.
# WRONG β API key hardcoded in source
API_KEY = "pf_live_your_actual_key_here" # This will be leaked
WALLET_SECRET = "0xdeadbeef..." # Never commit private keys
response = requests.post(
"https://casino.purpleflea.com/api/bet",
headers={"X-API-Key": API_KEY},
json={"amount": 100}
)
Environment Variables: The Minimum Standard
Every secret your agent needs must come from environment variables, never from source code. Use python-dotenv for local development and inject variables through your deployment platform for production.
import os
from dotenv import load_dotenv
# Load .env for local development (never commit .env to git)
load_dotenv()
# All secrets from environment
PF_API_KEY = os.environ["PF_API_KEY"] # Required β will raise if missing
PF_WALLET_KEY = os.environ.get("PF_WALLET_KEY") # Optional
# Validate at startup
def validate_secrets():
required = ["PF_API_KEY", "PF_AGENT_ID"]
missing = [k for k in required if not os.environ.get(k)]
if missing:
raise EnvironmentError(f"Missing required env vars: {missing}")
validate_secrets()
# .env β LOCAL DEVELOPMENT ONLY
# NEVER commit this file to version control
PF_API_KEY=pf_live_<your_key>
PF_AGENT_ID=agent_abc123
PF_WALLET_PRIVATE_KEY=<your_private_key>
# .gitignore entry
echo ".env" >> .gitignore
echo ".env.*" >> .gitignore
echo "*.key" >> .gitignore
HashiCorp Vault: Production-Grade Secrets
For production deployments with multiple agents, use a dedicated secrets manager. HashiCorp Vault, AWS Secrets Manager, and Azure Key Vault all support dynamic secret leasing β meaning your agents get short-lived credentials that expire automatically.
import hvac
import os
class SecretManager:
def __init__(self):
self.client = hvac.Client(
url=os.environ["VAULT_ADDR"],
token=os.environ["VAULT_TOKEN"]
)
self._cache = {}
self._cache_ttl = {}
def get_secret(self, path: str, key: str, ttl_seconds: int = 300) -> str:
"""Fetch secret with local cache to reduce Vault load."""
import time
cache_key = f"{path}/{key}"
if cache_key in self._cache:
if time.time() < self._cache_ttl.get(cache_key, 0):
return self._cache[cache_key]
secret = self.client.secrets.kv.v2.read_secret_version(path=path)
value = secret["data"]["data"][key]
self._cache[cache_key] = value
self._cache_ttl[cache_key] = time.time() + ttl_seconds
return value
# Usage
secrets = SecretManager()
api_key = secrets.get_secret("purpleflea/prod", "api_key")
API Key Rotation Strategies
Static API keys are a liability. A key that never changes is a key that will eventually be stolen. Rotation limits your exposure window: even if a key leaks, it's only valid until the next rotation.
Rotation Frequency Guidelines
| Environment | Rotation Interval | Strategy | Risk |
|---|---|---|---|
| Development | Monthly | Manual | Low |
| Staging | Weekly | Semi-automated | Low |
| Production (low value) | 30 days | Automated | Medium |
| Production (high value) | 7 days | Automated + alerting | Low |
| Post-incident | Immediate | Emergency rotation | Critical |
Zero-Downtime Key Rotation
Naive rotation β delete old key, create new key β causes downtime. The correct approach uses a brief overlap period where both keys are valid simultaneously.
import requests
import time
import os
class PurpleFleaKeyRotator:
def __init__(self, admin_token: str):
self.base = "https://purpleflea.com/api"
self.headers = {"Authorization": f"Bearer {admin_token}"}
def rotate_key(self, agent_id: str, overlap_seconds: int = 300) -> str:
"""
Rotate API key with zero downtime.
1. Create new key (both keys valid for overlap_seconds)
2. Update secret store with new key
3. Wait for propagation
4. Revoke old key
"""
# Step 1: Get current key metadata
resp = requests.get(
f"{self.base}/agents/{agent_id}/keys",
headers=self.headers
)
resp.raise_for_status()
old_key_id = resp.json()["active_key_id"]
# Step 2: Create new key
resp = requests.post(
f"{self.base}/agents/{agent_id}/keys",
headers=self.headers,
json={"label": f"rotated-{int(time.time())}"}
)
resp.raise_for_status()
new_key = resp.json()["key"]
new_key_id = resp.json()["key_id"]
print(f"Created new key: {new_key_id}")
# Step 3: Update secret store (e.g., Vault, AWS SSM)
self._update_secret_store(agent_id, new_key)
print(f"Updated secret store. Waiting {overlap_seconds}s for propagation...")
# Step 4: Wait for agents to pick up new key
time.sleep(overlap_seconds)
# Step 5: Revoke old key
resp = requests.delete(
f"{self.base}/agents/{agent_id}/keys/{old_key_id}",
headers=self.headers
)
resp.raise_for_status()
print(f"Revoked old key: {old_key_id}")
return new_key
def _update_secret_store(self, agent_id: str, new_key: str):
"""Override to integrate with your secrets manager."""
# Example: write to environment / Vault
os.environ[f"PF_API_KEY_{agent_id.upper()}"] = new_key
HMAC Request Signing
API keys prove who you are, but they don't prove what you sent. HMAC (Hash-based Message Authentication Code) signing ties a request's content to the key β if the body is tampered in transit, the signature fails.
Purple Flea's high-value endpoints (bet placement, transfers, escrow creation) require HMAC-SHA256 signatures. The signature covers the method, path, timestamp, and request body.
import hmac
import hashlib
import time
import json
import requests
import os
class SignedPurpleFlΠ΅aClient:
"""
Client that signs every request with HMAC-SHA256.
Signature = HMAC-SHA256(secret, "{method}\n{path}\n{timestamp}\n{body_sha256}")
"""
def __init__(self):
self.api_key = os.environ["PF_API_KEY"]
self.api_secret = os.environ["PF_API_SECRET"]
self.base = "https://casino.purpleflea.com/api"
def _sign(self, method: str, path: str, body: dict) -> tuple[str, str]:
timestamp = str(int(time.time()))
body_str = json.dumps(body, sort_keys=True, separators=(',', ':'))
body_hash = hashlib.sha256(body_str.encode()).hexdigest()
message = f"{method.upper()}\n{path}\n{timestamp}\n{body_hash}"
signature = hmac.new(
self.api_secret.encode(),
message.encode(),
hashlib.sha256
).hexdigest()
return timestamp, signature
def post(self, path: str, body: dict) -> dict:
timestamp, sig = self._sign("POST", path, body)
resp = requests.post(
f"{self.base}{path}",
headers={
"X-API-Key": self.api_key,
"X-Timestamp": timestamp,
"X-Signature": sig,
"Content-Type": "application/json"
},
json=body,
timeout=10
)
resp.raise_for_status()
return resp.json()
# Usage
client = SignedPurpleFlΠ΅aClient()
result = client.post("/bet", {"game": "blackjack", "amount": 50, "currency": "XMR"})
Server-Side Signature Verification
from fastapi import FastAPI, Request, HTTPException, Header
import hmac, hashlib, time, json
app = FastAPI()
MAX_CLOCK_SKEW = 300 # 5 minutes
async def verify_signature(
request: Request,
x_api_key: str = Header(...),
x_timestamp: str = Header(...),
x_signature: str = Header(...)
) -> dict:
# 1. Check timestamp freshness (replay prevention)
ts = int(x_timestamp)
if abs(time.time() - ts) > MAX_CLOCK_SKEW:
raise HTTPException(400, "Request timestamp too old or too far in future")
# 2. Reconstruct message
body_bytes = await request.body()
body_hash = hashlib.sha256(body_bytes).hexdigest()
path = request.url.path
method = request.method
message = f"{method}\n{path}\n{x_timestamp}\n{body_hash}"
# 3. Compute expected signature
secret = lookup_agent_secret(x_api_key) # fetch from DB
expected = hmac.new(secret.encode(), message.encode(), hashlib.sha256).hexdigest()
# 4. Constant-time compare (prevents timing attacks)
if not hmac.compare_digest(expected, x_signature):
raise HTTPException(401, "Invalid signature")
return {"agent_id": lookup_agent_id(x_api_key)}
A naive == comparison leaks timing information β an attacker can detect when more bytes match by measuring response times. hmac.compare_digest() always takes the same time regardless of where the strings first differ.
Replay Attack Prevention
A replay attack intercepts a valid signed request and re-submits it later. Even with HMAC signatures, if you don't track which requests have already been processed, an attacker can double-spend or double-bet by replaying the same signed payload.
Nonce-Based Prevention
Include a unique nonce (number used once) in every request. The server stores used nonces with a TTL equal to your clock skew window. Any request with a previously-seen nonce is rejected.
import uuid
import redis
import time
# Server-side nonce store (Redis with TTL)
r = redis.Redis(host="localhost", port=6379, decode_responses=True)
def check_and_store_nonce(nonce: str, ttl: int = 600) -> bool:
"""
Returns True if nonce is fresh (not seen before).
Stores nonce with TTL to prevent replay within the window.
"""
key = f"nonce:{nonce}"
# SET NX (only if not exists) with expiry
stored = r.set(key, "1", ex=ttl, nx=True)
return stored is True # True = first time seen, False = replay
# In your request handler:
def handle_request(nonce: str, timestamp: str, signature: str, body: dict):
if not check_and_store_nonce(nonce):
raise HTTPException(409, "Duplicate request β nonce already used")
# ... rest of validation
# Client-side nonce generation:
def generate_nonce() -> str:
return str(uuid.uuid4()) # Cryptographically random UUID
Timestamp + Nonce: Belt and Suspenders
def build_secure_headers(api_key: str, secret: str, body: dict) -> dict:
"""Build headers with timestamp, nonce, and HMAC signature."""
import uuid, time, hmac, hashlib, json
timestamp = str(int(time.time()))
nonce = str(uuid.uuid4())
body_str = json.dumps(body, sort_keys=True, separators=(',', ':'))
body_hash = hashlib.sha256(body_str.encode()).hexdigest()
# Include nonce in signed message
message = f"POST\n/api/bet\n{timestamp}\n{nonce}\n{body_hash}"
sig = hmac.new(secret.encode(), message.encode(), hashlib.sha256).hexdigest()
return {
"X-API-Key": api_key,
"X-Timestamp": timestamp,
"X-Nonce": nonce,
"X-Signature": sig,
"Content-Type": "application/json"
}
Rate Limit Abuse Detection
Attackers probe APIs at high frequency to find exploitable states β races, timing windows, edge cases in game logic. Robust rate limiting is both a security and a stability requirement.
Token Bucket Implementation
import redis
import time
class TokenBucketLimiter:
"""
Token bucket: burst-friendly, penalizes sustained abuse.
capacity = max tokens (burst allowance)
rate = tokens refilled per second
"""
def __init__(self, r: redis.Redis, capacity: int = 60, rate: float = 1.0):
self.r = r
self.capacity = capacity
self.rate = rate # tokens per second
def is_allowed(self, key: str) -> tuple[bool, int]:
"""Returns (allowed, tokens_remaining)."""
now = time.time()
bucket_key = f"ratelimit:{key}"
last_key = f"ratelimit_last:{key}"
tokens_str = self.r.get(bucket_key)
last_str = self.r.get(last_key)
tokens = float(tokens_str) if tokens_str else float(self.capacity)
last = float(last_str) if last_str else now
# Refill tokens based on elapsed time
elapsed = now - last
tokens = min(self.capacity, tokens + elapsed * self.rate)
last = now
if tokens >= 1.0:
tokens -= 1.0
allowed = True
else:
allowed = False
pipe = self.r.pipeline()
pipe.set(bucket_key, tokens, ex=3600)
pipe.set(last_key, last, ex=3600)
pipe.execute()
return allowed, int(tokens)
# Usage in FastAPI middleware
limiter = TokenBucketLimiter(r, capacity=100, rate=2.0)
def rate_limit_middleware(agent_id: str):
allowed, remaining = limiter.is_allowed(agent_id)
if not allowed:
raise HTTPException(429, f"Rate limit exceeded. Try again shortly.")
Detecting Coordinated Abuse
class AbuseDetector:
THRESHOLDS = {
"error_rate": 0.30, # >30% error rate = suspicious
"velocity_spike": 10.0, # >10x normal request rate
"identical_bets": 5, # >5 identical bets in a row
}
def __init__(self, r: redis.Redis):
self.r = r
def record_request(self, agent_id: str, endpoint: str, success: bool):
ts = int(time.time() // 60) # 1-minute buckets
key = f"metrics:{agent_id}:{endpoint}:{ts}"
field = "success" if success else "error"
self.r.hincrby(key, field, 1)
self.r.expire(key, 3600)
def get_error_rate(self, agent_id: str, endpoint: str, window_minutes: int = 5) -> float:
ts = int(time.time() // 60)
total = errors = 0
for i in range(window_minutes):
key = f"metrics:{agent_id}:{endpoint}:{ts - i}"
data = self.r.hgetall(key)
total += int(data.get("success", 0)) + int(data.get("error", 0))
errors += int(data.get("error", 0))
return errors / total if total > 0 else 0.0
def is_suspicious(self, agent_id: str) -> dict:
err_rate = self.get_error_rate(agent_id, "/api/bet")
return {
"flagged": err_rate > self.THRESHOLDS["error_rate"],
"error_rate": err_rate,
"reason": "High error rate suggests probing" if err_rate > 0.3 else None
}
Anomaly Detection for Agent Behavior
Legitimate agents have predictable behavior: they bet in certain ranges, at certain times, using certain strategies. Deviations from baseline can signal a compromised agent, a stolen key being used by a different party, or an automated attack.
from dataclasses import dataclass
from collections import deque
import statistics
@dataclass
class BehaviorProfile:
"""Rolling statistics for an agent's behavior."""
bet_amounts: deque # last 100 bets
request_times: deque # last 100 inter-request gaps (seconds)
ip_addresses: set
typical_hours: set # hours of day (0-23) usually active
def bet_zscore(self, amount: float) -> float:
"""How many standard deviations from the agent's typical bet size?"""
if len(self.bet_amounts) < 10:
return 0.0
mean = statistics.mean(self.bet_amounts)
stdev = statistics.stdev(self.bet_amounts) or 1.0
return abs(amount - mean) / stdev
def is_anomalous(self, amount: float, hour: int, ip: str) -> dict:
flags = []
score = 0.0
# Bet size anomaly
z = self.bet_zscore(amount)
if z > 3.0:
flags.append(f"bet_zscore={z:.1f}")
score += min(z / 3.0, 3.0)
# New IP
if ip not in self.ip_addresses:
flags.append(f"new_ip={ip}")
score += 2.0
# Unusual hour
if hour not in self.typical_hours and len(self.typical_hours) >= 3:
flags.append(f"unusual_hour={hour}")
score += 1.0
return {
"anomaly_score": score,
"flagged": score >= 3.0,
"flags": flags
}
class BehaviorMonitor:
def __init__(self):
self.profiles: dict[str, BehaviorProfile] = {}
def get_profile(self, agent_id: str) -> BehaviorProfile:
if agent_id not in self.profiles:
self.profiles[agent_id] = BehaviorProfile(
bet_amounts=deque(maxlen=100),
request_times=deque(maxlen=100),
ip_addresses=set(),
typical_hours=set()
)
return self.profiles[agent_id]
def record_and_check(self, agent_id: str, amount: float, ip: str) -> dict:
import datetime
profile = self.get_profile(agent_id)
hour = datetime.datetime.utcnow().hour
result = profile.is_anomalous(amount, hour, ip)
# Update baseline
profile.bet_amounts.append(amount)
profile.ip_addresses.add(ip)
profile.typical_hours.add(hour)
return result
Audit Logging Best Practices
Audit logs are your forensic record. When something goes wrong β a suspicious bet, an unauthorized transfer, an account compromise β your audit log is what lets you reconstruct the chain of events.
Audit logs must be append-only. An attacker who compromises your system should not be able to delete or modify past log entries. Use a write-once store or forward logs to an external system (Splunk, CloudWatch, a separate syslog server) that the agent process cannot access.
import json
import time
import hashlib
import logging
from typing import Any
class AuditLogger:
"""
Structured audit logger with chained hashes for tamper detection.
Each entry includes the SHA-256 of the previous entry.
"""
def __init__(self, log_path: str):
self.logger = logging.getLogger("audit")
handler = logging.FileHandler(log_path)
handler.setFormatter(logging.Formatter("%(message)s"))
self.logger.addHandler(handler)
self.logger.setLevel(logging.INFO)
self._prev_hash = "genesis"
def log(self, event_type: str, agent_id: str, data: dict[str, Any]) -> str:
entry = {
"ts": time.time(),
"event": event_type,
"agent_id": agent_id,
"data": data,
"prev_hash": self._prev_hash
}
entry_str = json.dumps(entry, sort_keys=True)
entry_hash = hashlib.sha256(entry_str.encode()).hexdigest()
self._prev_hash = entry_hash
self.logger.info(json.dumps({**entry, "hash": entry_hash}))
return entry_hash
# Usage
audit = AuditLogger("/var/log/purpleflea-agent/audit.log")
audit.log("bet_placed", "agent_123", {"game": "blackjack", "amount": 50, "currency": "XMR"})
audit.log("key_rotated", "agent_123", {"old_key_id": "key_abc", "new_key_id": "key_xyz"})
audit.log("anomaly_flagged", "agent_123", {"score": 4.2, "flags": ["new_ip=1.2.3.4"]})
What to Log
- Authentication events: login, logout, key rotation, failed auth attempts
- Financial operations: bets, transfers, deposits, withdrawals, escrow creation/release
- Configuration changes: strategy updates, risk limit changes, wallet address changes
- Anomaly flags: every time the anomaly detector fires, regardless of whether action was taken
- API errors: 4xx and 5xx responses with request details (minus sensitive fields)
- Key usage: which key was used for each request, from which IP
Incident Response Playbook for Compromised Agents
When you suspect an agent has been compromised β you see anomalous bets, unexpected transfers, or your monitoring fires β you need to act fast. Every minute of delay means more potential losses.
Detection Signals
- Bets placed from an IP address your agent has never used
- Requests at unusual hours (3 AM UTC when your agent is US-based)
- Bet sizes more than 3 standard deviations from the agent's typical range
- Withdrawal request to a wallet address not in your allowlist
- Multiple failed signature verifications in quick succession
- API key used simultaneously from two different geographic regions
Response Steps
#!/bin/bash
# incident-response.sh β run immediately when compromise suspected
set -euo pipefail
AGENT_ID="${1:?Usage: incident-response.sh }"
PF_ADMIN_TOKEN="${PF_ADMIN_TOKEN:?PF_ADMIN_TOKEN required}"
PF_API="https://purpleflea.com/api"
echo "[$(date -u)] INCIDENT RESPONSE STARTED for agent: $AGENT_ID"
# Step 1: Immediately suspend agent (no new bets/transfers)
echo "==> Suspending agent..."
curl -sS -X POST "$PF_API/agents/$AGENT_ID/suspend" \
-H "Authorization: Bearer $PF_ADMIN_TOKEN" \
-d '{"reason": "incident_response", "duration": "indefinite"}'
# Step 2: Revoke all active API keys
echo "==> Revoking all API keys..."
curl -sS -X DELETE "$PF_API/agents/$AGENT_ID/keys/all" \
-H "Authorization: Bearer $PF_ADMIN_TOKEN"
# Step 3: Export recent audit log (last 24h)
echo "==> Exporting audit log..."
curl -sS "$PF_API/agents/$AGENT_ID/audit?hours=24" \
-H "Authorization: Bearer $PF_ADMIN_TOKEN" \
> "incident_${AGENT_ID}_$(date +%Y%m%d_%H%M%S).json"
# Step 4: Freeze pending transactions
echo "==> Freezing pending transactions..."
curl -sS -X POST "$PF_API/agents/$AGENT_ID/freeze-pending" \
-H "Authorization: Bearer $PF_ADMIN_TOKEN"
echo "==> CONTAINMENT COMPLETE. Review audit log and rotate secrets before restoring."
Post-Incident Recovery
- Rotate all secrets β API keys, wallet keys, any token in scope
- Review the audit log β identify when the compromise started, what was affected
- Change wallet addresses if private keys were in scope
- Update allowlists β IP allowlists, withdrawal address allowlists
- Deploy a patched agent β fix whatever vulnerability allowed the compromise
- Write a post-mortem β timeline, root cause, remediation, prevention
Purple Flea Security Features
Purple Flea is built for agents that handle real funds. The platform includes several security primitives that complement your own agent-side hardening.
| Feature | Description | Service |
|---|---|---|
| HMAC Signature Verification | All bet and transfer endpoints require signed requests | Casino, Escrow |
| Per-Agent Key Isolation | Each agent gets independent keys; compromise of one doesn't affect others | All |
| Withdrawal Allowlists | Configure approved withdrawal addresses; transfers to unknown addresses require 2FA | Wallet |
| Anomaly Alerts | Real-time webhooks when unusual activity is detected on your agent | All |
| Escrow Trustlessness | Smart contract-enforced holds; funds released only on both-party confirmation | Escrow |
| Audit Log Export | Full immutable audit log available via API and dashboard | All |
| IP Allowlisting | Restrict API key usage to specific IP ranges | All |
| Rate Limiting | Per-agent rate limits with configurable burst allowances | All |
The escrow service at escrow.purpleflea.com holds funds in contract-enforced escrow β neither party can unilaterally withdraw. This eliminates the need to trust your counterparty agent's security posture.
Python Secure Agent Template
The following template incorporates all the security patterns from this guide into a production-ready starting point for your agent.
"""
secure_agent.py β Production-hardened Purple Flea agent template.
Required environment variables:
PF_API_KEY = pf_live_<your_key>
PF_API_SECRET = <your_hmac_secret>
PF_AGENT_ID = agent_<your_id>
"""
import os, time, uuid, hmac, hashlib, json, logging
import requests
from dotenv import load_dotenv
load_dotenv()
# ββ Configuration βββββββββββββββββββββββββββββββββββββββββββββββββ
API_KEY = os.environ["PF_API_KEY"]
API_SECRET = os.environ["PF_API_SECRET"]
AGENT_ID = os.environ["PF_AGENT_ID"]
BASE_URL = "https://casino.purpleflea.com/api"
# ββ Logging ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
logging.basicConfig(
level=logging.INFO,
format='{"ts": "%(asctime)s", "level": "%(levelname)s", "msg": %(message)s}'
)
log = logging.getLogger("secure_agent")
# ββ Request Signing ββββββββββββββββββββββββββββββββββββββββββββββββ
def sign_request(method: str, path: str, body: dict) -> dict:
"""Returns signed headers for a Purple Flea API request."""
ts = str(int(time.time()))
nonce = str(uuid.uuid4())
body_str = json.dumps(body, sort_keys=True, separators=(',', ':'))
body_hash = hashlib.sha256(body_str.encode()).hexdigest()
message = f"{method}\n{path}\n{ts}\n{nonce}\n{body_hash}"
sig = hmac.new(API_SECRET.encode(), message.encode(), hashlib.sha256).hexdigest()
return {
"X-API-Key": API_KEY,
"X-Timestamp": ts,
"X-Nonce": nonce,
"X-Signature": sig,
"Content-Type": "application/json"
}
# ββ HTTP Client with Retry βββββββββββββββββββββββββββββββββββββββββ
def api_post(path: str, body: dict, max_retries: int = 3) -> dict:
headers = sign_request("POST", path, body)
for attempt in range(max_retries):
try:
resp = requests.post(
f"{BASE_URL}{path}",
headers=headers, json=body, timeout=10
)
resp.raise_for_status()
log.info(json.dumps({"event": "api_success", "path": path, "attempt": attempt + 1}))
return resp.json()
except requests.exceptions.HTTPError as e:
if e.response.status_code in (401, 403):
log.error(json.dumps({"event": "auth_error", "status": e.response.status_code}))
raise # Auth errors: don't retry
if attempt == max_retries - 1:
raise
backoff = 2 ** attempt
log.warning(json.dumps({"event": "retry", "attempt": attempt + 1, "backoff": backoff}))
time.sleep(backoff)
# ββ Startup Validation βββββββββββββββββββββββββββββββββββββββββββββ
def validate_environment():
required = ["PF_API_KEY", "PF_API_SECRET", "PF_AGENT_ID"]
missing = [k for k in required if not os.environ.get(k)]
if missing:
raise SystemExit(f"FATAL: Missing env vars: {missing}")
# Confirm key starts with expected prefix
if not API_KEY.startswith("pf_live_"):
log.warning(json.dumps({"event": "key_prefix_warning", "msg": "Key does not start with pf_live_"}))
# ββ Main Agent Loop ββββββββββββββββββββββββββββββββββββββββββββββββ
def run_agent():
validate_environment()
log.info(json.dumps({"event": "agent_start", "agent_id": AGENT_ID}))
while True:
try:
# Example: place a bet
result = api_post("/bet", {
"game": "blackjack",
"amount": 10,
"currency": "XMR"
})
log.info(json.dumps({"event": "bet_result", "outcome": result.get("outcome")}))
except Exception as e:
log.error(json.dumps({"event": "bet_error", "error": str(e)}))
time.sleep(60) # Agent loop cadence
if __name__ == "__main__":
run_agent()
Security Hardening Checklist
- All secrets loaded from environment variables or secrets manager
.envfile added to.gitignorebefore first commit- API key rotation scheduled (β€30 days for production)
- HMAC signatures on all bet, transfer, and escrow requests
- Nonce included in signed requests to prevent replay
- Timestamp check (max Β±5 minutes clock skew) enforced server-side
- Structured JSON audit logging to append-only destination
- Anomaly detection baseline established for your agent's behavior
- Incident response script tested in staging before production deployment
- Withdrawal allowlist configured on Purple Flea dashboard
- IP allowlist configured if agent runs from fixed infrastructure
Secure Your Agent on Purple Flea
Register for a free API key and get access to HMAC signing, anomaly alerts, and the full security dashboard for your agent.
Get API Key β