1. Threat Model for Agent Secrets
Before choosing a secrets management approach, you must understand who you are defending against and what an attacker can access. Autonomous agents have a distinct threat surface from traditional applications:
- Compromised LLM output — prompt injection could instruct an agent to exfiltrate its own API keys
- Server breach — a compromised host exposes all in-memory and on-disk secrets
- Log leakage — secrets accidentally logged in error messages or request traces
- Supply chain attack — a malicious npm/PyPI package reads environment variables
- Insider threat — a developer or CI system with broad secret access
- MCP tool abuse — an agent exposed via MCP tools could be tricked into revealing credentials
| Threat | Impact | Mitigation | Priority |
|---|---|---|---|
| Prompt injection exfiltration | Full key exposure | Never echo secrets; sanitize LLM outputs | Critical |
| Environment variable dump | All env secrets | Use vault, not env for prod secrets | High |
| Log leakage | Partial key exposure | Scrub logs; structured logging | High |
| Server breach | All in-memory secrets | HSM/MPC for private keys, short TTLs | High |
| Supply chain | Env var theft | Audit dependencies; least privilege | Medium |
2. Environment Variable Patterns
Environment variables are the floor, not the ceiling, of secrets management. They are appropriate for development and simple deployments, but must be used carefully even there.
The right way to use environment variables
Load secrets from env vars once at startup, validate their presence, and never log them. Provide a clear error if a required secret is missing.
import os
import re
from typing import Optional
class SecretsFromEnv:
"""Load and validate secrets from environment variables at startup."""
REQUIRED = [
"PURPLEFLEA_API_KEY",
"WALLET_PRIVATE_KEY",
]
OPTIONAL = {
"TELEGRAM_TOKEN": None,
"DATABASE_URL": "sqlite:///agent.db",
}
def __init__(self):
self._secrets: dict = {}
self._load()
def _load(self):
missing = []
for key in self.REQUIRED:
val = os.environ.get(key)
if not val:
missing.append(key)
else:
self._secrets[key] = val
if missing:
raise EnvironmentError(
f"Missing required environment variables: {', '.join(missing)}\n"
f"Set them in your .env file or deployment configuration."
)
for key, default in self.OPTIONAL.items():
self._secrets[key] = os.environ.get(key, default)
# Validate format of known secrets
self._validate_api_key(self._secrets["PURPLEFLEA_API_KEY"])
self._validate_private_key(self._secrets["WALLET_PRIVATE_KEY"])
def _validate_api_key(self, key: str):
# Expect format: pf_live_<32 hex chars>
if not re.match(r'^pf_live_[a-f0-9]{32}$', key):
raise ValueError("PURPLEFLEA_API_KEY has invalid format")
def _validate_private_key(self, key: str):
# Expect 64-char hex or 0x-prefixed 64-char hex
k = key.lstrip("0x")
if not re.match(r'^[a-fA-F0-9]{64}$', k):
raise ValueError("WALLET_PRIVATE_KEY must be 32-byte hex")
def get(self, key: str) -> Optional[str]:
return self._secrets.get(key)
def __repr__(self) -> str:
# Safe repr: mask all values
masked = {k: "***" for k in self._secrets}
return f"SecretsFromEnv({masked})"
.env.example file committed to git with placeholder values to document required variables, and a .env file in .gitignore for actual values. Never commit .env.3. HashiCorp Vault Integration
HashiCorp Vault provides a production-grade secrets store with dynamic secrets, lease management, audit logging, and fine-grained access control. It is ideal for multi-agent deployments where different agents need different permission scopes.
Key Vault concepts for agents:
- KV v2 secrets engine — stores static secrets (API keys, seed phrases) with versioning
- Dynamic secrets — generate database credentials on-demand with automatic expiry
- AppRole authentication — agents authenticate with a role-id + secret-id, no long-lived tokens
- Token TTLs — all tokens expire; agents re-authenticate automatically
import aiohttp, os, time, logging
from typing import Optional, Any
class VaultClient:
"""Async HashiCorp Vault client using AppRole authentication."""
def __init__(self, vault_addr: str, role_id: str, secret_id: str,
mount_path: str = "secret"):
self.vault_addr = vault_addr.rstrip("/")
self.role_id = role_id
self.secret_id = secret_id
self.mount_path = mount_path
self._token: Optional[str] = None
self._token_expires_at: float = 0
self.log = logging.getLogger("vault")
async def _authenticate(self):
"""Authenticate via AppRole, get a short-lived token."""
async with aiohttp.ClientSession() as s:
resp = await s.post(
f"{self.vault_addr}/v1/auth/approle/login",
json={"role_id": self.role_id, "secret_id": self.secret_id}
)
resp.raise_for_status()
data = await resp.json()
auth = data["auth"]
self._token = auth["client_token"]
# Set expiry 60s before actual TTL for safety buffer
self._token_expires_at = time.time() + auth["lease_duration"] - 60
self.log.info(f"Vault authenticated; token valid for ~{auth['lease_duration']}s")
async def _ensure_token(self):
if self._token is None or time.time() >= self._token_expires_at:
await self._authenticate()
async def get_secret(self, path: str) -> dict[str, Any]:
"""Fetch a KV v2 secret. path e.g. 'agents/my-agent'."""
await self._ensure_token()
async with aiohttp.ClientSession() as s:
resp = await s.get(
f"{self.vault_addr}/v1/{self.mount_path}/data/{path}",
headers={"X-Vault-Token": self._token}
)
if resp.status == 404:
raise KeyError(f"Secret not found: {path}")
resp.raise_for_status()
data = await resp.json()
return data["data"]["data"]
async def put_secret(self, path: str, data: dict):
"""Write/update a KV v2 secret."""
await self._ensure_token()
async with aiohttp.ClientSession() as s:
resp = await s.post(
f"{self.vault_addr}/v1/{self.mount_path}/data/{path}",
headers={"X-Vault-Token": self._token},
json={"data": data}
)
resp.raise_for_status()
async def delete_secret_version(self, path: str, versions: list[int]):
"""Soft-delete specific versions of a secret."""
await self._ensure_token()
async with aiohttp.ClientSession() as s:
resp = await s.post(
f"{self.vault_addr}/v1/{self.mount_path}/delete/{path}",
headers={"X-Vault-Token": self._token},
json={"versions": versions}
)
resp.raise_for_status()
# Usage:
# vault = VaultClient(
# vault_addr="https://vault.internal:8200",
# role_id=os.environ["VAULT_ROLE_ID"],
# secret_id=os.environ["VAULT_SECRET_ID"]
# )
# secrets = await vault.get_secret("agents/trading-bot-1")
# api_key = secrets["purpleflea_api_key"]
4. AWS Secrets Manager Integration
For agents deployed on AWS (EC2, ECS, Lambda), AWS Secrets Manager provides native integration via IAM roles — no vault token management needed. The EC2 instance or ECS task role grants access to specific secrets.
import boto3, json, logging
from functools import lru_cache
from typing import Any
import time
class AWSSecretsManager:
"""AWS Secrets Manager client with caching to minimize API calls."""
def __init__(self, region: str = "us-east-1", cache_ttl: int = 300):
self.client = boto3.client("secretsmanager", region_name=region)
self.cache_ttl = cache_ttl
self._cache: dict = {} # secret_name -> (value, expires_at)
self.log = logging.getLogger("aws_secrets")
def get_secret(self, secret_name: str) -> dict[str, Any]:
"""Get a secret, using cache to avoid rate limits."""
cached = self._cache.get(secret_name)
if cached and time.time() < cached[1]:
return cached[0]
try:
response = self.client.get_secret_value(SecretId=secret_name)
except self.client.exceptions.ResourceNotFoundException:
raise KeyError(f"Secret not found: {secret_name}")
if "SecretString" in response:
value = json.loads(response["SecretString"])
else:
import base64
value = {"binary": base64.b64decode(response["SecretBinary"])}
expires_at = time.time() + self.cache_ttl
self._cache[secret_name] = (value, expires_at)
self.log.info(f"Fetched secret '{secret_name}' from AWS (cached {self.cache_ttl}s)")
return value
def rotate_secret(self, secret_name: str):
"""Trigger immediate rotation (requires rotation Lambda configured)."""
self.client.rotate_secret(SecretId=secret_name)
# Invalidate cache
self._cache.pop(secret_name, None)
self.log.info(f"Triggered rotation for '{secret_name}'")
def update_secret(self, secret_name: str, data: dict):
"""Update secret value. Cache is invalidated."""
self.client.put_secret_value(
SecretId=secret_name,
SecretString=json.dumps(data)
)
self._cache.pop(secret_name, None)
self.log.info(f"Updated secret '{secret_name}'")
# Usage with IAM role (no credentials needed on EC2/ECS):
# sm = AWSSecretsManager(region="us-east-1")
# secrets = sm.get_secret("purpleflea/trading-agent")
# api_key = secrets["api_key"]
5. Rotating API Keys Without Downtime
Key rotation is the process of replacing credentials before they expire or are compromised. The challenge for autonomous agents is continuity: the agent cannot stop while the key rotates. The solution is the dual-key pattern:
- Generate a new key while the old key is still valid
- Store both keys in the secret store
- Agent begins using the new key for new requests
- Wait for any in-flight requests using the old key to complete
- Revoke the old key
- Remove old key from secret store
import asyncio, aiohttp, time, logging
from typing import Optional
class RotatingKeyManager:
"""Dual-key pattern for zero-downtime API key rotation."""
def __init__(self, vault: VaultClient, secret_path: str,
purpleflea_api_base: str):
self.vault = vault
self.secret_path = secret_path
self.api_base = purpleflea_api_base
self.log = logging.getLogger("key_rotation")
self._current_key: Optional[str] = None
self._old_key: Optional[str] = None
self._in_flight: int = 0 # count of requests using old key
async def load_current_key(self):
secrets = await self.vault.get_secret(self.secret_path)
self._current_key = secrets["api_key"]
self._old_key = secrets.get("api_key_old")
async def rotate(self):
"""Perform a live rotation of the Purple Flea API key."""
self.log.info("Starting key rotation...")
# 1. Request new key from Purple Flea
async with aiohttp.ClientSession() as s:
resp = await s.post(
f"{self.api_base}/api/keys/rotate",
headers={"Authorization": f"Bearer {self._current_key}"}
)
resp.raise_for_status()
data = await resp.json()
new_key = data["new_key"]
old_key = self._current_key
# 2. Store both keys in vault (new = current, old = old)
await self.vault.put_secret(self.secret_path, {
"api_key": new_key,
"api_key_old": old_key,
"rotated_at": time.time()
})
self.log.info(f"Vault updated. New key: ...{new_key[-4:]}")
# 3. Switch current; keep old for in-flight drain
self._old_key = old_key
self._current_key = new_key
# 4. Wait for in-flight requests using old key to complete
deadline = time.time() + 30
while self._in_flight > 0 and time.time() < deadline:
await asyncio.sleep(1)
# 5. Revoke old key
async with aiohttp.ClientSession() as s:
await s.delete(
f"{self.api_base}/api/keys/{old_key}",
headers={"Authorization": f"Bearer {self._current_key}"}
)
# 6. Remove old key from vault
await self.vault.put_secret(self.secret_path, {
"api_key": new_key,
"rotated_at": time.time()
})
self._old_key = None
self.log.info("Rotation complete. Old key revoked.")
def get_key(self) -> str:
return self._current_key
6. Wallet Private Key Security: HSM and MPC
Private keys for crypto wallets are the most sensitive secrets an agent manages. If a private key is stolen, funds are irreversibly lost. Two enterprise approaches:
Hardware Security Modules (HSM)
An HSM is a physical device that generates and stores private keys in tamper-resistant hardware. The key never leaves the HSM; instead, you send data to sign and receive the signature. Providers: AWS CloudHSM, Azure Dedicated HSM, YubiHSM.
Multi-Party Computation (MPC)
MPC splits the private key into shards distributed across multiple parties. A transaction requires a threshold of parties to cooperate (e.g., 2-of-3). No single party or machine ever holds the full key. Providers: Fireblocks, Privy, Web3Auth.
from abc import ABC, abstractmethod
from eth_account import Account
from eth_account.messages import encode_defunct
import boto3, json
class WalletSigner(ABC):
"""Abstract wallet signer — swap implementations without changing agent code."""
@abstractmethod
async def sign_transaction(self, tx: dict) -> str:
"""Return signed transaction hex."""
@abstractmethod
async def get_address(self) -> str:
"""Return the wallet's public address."""
class EnvWalletSigner(WalletSigner):
"""Simple env var signer — development only."""
def __init__(self, private_key: str):
self._acct = Account.from_key(private_key)
async def sign_transaction(self, tx: dict) -> str:
signed = self._acct.sign_transaction(tx)
return signed.rawTransaction.hex()
async def get_address(self) -> str:
return self._acct.address
class KMSWalletSigner(WalletSigner):
"""AWS KMS asymmetric key signer (secp256k1 ECC)."""
def __init__(self, key_id: str, region: str = "us-east-1"):
self.key_id = key_id
self.kms = boto3.client("kms", region_name=region)
self._address: str = None
async def get_address(self) -> str:
if self._address:
return self._address
# Fetch public key from KMS
resp = self.kms.get_public_key(KeyId=self.key_id)
# Parse DER-encoded public key to Ethereum address
from eth_keys import keys
from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat
from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePublicKey
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.serialization import load_der_public_key
pub_key_der = resp["PublicKey"]
pub_key_obj = load_der_public_key(pub_key_der, backend=default_backend())
pub_bytes = pub_key_obj.public_bytes(Encoding.X962,
PublicFormat.UncompressedPoint)
# pub_bytes[1:] is the 64-byte uncompressed point (skip 0x04 prefix)
eth_key = keys.PublicKey(pub_bytes[1:])
self._address = eth_key.to_checksum_address()
return self._address
async def sign_transaction(self, tx: dict) -> str:
import hashlib
# Encode transaction and get keccak256 hash
from eth_account._utils.signing import sign_transaction_hash
# ... KMS sign call returns DER-encoded signature
# Full implementation requires DER -> (r,s) conversion + EIP-155 v computation
raise NotImplementedError("Full KMS sign implementation omitted for brevity")
7. Purple Flea API Key Lifecycle
Purple Flea API keys follow a defined lifecycle. Understanding it helps you build agents that handle key transitions gracefully.
- Creation — keys are created via
POST /api/keyswith a scope (read, trade, admin) - Active use — keys are sent as
Authorization: Bearer pf_live_...headers - Expiry warning — 7 days before expiry, the API returns a
X-Key-Expires-Inheader - Rotation — use
POST /api/keys/rotateto get a new key before expiry - Revocation —
DELETE /api/keys/{key_id}immediately invalidates a key
import aiohttp, time, logging
from typing import Optional
class PurpleFleatKeyManager:
"""Manages Purple Flea API key lifecycle: expiry detection + auto-rotation."""
EXPIRY_WARNING_DAYS = 7
def __init__(self, initial_key: str, secret_store, api_base: str):
self.api_base = api_base
self.secret_store = secret_store # vault or AWS SM client
self._key = initial_key
self._key_expires_at: Optional[float] = None
self.log = logging.getLogger("pf_keys")
@property
def key(self) -> str:
return self._key
async def check_and_rotate(self, session: aiohttp.ClientSession):
"""Call this after each API response to check expiry headers."""
if self._key_expires_at is None:
return
days_left = (self._key_expires_at - time.time()) / 86400
if days_left <= self.EXPIRY_WARNING_DAYS:
self.log.warning(f"API key expires in {days_left:.1f} days — rotating")
await self._do_rotate(session)
async def handle_response_headers(self, headers: dict):
"""Parse X-Key-Expires-In from API response headers."""
expires_in = headers.get("X-Key-Expires-In")
if expires_in:
self._key_expires_at = time.time() + int(expires_in)
async def _do_rotate(self, session: aiohttp.ClientSession):
resp = await session.post(
f"{self.api_base}/api/keys/rotate",
headers={"Authorization": f"Bearer {self._key}"}
)
if resp.status != 200:
self.log.error(f"Key rotation failed: {resp.status}")
return
data = await resp.json()
new_key = data["key"]
old_key = self._key
# Persist new key to secret store
await self.secret_store.put_secret("agents/my-agent", {
"api_key": new_key,
"api_key_old": old_key,
"rotated_at": time.time()
})
self._key = new_key
self._key_expires_at = None
self.log.info(f"API key rotated successfully. New key: ...{new_key[-4:]}")
8. SecretsManager Wrapper Class
In production agents, you need a unified interface that works across development (env vars), staging (HashiCorp Vault), and production (AWS Secrets Manager). The wrapper pattern provides this abstraction.
import os, logging
from enum import Enum
from typing import Any, Optional
class SecretBackend(Enum):
ENV = "env"
VAULT = "vault"
AWS = "aws"
class SecretsManager:
"""
Unified secrets manager. Backend is selected by SECRETS_BACKEND env var.
Falls back to env vars in development.
"""
def __init__(self):
self.log = logging.getLogger("secrets")
backend_name = os.environ.get("SECRETS_BACKEND", "env").lower()
self.backend = SecretBackend(backend_name)
self._impl = self._init_backend()
self.log.info(f"Secrets backend: {self.backend.value}")
def _init_backend(self):
if self.backend == SecretBackend.ENV:
return EnvSecretsBackend()
elif self.backend == SecretBackend.VAULT:
return VaultSecretsBackend(
vault_addr=os.environ["VAULT_ADDR"],
role_id=os.environ["VAULT_ROLE_ID"],
secret_id=os.environ["VAULT_SECRET_ID"],
)
elif self.backend == SecretBackend.AWS:
return AWSSecretsBackend(
region=os.environ.get("AWS_REGION", "us-east-1")
)
async def get(self, key: str, namespace: str = "default") -> Optional[str]:
"""Get a single secret value by key and namespace."""
try:
return await self._impl.get(key, namespace)
except Exception as e:
self.log.error(f"Failed to get secret '{key}' from {self.backend.value}: {e}")
raise
async def set(self, key: str, value: str, namespace: str = "default"):
"""Store a secret value."""
await self._impl.set(key, value, namespace)
async def rotate(self, key: str, namespace: str = "default"):
"""Trigger rotation for a secret."""
await self._impl.rotate(key, namespace)
class EnvSecretsBackend:
async def get(self, key: str, namespace: str) -> Optional[str]:
env_key = f"{namespace.upper()}_{key.upper()}".replace("-", "_")
val = os.environ.get(env_key) or os.environ.get(key.upper())
return val
async def set(self, key: str, value: str, namespace: str):
# Cannot set env vars persistently; log warning
logging.getLogger("secrets").warning(
f"EnvSecretsBackend.set() called for '{key}' — env vars are not persistent"
)
async def rotate(self, key: str, namespace: str):
raise NotImplementedError("Use vault or AWS backend for rotation")
# In your agent:
# secrets = SecretsManager()
# api_key = await secrets.get("API_KEY", namespace="purpleflea")
# private_key = await secrets.get("WALLET_PRIVATE_KEY", namespace="wallet")
Secure your agent with Purple Flea
Purple Flea provides API keys, wallet infrastructure, escrow, and a full financial stack for autonomous agents. Start with the free faucet — no risk, no commitment.
Get Started Free