Signal Generation Pipelines for AI Trading Agents
Raw market data is noise. Signals are noise transformed into actionable predictions. This guide walks through building a production-grade signal generation pipeline — from data ingestion and feature engineering to ensemble combination and live order placement via the Purple Flea Trading API.
A signal is a quantitative prediction about future price movement with an associated strength, direction, and decay time. Good signals are forward-looking, statistically significant in out-of-sample testing, and combined intelligently with other signals. Bad signals are backtested into existence and forgotten as soon as they hit live markets. This guide teaches the difference — and shows how to build a pipeline that survives contact with reality.
1. Signal Types: Technical, Fundamental, Sentiment, On-Chain, Cross-Asset
Signals fall into five broad categories, each with different data sources, latencies, and decay characteristics. Robust trading agents combine signals from multiple categories to reduce dependence on any single source of alpha.
Derived from price, volume, and order flow data. Examples include momentum indicators (RSI, MACD), moving average crossovers, Bollinger Band breakouts, order book imbalance, and VWAP deviation. Technical signals are high-frequency — they can be computed in milliseconds and decay within minutes to hours. They are the most common signal type for crypto trading agents because the data is readily available and standardized.
Based on underlying asset value metrics: protocol revenue, active addresses, developer activity (GitHub commits), token emission schedules, and treasury balances. Fundamental signals change slowly and have decay times of days to weeks. They work best as filters or regime identifiers rather than direct trade triggers.
Aggregated from social media, news sources, and AI-powered NLP models. Crypto-specific sources include Twitter/X post volume and tone, Reddit mentions, Fear and Greed index, and funding rate sentiment. Sentiment signals are noisy but can capture crowd psychology ahead of price moves. Decay is typically 1 to 12 hours for event-driven sentiment.
Derived from blockchain transaction data: whale wallet flows, exchange inflows/outflows, large transfer clustering, miner activity, and DeFi protocol utilization. On-chain data is transparent and manipulation-resistant. Signal decay is 4 to 48 hours depending on the metric — whale accumulation patterns last longer than single large transactions.
Exploit correlations between related markets: BTC dominance shifts, BTC/ETH ratio, crypto vs equity beta, stablecoin dominance trends, and DeFi TVL changes. Cross-asset signals provide diversification benefit because they are often uncorrelated with pure technical signals on the same asset.
| Signal Type | Data Latency | Typical Decay | Best Use | Difficulty |
|---|---|---|---|---|
| Technical | Milliseconds | Minutes – Hours | Trade triggering | Low |
| Fundamental | Hours – Days | Days – Weeks | Regime filter | Medium |
| Sentiment | Seconds – Minutes | 1 – 12 Hours | Short-term momentum | Medium |
| On-Chain | Minutes – Hours | 4 – 48 Hours | Directional bias | High |
| Cross-Asset | Milliseconds | Hours – Days | Portfolio regime | Medium |
2. Signal Pipeline Architecture
A signal pipeline transforms raw market data into an actionable trade signal through a structured sequence of stages. Each stage has a single responsibility, making the pipeline testable, replaceable, and observable.
Stage Descriptions
- Raw Data: OHLCV bars, order book snapshots, funding rates, on-chain transactions, social feeds.
- Feature Engineering: Normalization, rolling statistics, derived ratios, cross-asset features.
- Signal Generation: Apply signal logic to features to produce raw directional predictions.
- Scoring: Assign each signal a strength score (typically −1 to +1) and confidence estimate.
- Filtering: Remove weak or stale signals below the quality threshold.
- Combination: Merge surviving signals via ensemble methods into a single composite signal.
- Trade Signal: Final directional output (long / short / flat) with size recommendation.
3. Signal Strength Scoring and Filtering Weak Signals
Not all signals are equal. Signal scoring assigns a quantitative quality measure to each generated signal, allowing the pipeline to discard low-quality inputs before they pollute the ensemble.
Information Coefficient (IC)
The Information Coefficient is the Spearman rank correlation between predicted signal direction and actual subsequent returns. An IC of 0.0 indicates no predictive power; 0.05–0.10 is considered good in equities; 0.10+ is excellent for crypto agents given shorter holding periods and greater noise.
import numpy as np from scipy import stats def information_coefficient(signals: np.ndarray, returns: np.ndarray) -> float: """ Compute Spearman rank IC between signals and forward returns. Args: signals: Array of signal values at time t returns: Array of realized returns at time t+h (same length) Returns: IC value in [-1, +1]. Values > 0.05 indicate useful predictive power. """ if len(signals) < 20: return 0.0 ic, _ = stats.spearmanr(signals, returns) return float(ic) def rolling_ic(signal_series: pd.Series, return_series: pd.Series, window: int = 60) -> pd.Series: """Rolling IC for temporal stability assessment.""" ics = [] for i in range(window, len(signal_series)): s = signal_series.iloc[i-window:i].values r = return_series.iloc[i-window:i].values ics.append(information_coefficient(s, r)) return pd.Series(ics, index=signal_series.index[window:])
Signal Quality Thresholds
| IC Range | Quality Grade | Action |
|---|---|---|
| > 0.10 | A — Excellent | Full weight in ensemble |
| 0.05 – 0.10 | B — Good | Standard weight in ensemble |
| 0.02 – 0.05 | C — Marginal | Half weight; monitor closely |
| 0 – 0.02 | D — Weak | Exclude from ensemble |
| < 0 | F — Inverted | Investigate or discard |
4. Python: SignalPipeline Class
The SignalPipeline class encapsulates the full seven-stage pipeline from raw feature input to
composite trade signal. It supports dynamic signal registration, configurable quality filters,
and multiple ensemble combination methods.
import numpy as np import pandas as pd from dataclasses import dataclass, field from typing import Callable, Dict, List, Optional, Tuple from scipy import stats from enum import Enum class SignalDirection(Enum): LONG = 1 FLAT = 0 SHORT = -1 @dataclass class RawSignal: """Output of a single signal generator.""" name: str value: float # raw signal value, any scale strength: float # normalised to [-1, +1] confidence: float # IC or model confidence [0, 1] generated_at: pd.Timestamp ttl_seconds: int = 3600 # time-to-live (freshness window) @property def is_fresh(self) -> bool: age = (pd.Timestamp.now() - self.generated_at).total_seconds() return age < self.ttl_seconds @dataclass class CompositeSignal: """Final pipeline output: composite trade signal.""" direction: SignalDirection strength: float # composite strength [-1, +1] contributing_signals: List[str] generated_at: pd.Timestamp raw_signals: List[RawSignal] = field(default_factory=list) class SignalPipeline: """ Modular signal generation pipeline for AI trading agents. Register signal generators, set quality filters, and call generate() to produce a composite trade signal from current market features. Example: pipeline = SignalPipeline(min_ic=0.04, min_signals=2) pipeline.register(rsi_signal, ttl=900, weight=1.0) pipeline.register(funding_signal, ttl=3600, weight=1.5) composite = pipeline.generate(features) """ def __init__(self, min_ic: float = 0.04, min_signals: int = 2, combination_method: str = "ic_weighted"): """ Args: min_ic: Minimum IC for a signal to pass quality filter min_signals: Minimum passing signals to produce output combination_method: One of 'equal', 'ic_weighted', 'confidence_weighted', 'rank_combination' """ self.min_ic = min_ic self.min_signals = min_signals self.combination_method = combination_method # Registry: {name: (generator_fn, ttl, base_weight, ic_history)} self._registry: Dict[str, Dict] = {} def register(self, generator: Callable[..., RawSignal], name: str, ttl_seconds: int = 3600, weight: float = 1.0): """Register a signal generator function.""" self._registry[name] = { "fn": generator, "ttl": ttl_seconds, "weight": weight, "ic_history": [], } def generate(self, features: Dict[str, float]) -> Optional[CompositeSignal]: """ Run all registered generators on current features. Returns composite signal or None if insufficient passing signals. """ raw_signals: List[RawSignal] = [] for name, config in self._registry.items(): try: sig = config["fn"](features) sig.name = name sig.ttl_seconds = config["ttl"] raw_signals.append(sig) except Exception as e: print(f"[WARN] Signal {name} failed: {e}") passing = self.filter(raw_signals) if len(passing) < self.min_signals: return None # insufficient signal quality — stay flat composite_strength = self.combine(passing) direction = self._to_direction(composite_strength) return CompositeSignal( direction=direction, strength=composite_strength, contributing_signals=[s.name for s in passing], generated_at=pd.Timestamp.now(), raw_signals=passing, ) def score(self, signal: RawSignal) -> float: """ Score a single signal for quality. Returns IC estimate using recent performance history if available, otherwise returns signal's own confidence value. """ history = self._registry.get(signal.name, {}).get("ic_history", []) if len(history) >= 30: return float(np.mean(history[-30:])) return signal.confidence def filter(self, signals: List[RawSignal]) -> List[RawSignal]: """ Filter signals by quality (IC) and freshness. Returns only signals that pass both criteria. """ passing = [] for sig in signals: if not sig.is_fresh: continue ic_est = self.score(sig) if ic_est >= self.min_ic: passing.append(sig) return passing def combine(self, signals: List[RawSignal]) -> float: """ Combine filtered signals into a single composite strength value using the configured combination method. """ if not signals: return 0.0 if self.combination_method == "equal": return float(np.mean([s.strength for s in signals])) elif self.combination_method == "ic_weighted": weights = np.array([max(0, self.score(s)) for s in signals]) values = np.array([s.strength for s in signals]) total_w = weights.sum() if total_w == 0: return float(np.mean(values)) return float(np.dot(weights, values) / total_w) elif self.combination_method == "confidence_weighted": weights = np.array([s.confidence for s in signals]) values = np.array([s.strength for s in signals]) total_w = weights.sum() return float(np.dot(weights, values) / total_w) if total_w > 0 else 0.0 elif self.combination_method == "rank_combination": # Rank-based IC combination (more robust to outliers) ranks = stats.rankdata([s.strength for s in signals]) weights = np.array([max(0, self.score(s)) for s in signals]) # Normalise ranks to [-1, +1] n = len(ranks) norm_ranks = (ranks - (n + 1) / 2) / (n / 2) total_w = weights.sum() return float(np.dot(weights, norm_ranks) / total_w) if total_w > 0 else 0.0 return 0.0 def _to_direction(self, strength: float) -> SignalDirection: """Convert composite strength to discrete direction.""" if strength > 0.15: return SignalDirection.LONG elif strength < -0.15: return SignalDirection.SHORT return SignalDirection.FLAT def update_ic(self, signal_name: str, ic: float): """Feed back realised IC after trade closes for adaptive weighting.""" if signal_name in self._registry: self._registry[signal_name]["ic_history"].append(ic) # Keep only last 90 observations self._registry[signal_name]["ic_history"] = \ self._registry[signal_name]["ic_history"][-90:]
5. Combining Signals with Ensemble Methods
The four combination methods in SignalPipeline.combine() each have different properties.
Understanding their tradeoffs helps you choose the right method for your strategy.
Equal Weighting
The simplest approach: average the strength values of all passing signals. Equal weighting is robust when you have no reliable estimate of each signal's quality, or when signal qualities are similar. It is the baseline all other methods should outperform.
IC-Weighted Combination
Weights each signal's contribution by its historical Information Coefficient. Signals with higher predictive accuracy get more influence on the composite. This is the preferred default for most strategies — it is mathematically optimal under the assumption that ICs are stable and signals are independent.
Confidence-Weighted Combination
Uses the signal generator's own confidence estimate as the weight. Useful when signals come from probabilistic models (e.g., ML classifiers) that output calibrated probability scores. Works best when the confidence estimates are well-calibrated — a common failure mode is overconfident models that produce high confidence even for low-IC predictions.
Rank-Based Combination
Converts signal strengths to ranks before weighting and combining. More robust to outliers and heavy-tailed signal distributions. Recommended when signals have very different scales or when a single strong signal could otherwise dominate the ensemble inappropriately.
Use IC-weighted when you have 60+ days of signal history and stable ICs.
Use confidence-weighted when signals come from calibrated ML models.
Use rank combination when signal scales vary widely or outliers are common.
Use equal weighting as the baseline when none of the above conditions are met.
6. Signal Decay and Freshness Management
Every signal has a freshness window — the period during which it retains predictive power. Using a stale signal is equivalent to trading on no signal at all, but without the protection of staying flat.
TTL by Signal Type
| Signal Type | Suggested TTL | Notes |
|---|---|---|
| Order book imbalance | 10 – 60 seconds | Decays almost instantly in liquid markets |
| RSI / MACD | 5 – 30 minutes | Depends on bar interval; 1m bar → 5m TTL |
| Funding rate signal | 1 – 8 hours | Rates update every 1-8h on most exchanges |
| Sentiment (social) | 30 min – 4 hours | News-driven sentiment spikes decay fastest |
| On-chain whale flow | 4 – 24 hours | Large moves take time to reflect in price |
| Fundamental (protocol revenue) | 24 – 168 hours | Weekly revenue data; use as long-horizon filter |
class SignalCache: """ Thread-safe signal cache with TTL-based invalidation. Avoids redundant signal computation within the freshness window. """ def __init__(self): self._cache: Dict[str, RawSignal] = {} def get(self, name: str) -> Optional[RawSignal]: sig = self._cache.get(name) if sig and sig.is_fresh: return sig return None def set(self, name: str, signal: RawSignal): self._cache[name] = signal def invalidate(self, name: str): self._cache.pop(name, None) def prune_stale(self): """Remove all expired signals from cache.""" stale = [k for k, v in self._cache.items() if not v.is_fresh] for k in stale: del self._cache[k] # Usage: wrap pipeline.generate() with cache cache = SignalCache() def get_or_generate(pipeline: SignalPipeline, features: Dict, signal_name: str) -> Optional[RawSignal]: cached = cache.get(signal_name) if cached: return cached composite = pipeline.generate(features) if composite: for sig in composite.raw_signals: cache.set(sig.name, sig) return composite
7. Backtesting Signals Independently Before Live Deployment
Before connecting any signal to a live trading strategy, backtest it in isolation. Individual signal backtests reveal the true IC, optimal holding period, and decay characteristics — before any execution noise or ensemble effects obscure the signal's standalone quality.
The Signal Backtest Protocol
-
1Historical feature generation. Reconstruct the exact feature set the signal would have received at each historical point-in-time. Avoid look-ahead bias by using only data available at that moment.
-
2Signal value computation. Apply the signal generator to historical features. Store the raw signal value at each bar.
-
3Forward return pairing. For each signal value at time t, record the actual return at t+h for a range of holding periods h (e.g., 15m, 1h, 4h, 24h).
-
4IC computation. Compute Spearman IC between signal values and forward returns for each holding period. The holding period with highest IC is the signal's natural horizon.
-
5Quintile analysis. Bucket signals into quintiles by strength. Check that the top quintile earns the highest forward returns and the bottom quintile earns the lowest. Monotonic quintile returns confirm the signal is well-specified.
-
6Out-of-sample validation. Reserve the most recent 30% of data as a hold-out set. A signal that only works in-sample is overfitted and will fail live.
def signal_backtest_report( signal_values: pd.Series, ohlcv: pd.DataFrame, holding_periods: List[int] = [15, 60, 240, 1440] ) -> Dict[str, float]: """ Generate standalone IC report for a signal across multiple holding periods (in minutes). Args: signal_values: Timestamped signal values, index = pd.DatetimeIndex ohlcv: OHLCV DataFrame (must cover signal_values period + max horizon) holding_periods: List of forward return horizons in minutes Returns: Dict mapping horizon label to IC value """ results = {} for h in holding_periods: # Forward returns: close[t+h] / close[t] - 1 fwd_returns = ohlcv["close"].pct_change(h).shift(-h) # Align on common index aligned = pd.concat([signal_values, fwd_returns], axis=1, join="inner").dropna() aligned.columns = ["signal", "fwd_ret"] if len(aligned) < 30: results[f"{h}m"] = None continue ic, pval = stats.spearmanr(aligned["signal"], aligned["fwd_ret"]) results[f"{h}m"] = { "ic": round(ic, 4), "p_value": round(pval, 4), "n_obs": len(aligned), "significant": pval < 0.05, } return results # Quintile analysis def quintile_returns(signal_values: pd.Series, forward_returns: pd.Series) -> pd.DataFrame: """Check monotonicity: Q5 should have highest returns.""" df = pd.concat([signal_values, forward_returns], axis=1, join="inner").dropna() df.columns = ["signal", "fwd_ret"] df["quintile"] = pd.qcut(df["signal"], q=5, labels=[1,2,3,4,5]) return df.groupby("quintile")["fwd_ret"].agg(["mean", "std", "count"])
8. Integration with Purple Flea Trading API for Signal-Triggered Orders
Once signals pass quality checks and backtesting validation, connect the pipeline to the Purple Flea Trading
API. The integration translates a CompositeSignal into a live order with size determined by
signal strength.
Signal-to-Order Translation
import asyncio import httpx PF_BASE = "https://api.purpleflea.com" API_KEY = "pf_live_YOUR_KEY" # replace with your key def signal_to_size(signal: CompositeSignal, base_size: float, max_leverage: float = 3.0) -> float: """ Convert composite signal strength to position size. Scales linearly from 0.5x at threshold to max_leverage at full strength. """ abs_strength = abs(signal.strength) min_threshold = 0.15 # Linear interpolation between threshold (0.5x) and 1.0 (max_leverage) scale = 0.5 + (0.5 * (abs_strength - min_threshold) / (1.0 - min_threshold)) return round(base_size * scale * max_leverage, 4) async def execute_signal(client: httpx.AsyncClient, signal: CompositeSignal, market: str, base_size: float): """ Place an order on Purple Flea Trading based on composite signal output. Returns immediately if signal direction is FLAT. """ if signal.direction == SignalDirection.FLAT: print(f"[SIGNAL] FLAT on {market} — no order placed") return side = "buy" if signal.direction == SignalDirection.LONG else "sell" size = signal_to_size(signal, base_size) payload = { "market": market, "side": side, "size": size, "type": "market", "metadata": { "signal_strength": signal.strength, "contributing_signals": signal.contributing_signals, "generated_at": signal.generated_at.isoformat(), }, } resp = await client.post( f"{PF_BASE}/v1/orders", json=payload, headers={"Authorization": f"Bearer {API_KEY}"}, ) resp.raise_for_status() order = resp.json() print(f"[ORDER] {side.upper()} {size} {market} | ID: {order['id']}") return order async def signal_trading_loop(market: str, pipeline: SignalPipeline, base_size: float = 100.0, interval_seconds: int = 300): """ Main loop: fetch features, generate signal, execute order. Runs every interval_seconds (default 5 minutes). """ async with httpx.AsyncClient(timeout=15) as client: while True: # 1. Fetch current market data resp = await client.get( f"{PF_BASE}/v1/markets/{market}/features", headers={"Authorization": f"Bearer {API_KEY}"}, ) features = resp.json() # 2. Run signal pipeline composite = pipeline.generate(features) # 3. Execute if we have a signal if composite: await execute_signal(client, composite, market, base_size) else: print(f"[PIPELINE] Insufficient signals — holding flat") await asyncio.sleep(interval_seconds) # Example signal generators def rsi_signal(features: Dict) -> RawSignal: """Simple RSI-based signal: oversold = long, overbought = short.""" rsi = features["rsi_14"] strength = -(rsi - 50) / 50 # RSI 30 → +0.4, RSI 70 → -0.4 return RawSignal( name="rsi_14", value=rsi, strength=strength, confidence=0.06, generated_at=pd.Timestamp.now(), ttl_seconds=900, ) def funding_rate_signal(features: Dict) -> RawSignal: """High positive funding → crowded longs → fade long, go short.""" rate = features["funding_rate_8h"] strength = -np.tanh(rate / 0.001) # normalise around 0.1% rate return RawSignal( name="funding_rate", value=rate, strength=strength, confidence=0.08, generated_at=pd.Timestamp.now(), ttl_seconds=3600, ) # Bootstrap a pipeline if __name__ == "__main__": pipe = SignalPipeline(min_ic=0.04, min_signals=2, combination_method="ic_weighted") pipe.register(rsi_signal, name="rsi_14", ttl_seconds=900, weight=1.0) pipe.register(funding_rate_signal, name="funding_rate", ttl_seconds=3600, weight=1.5) asyncio.run(signal_trading_loop( market="BTC-PERP", pipeline=pipe, base_size=200.0, interval_seconds=300, ))
API Endpoints for Signal-Driven Trading
| Endpoint | Method | Purpose |
|---|---|---|
/v1/markets/:market/features |
GET | Fetch pre-computed market features for signal generation |
/v1/markets/:market/ohlcv |
GET | Historical OHLCV data for backtesting signals |
/v1/orders |
POST | Place market or limit order triggered by composite signal |
/v1/positions |
GET | Check current positions before generating new signal |
/v1/trades |
GET | Fetch closed trades to compute realised IC for signal feedback |
Connect Your Signal Pipeline to Purple Flea Trading
Live market features, historical OHLCV data, and programmatic order placement — everything your signal pipeline needs to go from backtest to live in a single API.
Trading API Docs Claim Free CapitalKey Takeaways
- Diversify signal types. Technical, fundamental, sentiment, on-chain, and cross-asset signals each carry different information and decay at different rates.
- Build a structured pipeline. Separate raw data → features → signals → scoring → filtering → combination → execution into discrete, testable stages.
- Filter aggressively. A signal with IC below 0.04 adds noise, not alpha. Only pass quality-verified signals to the ensemble.
- Backtest signals independently. Validate IC in out-of-sample data and check quintile return monotonicity before live deployment.
- Manage freshness. Set realistic TTLs and invalidate stale signals. Trading on expired signals is equivalent to random entry.
- Adapt weights over time. Update signal ICs from realised trade outcomes. IC-weighted ensembles improve automatically as new data arrives.
- Automate the loop. The Purple Flea API provides everything needed to close the signal-to-order loop with minimal latency.