NLP Trading Signals for AI Agents
Markets move on information before they move on price. For AI agents operating in financial environments, the ability to extract structured signals from unstructured text — news articles, earnings transcripts, SEC filings, social media — is a fundamental edge. This guide walks through a complete NLP pipeline: from raw text ingestion to FinBERT scoring to live trade execution via the Purple Flea trading API.
A production Python pipeline that subscribes to news feeds and social APIs, scores incoming text with FinBERT and GPT-4, aggregates into a rolling sentiment index, and fires buy/sell signals to the Purple Flea trading API — all running autonomously as a PM2 process.
Why NLP Gives Agents an Edge
Traditional quantitative strategies operate on OHLCV data — open, high, low, close, volume. These signals are available to every market participant simultaneously. Text data is different: it arrives in an enormous variety of forms, requires interpretation, and most market participants either ignore it or process it slowly.
AI agents are uniquely positioned to exploit this asymmetry. A well-designed agent can monitor thousands of sources simultaneously, parse earnings calls within seconds of release, and translate nuanced language into numeric signals faster than any human trader.
The NLP signal taxonomy
| Signal Type | Source | Latency | Alpha Decay |
|---|---|---|---|
| Breaking news sentiment | RSS, newsapi.org | < 30s | Minutes |
| Earnings call tone | Whisper + transcript APIs | ~60s post-call | Hours |
| SEC filing language shifts | EDGAR full-text | Minutes | Days |
| Social media momentum | Twitter/Reddit APIs | Real-time | Minutes–hours |
| Fear/Greed index proxy | Aggregated multi-source | ~15 min | Days |
| Central bank language | Fed/ECB press releases | Seconds | Days–weeks |
News Sentiment Analysis
News sentiment is the most actionable short-term NLP signal. The pipeline starts with ingesting headlines and articles from structured feeds, then scoring each article against a target entity.
Source ingestion
The two most accessible sources for agents are RSS feeds (free, no API key required) and newsapi.org (free tier: 100 req/day, paid: unlimited). For crypto assets, cryptopanic.com provides pre-labeled bullish/bearish tags that can serve as a weak supervision signal.
# news_ingestion.py
import feedparser
import httpx
import asyncio
from datetime import datetime, timezone
from typing import AsyncGenerator
RSS_FEEDS = {
"reuters_markets": "https://feeds.reuters.com/reuters/businessNews",
"coindesk": "https://www.coindesk.com/arc/outboundfeeds/rss/",
"cointelegraph": "https://cointelegraph.com/rss",
"wsj_markets": "https://feeds.a.wsj.com/rss/RSSWSJD.xml",
}
async def stream_rss_headlines(
feeds: dict[str, str],
poll_interval: int = 30
) -> AsyncGenerator[dict, None]:
"""Yield new headlines as they appear across all RSS feeds."""
seen: set[str] = set()
while True:
for name, url in feeds.items():
try:
feed = feedparser.parse(url)
for entry in feed.entries:
uid = entry.get("id") or entry.get("link", "")
if uid not in seen:
seen.add(uid)
yield {
"source": name,
"title": entry.get("title", ""),
"summary": entry.get("summary", ""),
"link": entry.get("link", ""),
"published": entry.get("published", ""),
"fetched_at": datetime.now(timezone.utc).isoformat(),
}
except Exception as e:
print(f"[RSS error] {name}: {e}")
await asyncio.sleep(poll_interval)
Entity-level filtering
A global news stream contains far more noise than signal for any specific asset. The first filter step is entity recognition: only process articles that mention your target assets, and track the sentiment of the entity-specific sentences, not the entire article.
import spacy
from collections import defaultdict
nlp = spacy.load("en_core_web_sm")
TARGET_ENTITIES = {
"BTC": ["bitcoin", "btc", "satoshi"],
"ETH": ["ethereum", "ether", "eth"],
"SOL": ["solana", "sol"],
}
def extract_entity_sentences(text: str) -> dict[str, list[str]]:
"""Return only sentences that mention tracked entities."""
doc = nlp(text.lower())
matched: dict[str, list[str]] = defaultdict(list)
for sent in doc.sents:
sent_text = sent.text
for symbol, aliases in TARGET_ENTITIES.items():
if any(alias in sent_text for alias in aliases):
matched[symbol].append(sent.text)
return dict(matched)
FinBERT and LLM Sentiment Scoring
Generic sentiment models (VADER, TextBlob) perform poorly on financial text — they misinterpret domain-specific phrases like "volatile growth" or "strong resistance." FinBERT, a BERT variant fine-tuned on financial communications, achieves significantly better calibration on financial news.
FinBERT setup
from transformers import pipeline, AutoTokenizer, AutoModelForSequenceClassification
import torch
def load_finbert():
model_name = "ProsusAI/finbert"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForSequenceClassification.from_pretrained(model_name)
device = 0 if torch.cuda.is_available() else -1
return pipeline(
"text-classification",
model=model,
tokenizer=tokenizer,
device=device,
top_k=None, # return all class probabilities
)
finbert = load_finbert()
def score_finbert(text: str) -> dict:
"""Return {positive, negative, neutral} probabilities."""
# FinBERT max input: 512 tokens — truncate long text
truncated = text[:800]
results = finbert(truncated)[0]
scores = {r["label"].lower(): r["score"] for r in results}
# Compound: bullish score = positive - negative
scores["compound"] = scores.get("positive", 0) - scores.get("negative", 0)
return scores
LLM scoring with Purple Flea API context
For higher-stakes decisions, pure FinBERT scores can be augmented with an LLM that has access to broader market context. The LLM prompt includes recent price action and the text to score, producing a richer signal.
import httpx
import json
PURPLE_FLEA_API = "https://api.purpleflea.com"
async def llm_score_with_context(
article_text: str,
symbol: str,
api_key: str = "pf_live_<your_key>"
) -> dict:
"""Score article sentiment with LLM + live market context."""
# Fetch current price from Purple Flea
async with httpx.AsyncClient() as client:
price_resp = await client.get(
f"{PURPLE_FLEA_API}/v1/market/price/{symbol}",
headers={"Authorization": f"Bearer {api_key}"}
)
price_data = price_resp.json()
prompt = f"""You are a financial sentiment analyst.
Asset: {symbol}
Current price: {price_data.get('price', 'unknown')}
24h change: {price_data.get('change_24h', 'unknown')}%
Article:
{article_text[:1200]}
Respond with JSON only:
{{
"sentiment": "bullish|bearish|neutral",
"confidence": 0.0-1.0,
"key_phrases": ["phrase1", "phrase2"],
"trading_implication": "brief one-sentence implication"
}}"""
async with httpx.AsyncClient() as client:
resp = await client.post(
f"{PURPLE_FLEA_API}/v1/agent/llm",
headers={"Authorization": f"Bearer {api_key}"},
json={"prompt": prompt, "model": "gpt-4o-mini", "response_format": "json"}
)
return resp.json()
FinBERT is fast and cheap (runs locally, ~2ms/article). LLM scoring costs ~0.01 USD per article. Use FinBERT for real-time streaming and LLM scoring for high-confidence signals that trigger large position changes.
Social Media Signal Extraction
Social media provides volume-weighted sentiment — a strong divergence between retail social sentiment and price action is a historically predictive signal. The challenge is filtering signal from noise (spam, bots, coordinated pumps).
Reddit PRAW integration
import praw
from textblob import TextBlob
import re
reddit = praw.Reddit(
client_id="YOUR_CLIENT_ID",
client_secret="YOUR_CLIENT_SECRET",
user_agent="PurpleFlea-NLP-Agent/1.0"
)
CRYPTO_SUBREDDITS = ["CryptoCurrency", "Bitcoin", "ethereum", "solana"]
def clean_text(text: str) -> str:
text = re.sub(r"http\S+", "", text)
text = re.sub(r"[^a-zA-Z\s]", " ", text)
return text.lower().strip()
def fetch_reddit_sentiment(symbol: str, limit: int = 100) -> dict:
"""Aggregate sentiment score from recent Reddit posts."""
scores = []
for sub in CRYPTO_SUBREDDITS:
subreddit = reddit.subreddit(sub)
for post in subreddit.hot(limit=limit // len(CRYPTO_SUBREDDITS)):
if symbol.lower() in post.title.lower():
clean = clean_text(post.title + " " + (post.selftext or ""))
if len(clean) > 20:
scores.append(TextBlob(clean).sentiment.polarity)
if not scores:
return {"symbol": symbol, "score": 0.0, "n": 0}
return {
"symbol": symbol,
"score": sum(scores) / len(scores),
"n": len(scores),
"bullish_pct": sum(1 for s in scores if s > 0.1) / len(scores),
"bearish_pct": sum(1 for s in scores if s < -0.1) / len(scores),
}
Earnings Call Transcription Analysis
Earnings calls are among the highest-alpha NLP events. Management tone, word choice, and hedging language are predictive of near-term price movements. Studies show that "uncertainty language" (words like "challenging," "headwinds," "visibility") correlates with post-call price drops even when reported numbers beat consensus.
Transcription with OpenAI Whisper
import whisper
import httpx
import re
model = whisper.load_model("base") # or "medium" for better accuracy
def transcribe_earnings_audio(audio_path: str) -> str:
"""Transcribe earnings call audio to text."""
result = model.transcribe(audio_path, language="en")
return result["text"]
UNCERTAINTY_PHRASES = [
"challenging environment", "headwinds", "limited visibility",
"difficult to predict", "uncertainty", "macro pressures",
"supply chain", "inflationary", "cautious outlook",
]
CONFIDENCE_PHRASES = [
"strong pipeline", "record revenue", "exceeding expectations",
"accelerating growth", "confident", "optimistic", "robust demand",
"margin expansion", "share buyback",
]
def analyze_earnings_tone(transcript: str) -> dict:
"""Score an earnings call transcript for management tone."""
text_lower = transcript.lower()
uncertainty_hits = sum(
1 for phrase in UNCERTAINTY_PHRASES if phrase in text_lower
)
confidence_hits = sum(
1 for phrase in CONFIDENCE_PHRASES if phrase in text_lower
)
total = uncertainty_hits + confidence_hits
tone_score = (confidence_hits - uncertainty_hits) / max(total, 1)
return {
"tone_score": tone_score, # -1 (bearish) to +1 (bullish)
"uncertainty_count": uncertainty_hits,
"confidence_count": confidence_hits,
"signal": "bullish" if tone_score > 0.2 else "bearish" if tone_score < -0.2 else "neutral",
}
Constructing a Fear/Greed Index
The classic CNN Fear and Greed Index uses seven sub-components. Agents can construct their own version from raw data sources — this has the advantage of being real-time (CNN's index updates once per day) and customizable to specific asset classes.
Fear/Greed sub-components
| Component | Proxy Metric | Weight |
|---|---|---|
| Market momentum | Price vs. 125-day MA | 25% |
| News sentiment | FinBERT aggregate (rolling 24h) | 25% |
| Social volume | Reddit/Twitter mention velocity | 15% |
| Safe haven demand | BTC dominance delta | 15% |
| Volatility | 30d realized vol vs. 90d avg | 10% |
| Options skew | Put/call ratio (Deribit) | 10% |
import numpy as np
def compute_fear_greed(components: dict) -> dict:
"""
Aggregate sub-signals into a 0-100 Fear/Greed index.
Each component should already be normalized to [0, 1].
0 = extreme fear, 100 = extreme greed.
"""
weights = {
"momentum": 0.25,
"news_sentiment": 0.25,
"social_volume": 0.15,
"safe_haven": 0.15,
"volatility": 0.10, # inverted: low vol = greed
"options_skew": 0.10,
}
score = sum(
components.get(k, 0.5) * w
for k, w in weights.items()
) * 100
if score >= 75: label = "Extreme Greed"
elif score >= 55: label = "Greed"
elif score >= 45: label = "Neutral"
elif score >= 25: label = "Fear"
else: label = "Extreme Fear"
return {"score": round(score, 1), "label": label}
The Full Python Pipeline
The complete pipeline connects ingestion, entity filtering, FinBERT scoring, LLM enrichment (for high-confidence signals), fear/greed aggregation, and trade execution into a single async loop running as a persistent process.
Ingest
RSS feeds + Reddit + news API, deduplicated, polled every 30 seconds
Filter
Entity extraction with spaCy — only pass sentences mentioning tracked symbols
Score
FinBERT on all articles; LLM scoring on articles exceeding confidence threshold
Aggregate
Rolling 1h / 4h / 24h exponentially-weighted sentiment averages per symbol
Signal
Threshold-based signal generation with minimum conviction requirements
Execute
Purple Flea trading API — send buy/sell order with NLP-derived size confidence
# pipeline.py — full async NLP trading loop
import asyncio
import httpx
from collections import deque, defaultdict
from datetime import datetime, timezone
PURPLE_FLEA_API = "https://api.purpleflea.com"
API_KEY = "pf_live_<your_key>"
# Rolling sentiment windows (stores compound scores)
sentiment_windows: dict[str, deque] = defaultdict(lambda: deque(maxlen=200))
SIGNAL_THRESHOLDS = {
"strong_bull": 0.45,
"bull": 0.25,
"bear": -0.25,
"strong_bear": -0.45,
}
async def execute_trade(symbol: str, side: str, confidence: float):
"""Fire a trade via Purple Flea trading API."""
# Scale position size by confidence (0.1x to 1.0x of base size)
base_usdc = 50.0
size = base_usdc * confidence
async with httpx.AsyncClient() as client:
resp = await client.post(
f"{PURPLE_FLEA_API}/v1/trade/order",
headers={"Authorization": f"Bearer {API_KEY}"},
json={
"symbol": symbol,
"side": side, # "buy" or "sell"
"amount_usdc": size,
"order_type": "market",
"source": "nlp_pipeline",
"metadata": {"confidence": confidence}
},
timeout=10.0
)
result = resp.json()
print(f"[TRADE] {side} {symbol} | size={size:.2f} USDC | confidence={confidence:.2f} | id={result.get('order_id')}")
def compute_rolling_sentiment(symbol: str) -> float:
"""Exponentially-weighted average of recent scores for a symbol."""
window = sentiment_windows[symbol]
if not window:
return 0.0
weights = [0.95 ** i for i in range(len(window))]
total_weight = sum(weights)
weighted_sum = sum(s * w for s, w in zip(reversed(window), weights))
return weighted_sum / total_weight
async def run_pipeline():
print("[NLP Pipeline] Starting...")
async for article in stream_rss_headlines(RSS_FEEDS):
# Entity filtering
entity_sentences = extract_entity_sentences(
article["title"] + " " + article.get("summary", "")
)
for symbol, sentences in entity_sentences.items():
text = " ".join(sentences)
if len(text) < 30:
continue
# Fast FinBERT score
scores = score_finbert(text)
compound = scores["compound"]
sentiment_windows[symbol].append(compound)
# Check rolling average for signal
rolling = compute_rolling_sentiment(symbol)
if rolling >= SIGNAL_THRESHOLDS["strong_bull"]:
await execute_trade(symbol, "buy", confidence=min(rolling, 1.0))
elif rolling <= SIGNAL_THRESHOLDS["strong_bear"]:
await execute_trade(symbol, "sell", confidence=min(abs(rolling), 1.0))
if __name__ == "__main__":
asyncio.run(run_pipeline())
Purple Flea Trading API Integration
Purple Flea's trading API accepts standard REST orders with optional metadata fields for attaching signal provenance. This enables post-hoc analysis of which signal sources contributed to profitable vs. unprofitable trades.
# trade_client.py
import httpx
from dataclasses import dataclass
@dataclass
class TradeResult:
order_id: str
symbol: str
side: str
filled_amount: float
price: float
fee: float
status: str
class PurpleFleatradingClient:
BASE = "https://api.purpleflea.com/v1"
def __init__(self, api_key: str):
self.headers = {"Authorization": f"Bearer {api_key}"}
async def place_nlp_order(
self,
symbol: str,
side: str,
amount_usdc: float,
nlp_signal: dict,
) -> TradeResult:
async with httpx.AsyncClient() as client:
resp = await client.post(
f"{self.BASE}/trade/order",
headers=self.headers,
json={
"symbol": symbol,
"side": side,
"amount_usdc": amount_usdc,
"order_type": "market",
"source": "nlp",
"metadata": nlp_signal,
},
timeout=15.0
)
resp.raise_for_status()
data = resp.json()
return TradeResult(
order_id=data["order_id"],
symbol=data["symbol"],
side=data["side"],
filled_amount=data["filled_amount"],
price=data["avg_price"],
fee=data["fee"],
status=data["status"],
)
async def get_positions(self) -> list[dict]:
async with httpx.AsyncClient() as client:
resp = await client.get(
f"{self.BASE}/trade/positions",
headers=self.headers
)
return resp.json()["positions"]
# Usage
client = PurpleFleatradingClient("pf_live_<your_key>")
result = await client.place_nlp_order(
symbol="BTC",
side="buy",
amount_usdc=75.0,
nlp_signal={
"finbert_compound": 0.52,
"source_count": 8,
"top_headline": "Bitcoin surges as institutional inflows accelerate",
"fear_greed_score": 71,
}
)
Backtesting NLP Signals
Before deploying NLP signals live, backtest them against historical data. The key challenge is avoiding look-ahead bias: news must be matched to the price after publication, not before. A 15-minute delay is a safe minimum for most news sources.
import pandas as pd
import numpy as np
def backtest_nlp_signals(
signals_df: pd.DataFrame, # cols: timestamp, symbol, compound_score
prices_df: pd.DataFrame, # cols: timestamp, symbol, close
holding_period_hours: int = 4,
entry_threshold: float = 0.3,
) -> dict:
"""
Simple backtest: enter on |compound_score| > threshold,
exit after holding_period_hours.
Returns performance metrics.
"""
results = []
for _, row in signals_df.iterrows():
if abs(row["compound_score"]) < entry_threshold:
continue
entry_time = row["timestamp"]
exit_time = entry_time + pd.Timedelta(hours=holding_period_hours)
sym_prices = prices_df[prices_df["symbol"] == row["symbol"]]
entry_price_rows = sym_prices[sym_prices["timestamp"] >= entry_time]
exit_price_rows = sym_prices[sym_prices["timestamp"] >= exit_time]
if entry_price_rows.empty or exit_price_rows.empty:
continue
entry_price = entry_price_rows.iloc[0]["close"]
exit_price = exit_price_rows.iloc[0]["close"]
direction = 1 if row["compound_score"] > 0 else -1
pnl_pct = direction * (exit_price - entry_price) / entry_price
results.append({"pnl_pct": pnl_pct, "direction": direction})
if not results:
return {}
df = pd.DataFrame(results)
return {
"n_trades": len(df),
"win_rate": (df["pnl_pct"] > 0).mean(),
"avg_pnl_pct": df["pnl_pct"].mean(),
"sharpe": df["pnl_pct"].mean() / df["pnl_pct"].std() * np.sqrt(252 * 24 / holding_period_hours),
"max_drawdown": (df["pnl_pct"].cumsum() - df["pnl_pct"].cumsum().cummax()).min(),
}
NLP signals are especially prone to survivorship bias (only articles that reached your feed survived) and publication-time errors (article timestamps vs. market-impact timestamps differ). Always validate with out-of-sample data before going live.
Live Signal Dashboard
A minimal Flask dashboard lets you monitor the NLP pipeline state in real time. It surfaces current rolling sentiment per symbol, recent signal events, and fear/greed index.
from flask import Flask, jsonify
import threading
app = Flask(__name__)
dashboard_state = {
"signals": {},
"fear_greed": 50,
"last_articles": [],
}
@app.route("/state")
def get_state():
return jsonify({
"signals": {
symbol: {
"rolling_sentiment": compute_rolling_sentiment(symbol),
"label": (
"bullish" if compute_rolling_sentiment(symbol) > 0.25
else "bearish" if compute_rolling_sentiment(symbol) < -0.25
else "neutral"
),
}
for symbol in sentiment_windows
},
"fear_greed": dashboard_state["fear_greed"],
"recent_articles": dashboard_state["last_articles"][-10:],
})
@app.route("/health")
def health():
return jsonify({"status": "ok"})
# Run in background thread alongside the main pipeline
def run_dashboard():
app.run(host="0.0.0.0", port=5100, debug=False)
threading.Thread(target=run_dashboard, daemon=True).start()
Deploying with PM2
Run the NLP pipeline as a persistent PM2 process alongside other Purple Flea agent services. Use the .cjs ecosystem format since the pipeline project may use ES modules.
// ecosystem.nlp.cjs
module.exports = {
apps: [{
name: "nlp-pipeline",
script: "pipeline.py",
interpreter: "python3",
watch: false,
autorestart: true,
max_restarts: 10,
restart_delay: 5000,
env: {
PURPLE_FLEA_API_KEY: "pf_live_<your_key>",
PYTHONUNBUFFERED: "1",
},
log_date_format: "YYYY-MM-DD HH:mm:ss",
}]
};
pm2 start ecosystem.nlp.cjs
pm2 save
pm2 logs nlp-pipeline --lines 50
Risk Management for NLP Signals
NLP signals carry unique risks that OHLCV strategies do not. Coordinated manipulation (pump-and-dump social campaigns), satire misclassified as news, and model hallucinations in LLM-scored signals can all generate false positives.
Guardrails to implement
- Source diversity requirement: only fire a signal if it appears across at least 3 independent sources
- Price-sentiment divergence cap: if price is already up 10% and sentiment is bullish, reduce position size by 50% — the signal may already be priced in
- Cooldown timer: minimum 15-minute gap between consecutive NLP-driven orders per symbol
- Stale signal decay: sentiment scores older than 2 hours receive a 0.5x weight multiplier
- Manual override endpoint: a simple HTTP endpoint to pause signal generation during known high-noise events (FOMC, geopolitical crises)
class NLPRiskManager:
def __init__(self):
self.last_trade_time: dict[str, float] = {}
self.cooldown_seconds = 900 # 15 minutes
self.paused = False
def check_trade_allowed(
self,
symbol: str,
signal_score: float,
source_count: int,
price_change_24h: float,
) -> tuple[bool, str]:
if self.paused:
return False, "pipeline paused"
if source_count < 3:
return False, f"insufficient sources ({source_count} < 3)"
import time
last = self.last_trade_time.get(symbol, 0)
if time.time() - last < self.cooldown_seconds:
return False, "cooldown active"
# Price already moved in signal direction?
if signal_score > 0 and price_change_24h > 10:
return False, "signal likely priced in"
if signal_score < 0 and price_change_24h < -10:
return False, "signal likely priced in"
return True, "ok"
Conclusion
NLP trading signals give AI agents access to a dimension of market information that purely quantitative approaches miss. The pipeline described here — RSS ingestion, entity-level FinBERT scoring, LLM enrichment, fear/greed aggregation, and Purple Flea API execution — can be deployed in under an hour and extended incrementally as you validate which signal sources contribute the most alpha.
Register your agent at purpleflea.com/register to get your API key. New agents can claim free testnet funds from the Purple Flea Faucet to paper-trade the NLP pipeline before committing real capital.
Next steps: connect the NLP pipeline output to the Multi-Model Agent Systems pattern to route high-conviction signals to more capable (and expensive) decision-making models while keeping routine sentiment scoring cheap.