Skip to content

Instantly share code, notes, and snippets.

@seangeleno
Last active August 19, 2025 03:58
Show Gist options
  • Save seangeleno/06c51737d93304d30460e370fb364388 to your computer and use it in GitHub Desktop.
Save seangeleno/06c51737d93304d30460e370fb364388 to your computer and use it in GitHub Desktop.
def fetch_news(self, asset: str, current_time: datetime) -> bool:
# Re-implemented: compute sentiment and return True if entries should be blocked
try:
# Get cached-or-fresh sentiment
sentiment = self.news_service.get_symbol_sentiment(asset)
self._last_sentiment[asset] = sentiment
# Decide if we block entries:
# - Require minimum confidence (quality/recency/amount of news)
# - Require minimum absolute sentiment magnitude
should_block = (
sentiment.articles_used > 0 and
sentiment.confidence >= self.news_block_min_confidence and
abs(sentiment.score) >= self.news_block_min_abs_score
)
if should_block:
self.logger.info(
f"Recent news for {asset}: bias={sentiment.bias} "
f"score={sentiment.score:.3f} conf={sentiment.confidence:.2f} "
f"articles={sentiment.articles_used}. Blocking entry."
)
else:
self.logger.debug(
f"No strong recent news for {asset}: score={sentiment.score:.3f}, "
f"conf={sentiment.confidence:.2f}, articles={sentiment.articles_used}."
)
return bool(should_block)
except Exception as e:
# Fallback: do not block if sentiment retrieval fails
self.logger.warning(f'News sentiment computation failed for {asset}: {str(e)}', exc_info=True)
return False
import re
import json
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
# ... existing code ...
import requests
from datetime import datetime, timedelta
# ... existing code ...
# ---------------------------
# Sentiment + TTL cache types
# ---------------------------
news_sentiment_analyzer = None # Placeholder for compatibility with your snippet (unused)
@dataclass
class SentimentResult:
symbol: str
score: float # -1.0 (bearish) ... +1.0 (bullish)
confidence: float # 0.0 ... 1.0 (quality/amount/recency of news)
bias: str # 'bearish', 'neutral', 'bullish'
bias_strength: float # 0.0 ... 1.0
adjustments: Dict[str, float]
analyzed_at: float
articles_used: int
class TTLCache:
def __init__(self, ttl_seconds: int = 900, max_items: int = 300, persist_path: Path = Path(".cache/news_cache.json")):
self.ttl = ttl_seconds
self.max_items = max_items
self.persist_path = persist_path
self._data: Dict[str, Dict[str, Any]] = {}
self._load()
def _load(self) -> None:
try:
if self.persist_path.exists():
with self.persist_path.open("r", encoding="utf-8") as f:
self._data = json.load(f)
except Exception:
self._data = {}
def _save(self) -> None:
try:
self.persist_path.parent.mkdir(parents=True, exist_ok=True)
with self.persist_path.open("w", encoding="utf-8") as f:
json.dump(self._data, f)
except Exception:
pass
def get(self, key: str) -> Optional[Any]:
now = time.time()
item = self._data.get(key)
if not item:
return None
if now - item["ts"] > self.ttl:
# expired
self._data.pop(key, None)
self._save()
return None
return item["value"]
def set(self, key: str, value: Any) -> None:
# prune if needed
if len(self._data) >= self.max_items:
# remove oldest items
oldest = sorted(self._data.items(), key=lambda kv: kv[1]["ts"])[: max(1, len(self._data) - self.max_items + 1)]
for k, _ in oldest:
self._data.pop(k, None)
self._data[key] = {"ts": time.time(), "value": value}
self._save()
class NewsSentimentService:
def __init__(self, ttl_seconds: int = 900):
self.cache = TTLCache(ttl_seconds=ttl_seconds)
# Integrate your existing news function here:
# It should return a list[dict] with keys: title, summary (or description), published_at (ISO8601 or epoch seconds)
def fetch_news_for_symbol(self, symbol: str) -> List[Dict[str, Any]]:
# To be overridden in StrategyNewsService below
return []
def get_symbol_sentiment(self, symbol: str) -> SentimentResult:
cache_key = f"sentiment::{symbol.upper()}"
cached = self.cache.get(cache_key)
if cached:
return self._dict_to_sentiment(cached)
articles = self.fetch_news_for_symbol(symbol)
score, confidence, n_used = self._analyze_articles(articles)
bias, bias_strength = self._score_to_bias(score, confidence)
adjustments = self._compute_adjustments(score, confidence)
result = SentimentResult(
symbol=symbol.upper(),
score=score,
confidence=confidence,
bias=bias,
bias_strength=bias_strength,
adjustments=adjustments,
analyzed_at=time.time(),
articles_used=n_used,
)
self.cache.set(cache_key, self._sentiment_to_dict(result))
return result
# -----------------
# Sentiment engine
# -----------------
def _normalize_text(self, text: str) -> List[str]:
text = text.lower()
# Basic tokenization
tokens = re.findall(r"[a-z]+(?:'[a-z]+)?", text)
return tokens
def _simple_sentiment_score(self, text: str) -> float:
# Lightweight finance-leaning lexicon (expand as needed)
positive = {
"beat", "beats", "beating", "growth", "bull", "bullish", "surge", "rally",
"strong", "record", "profit", "profits", "upgrade", "upgrades", "optimistic",
"positive", "outperform", "beat", "improve", "improves", "improved",
"rebound", "rebounded", "expansion", "exceeds", "exceeded"
}
negative = {
"miss", "misses", "warning", "warns", "guidance cut", "cuts guidance",
"bear", "bearish", "plunge", "selloff", "downgrade", "downgrades",
"weak", "loss", "losses", "lawsuit", "probe", "investigation",
"negative", "underperform", "decline", "declines", "declined",
"recession", "slowdown", "slump", "bankruptcy", "default", "defaults"
}
negators = {"no", "not", "never", "without"}
intensifiers = {"very", "significant", "strong", "massive", "huge", "sharp"}
deintensifiers = {"slight", "slightly", "modest", "minor"}
tokens = self._normalize_text(text)
score = 0.0
window_negate = 0
window_boost = 1.0
for t in tokens:
if t in negators:
window_negate = 2 # negate next couple of sentiment tokens
continue
if t in intensifiers:
window_boost = min(2.0, window_boost + 0.25)
continue
if t in deintensifiers:
window_boost = max(0.5, window_boost - 0.2)
continue
word_score = 0.0
if t in positive:
word_score = +1.0
elif t in negative:
word_score = -1.0
if word_score != 0.0:
if window_negate > 0:
word_score *= -1.0
window_negate -= 1
word_score *= window_boost
# decay boost after use
window_boost = 1.0
score += word_score
# squash to -1..1 with tanh-like function
return max(-1.0, min(1.0, score / 5.0))
def _parse_published_ts(self, v: Any) -> float:
if v is None:
return time.time()
if isinstance(v, (int, float)):
# assume seconds
return float(v)
s = str(v)
# very lenient ISO8601 parse (YYYY-MM-DDTHH:MM:SSZ)
m = re.match(r"(\d{4})-(\d{2})-(\d{2})[T\s](\d{2}):(\d{2})(?::(\d{2}))?Z?", s)
if not m:
return time.time()
y, mo, d, h, mi, se = m.groups()
se = se or "00"
try:
import datetime as _dt
dt = _dt.datetime(int(y), int(mo), int(d), int(h), int(mi), int(se), tzinfo=_dt.timezone.utc)
return dt.timestamp()
except Exception:
return time.time()
def _analyze_articles(self, articles: List[Dict[str, Any]]) -> Tuple[float, float, int]:
if not articles:
return 0.0, 0.0, 0
now = time.time()
total_weight = 0.0
weighted_score = 0.0
used = 0
for a in articles:
title = (a.get("title") or "").strip()
summary = (a.get("summary") or a.get("description") or "").strip()
text = title
if summary:
text += ". " + summary
if not text:
continue
# recency weight: newer = heavier
ts = self._parse_published_ts(a.get("published_at"))
hours_ago = max(0.0, (now - ts) / 3600.0)
recency_weight = 1.0 / (1.0 + hours_ago) # 1.0 for now, ~0.5 for 1h ago, etc.
s = self._simple_sentiment_score(text)
# magnitude weight: stronger statements get a touch more weight
mag_weight = 0.75 + 0.25 * abs(s)
w = recency_weight * mag_weight
total_weight += w
weighted_score += s * w
used += 1
if total_weight == 0.0:
return 0.0, 0.0, used
score = weighted_score / total_weight
# confidence based on number of articles and recency spread
base_conf = min(1.0, used / 10.0) # saturate around 10 articles
# If most of the weight is recent, confidence a bit higher
confidence = min(1.0, base_conf * (0.75 + 0.25 * min(1.0, total_weight)))
return float(score), float(confidence), used
def _score_to_bias(self, score: float, confidence: float) -> Tuple[str, float]:
# threshold bands
if score > 0.2:
return "bullish", min(1.0, (score - 0.2) * 1.25 * confidence + 0.2 * confidence)
elif score < -0.2:
return "bearish", min(1.0, (-score - 0.2) * 1.25 * confidence + 0.2 * confidence)
else:
return "neutral", max(0.0, 0.5 * (0.2 - abs(score)) * (1.0 - abs(score))) * confidence
def _compute_adjustments(self, score: float, confidence: float) -> Dict[str, float]:
# Convert sentiment into risk and grid/TP/SL adjustments
impact = min(1.0, 0.6 + 0.4 * confidence) # stronger effect with confidence
intensity = abs(score) * impact
# Position size between 0.7x and 1.3x based on sentiment
if score >= 0:
size_mult = 1.0 + 0.3 * intensity
else:
size_mult = 1.0 - 0.3 * intensity
# Grid steps widen with higher intensity (news => volatility)
grid_mult = 1.0 + 0.4 * intensity
# TP and SL
if score >= 0:
tp_mult = 1.0 + 0.25 * intensity # slightly more ambitious TP when bullish
sl_mult = 1.0 + 0.15 * intensity # allow a little more room
else:
tp_mult = 1.0 - 0.2 * intensity # more conservative TP when bearish
sl_mult = 1.0 - 0.15 * intensity # tighten a bit
# Signal threshold offset: require stronger contrary signals against the bias
signal_offset = 0.15 * intensity # 0..0.15
return {
"position_size_multiplier": float(max(0.5, min(1.5, size_mult))),
"grid_step_multiplier": float(min(1.8, grid_mult)),
"take_profit_multiplier": float(max(0.7, min(1.4, tp_mult))),
"stop_loss_multiplier": float(max(0.7, min(1.3, sl_mult))),
"signal_threshold_offset": float(signal_offset),
}
# -------------
# Serialization
# -------------
def _sentiment_to_dict(self, s: SentimentResult) -> Dict[str, Any]:
return {
"symbol": s.symbol,
"score": s.score,
"confidence": s.confidence,
"bias": s.bias,
"bias_strength": s.bias_strength,
"adjustments": s.adjustments,
"analyzed_at": s.analyzed_at,
"articles_used": s.articles_used,
}
def _dict_to_sentiment(self, d: Dict[str, Any]) -> SentimentResult:
return SentimentResult(
symbol=d["symbol"],
score=float(d["score"]),
confidence=float(d["confidence"]),
bias=str(d["bias"]),
bias_strength=float(d["bias_strength"]),
adjustments=dict(d["adjustments"]),
analyzed_at=float(d["analyzed_at"]),
articles_used=int(d.get("articles_used", 0)),
)
# Strategy-integrated sentiment service using your NewsAPI flow
class StrategyNewsService(NewsSentimentService):
def __init__(self, api_key: Optional[str], ttl_seconds: int = 900, logger=None):
super().__init__(ttl_seconds=ttl_seconds)
self.api_key = api_key
self.logger = logger
def fetch_news_for_symbol(self, symbol: str) -> List[Dict[str, Any]]:
if not self.api_key:
if self.logger:
self.logger.debug(f'No NewsAPI key. Skipping news fetch for {symbol}.')
return []
try:
now = datetime.utcnow()
from_time = (now - timedelta(hours=2)).strftime('%Y-%m-%dT%H:%M:%SZ')
# You used "+ crypto" to constrain to crypto context
url = (
"https://newsapi.org/v2/everything"
f"?q={symbol}+crypto&from={from_time}&sortBy=publishedAt&apiKey={self.api_key}"
)
time.sleep(2) # Respectful delay to avoid rate limits
resp = requests.get(url, timeout=10)
if resp.status_code == 429:
if self.logger:
self.logger.warning(f'NewsAPI rate limit reached for {symbol}.')
return []
resp.raise_for_status()
data = resp.json()
if data.get('status') != 'ok':
return []
articles = data.get('articles', []) or []
# Normalize articles into expected format
normalized = []
for a in articles:
normalized.append({
"title": a.get("title") or "",
"summary": a.get("description") or a.get("content") or "",
"published_at": a.get("publishedAt") or a.get("published_at") or "",
})
return normalized
except requests.exceptions.RequestException as e:
if self.logger:
self.logger.warning(f'NewsAPI request failed for {symbol}: {str(e)}')
return []
except Exception as e:
if self.logger:
self.logger.error(f'Unexpected error fetching news for {symbol}: {str(e)}', exc_info=True)
return []
# ... existing code ...
class YourStrategyClassNameHere: # replace with your actual class name
# ... existing code ...
def __init__(self, *args, **kwargs):
# ... existing code ...
# Ensure you have self.news_api_key defined somewhere in your config/params
# Initialize the strategy-integrated news service
self.news_service = StrategyNewsService(
api_key=getattr(self, 'news_api_key', None),
ttl_seconds=900,
logger=getattr(self, 'logger', None)
)
# Thresholds for blocking entries based on sentiment
self.news_block_min_confidence = 0.35
self.news_block_min_abs_score = 0.25
# Optional: store last sentiment per base asset
self._last_sentiment: Dict[str, SentimentResult] = {}
# ... existing code ...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment