Skip to content

Instantly share code, notes, and snippets.

@normanlmfung
Last active June 30, 2024 02:49
Show Gist options
  • Save normanlmfung/da9d1a57367990d01226712eed7318d9 to your computer and use it in GitHub Desktop.
Save normanlmfung/da9d1a57367990d01226712eed7318d9 to your computer and use it in GitHub Desktop.
earnings_provider.py
import os
import sys
import traceback
import re
import json
import uuid
from typing import List, Dict
import time
import arrow
from datetime import datetime, timedelta, timezone
import re
import asyncio
import argparse
import logging
import pandas as pd
from tabulate import tabulate
from redis import StrictRedis
# https://www.analyticsvidhya.com/blog/2021/06/download-financial-dataset-using-yahoo-finance-in-python-a-complete-guide/
from yahoofinancials import YahooFinancials
'''
https://www.tradingview.com/markets/stocks-usa/earnings
Usage:
From commmand line:
python earnings_provider.py --tickers NVDA,AAPL,MSFT,META,AMZN,GOOG,TSLA --export_json_file_name earnings --earnings_full_topic earnings_full_yf --earnings_current_topic earnings_current_yf --redis_url localhost --redis_port 6379 --redis_db 0 --redis_ttl_ms 3600000
From Debugger:
Launch.json
{
"version": "0.2.0",
"configurations": [
{
"name": "Python: Current File",
"type": "python",
"request": "launch",
"justMyCode": false,
"program": "${file}",
"console": "internalConsole",
"args" : [
"--tickers", "NVDA,AAPL,MSFT,META,AMZN,GOOG,TSLA",
"--export_json_file_name", "earnings",
"--earnings_full_topic", "earnings_full_yf",
"--earnings_current_topic", "earnings_current_yf",
"--redis_url", "localhost",
"--redis_port", "6379",
"--redis_db", "0",
"--redis_ttl_ms", "3600000"
],
"env" : {
"ENV" : "prod",
}
}
]
}
'''
class EarningsProvider:
def __init__(self) -> None:
self.param = {}
logging.Formatter.converter = time.gmtime
self.logger = logging.getLogger('earnings_provider')
log_level = logging.INFO # DEBUG --> INFO --> WARNING --> ERROR
self.logger.setLevel(log_level)
format_str = '%(asctime)s %(message)s'
formatter = logging.Formatter(format_str)
sh = logging.StreamHandler()
sh.setLevel(log_level)
sh.setFormatter(formatter)
self.logger.addHandler(sh)
fh = logging.FileHandler('earnings_provider.log')
fh.setLevel(log_level)
fh.setFormatter(formatter)
self.logger.addHandler(fh)
self.argparse()
self.init_redis_client()
def argparse(self):
parser = argparse.ArgumentParser()
parser.add_argument("--tickers", help="Comma separated list of tickers, example: NVDA,AAPL,MSFT,META,AMZN,GOOG,TSLA",default="NVDA")
parser.add_argument("--export_json_file_name", help="default: earnings.csv. New entries will be appended to this file.",default="earnings")
parser.add_argument("--earnings_full_topic", help="redis topic published to, this is for full data set. Default: None",default=None)
parser.add_argument("--earnings_current_topic", help="redis topic published to, this is for current, newly updated only. Default: None",default=None)
parser.add_argument("--redis_url", help="redis url. Example, localhost. Default: None (Will not publish to redis)",default=None)
parser.add_argument("--redis_port", help="redis port",default=6379)
parser.add_argument("--redis_db", help="redis db",default=0)
parser.add_argument("--redis_ttl_ms", help="TTL for items published to redis. Default: 1000*60*60 (i.e. 1hr)",default=1000*60*60)
parser.add_argument("--loop_max_freq_sec", help="min loop time. Default 5 sec (Avoid rate limit issues)",default=5)
args = parser.parse_args()
self.param['tickers'] = args.tickers.split(',')
self.param['export_json_file_name'] = args.export_json_file_name
self.param['earnings_full_topic'] = args.earnings_full_topic
self.param['earnings_current_topic'] = args.earnings_current_topic
self.param['redis_url'] = args.redis_url
self.param['redis_port'] = int(args.redis_port)
self.param['redis_db'] = int(args.redis_db)
self.param['redis_ttl_ms'] = int(args.redis_ttl_ms)
self.param['loop_max_freq_sec'] = int(args.loop_max_freq_sec)
print(f"Startup args: {args}") # Dont use logger, not yet setup yet.
print(f"param: {print(json.dumps(self.param, indent=2))}")
return parser
def init_redis_client(self):
if self.param['redis_url']:
self.redis_client = StrictRedis(
host = self.param['redis_url'],
port = self.param['redis_port'],
db = self.param['redis_db'],
ssl = False
)
try:
self.redis_client.keys()
except ConnectionError as redis_conn_error:
err_msg = f"@slack Failed to connect to redis: {self.param['redis_url']}, port: {self.param['redis_port']}"
raise ConnectionError(err_msg)
else:
self.redis_client = None
def send_notification(self, title : str, message : str):
print(f'todo {title} {message}')
async def start(self):
def pubish_earnings(
earnings : List[Dict],
topic : str,
redis_client,
ttl_ms : int = 1000*60*60
):
'''
https://redis.io/commands/publish/
'''
redis_client.set(name=topic, value=json.dumps(earnings), ex=int(ttl_ms/1000))
'''
To restore from redis:
restored = json.loads(redis_client.get(topic))
for earning in restored:
earning['earning_date_0'] = arrow.get(earning['earning_date_0']).datetime
earning['earning_date_0'] = earning['earning_date_0'].replace(tzinfo=None)
earning['earning_date_1'] = arrow.get(earning['earning_date_1']).datetime
earning['earning_date_1'] = earning['earning_date_1'].replace(tzinfo=None)
'''
async def fetch_earnings(ticker : str):
result = None
try:
stock = YahooFinancials(ticker)
earnings = stock.get_stock_earnings_data()[ticker]
self.logger.info(f"{ticker}: earnings fetched")
financial_ccy = earnings['financialCurrency']
earnings_dates_ts = earnings['earningsChart']['earningsDate']
earnings_dates = []
for ts in earnings_dates_ts:
earnings_dates.append(
datetime.fromtimestamp(ts)
)
result = {
'ticker' : ticker,
'ccy' : financial_ccy,
'earning_date_0' : earnings_dates[0].strftime("%Y-%m-%d %H:%M:%S") if earnings_dates else '', # datetime wont serialize
'earning_date_1' : earnings_dates[1].strftime("%Y-%m-%d %H:%M:%S") if earnings_dates and len(earnings_dates)>=2 else '',
'last_updated_ts_ms' : None # None to indicate entry came from original fetch
}
'''
What's UNIT of 'actual' vs 'estimate'? Billions usd? NO: It's EPS!!!
{'date': '3Q2023', 'actual': 0.27, 'estimate': 0.21}
{'date': '4Q2023', 'actual': 0.4, 'estimate': 0.34}
{'date': '1Q2024', 'actual': 0.52, 'estimate': 0.46}
{'date': '2Q2024', 'actual': 0.61, 'estimate': 0.56}
'''
max_quarter : int = -1
max_year : int = -1
quarterly_earnings_actual_vs_estimate = earnings['earningsChart']['quarterly']
for item in quarterly_earnings_actual_vs_estimate:
quarter = item['date'][:2]
quarter = int(quarter[0])
year = int(item['date'][2:])
if year>max_year:
max_year = year
max_quarter = -1
if quarter>max_quarter:
max_quarter = quarter
result[f"qoq_{item['date']}_eps_act"] = item['actual']
result[f"qoq_{item['date']}_eps_est"] = item['estimate']
if item['actual']>item['estimate']:
result[f"qoq_{item['date']}_bullbear"] = 'bullish'
elif item['actual']==item['estimate']:
result[f"qoq_{item['date']}_bullbear"] = 'neutral'
else:
result[f"qoq_{item['date']}_bullbear"] = 'bearish'
if year==max_year and quarter==max_quarter:
# Make it convenient for consumer to see if latest update is bullish vs bearish
result["latest_quarter"] = item['date']
result["latest_bullbear"] = result[f"qoq_{item['date']}_bullbear"]
financials = earnings['financialsChart']
'''
0: {'date': '3Q2023', 'revenue': 13507000000, 'earnings': 6188000000}
1: {'date': '4Q2023', 'revenue': 18120000000, 'earnings': 9243000000}
2: {'date': '1Q2024', 'revenue': 22103000000, 'earnings': 12285000000}
3: {'date': '2Q2024', 'revenue': 26044000000, 'earnings': 14881000000}
'''
qoq_financials = financials['quarterly']
'''
0: {'date': 2021, 'revenue': 16675000000, 'earnings': 4332000000}
1: {'date': 2022, 'revenue': 26914000000, 'earnings': 9752000000}
2: {'date': 2023, 'revenue': 26974000000, 'earnings': 4368000000}
3: {'date': 2024, 'revenue': 60922000000, 'earnings': 29760000000}
'''
yoy_financials = financials['yearly']
for item in qoq_financials:
result[f"qoq_{item['date']}_revenue"] = item['revenue']
for item in yoy_financials:
result[f"yoy_{item['date']}_revenue"] = item['revenue']
result[f"yoy_{item['date']}_earnings"] = item['earnings']
except Exception as fetch_error:
self.logger.error(f"Error while fetching earnings for {ticker}. {fetch_error} {str(sys.exc_info()[0])} {str(sys.exc_info()[1])} {traceback.format_exc()}")
return result
while True:
try:
dt_loop_start = datetime.now()
export_json_file_name = f"{self.param['export_json_file_name']}.json"
existing_json : bool = os.path.isfile(export_json_file_name) and os.stat(export_json_file_name).st_size!=0
if existing_json:
os.stat(export_json_file_name).st_size == 0
with open(export_json_file_name) as json_file:
old_earnings = json.load(json_file)
tasks = [ asyncio.create_task(fetch_earnings(ticker)) for ticker in self.param['tickers'] ]
new_earnings = await asyncio.gather(*tasks)
# Detect updates
updated : bool = False
updates : List[Dict] = []
if existing_json:
for new_earning in new_earnings:
ticker = new_earning['ticker']
new_columns = list(new_earning.keys())
old_earning = [ old_earning for old_earning in old_earnings if old_earning['ticker'] == ticker ]
if old_earning:
old_earning = old_earning[0]
old_columns = list(old_earning.keys())
if len(new_columns)!=len(old_columns):
updated = True
# For HTF analysis, this may be inaccurate.
last_updated_ts_ms = datetime.now().timestamp() * 1000
updated_fields = [ new_column for new_column in new_columns if new_column not in old_columns ]
self.logger.info(f"{ticker} update detected! updated fields: {' '.join(updated_fields)}. new_earning: {new_earning}")
for update in updated_fields:
updates.append(
{
'key' : update,
'val' : new_earning[update]
}
)
self.logger.info(f"{ticker} update {update}: {new_earning[update]}")
self.send_notification(title=f"earnings_provider {ticker} earnings update. Updated: {' '.join(updated_fields)}", message=new_earning)
pd_earning = pd.read_csv(f"{self.param['export_json_file_name']}_{ticker}.csv")
pd_earning.drop(pd_earning.columns[pd_earning.columns.str.contains('unnamed',case = False)],axis = 1, inplace = True)
if updated:
for update in updates:
earning_row = {
'key' :update['key'],
'val' : update['val'],
'last_updated_ts_ms' : last_updated_ts_ms
}
pd_earning = pd.concat([pd_earning, pd.DataFrame([earning_row])], axis=0, ignore_index=True)
pd_earning.to_csv(f"{self.param['export_json_file_name']}_{ticker}.csv")
else:
for new_earning in new_earnings:
ticker = new_earning['ticker']
pd_earning = pd.DataFrame([new_earning])
pd_earning = pd_earning.transpose()
pd_earning.reset_index(inplace=True)
pd_earning.columns = [ 'key', 'val' ]
pd_earning['last_updated_ts_ms'] = None
self.logger.info(f"{tabulate(pd_earning, headers='keys', tablefmt='psql')}")
pd_earning.to_csv(f"{self.param['export_json_file_name']}_{ticker}.csv")
with open(export_json_file_name, 'w') as json_file:
json_file.write(json.dumps(new_earnings))
self.logger.info(f"Earnings written to {export_json_file_name}")
if self.redis_client:
pubish_earnings(
earnings=new_earnings,
topic=self.param['earnings_full_topic'],
redis_client=self.redis_client,
ttl_ms=self.param['redis_ttl_ms']
)
if updated:
new_earning_current = [ x for x in new_earnings if x['last_updated_ts_ms'] ]
pubish_earnings(
earnings=new_earning_current,
topic=self.param['earnings_full_topic'],
redis_client=self.redis_client,
ttl_ms=self.param['redis_ttl_ms']
)
loop_elapsed = datetime.now() - dt_loop_start
self.logger.info(f"loop elapsed (sec): {loop_elapsed.total_seconds()}")
except Exception as loop_error:
err_msg = f"{loop_error} {str(sys.exc_info()[0])} {str(sys.exc_info()[1])} {traceback.format_exc()}"
self.logger.error(err_msg)
self.send_notification(title="earnings_provider error", message=err_msg[:50])
time.sleep(self.param['loop_max_freq_sec'])
async def _run_jobs():
provider = EarningsProvider()
await provider.start()
asyncio.get_event_loop().run_until_complete(_run_jobs())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment