1. Why Agents Need Governance
Autonomous AI agents executing financial transactions without oversight create risks that compound rapidly. A misconfigured trading agent can blow up a portfolio in minutes. A payment agent with no spending cap can drain a wallet before any human notices. Governance is the set of structures, policies, and mechanisms that keep autonomous agents accountable — even when no human is watching.
Governance for AI financial agents rests on three pillars:
Without governance, autonomous financial agents are effectively operating in regulatory grey zones. More importantly, they're operating without the safety nets that even traditional automated trading systems require. MiCA, SEC guidance on AI in financial services, and CFTC algorithmic trading rules all expect that humans can explain and justify the decisions their automated systems made — and that's only possible with governance in place.
The Principal-Agent Problem for AI
Traditional finance has long grappled with the principal-agent problem: when you delegate authority to an agent (a fund manager, a broker, an employee), their incentives may diverge from yours. AI agents introduce a new variant: the agent may not have misaligned incentives, but it can make systematic errors at machine speed. Governance frameworks manage this through:
- Principle-bound authorization — agents can only act within explicitly granted scopes
- Ceiling-based limits — hard caps on transaction size, frequency, and aggregate exposure
- Circuit breakers — automatic stops triggered by anomalous behavior patterns
- Escrow-based commitment — funds are locked and conditionally released rather than transferred directly
What Governance Covers
A complete governance framework addresses the entire agent lifecycle:
| Phase | Governance Concern | Mechanism |
|---|---|---|
| Deployment | Identity & authorization scope | Agent registry, capability grants |
| Operation | Transaction approval | Policy engine, multi-sig, spending limits |
| Monitoring | Real-time anomaly detection | Circuit breakers, rate monitors |
| Audit | Tamper-resistant log retention | On-chain events, WORM storage |
| Incident | Containment & recovery | Kill switches, override protocols |
| Upgrade | Controlled policy changes | Timelock, multi-sig governance |
2. Multi-Sig Approval for Large Transactions
Multi-signature (multi-sig) approval is the first line of defense for large transactions. Rather than allowing a single agent or private key to authorize high-value transfers, multi-sig requires a threshold of approvers — typically M-of-N — before a transaction can execute.
For AI agent systems, multi-sig takes two forms: cryptographic multi-sig (on-chain, using ECDSA or EdDSA signatures from multiple wallets) and application-level multi-sig (off-chain, where multiple agents or humans must approve before the transaction is submitted).
Threshold Tiers
A practical multi-sig governance model uses tiered thresholds based on transaction value:
| Tier | Value Range | Required Signers | Time Limit |
|---|---|---|---|
| Micro | < 10 USDC | Agent only (1-of-1) | Instant |
| Standard | 10 – 500 USDC | Agent + policy check (1-of-1 + rules) | Instant |
| Large | 500 – 5,000 USDC | Agent + supervisor (2-of-2) | 5 minutes |
| Major | 5,000 – 50,000 USDC | Agent + 2 humans (3-of-3) | 1 hour |
| Critical | > 50,000 USDC | Multi-sig council (4-of-6) | 24 hours + timelock |
import asyncio
import hashlib
import time
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional, List, Dict
class ApprovalStatus(Enum):
PENDING = "pending"
APPROVED = "approved"
REJECTED = "rejected"
EXPIRED = "expired"
@dataclass
class MultiSigRequest:
request_id: str
initiator: str
amount_usdc: float
destination: str
purpose: str
required_signers: List[str]
signatures: Dict[str, str] = field(default_factory=dict)
created_at: float = field(default_factory=time.time)
expires_at: float = 0
status: ApprovalStatus = ApprovalStatus.PENDING
def is_expired(self) -> bool:
return time.time() > self.expires_at
def is_approved(self, threshold: int) -> bool:
return len(self.signatures) >= threshold
def digest(self) -> str:
# Deterministic hash for signing
payload = f"{self.request_id}:{self.amount_usdc}:{self.destination}"
return hashlib.sha256(payload.encode()).hexdigest()
class MultiSigCoordinator:
"""Coordinates multi-signature approval for high-value transactions."""
TIER_CONFIG = {
"micro": {"max_usdc": 10, "threshold": 1, "timeout_s": 0},
"standard": {"max_usdc": 500, "threshold": 1, "timeout_s": 0},
"large": {"max_usdc": 5000, "threshold": 2, "timeout_s": 300},
"major": {"max_usdc": 50000, "threshold": 3, "timeout_s": 3600},
"critical": {"max_usdc": float("inf"), "threshold": 4, "timeout_s": 86400},
}
def __init__(self):
self.pending_requests: Dict[str, MultiSigRequest] = {}
self.authorized_signers: Dict[str, str] = {} # signer_id -> public_key
def get_tier(self, amount_usdc: float) -> str:
if amount_usdc < 10: return "micro"
elif amount_usdc < 500: return "standard"
elif amount_usdc < 5000: return "large"
elif amount_usdc < 50000: return "major"
else: return "critical"
async def request_approval(
self,
initiator: str,
amount_usdc: float,
destination: str,
purpose: str,
signers: List[str],
) -> MultiSigRequest:
tier = self.get_tier(amount_usdc)
config = self.TIER_CONFIG[tier]
import uuid
req = MultiSigRequest(
request_id=str(uuid.uuid4()),
initiator=initiator,
amount_usdc=amount_usdc,
destination=destination,
purpose=purpose,
required_signers=signers,
expires_at=time.time() + config["timeout_s"] if config["timeout_s"] > 0 else float("inf"),
)
self.pending_requests[req.request_id] = req
if config["threshold"] == 1:
# Auto-approve single-signer tiers after policy check
req.signatures[initiator] = req.digest()
req.status = ApprovalStatus.APPROVED
return req
def add_signature(self, request_id: str, signer_id: str, signature: str) -> bool:
req = self.pending_requests.get(request_id)
if not req or req.is_expired():
if req: req.status = ApprovalStatus.EXPIRED
return False
if signer_id not in req.required_signers:
raise ValueError(f"Signer {signer_id} not authorized for this request")
req.signatures[signer_id] = signature
tier = self.get_tier(req.amount_usdc)
threshold = self.TIER_CONFIG[tier]["threshold"]
if req.is_approved(threshold):
req.status = ApprovalStatus.APPROVED
return True
3. Policy Engines: Rules That Run Before Every Transaction
While multi-sig handles large-value approval, policy engines enforce continuous rule-based governance on every transaction, regardless of size. A policy engine evaluates a proposed transaction against a set of rules and returns a decision: allow, deny, or flag for review.
Policy engines are the workhorses of agent governance. They run synchronously before any transaction is submitted to a blockchain or payment system, providing a final checkpoint that can catch errors, fraud patterns, and policy violations before they become irreversible.
Spending Limits
Spending limits operate across multiple time windows simultaneously. An agent might be allowed to spend up to 50 USDC per transaction, 200 USDC per hour, and 1,000 USDC per day. All three limits must pass for a transaction to proceed.
Whitelist / Blacklist
Address-based filtering controls which counterparties an agent can interact with. Whitelists specify only allowed destinations; blacklists specify prohibited ones. For regulated agents, blacklists often incorporate OFAC sanction lists, known scam addresses, and internally flagged counterparties.
Time-Based Restrictions
Time-based policies limit when an agent can act. A trading agent might only be allowed to place orders during market hours. A payroll agent might only be able to run disbursements on specific days. Time restrictions add a predictability layer that makes anomaly detection significantly easier.
4. The AgentGovernancePolicy Class
The following Python class encapsulates a complete governance policy for a single agent. It combines spending limit tracking, address filtering, time-based rules, and explicit approve/veto methods into a unified interface.
import time
import logging
from collections import defaultdict
from dataclasses import dataclass, field
from typing import Optional, List, Dict, Tuple
from enum import Enum
logger = logging.getLogger("agent_governance")
class PolicyDecision(Enum):
ALLOW = "allow"
DENY = "deny"
REVIEW = "review" # Flag for human review but don't block
@dataclass
class TransactionRequest:
tx_id: str
agent_id: str
amount_usdc: float
destination_address: str
purpose: str
token: str = "USDC"
chain: str = "ethereum"
metadata: Dict = field(default_factory=dict)
timestamp: float = field(default_factory=time.time)
@dataclass
class PolicyResult:
decision: PolicyDecision
reason: str
rule_triggered: Optional[str] = None
requires_multisig: bool = False
multisig_threshold: int = 1
metadata: Dict = field(default_factory=dict)
class AgentGovernancePolicy:
"""
Comprehensive governance policy for autonomous AI financial agents.
Enforces:
- Per-transaction spending limits
- Rolling window spending limits (hourly, daily, weekly)
- Address whitelist / blacklist
- Time-of-day and day-of-week restrictions
- Velocity checks (max transactions per window)
- Explicit veto capabilities
Usage:
policy = AgentGovernancePolicy(
agent_id="trading-agent-001",
max_tx_usdc=500,
daily_limit_usdc=5000,
allowed_destinations=["0xEscrowContract..."],
)
result = policy.approve_transaction(tx_request)
"""
def __init__(
self,
agent_id: str,
# Spending limits
max_tx_usdc: float = 100.0,
hourly_limit_usdc: float = 500.0,
daily_limit_usdc: float = 2000.0,
weekly_limit_usdc: float = 10000.0,
# Transaction velocity
max_tx_per_hour: int = 20,
max_tx_per_day: int = 100,
# Multi-sig thresholds
multisig_threshold_usdc: float = 500.0,
# Address controls
allowed_destinations: Optional[List[str]] = None,
blocked_destinations: Optional[List[str]] = None,
# Time controls (0-23, UTC)
allowed_hours: Optional[List[int]] = None,
allowed_weekdays: Optional[List[int]] = None, # 0=Monday
# Override
emergency_pause: bool = False,
):
self.agent_id = agent_id
self.max_tx_usdc = max_tx_usdc
self.hourly_limit_usdc = hourly_limit_usdc
self.daily_limit_usdc = daily_limit_usdc
self.weekly_limit_usdc = weekly_limit_usdc
self.max_tx_per_hour = max_tx_per_hour
self.max_tx_per_day = max_tx_per_day
self.multisig_threshold_usdc = multisig_threshold_usdc
self.allowed_destinations = set(allowed_destinations or [])
self.blocked_destinations = set(blocked_destinations or [])
self.allowed_hours = set(allowed_hours) if allowed_hours else None
self.allowed_weekdays = set(allowed_weekdays) if allowed_weekdays else None
self.emergency_pause = emergency_pause
# Rolling window ledger: list of (timestamp, amount_usdc)
self._tx_log: List[Tuple[float, float]] = []
# Manual vetoes indexed by tx_id
self._vetoes: Dict[str, str] = {}
# -------------------------------------------------------------------------
# Public API
# -------------------------------------------------------------------------
def approve_transaction(self, tx: TransactionRequest) -> PolicyResult:
"""
Evaluate a transaction request against all governance rules.
Returns a PolicyResult with decision, reason, and any flags.
"""
# Manual veto check first
if tx.tx_id in self._vetoes:
return PolicyResult(
decision=PolicyDecision.DENY,
reason=f"Transaction explicitly vetoed: {self._vetoes[tx.tx_id]}",
rule_triggered="manual_veto",
)
# Emergency pause
if self.emergency_pause:
return PolicyResult(
decision=PolicyDecision.DENY,
reason="Agent is in emergency pause state",
rule_triggered="emergency_pause",
)
# Run each governance check
checks = [
self._check_per_tx_limit(tx),
self._check_rolling_limits(tx),
self._check_velocity(tx),
self._check_address_rules(tx),
self._check_time_rules(tx),
]
for result in checks:
if result.decision == PolicyDecision.DENY:
logger.warning(f"[{self.agent_id}] TX {tx.tx_id} DENIED: {result.reason}")
return result
# Determine if multi-sig is required
requires_multisig = tx.amount_usdc >= self.multisig_threshold_usdc
threshold = 2 if tx.amount_usdc >= self.multisig_threshold_usdc * 10 else (2 if requires_multisig else 1)
# All checks passed — record and allow
self._record_transaction(tx)
logger.info(f"[{self.agent_id}] TX {tx.tx_id} APPROVED: {tx.amount_usdc} USDC to {tx.destination_address}")
return PolicyResult(
decision=PolicyDecision.ALLOW,
reason="All governance checks passed",
requires_multisig=requires_multisig,
multisig_threshold=threshold,
)
def veto_transaction(self, tx_id: str, reason: str, vetoed_by: str) -> bool:
"""
Explicitly veto a specific transaction by ID. Once vetoed,
any approval attempt for this tx_id will be denied regardless
of policy state. Returns True if veto was recorded.
"""
self._vetoes[tx_id] = f"{reason} (by {vetoed_by} at {time.time():.0f})"
logger.warning(f"[{self.agent_id}] VETO recorded for TX {tx_id}: {reason}")
return True
def pause_agent(self, reason: str = "Manual emergency pause"):
"""Immediately halt all future transaction approvals."""
self.emergency_pause = True
logger.critical(f"[{self.agent_id}] EMERGENCY PAUSE activated: {reason}")
def resume_agent(self, authorized_by: str):
"""Resume normal operation after pause. Requires explicit authorization."""
self.emergency_pause = False
logger.info(f"[{self.agent_id}] Agent resumed by {authorized_by}")
def get_spend_summary(self) -> Dict:
"""Return current rolling window spend totals."""
now = time.time()
return {
"hourly_spent": self._sum_window(now - 3600),
"daily_spent": self._sum_window(now - 86400),
"weekly_spent": self._sum_window(now - 604800),
"hourly_limit": self.hourly_limit_usdc,
"daily_limit": self.daily_limit_usdc,
"weekly_limit": self.weekly_limit_usdc,
"tx_count_today": self._count_window(now - 86400),
}
# -------------------------------------------------------------------------
# Private checks
# -------------------------------------------------------------------------
def _check_per_tx_limit(self, tx: TransactionRequest) -> PolicyResult:
if tx.amount_usdc > self.max_tx_usdc:
return PolicyResult(
decision=PolicyDecision.DENY,
reason=f"TX amount {tx.amount_usdc} USDC exceeds per-transaction limit of {self.max_tx_usdc}",
rule_triggered="per_tx_limit",
)
return PolicyResult(decision=PolicyDecision.ALLOW, reason="ok")
def _check_rolling_limits(self, tx: TransactionRequest) -> PolicyResult:
now = tx.timestamp
hourly = self._sum_window(now - 3600) + tx.amount_usdc
daily = self._sum_window(now - 86400) + tx.amount_usdc
weekly = self._sum_window(now - 604800) + tx.amount_usdc
if hourly > self.hourly_limit_usdc:
return PolicyResult(
decision=PolicyDecision.DENY,
reason=f"Would exceed hourly limit: {hourly:.2f}/{self.hourly_limit_usdc} USDC",
rule_triggered="hourly_limit",
)
if daily > self.daily_limit_usdc:
return PolicyResult(
decision=PolicyDecision.DENY,
reason=f"Would exceed daily limit: {daily:.2f}/{self.daily_limit_usdc} USDC",
rule_triggered="daily_limit",
)
if weekly > self.weekly_limit_usdc:
return PolicyResult(
decision=PolicyDecision.DENY,
reason=f"Would exceed weekly limit: {weekly:.2f}/{self.weekly_limit_usdc} USDC",
rule_triggered="weekly_limit",
)
return PolicyResult(decision=PolicyDecision.ALLOW, reason="ok")
def _check_velocity(self, tx: TransactionRequest) -> PolicyResult:
now = tx.timestamp
tx_last_hour = self._count_window(now - 3600)
tx_today = self._count_window(now - 86400)
if tx_last_hour >= self.max_tx_per_hour:
return PolicyResult(
decision=PolicyDecision.DENY,
reason=f"Velocity limit: {tx_last_hour} tx in last hour (max {self.max_tx_per_hour})",
rule_triggered="velocity_hourly",
)
if tx_today >= self.max_tx_per_day:
return PolicyResult(
decision=PolicyDecision.DENY,
reason=f"Velocity limit: {tx_today} tx today (max {self.max_tx_per_day})",
rule_triggered="velocity_daily",
)
return PolicyResult(decision=PolicyDecision.ALLOW, reason="ok")
def _check_address_rules(self, tx: TransactionRequest) -> PolicyResult:
dest = tx.destination_address.lower()
if dest in {d.lower() for d in self.blocked_destinations}:
return PolicyResult(
decision=PolicyDecision.DENY,
reason=f"Destination {tx.destination_address} is blacklisted",
rule_triggered="address_blacklist",
)
if self.allowed_destinations and dest not in {d.lower() for d in self.allowed_destinations}:
return PolicyResult(
decision=PolicyDecision.DENY,
reason=f"Destination {tx.destination_address} not in whitelist",
rule_triggered="address_whitelist",
)
return PolicyResult(decision=PolicyDecision.ALLOW, reason="ok")
def _check_time_rules(self, tx: TransactionRequest) -> PolicyResult:
import datetime
dt = datetime.datetime.utcfromtimestamp(tx.timestamp)
if self.allowed_hours and dt.hour not in self.allowed_hours:
return PolicyResult(
decision=PolicyDecision.DENY,
reason=f"Transaction outside allowed hours (hour={dt.hour} UTC)",
rule_triggered="time_restriction_hour",
)
if self.allowed_weekdays and dt.weekday() not in self.allowed_weekdays:
return PolicyResult(
decision=PolicyDecision.DENY,
reason=f"Transaction outside allowed days (weekday={dt.weekday()})",
rule_triggered="time_restriction_day",
)
return PolicyResult(decision=PolicyDecision.ALLOW, reason="ok")
def _record_transaction(self, tx: TransactionRequest):
self._tx_log.append((tx.timestamp, tx.amount_usdc))
# Prune entries older than 1 week
cutoff = time.time() - 604800
self._tx_log = [(t, a) for t, a in self._tx_log if t > cutoff]
def _sum_window(self, since_ts: float) -> float:
return sum(a for t, a in self._tx_log if t >= since_ts)
def _count_window(self, since_ts: float) -> int:
return sum(1 for t, _ in self._tx_log if t >= since_ts)
The approve_transaction() method is the main entry point. It runs all checks in sequence and returns on the first DENY verdict, making it fast for the common case (most transactions pass) and informative for debugging failures. The veto_transaction() method provides an escape hatch that can be called by any authorized principal — human or another governance agent — to permanently block a specific transaction ID before it executes.
Usage Example
# Configure governance for a trading agent
policy = AgentGovernancePolicy(
agent_id="trader-alpha-001",
max_tx_usdc=250,
hourly_limit_usdc=1000,
daily_limit_usdc=5000,
multisig_threshold_usdc=500,
allowed_hours=list(range(9, 22)), # 9am-10pm UTC
allowed_weekdays=[0, 1, 2, 3, 4], # Mon-Fri
blocked_destinations=["0xBadActorAddress..."],
)
tx = TransactionRequest(
tx_id="pf_tx_20260307_001",
agent_id="trader-alpha-001",
amount_usdc=100,
destination_address="0xEscrowContractAddress...",
purpose="Arbitrage profit settlement",
)
result = policy.approve_transaction(tx)
if result.decision == PolicyDecision.ALLOW:
if result.requires_multisig:
# Route to multi-sig coordinator
await coordinator.request_approval(...)
else:
# Execute directly
await submit_transaction(tx)
else:
print(f"Blocked: {result.reason}")
5. Audit Trails: On-Chain vs. Off-Chain Storage
Governance without an audit trail is unverifiable. Every transaction approval, denial, veto, policy change, and override must be recorded in a form that cannot be tampered with after the fact. The choice between on-chain and off-chain audit storage involves a classic tradeoff: cost vs. trustlessness.
On-Chain Audit Events
On-chain audit storage emits structured events to a blockchain. These events are immutable, timestamped by block time, and globally verifiable. The cost is gas per event — manageable for high-value transactions, but prohibitive for micro-transactions at high frequency.
// SPDX-License-Identifier: MIT
pragma solidity ^0.8.24;
contract AgentAuditLog {
event TransactionApproved(
bytes32 indexed agentId,
bytes32 indexed txId,
uint256 amountUsdc,
address destination,
uint256 timestamp
);
event TransactionDenied(
bytes32 indexed agentId,
bytes32 indexed txId,
string reason,
uint256 timestamp
);
event PolicyUpdated(
bytes32 indexed agentId,
string field,
string oldValue,
string newValue,
address updatedBy,
uint256 timestamp
);
event AgentPaused(bytes32 indexed agentId, address pausedBy, uint256 timestamp);
event AgentResumed(bytes32 indexed agentId, address resumedBy, uint256 timestamp);
// Only authorized governance contracts can emit
mapping(address => bool) public authorizedEmitters;
address public owner;
constructor() { owner = msg.sender; }
modifier onlyAuthorized() {
require(authorizedEmitters[msg.sender] || msg.sender == owner, "Not authorized");
_;
}
function logApproval(
bytes32 agentId, bytes32 txId, uint256 amount, address dest
) external onlyAuthorized {
emit TransactionApproved(agentId, txId, amount, dest, block.timestamp);
}
function logDenial(
bytes32 agentId, bytes32 txId, string calldata reason
) external onlyAuthorized {
emit TransactionDenied(agentId, txId, reason, block.timestamp);
}
}
Off-Chain Audit Storage
For high-frequency agents, off-chain storage with cryptographic integrity is more practical. A common pattern: write structured logs to an append-only database (or WORM storage like AWS S3 Object Lock), and periodically anchor a Merkle root of those logs on-chain.
import json
import hashlib
import time
from typing import List, Dict
class AuditLogger:
"""Off-chain audit log with Merkle anchoring capability."""
def __init__(self, agent_id: str, log_file: str):
self.agent_id = agent_id
self.log_file = log_file
self.entries: List[Dict] = []
self._load_existing()
def log(self, event_type: str, tx_id: str, **kwargs):
entry = {
"agent_id": self.agent_id,
"event_type": event_type,
"tx_id": tx_id,
"timestamp": time.time(),
"timestamp_iso": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
**kwargs,
}
# Each entry includes hash of previous entry (chain)
prev_hash = self.entries[-1]["entry_hash"] if self.entries else "genesis"
entry["prev_hash"] = prev_hash
entry["entry_hash"] = hashlib.sha256(
json.dumps(entry, sort_keys=True).encode()
).hexdigest()
self.entries.append(entry)
self._persist(entry)
def compute_merkle_root(self) -> str:
"""Compute Merkle root of all entry hashes for on-chain anchoring."""
hashes = [e["entry_hash"].encode() for e in self.entries]
while len(hashes) > 1:
if len(hashes) % 2: hashes.append(hashes[-1])
hashes = [
hashlib.sha256(hashes[i] + hashes[i+1]).hexdigest().encode()
for i in range(0, len(hashes), 2)
]
return hashes[0].decode() if hashes else ""
def verify_integrity(self) -> bool:
"""Verify the chain has not been tampered with."""
for i, entry in enumerate(self.entries):
stored_hash = entry.pop("entry_hash")
computed = hashlib.sha256(json.dumps(entry, sort_keys=True).encode()).hexdigest()
entry["entry_hash"] = stored_hash
if stored_hash != computed:
return False # Tampered!
return True
def _persist(self, entry: Dict):
with open(self.log_file, "a") as f:
f.write(json.dumps(entry) + "\n")
def _load_existing(self):
try:
with open(self.log_file) as f:
self.entries = [json.loads(l) for l in f if l.strip()]
except FileNotFoundError:
pass
6. Human-in-the-Loop Override Mechanisms
Even the most sophisticated autonomous agent system needs a human override path. The question is not whether to include human override, but how to make it fast enough to be useful without making it so easy that it undermines autonomy.
Override Architecture
A practical override architecture has three layers:
- Soft override — A human flags a pending transaction for additional review. The transaction is held, not cancelled. The agent continues with other tasks while waiting for a decision.
- Hard veto — A human explicitly blocks a specific transaction or class of transactions. The veto is recorded in the audit log. The agent receives a structured rejection and can surface it to its principal.
- Emergency pause — A human (or automated monitor) halts all agent transactions immediately. No transactions are approved until the pause is explicitly lifted by an authorized party.
import asyncio
from typing import Optional, Callable
class HumanOverrideInterface:
"""
Async interface for human override of pending agent transactions.
In production, this connects to a dashboard webhook, Slack bot,
or PagerDuty-style alerting system.
"""
def __init__(
self,
policy: AgentGovernancePolicy,
audit_log: AuditLogger,
review_timeout_s: int = 300, # 5 minutes to review
alert_callback: Optional[Callable] = None,
):
self.policy = policy
self.audit_log = audit_log
self.review_timeout_s = review_timeout_s
self._alert_callback = alert_callback
self._pending_review: dict = {} # tx_id -> asyncio.Event
self._review_decisions: dict = {}
async def request_human_review(
self, tx: TransactionRequest, reason: str
) -> PolicyResult:
"""
Hold a transaction pending human approval.
Returns after human decision or timeout.
"""
event = asyncio.Event()
self._pending_review[tx.tx_id] = event
self.audit_log.log("human_review_requested", tx.tx_id, reason=reason)
# Notify human reviewers
if self._alert_callback:
await self._alert_callback({
"type": "review_needed",
"tx_id": tx.tx_id,
"amount_usdc": tx.amount_usdc,
"destination": tx.destination_address,
"reason": reason,
})
try:
await asyncio.wait_for(event.wait(), timeout=self.review_timeout_s)
except asyncio.TimeoutError:
# Default deny on timeout — conservative governance
self.audit_log.log("review_timeout", tx.tx_id)
return PolicyResult(decision=PolicyDecision.DENY, reason="Human review timed out")
decision = self._review_decisions.pop(tx.tx_id, PolicyDecision.DENY)
return PolicyResult(decision=decision, reason="Human decision")
def human_approve(self, tx_id: str, approved_by: str):
"""Called by the human review dashboard to approve a pending tx."""
self._review_decisions[tx_id] = PolicyDecision.ALLOW
self.audit_log.log("human_approved", tx_id, approved_by=approved_by)
if tx_id in self._pending_review:
self._pending_review[tx_id].set()
def human_reject(self, tx_id: str, rejected_by: str, reason: str):
"""Called by the human review dashboard to reject a pending tx."""
self._review_decisions[tx_id] = PolicyDecision.DENY
self.policy.veto_transaction(tx_id, reason, rejected_by)
self.audit_log.log("human_rejected", tx_id, rejected_by=rejected_by, reason=reason)
if tx_id in self._pending_review:
self._pending_review[tx_id].set()
Circuit Breakers
Circuit breakers are automated human-in-the-loop triggers. They monitor agent behavior in real time and automatically pause or escalate when anomalies are detected — without requiring a human to be watching continuously.
class CircuitBreaker:
"""Automatically pauses agent on anomalous patterns."""
def __init__(self, policy: AgentGovernancePolicy):
self.policy = policy
self._denial_count = 0
self._denial_window_start = time.time()
self.max_denials_per_minute = 10
def record_denial(self):
now = time.time()
if now - self._denial_window_start > 60:
self._denial_count = 0
self._denial_window_start = now
self._denial_count += 1
if self._denial_count >= self.max_denials_per_minute:
self.policy.pause_agent(f"Circuit breaker: {self._denial_count} denials in <60s")
7. Integration with Purple Flea Escrow for Governed Payments
The Purple Flea escrow service provides a natural integration point for agent governance. Rather than an agent making direct on-chain transfers — which are irreversible — governed payments route through escrow, creating a time window for review and cancellation.
The escrow workflow adds a reversibility layer to agent payments:
- Agent passes governance checks and gets approval to pay 200 USDC to counterparty
- Instead of direct transfer, agent deposits to Purple Flea escrow with a 24-hour release delay
- Governance monitors can flag the escrow for additional review
- If no objection, funds auto-release after the delay
- If flagged, a human can call the dispute function before release
import httpx
ESCROW_API = "https://escrow.purpleflea.com"
async def make_governed_payment(
agent_api_key: str, # pf_live_... format
amount_usdc: float,
recipient_agent_id: str,
purpose: str,
release_delay_hours: int = 24,
policy: AgentGovernancePolicy = None,
) -> dict:
"""
Make a payment through Purple Flea escrow with governance checks.
The escrow holds funds for release_delay_hours before auto-releasing,
giving governance monitors time to flag or cancel.
"""
if policy:
tx = TransactionRequest(
tx_id=f"escrow_{int(time.time())}",
agent_id="self",
amount_usdc=amount_usdc,
destination_address=recipient_agent_id,
purpose=purpose,
)
result = policy.approve_transaction(tx)
if result.decision != PolicyDecision.ALLOW:
raise PermissionError(f"Governance denied: {result.reason}")
async with httpx.AsyncClient() as client:
resp = await client.post(
f"{ESCROW_API}/create",
headers={"Authorization": f"Bearer {agent_api_key}"},
json={
"recipient": recipient_agent_id,
"amount_usdc": amount_usdc,
"purpose": purpose,
"release_delay_hours": release_delay_hours,
"governance_metadata": {
"policy_version": "1.0",
"approved_at": time.time(),
},
},
)
resp.raise_for_status()
return resp.json()
8. DAO-Style Governance for Multi-Agent Cooperatives
When multiple agents pool resources or collaborate on tasks, governance becomes a collective problem. DAO-style governance — where policy changes require proposals, voting, and time-locked execution — scales naturally to multi-agent systems.
Governance Tokens for Agents
In a multi-agent cooperative, each agent holds governance tokens proportional to its contribution (capital, reputation, or stake). When an agent wants to change a shared policy — say, increase the daily spending limit — it submits a proposal that other agents vote on.
from dataclasses import dataclass, field
from enum import Enum
from typing import Dict, List, Optional
class ProposalState(Enum):
ACTIVE = "active"
PASSED = "passed"
REJECTED = "rejected"
QUEUED = "queued" # In timelock
EXECUTED = "executed"
@dataclass
class GovernanceProposal:
proposal_id: str
proposer: str
title: str
description: str
policy_changes: Dict # e.g., {"daily_limit_usdc": 10000}
votes_for: Dict[str, float] = field(default_factory=dict)
votes_against: Dict[str, float] = field(default_factory=dict)
voting_deadline: float = 0
execution_eta: Optional[float] = None
state: ProposalState = ProposalState.ACTIVE
@property
def total_for(self) -> float:
return sum(self.votes_for.values())
@property
def total_against(self) -> float:
return sum(self.votes_against.values())
@property
def quorum_reached(self, quorum_pct: float = 0.4) -> bool:
total = self.total_for + self.total_against
return total >= quorum_pct # Simplified; real impl uses token supply
class MultiAgentGovernor:
"""DAO-style governance for multi-agent cooperatives."""
def __init__(
self,
voting_period_s: int = 86400, # 24h voting
timelock_s: int = 43200, # 12h execution delay
quorum_pct: float = 0.4,
):
self.voting_period_s = voting_period_s
self.timelock_s = timelock_s
self.quorum_pct = quorum_pct
self.proposals: Dict[str, GovernanceProposal] = {}
self.agent_stakes: Dict[str, float] = {} # agent_id -> stake
def register_agent(self, agent_id: str, stake: float):
self.agent_stakes[agent_id] = stake
def submit_proposal(
self,
proposer: str,
title: str,
description: str,
policy_changes: Dict,
) -> GovernanceProposal:
import uuid
proposal = GovernanceProposal(
proposal_id=str(uuid.uuid4()),
proposer=proposer,
title=title,
description=description,
policy_changes=policy_changes,
voting_deadline=time.time() + self.voting_period_s,
)
self.proposals[proposal.proposal_id] = proposal
return proposal
def cast_vote(self, proposal_id: str, voter: str, support: bool) -> bool:
proposal = self.proposals.get(proposal_id)
if not proposal or proposal.state != ProposalState.ACTIVE:
return False
if time.time() > proposal.voting_deadline:
self._finalize(proposal)
return False
vote_weight = self.agent_stakes.get(voter, 0)
if support:
proposal.votes_for[voter] = vote_weight
else:
proposal.votes_against[voter] = vote_weight
return True
def _finalize(self, proposal: GovernanceProposal):
total_stake = sum(self.agent_stakes.values())
participation = (proposal.total_for + proposal.total_against) / total_stake
if participation < self.quorum_pct:
proposal.state = ProposalState.REJECTED
elif proposal.total_for > proposal.total_against:
proposal.state = ProposalState.QUEUED
proposal.execution_eta = time.time() + self.timelock_s
else:
proposal.state = ProposalState.REJECTED
def try_execute(
self, proposal_id: str, target_policy: AgentGovernancePolicy
) -> bool:
proposal = self.proposals.get(proposal_id)
if not proposal or proposal.state != ProposalState.QUEUED:
return False
if time.time() < proposal.execution_eta:
return False # Still in timelock
for field, value in proposal.policy_changes.items():
if hasattr(target_policy, field):
setattr(target_policy, field, value)
proposal.state = ProposalState.EXECUTED
return True
Practical Multi-Agent Governance Patterns
9. Incident Response When Governance Is Breached
No governance system is perfect. When a breach occurs — whether from an exploit, a misconfigured policy, a compromised agent key, or an unexpected interaction between systems — a pre-defined incident response plan determines how quickly damage can be contained.
Breach Classification
| Severity | Example | Response Time | First Action |
|---|---|---|---|
| P1 — Critical | Agent key compromised, funds draining | < 5 minutes | Emergency pause all agents, rotate keys |
| P2 — High | Policy bypass detected, unexpected spend | < 15 minutes | Pause affected agent, freeze escrows |
| P3 — Medium | Anomalous transaction pattern, not yet costly | < 1 hour | Flag for review, increase monitoring |
| P4 — Low | Policy audit finding, theoretical vulnerability | < 24 hours | Scheduled patch, document in changelog |
Automated Incident Response
import asyncio
from typing import List
class IncidentResponder:
"""Automated incident response for governance breaches."""
def __init__(self, policies: List[AgentGovernancePolicy], audit_log: AuditLogger):
self.policies = policies
self.audit_log = audit_log
self._incident_count = 0
async def handle_p1_breach(self, agent_id: str, description: str):
"""Critical: pause everything immediately."""
for policy in self.policies:
policy.pause_agent(f"P1 incident response: {description}")
self._incident_count += 1
self.audit_log.log(
"incident_p1", agent_id,
description=description,
incident_number=self._incident_count,
)
# Post to escrow dispute endpoint to freeze pending payments
async with httpx.AsyncClient() as client:
await client.post(
"https://escrow.purpleflea.com/freeze-all",
json={"agent_id": agent_id, "incident": self._incident_count},
)
async def post_mortem(self, incident_id: int) -> dict:
"""Generate post-mortem report from audit logs."""
entries = [
e for e in self.audit_log.entries
if e.get("incident_number") == incident_id or
e["timestamp"] > (self.audit_log.entries[-1]["timestamp"] - 3600)
]
denied = [e for e in entries if e["event_type"] == "tx_denied"]
approved = [e for e in entries if e["event_type"] == "tx_approved"]
return {
"incident_id": incident_id,
"total_events": len(entries),
"approved_during_window": len(approved),
"denied_during_window": len(denied),
"merkle_root": self.audit_log.compute_merkle_root(),
}
Recovery Checklist
After containing an incident, a structured recovery process prevents recurrence:
- Containment — Pause all affected agents, freeze escrows, revoke compromised API keys
- Assessment — Pull and verify audit logs; determine scope of unauthorized transactions
- Communication — Notify affected counterparties, document incident in public changelog if material
- Root cause analysis — Identify the policy gap or exploit that allowed the breach
- Remediation — Patch the vulnerability, update policies, add new policy checks
- Restoration — Resume agents with enhanced monitoring, lower limits initially
- Post-mortem publication — Publish findings to build trust with the agent ecosystem
Build Governed Agents on Purple Flea
Use Purple Flea's escrow infrastructure to add reversibility and dispute resolution to your agent payment flows — with a built-in time window that doubles as a governance review period.