-
-
Save normanlmfung/da9d1a57367990d01226712eed7318d9 to your computer and use it in GitHub Desktop.
earnings_provider.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import sys | |
import traceback | |
import re | |
import json | |
import uuid | |
from typing import List, Dict | |
import time | |
import arrow | |
from datetime import datetime, timedelta, timezone | |
import re | |
import asyncio | |
import argparse | |
import logging | |
import pandas as pd | |
from tabulate import tabulate | |
from redis import StrictRedis | |
# https://www.analyticsvidhya.com/blog/2021/06/download-financial-dataset-using-yahoo-finance-in-python-a-complete-guide/ | |
from yahoofinancials import YahooFinancials | |
''' | |
https://www.tradingview.com/markets/stocks-usa/earnings | |
Usage: | |
From commmand line: | |
python earnings_provider.py --tickers NVDA,AAPL,MSFT,META,AMZN,GOOG,TSLA --export_json_file_name earnings --earnings_full_topic earnings_full_yf --earnings_current_topic earnings_current_yf --redis_url localhost --redis_port 6379 --redis_db 0 --redis_ttl_ms 3600000 | |
From Debugger: | |
Launch.json | |
{ | |
"version": "0.2.0", | |
"configurations": [ | |
{ | |
"name": "Python: Current File", | |
"type": "python", | |
"request": "launch", | |
"justMyCode": false, | |
"program": "${file}", | |
"console": "internalConsole", | |
"args" : [ | |
"--tickers", "NVDA,AAPL,MSFT,META,AMZN,GOOG,TSLA", | |
"--export_json_file_name", "earnings", | |
"--earnings_full_topic", "earnings_full_yf", | |
"--earnings_current_topic", "earnings_current_yf", | |
"--redis_url", "localhost", | |
"--redis_port", "6379", | |
"--redis_db", "0", | |
"--redis_ttl_ms", "3600000" | |
], | |
"env" : { | |
"ENV" : "prod", | |
} | |
} | |
] | |
} | |
''' | |
class EarningsProvider: | |
def __init__(self) -> None: | |
self.param = {} | |
logging.Formatter.converter = time.gmtime | |
self.logger = logging.getLogger('earnings_provider') | |
log_level = logging.INFO # DEBUG --> INFO --> WARNING --> ERROR | |
self.logger.setLevel(log_level) | |
format_str = '%(asctime)s %(message)s' | |
formatter = logging.Formatter(format_str) | |
sh = logging.StreamHandler() | |
sh.setLevel(log_level) | |
sh.setFormatter(formatter) | |
self.logger.addHandler(sh) | |
fh = logging.FileHandler('earnings_provider.log') | |
fh.setLevel(log_level) | |
fh.setFormatter(formatter) | |
self.logger.addHandler(fh) | |
self.argparse() | |
self.init_redis_client() | |
def argparse(self): | |
parser = argparse.ArgumentParser() | |
parser.add_argument("--tickers", help="Comma separated list of tickers, example: NVDA,AAPL,MSFT,META,AMZN,GOOG,TSLA",default="NVDA") | |
parser.add_argument("--export_json_file_name", help="default: earnings.csv. New entries will be appended to this file.",default="earnings") | |
parser.add_argument("--earnings_full_topic", help="redis topic published to, this is for full data set. Default: None",default=None) | |
parser.add_argument("--earnings_current_topic", help="redis topic published to, this is for current, newly updated only. Default: None",default=None) | |
parser.add_argument("--redis_url", help="redis url. Example, localhost. Default: None (Will not publish to redis)",default=None) | |
parser.add_argument("--redis_port", help="redis port",default=6379) | |
parser.add_argument("--redis_db", help="redis db",default=0) | |
parser.add_argument("--redis_ttl_ms", help="TTL for items published to redis. Default: 1000*60*60 (i.e. 1hr)",default=1000*60*60) | |
parser.add_argument("--loop_max_freq_sec", help="min loop time. Default 5 sec (Avoid rate limit issues)",default=5) | |
args = parser.parse_args() | |
self.param['tickers'] = args.tickers.split(',') | |
self.param['export_json_file_name'] = args.export_json_file_name | |
self.param['earnings_full_topic'] = args.earnings_full_topic | |
self.param['earnings_current_topic'] = args.earnings_current_topic | |
self.param['redis_url'] = args.redis_url | |
self.param['redis_port'] = int(args.redis_port) | |
self.param['redis_db'] = int(args.redis_db) | |
self.param['redis_ttl_ms'] = int(args.redis_ttl_ms) | |
self.param['loop_max_freq_sec'] = int(args.loop_max_freq_sec) | |
print(f"Startup args: {args}") # Dont use logger, not yet setup yet. | |
print(f"param: {print(json.dumps(self.param, indent=2))}") | |
return parser | |
def init_redis_client(self): | |
if self.param['redis_url']: | |
self.redis_client = StrictRedis( | |
host = self.param['redis_url'], | |
port = self.param['redis_port'], | |
db = self.param['redis_db'], | |
ssl = False | |
) | |
try: | |
self.redis_client.keys() | |
except ConnectionError as redis_conn_error: | |
err_msg = f"@slack Failed to connect to redis: {self.param['redis_url']}, port: {self.param['redis_port']}" | |
raise ConnectionError(err_msg) | |
else: | |
self.redis_client = None | |
def send_notification(self, title : str, message : str): | |
print(f'todo {title} {message}') | |
async def start(self): | |
def pubish_earnings( | |
earnings : List[Dict], | |
topic : str, | |
redis_client, | |
ttl_ms : int = 1000*60*60 | |
): | |
''' | |
https://redis.io/commands/publish/ | |
''' | |
redis_client.set(name=topic, value=json.dumps(earnings), ex=int(ttl_ms/1000)) | |
''' | |
To restore from redis: | |
restored = json.loads(redis_client.get(topic)) | |
for earning in restored: | |
earning['earning_date_0'] = arrow.get(earning['earning_date_0']).datetime | |
earning['earning_date_0'] = earning['earning_date_0'].replace(tzinfo=None) | |
earning['earning_date_1'] = arrow.get(earning['earning_date_1']).datetime | |
earning['earning_date_1'] = earning['earning_date_1'].replace(tzinfo=None) | |
''' | |
async def fetch_earnings(ticker : str): | |
result = None | |
try: | |
stock = YahooFinancials(ticker) | |
earnings = stock.get_stock_earnings_data()[ticker] | |
self.logger.info(f"{ticker}: earnings fetched") | |
financial_ccy = earnings['financialCurrency'] | |
earnings_dates_ts = earnings['earningsChart']['earningsDate'] | |
earnings_dates = [] | |
for ts in earnings_dates_ts: | |
earnings_dates.append( | |
datetime.fromtimestamp(ts) | |
) | |
result = { | |
'ticker' : ticker, | |
'ccy' : financial_ccy, | |
'earning_date_0' : earnings_dates[0].strftime("%Y-%m-%d %H:%M:%S") if earnings_dates else '', # datetime wont serialize | |
'earning_date_1' : earnings_dates[1].strftime("%Y-%m-%d %H:%M:%S") if earnings_dates and len(earnings_dates)>=2 else '', | |
'last_updated_ts_ms' : None # None to indicate entry came from original fetch | |
} | |
''' | |
What's UNIT of 'actual' vs 'estimate'? Billions usd? NO: It's EPS!!! | |
{'date': '3Q2023', 'actual': 0.27, 'estimate': 0.21} | |
{'date': '4Q2023', 'actual': 0.4, 'estimate': 0.34} | |
{'date': '1Q2024', 'actual': 0.52, 'estimate': 0.46} | |
{'date': '2Q2024', 'actual': 0.61, 'estimate': 0.56} | |
''' | |
max_quarter : int = -1 | |
max_year : int = -1 | |
quarterly_earnings_actual_vs_estimate = earnings['earningsChart']['quarterly'] | |
for item in quarterly_earnings_actual_vs_estimate: | |
quarter = item['date'][:2] | |
quarter = int(quarter[0]) | |
year = int(item['date'][2:]) | |
if year>max_year: | |
max_year = year | |
max_quarter = -1 | |
if quarter>max_quarter: | |
max_quarter = quarter | |
result[f"qoq_{item['date']}_eps_act"] = item['actual'] | |
result[f"qoq_{item['date']}_eps_est"] = item['estimate'] | |
if item['actual']>item['estimate']: | |
result[f"qoq_{item['date']}_bullbear"] = 'bullish' | |
elif item['actual']==item['estimate']: | |
result[f"qoq_{item['date']}_bullbear"] = 'neutral' | |
else: | |
result[f"qoq_{item['date']}_bullbear"] = 'bearish' | |
if year==max_year and quarter==max_quarter: | |
# Make it convenient for consumer to see if latest update is bullish vs bearish | |
result["latest_quarter"] = item['date'] | |
result["latest_bullbear"] = result[f"qoq_{item['date']}_bullbear"] | |
financials = earnings['financialsChart'] | |
''' | |
0: {'date': '3Q2023', 'revenue': 13507000000, 'earnings': 6188000000} | |
1: {'date': '4Q2023', 'revenue': 18120000000, 'earnings': 9243000000} | |
2: {'date': '1Q2024', 'revenue': 22103000000, 'earnings': 12285000000} | |
3: {'date': '2Q2024', 'revenue': 26044000000, 'earnings': 14881000000} | |
''' | |
qoq_financials = financials['quarterly'] | |
''' | |
0: {'date': 2021, 'revenue': 16675000000, 'earnings': 4332000000} | |
1: {'date': 2022, 'revenue': 26914000000, 'earnings': 9752000000} | |
2: {'date': 2023, 'revenue': 26974000000, 'earnings': 4368000000} | |
3: {'date': 2024, 'revenue': 60922000000, 'earnings': 29760000000} | |
''' | |
yoy_financials = financials['yearly'] | |
for item in qoq_financials: | |
result[f"qoq_{item['date']}_revenue"] = item['revenue'] | |
for item in yoy_financials: | |
result[f"yoy_{item['date']}_revenue"] = item['revenue'] | |
result[f"yoy_{item['date']}_earnings"] = item['earnings'] | |
except Exception as fetch_error: | |
self.logger.error(f"Error while fetching earnings for {ticker}. {fetch_error} {str(sys.exc_info()[0])} {str(sys.exc_info()[1])} {traceback.format_exc()}") | |
return result | |
while True: | |
try: | |
dt_loop_start = datetime.now() | |
export_json_file_name = f"{self.param['export_json_file_name']}.json" | |
existing_json : bool = os.path.isfile(export_json_file_name) and os.stat(export_json_file_name).st_size!=0 | |
if existing_json: | |
os.stat(export_json_file_name).st_size == 0 | |
with open(export_json_file_name) as json_file: | |
old_earnings = json.load(json_file) | |
tasks = [ asyncio.create_task(fetch_earnings(ticker)) for ticker in self.param['tickers'] ] | |
new_earnings = await asyncio.gather(*tasks) | |
# Detect updates | |
updated : bool = False | |
updates : List[Dict] = [] | |
if existing_json: | |
for new_earning in new_earnings: | |
ticker = new_earning['ticker'] | |
new_columns = list(new_earning.keys()) | |
old_earning = [ old_earning for old_earning in old_earnings if old_earning['ticker'] == ticker ] | |
if old_earning: | |
old_earning = old_earning[0] | |
old_columns = list(old_earning.keys()) | |
if len(new_columns)!=len(old_columns): | |
updated = True | |
# For HTF analysis, this may be inaccurate. | |
last_updated_ts_ms = datetime.now().timestamp() * 1000 | |
updated_fields = [ new_column for new_column in new_columns if new_column not in old_columns ] | |
self.logger.info(f"{ticker} update detected! updated fields: {' '.join(updated_fields)}. new_earning: {new_earning}") | |
for update in updated_fields: | |
updates.append( | |
{ | |
'key' : update, | |
'val' : new_earning[update] | |
} | |
) | |
self.logger.info(f"{ticker} update {update}: {new_earning[update]}") | |
self.send_notification(title=f"earnings_provider {ticker} earnings update. Updated: {' '.join(updated_fields)}", message=new_earning) | |
pd_earning = pd.read_csv(f"{self.param['export_json_file_name']}_{ticker}.csv") | |
pd_earning.drop(pd_earning.columns[pd_earning.columns.str.contains('unnamed',case = False)],axis = 1, inplace = True) | |
if updated: | |
for update in updates: | |
earning_row = { | |
'key' :update['key'], | |
'val' : update['val'], | |
'last_updated_ts_ms' : last_updated_ts_ms | |
} | |
pd_earning = pd.concat([pd_earning, pd.DataFrame([earning_row])], axis=0, ignore_index=True) | |
pd_earning.to_csv(f"{self.param['export_json_file_name']}_{ticker}.csv") | |
else: | |
for new_earning in new_earnings: | |
ticker = new_earning['ticker'] | |
pd_earning = pd.DataFrame([new_earning]) | |
pd_earning = pd_earning.transpose() | |
pd_earning.reset_index(inplace=True) | |
pd_earning.columns = [ 'key', 'val' ] | |
pd_earning['last_updated_ts_ms'] = None | |
self.logger.info(f"{tabulate(pd_earning, headers='keys', tablefmt='psql')}") | |
pd_earning.to_csv(f"{self.param['export_json_file_name']}_{ticker}.csv") | |
with open(export_json_file_name, 'w') as json_file: | |
json_file.write(json.dumps(new_earnings)) | |
self.logger.info(f"Earnings written to {export_json_file_name}") | |
if self.redis_client: | |
pubish_earnings( | |
earnings=new_earnings, | |
topic=self.param['earnings_full_topic'], | |
redis_client=self.redis_client, | |
ttl_ms=self.param['redis_ttl_ms'] | |
) | |
if updated: | |
new_earning_current = [ x for x in new_earnings if x['last_updated_ts_ms'] ] | |
pubish_earnings( | |
earnings=new_earning_current, | |
topic=self.param['earnings_full_topic'], | |
redis_client=self.redis_client, | |
ttl_ms=self.param['redis_ttl_ms'] | |
) | |
loop_elapsed = datetime.now() - dt_loop_start | |
self.logger.info(f"loop elapsed (sec): {loop_elapsed.total_seconds()}") | |
except Exception as loop_error: | |
err_msg = f"{loop_error} {str(sys.exc_info()[0])} {str(sys.exc_info()[1])} {traceback.format_exc()}" | |
self.logger.error(err_msg) | |
self.send_notification(title="earnings_provider error", message=err_msg[:50]) | |
time.sleep(self.param['loop_max_freq_sec']) | |
async def _run_jobs(): | |
provider = EarningsProvider() | |
await provider.start() | |
asyncio.get_event_loop().run_until_complete(_run_jobs()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment