Last active
August 19, 2025 03:58
-
-
Save seangeleno/06c51737d93304d30460e370fb364388 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| def fetch_news(self, asset: str, current_time: datetime) -> bool: | |
| # Re-implemented: compute sentiment and return True if entries should be blocked | |
| try: | |
| # Get cached-or-fresh sentiment | |
| sentiment = self.news_service.get_symbol_sentiment(asset) | |
| self._last_sentiment[asset] = sentiment | |
| # Decide if we block entries: | |
| # - Require minimum confidence (quality/recency/amount of news) | |
| # - Require minimum absolute sentiment magnitude | |
| should_block = ( | |
| sentiment.articles_used > 0 and | |
| sentiment.confidence >= self.news_block_min_confidence and | |
| abs(sentiment.score) >= self.news_block_min_abs_score | |
| ) | |
| if should_block: | |
| self.logger.info( | |
| f"Recent news for {asset}: bias={sentiment.bias} " | |
| f"score={sentiment.score:.3f} conf={sentiment.confidence:.2f} " | |
| f"articles={sentiment.articles_used}. Blocking entry." | |
| ) | |
| else: | |
| self.logger.debug( | |
| f"No strong recent news for {asset}: score={sentiment.score:.3f}, " | |
| f"conf={sentiment.confidence:.2f}, articles={sentiment.articles_used}." | |
| ) | |
| return bool(should_block) | |
| except Exception as e: | |
| # Fallback: do not block if sentiment retrieval fails | |
| self.logger.warning(f'News sentiment computation failed for {asset}: {str(e)}', exc_info=True) | |
| return False |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import re | |
| import json | |
| import time | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Optional, Tuple | |
| # ... existing code ... | |
| import requests | |
| from datetime import datetime, timedelta | |
| # ... existing code ... | |
| # --------------------------- | |
| # Sentiment + TTL cache types | |
| # --------------------------- | |
| news_sentiment_analyzer = None # Placeholder for compatibility with your snippet (unused) | |
| @dataclass | |
| class SentimentResult: | |
| symbol: str | |
| score: float # -1.0 (bearish) ... +1.0 (bullish) | |
| confidence: float # 0.0 ... 1.0 (quality/amount/recency of news) | |
| bias: str # 'bearish', 'neutral', 'bullish' | |
| bias_strength: float # 0.0 ... 1.0 | |
| adjustments: Dict[str, float] | |
| analyzed_at: float | |
| articles_used: int | |
| class TTLCache: | |
| def __init__(self, ttl_seconds: int = 900, max_items: int = 300, persist_path: Path = Path(".cache/news_cache.json")): | |
| self.ttl = ttl_seconds | |
| self.max_items = max_items | |
| self.persist_path = persist_path | |
| self._data: Dict[str, Dict[str, Any]] = {} | |
| self._load() | |
| def _load(self) -> None: | |
| try: | |
| if self.persist_path.exists(): | |
| with self.persist_path.open("r", encoding="utf-8") as f: | |
| self._data = json.load(f) | |
| except Exception: | |
| self._data = {} | |
| def _save(self) -> None: | |
| try: | |
| self.persist_path.parent.mkdir(parents=True, exist_ok=True) | |
| with self.persist_path.open("w", encoding="utf-8") as f: | |
| json.dump(self._data, f) | |
| except Exception: | |
| pass | |
| def get(self, key: str) -> Optional[Any]: | |
| now = time.time() | |
| item = self._data.get(key) | |
| if not item: | |
| return None | |
| if now - item["ts"] > self.ttl: | |
| # expired | |
| self._data.pop(key, None) | |
| self._save() | |
| return None | |
| return item["value"] | |
| def set(self, key: str, value: Any) -> None: | |
| # prune if needed | |
| if len(self._data) >= self.max_items: | |
| # remove oldest items | |
| oldest = sorted(self._data.items(), key=lambda kv: kv[1]["ts"])[: max(1, len(self._data) - self.max_items + 1)] | |
| for k, _ in oldest: | |
| self._data.pop(k, None) | |
| self._data[key] = {"ts": time.time(), "value": value} | |
| self._save() | |
| class NewsSentimentService: | |
| def __init__(self, ttl_seconds: int = 900): | |
| self.cache = TTLCache(ttl_seconds=ttl_seconds) | |
| # Integrate your existing news function here: | |
| # It should return a list[dict] with keys: title, summary (or description), published_at (ISO8601 or epoch seconds) | |
| def fetch_news_for_symbol(self, symbol: str) -> List[Dict[str, Any]]: | |
| # To be overridden in StrategyNewsService below | |
| return [] | |
| def get_symbol_sentiment(self, symbol: str) -> SentimentResult: | |
| cache_key = f"sentiment::{symbol.upper()}" | |
| cached = self.cache.get(cache_key) | |
| if cached: | |
| return self._dict_to_sentiment(cached) | |
| articles = self.fetch_news_for_symbol(symbol) | |
| score, confidence, n_used = self._analyze_articles(articles) | |
| bias, bias_strength = self._score_to_bias(score, confidence) | |
| adjustments = self._compute_adjustments(score, confidence) | |
| result = SentimentResult( | |
| symbol=symbol.upper(), | |
| score=score, | |
| confidence=confidence, | |
| bias=bias, | |
| bias_strength=bias_strength, | |
| adjustments=adjustments, | |
| analyzed_at=time.time(), | |
| articles_used=n_used, | |
| ) | |
| self.cache.set(cache_key, self._sentiment_to_dict(result)) | |
| return result | |
| # ----------------- | |
| # Sentiment engine | |
| # ----------------- | |
| def _normalize_text(self, text: str) -> List[str]: | |
| text = text.lower() | |
| # Basic tokenization | |
| tokens = re.findall(r"[a-z]+(?:'[a-z]+)?", text) | |
| return tokens | |
| def _simple_sentiment_score(self, text: str) -> float: | |
| # Lightweight finance-leaning lexicon (expand as needed) | |
| positive = { | |
| "beat", "beats", "beating", "growth", "bull", "bullish", "surge", "rally", | |
| "strong", "record", "profit", "profits", "upgrade", "upgrades", "optimistic", | |
| "positive", "outperform", "beat", "improve", "improves", "improved", | |
| "rebound", "rebounded", "expansion", "exceeds", "exceeded" | |
| } | |
| negative = { | |
| "miss", "misses", "warning", "warns", "guidance cut", "cuts guidance", | |
| "bear", "bearish", "plunge", "selloff", "downgrade", "downgrades", | |
| "weak", "loss", "losses", "lawsuit", "probe", "investigation", | |
| "negative", "underperform", "decline", "declines", "declined", | |
| "recession", "slowdown", "slump", "bankruptcy", "default", "defaults" | |
| } | |
| negators = {"no", "not", "never", "without"} | |
| intensifiers = {"very", "significant", "strong", "massive", "huge", "sharp"} | |
| deintensifiers = {"slight", "slightly", "modest", "minor"} | |
| tokens = self._normalize_text(text) | |
| score = 0.0 | |
| window_negate = 0 | |
| window_boost = 1.0 | |
| for t in tokens: | |
| if t in negators: | |
| window_negate = 2 # negate next couple of sentiment tokens | |
| continue | |
| if t in intensifiers: | |
| window_boost = min(2.0, window_boost + 0.25) | |
| continue | |
| if t in deintensifiers: | |
| window_boost = max(0.5, window_boost - 0.2) | |
| continue | |
| word_score = 0.0 | |
| if t in positive: | |
| word_score = +1.0 | |
| elif t in negative: | |
| word_score = -1.0 | |
| if word_score != 0.0: | |
| if window_negate > 0: | |
| word_score *= -1.0 | |
| window_negate -= 1 | |
| word_score *= window_boost | |
| # decay boost after use | |
| window_boost = 1.0 | |
| score += word_score | |
| # squash to -1..1 with tanh-like function | |
| return max(-1.0, min(1.0, score / 5.0)) | |
| def _parse_published_ts(self, v: Any) -> float: | |
| if v is None: | |
| return time.time() | |
| if isinstance(v, (int, float)): | |
| # assume seconds | |
| return float(v) | |
| s = str(v) | |
| # very lenient ISO8601 parse (YYYY-MM-DDTHH:MM:SSZ) | |
| m = re.match(r"(\d{4})-(\d{2})-(\d{2})[T\s](\d{2}):(\d{2})(?::(\d{2}))?Z?", s) | |
| if not m: | |
| return time.time() | |
| y, mo, d, h, mi, se = m.groups() | |
| se = se or "00" | |
| try: | |
| import datetime as _dt | |
| dt = _dt.datetime(int(y), int(mo), int(d), int(h), int(mi), int(se), tzinfo=_dt.timezone.utc) | |
| return dt.timestamp() | |
| except Exception: | |
| return time.time() | |
| def _analyze_articles(self, articles: List[Dict[str, Any]]) -> Tuple[float, float, int]: | |
| if not articles: | |
| return 0.0, 0.0, 0 | |
| now = time.time() | |
| total_weight = 0.0 | |
| weighted_score = 0.0 | |
| used = 0 | |
| for a in articles: | |
| title = (a.get("title") or "").strip() | |
| summary = (a.get("summary") or a.get("description") or "").strip() | |
| text = title | |
| if summary: | |
| text += ". " + summary | |
| if not text: | |
| continue | |
| # recency weight: newer = heavier | |
| ts = self._parse_published_ts(a.get("published_at")) | |
| hours_ago = max(0.0, (now - ts) / 3600.0) | |
| recency_weight = 1.0 / (1.0 + hours_ago) # 1.0 for now, ~0.5 for 1h ago, etc. | |
| s = self._simple_sentiment_score(text) | |
| # magnitude weight: stronger statements get a touch more weight | |
| mag_weight = 0.75 + 0.25 * abs(s) | |
| w = recency_weight * mag_weight | |
| total_weight += w | |
| weighted_score += s * w | |
| used += 1 | |
| if total_weight == 0.0: | |
| return 0.0, 0.0, used | |
| score = weighted_score / total_weight | |
| # confidence based on number of articles and recency spread | |
| base_conf = min(1.0, used / 10.0) # saturate around 10 articles | |
| # If most of the weight is recent, confidence a bit higher | |
| confidence = min(1.0, base_conf * (0.75 + 0.25 * min(1.0, total_weight))) | |
| return float(score), float(confidence), used | |
| def _score_to_bias(self, score: float, confidence: float) -> Tuple[str, float]: | |
| # threshold bands | |
| if score > 0.2: | |
| return "bullish", min(1.0, (score - 0.2) * 1.25 * confidence + 0.2 * confidence) | |
| elif score < -0.2: | |
| return "bearish", min(1.0, (-score - 0.2) * 1.25 * confidence + 0.2 * confidence) | |
| else: | |
| return "neutral", max(0.0, 0.5 * (0.2 - abs(score)) * (1.0 - abs(score))) * confidence | |
| def _compute_adjustments(self, score: float, confidence: float) -> Dict[str, float]: | |
| # Convert sentiment into risk and grid/TP/SL adjustments | |
| impact = min(1.0, 0.6 + 0.4 * confidence) # stronger effect with confidence | |
| intensity = abs(score) * impact | |
| # Position size between 0.7x and 1.3x based on sentiment | |
| if score >= 0: | |
| size_mult = 1.0 + 0.3 * intensity | |
| else: | |
| size_mult = 1.0 - 0.3 * intensity | |
| # Grid steps widen with higher intensity (news => volatility) | |
| grid_mult = 1.0 + 0.4 * intensity | |
| # TP and SL | |
| if score >= 0: | |
| tp_mult = 1.0 + 0.25 * intensity # slightly more ambitious TP when bullish | |
| sl_mult = 1.0 + 0.15 * intensity # allow a little more room | |
| else: | |
| tp_mult = 1.0 - 0.2 * intensity # more conservative TP when bearish | |
| sl_mult = 1.0 - 0.15 * intensity # tighten a bit | |
| # Signal threshold offset: require stronger contrary signals against the bias | |
| signal_offset = 0.15 * intensity # 0..0.15 | |
| return { | |
| "position_size_multiplier": float(max(0.5, min(1.5, size_mult))), | |
| "grid_step_multiplier": float(min(1.8, grid_mult)), | |
| "take_profit_multiplier": float(max(0.7, min(1.4, tp_mult))), | |
| "stop_loss_multiplier": float(max(0.7, min(1.3, sl_mult))), | |
| "signal_threshold_offset": float(signal_offset), | |
| } | |
| # ------------- | |
| # Serialization | |
| # ------------- | |
| def _sentiment_to_dict(self, s: SentimentResult) -> Dict[str, Any]: | |
| return { | |
| "symbol": s.symbol, | |
| "score": s.score, | |
| "confidence": s.confidence, | |
| "bias": s.bias, | |
| "bias_strength": s.bias_strength, | |
| "adjustments": s.adjustments, | |
| "analyzed_at": s.analyzed_at, | |
| "articles_used": s.articles_used, | |
| } | |
| def _dict_to_sentiment(self, d: Dict[str, Any]) -> SentimentResult: | |
| return SentimentResult( | |
| symbol=d["symbol"], | |
| score=float(d["score"]), | |
| confidence=float(d["confidence"]), | |
| bias=str(d["bias"]), | |
| bias_strength=float(d["bias_strength"]), | |
| adjustments=dict(d["adjustments"]), | |
| analyzed_at=float(d["analyzed_at"]), | |
| articles_used=int(d.get("articles_used", 0)), | |
| ) | |
| # Strategy-integrated sentiment service using your NewsAPI flow | |
| class StrategyNewsService(NewsSentimentService): | |
| def __init__(self, api_key: Optional[str], ttl_seconds: int = 900, logger=None): | |
| super().__init__(ttl_seconds=ttl_seconds) | |
| self.api_key = api_key | |
| self.logger = logger | |
| def fetch_news_for_symbol(self, symbol: str) -> List[Dict[str, Any]]: | |
| if not self.api_key: | |
| if self.logger: | |
| self.logger.debug(f'No NewsAPI key. Skipping news fetch for {symbol}.') | |
| return [] | |
| try: | |
| now = datetime.utcnow() | |
| from_time = (now - timedelta(hours=2)).strftime('%Y-%m-%dT%H:%M:%SZ') | |
| # You used "+ crypto" to constrain to crypto context | |
| url = ( | |
| "https://newsapi.org/v2/everything" | |
| f"?q={symbol}+crypto&from={from_time}&sortBy=publishedAt&apiKey={self.api_key}" | |
| ) | |
| time.sleep(2) # Respectful delay to avoid rate limits | |
| resp = requests.get(url, timeout=10) | |
| if resp.status_code == 429: | |
| if self.logger: | |
| self.logger.warning(f'NewsAPI rate limit reached for {symbol}.') | |
| return [] | |
| resp.raise_for_status() | |
| data = resp.json() | |
| if data.get('status') != 'ok': | |
| return [] | |
| articles = data.get('articles', []) or [] | |
| # Normalize articles into expected format | |
| normalized = [] | |
| for a in articles: | |
| normalized.append({ | |
| "title": a.get("title") or "", | |
| "summary": a.get("description") or a.get("content") or "", | |
| "published_at": a.get("publishedAt") or a.get("published_at") or "", | |
| }) | |
| return normalized | |
| except requests.exceptions.RequestException as e: | |
| if self.logger: | |
| self.logger.warning(f'NewsAPI request failed for {symbol}: {str(e)}') | |
| return [] | |
| except Exception as e: | |
| if self.logger: | |
| self.logger.error(f'Unexpected error fetching news for {symbol}: {str(e)}', exc_info=True) | |
| return [] | |
| # ... existing code ... |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| class YourStrategyClassNameHere: # replace with your actual class name | |
| # ... existing code ... | |
| def __init__(self, *args, **kwargs): | |
| # ... existing code ... | |
| # Ensure you have self.news_api_key defined somewhere in your config/params | |
| # Initialize the strategy-integrated news service | |
| self.news_service = StrategyNewsService( | |
| api_key=getattr(self, 'news_api_key', None), | |
| ttl_seconds=900, | |
| logger=getattr(self, 'logger', None) | |
| ) | |
| # Thresholds for blocking entries based on sentiment | |
| self.news_block_min_confidence = 0.35 | |
| self.news_block_min_abs_score = 0.25 | |
| # Optional: store last sentiment per base asset | |
| self._last_sentiment: Dict[str, SentimentResult] = {} | |
| # ... existing code ... |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment