Last active
March 5, 2025 12:19
-
-
Save Magnus167/a5bee20de7749f3a9841ffdefeea6e0a to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import streamlit as st | |
import pandas as pd | |
import numpy as np | |
import matplotlib.pyplot as plt | |
import dash_utils | |
import concurrent.futures as cf | |
def calc_errors_page(): | |
st.title("Calc-Errors report") | |
st.write("Calc-Errors report for PROD, UAT, and DEV environments.") | |
# Create tabs for each environment | |
tab_prod, tab_uat, tab_dev = st.tabs(["PROD", "UAT", "DEV"]) | |
tabs = {"PROD": tab_prod, "UAT": tab_uat, "DEV": tab_dev} | |
# Create placeholders for each tab so we can update them later | |
_ = {env: tabs[env].empty() for env in ["PROD", "UAT", "DEV"]} | |
# Launch database calls concurrently using ThreadPoolExecutor | |
with cf.ThreadPoolExecutor(max_workers=3) as executor: | |
# Map each environment to a future | |
future_to_env = { | |
executor.submit(dash_utils.get_calc_error_report, env=env): env | |
for env in ["PROD", "UAT", "DEV"] | |
} | |
for future in cf.as_completed(future_to_env): | |
env = future_to_env[future] | |
try: | |
data = future.result() | |
except Exception as e: | |
data = f"Error fetching data for {env}: {e}" | |
# Update the corresponding tab with the results | |
with tabs[env]: | |
st.subheader(f"Calc-Errors Report ({env})") | |
if isinstance(data, pd.DataFrame): | |
st.dataframe(data) | |
else: | |
st.write(data) | |
def calc_errors_page_extenders(): | |
st.title("Calc-Errors report") | |
st.write("Calc-Errors report for PROD, UAT, and DEV environments.") | |
# Create expanders for each environment | |
expander_prod = st.expander("PROD", expanded=False) | |
expander_uat = st.expander("UAT", expanded=False) | |
expander_dev = st.expander("DEV", expanded=False) | |
expanders = {"PROD": expander_prod, "UAT": expander_uat, "DEV": expander_dev} | |
# Launch database calls concurrently using ThreadPoolExecutor | |
with cf.ThreadPoolExecutor(max_workers=3) as executor: | |
# Map each environment to a future | |
future_to_env = { | |
executor.submit(dash_utils.get_calc_error_report, env=env): env | |
for env in ["PROD", "UAT", "DEV"] | |
} | |
for future in cf.as_completed(future_to_env): | |
env = future_to_env[future] | |
try: | |
data = future.result() | |
except Exception as e: | |
data = f"Error fetching data for {env}: {e}" | |
# Update the corresponding expander with the results | |
with expanders[env]: | |
st.subheader(f"Calc-Errors Report ({env})") | |
if isinstance(data, pd.DataFrame): | |
st.dataframe(data) | |
else: | |
st.write(data) | |
def collect_errors_page(): | |
st.title("Collect-Errors report") | |
st.write("Collect-Errors report for PROD, UAT, and DEV environments.") | |
# Create tabs for each environment | |
tab_prod, tab_uat, tab_dev = st.tabs(["PROD", "UAT", "DEV"]) | |
tabs = {"PROD": tab_prod, "UAT": tab_uat, "DEV": tab_dev} | |
# Create placeholders for each tab so we can update them later | |
_ = {env: tabs[env].empty() for env in ["PROD", "UAT", "DEV"]} | |
# Launch database calls concurrently using ThreadPoolExecutor | |
with cf.ThreadPoolExecutor(max_workers=3) as executor: | |
# Map each environment to a future | |
future_to_env = { | |
executor.submit(dash_utils.get_collect_error_report, env=env): env | |
for env in ["PROD", "UAT", "DEV"] | |
} | |
for future in cf.as_completed(future_to_env): | |
env = future_to_env[future] | |
try: | |
data = future.result() | |
except Exception as e: | |
data = f"Error fetching data for {env}: {e}" | |
# Update the corresponding tab with the results | |
with tabs[env]: | |
st.subheader(f"Collect-Errors Report ({env})") | |
if isinstance(data, pd.DataFrame): | |
st.dataframe(data) | |
else: | |
st.write(data) | |
def discontinued_series_page(): | |
st.title("Discontinued-Series report") | |
st.write("Discontinued-Series report for PROD, UAT, and DEV environments.") | |
# Create tabs for each environment | |
tab_prod, tab_uat, tab_dev = st.tabs(["PROD", "UAT", "DEV"]) | |
tabs = {"PROD": tab_prod, "UAT": tab_uat, "DEV": tab_dev} | |
# Create placeholders for each tab so we can update them later | |
_ = {env: tabs[env].empty() for env in ["PROD", "UAT", "DEV"]} | |
# Launch database calls concurrently using ThreadPoolExecutor | |
with cf.ThreadPoolExecutor(max_workers=3) as executor: | |
# Map each environment to a future | |
future_to_env = { | |
executor.submit(dash_utils.get_discontinued_series_report, env=env): env | |
for env in ["PROD", "UAT", "DEV"] | |
} | |
for future in cf.as_completed(future_to_env): | |
env = future_to_env[future] | |
try: | |
data = future.result() | |
except Exception as e: | |
data = f"Error fetching data for {env}: {e}" | |
# Update the corresponding tab with the results | |
with tabs[env]: | |
st.subheader(f"Discontinued-Series Report ({env})") | |
if isinstance(data, pd.DataFrame): | |
st.dataframe(data) | |
else: | |
st.write(data) | |
def chart_page(): | |
st.title("Diff Chart Page") | |
st.write( | |
"This chart shows the difference between the indicator published today and yesterday." | |
) | |
tickers_list = dash_utils.get_tickers_list() | |
ticker = st.selectbox("Select Ticker", tickers_list) | |
curr_df, prev_df = dash_utils.get_ticker_files(ticker) | |
old_name, new_name = f"{ticker} (old)", f"{ticker} (new)" | |
curr_df = curr_df.set_index("real_date")["value"].rename(new_name) | |
prev_df = prev_df.set_index("real_date")["value"].rename(old_name) | |
diff_df = curr_df - prev_df | |
data = pd.concat([prev_df, curr_df, diff_df], axis=1) | |
data = data.fillna(method="ffill") | |
st.line_chart(diff_df) | |
def sources_chart_page(): | |
st.markdown( | |
""" | |
<iframe | |
src="https://macrosynergy.github.io/msy-content-links/" | |
width="1500" | |
height="1500" | |
style="border:none;"> | |
</iframe> | |
""", | |
unsafe_allow_html=True, | |
) | |
def buttons_page(): | |
st.title("Buttons Page") | |
st.write("Here are some buttons with links.") | |
st.button( | |
"JPMaQS on JPMM", | |
on_click=lambda: st.write("[JPMaQS on JPMM](https://www.jpmm.com/#jpmaqs)"), | |
) | |
st.button( | |
"JPMaQS Confluence", | |
on_click=lambda: st.write( | |
"[JPMaQS Confluence](https://example.com/confluence)" | |
), | |
) | |
st.button( | |
"Macrosynergy Package docs", | |
on_click=lambda: st.write( | |
"[Macrosynergy Package docs](https://docs.macrosynergy.com)" | |
), | |
) | |
def multi_charts_page(): | |
st.title("Multi-Chart Page") | |
st.write("Select a chart type from the dropdown.") | |
chart_type = st.selectbox( | |
"Select Chart Type", ["Line Chart", "Bar Chart", "Scatter Plot"] | |
) | |
data = pd.DataFrame(np.random.randn(20, 2), columns=["X", "Y"]) | |
if chart_type == "Line Chart": | |
st.line_chart(data) | |
elif chart_type == "Bar Chart": | |
st.bar_chart(data) | |
elif chart_type == "Scatter Plot": | |
st.scatter_chart(data) | |
def email_page(): | |
# Initialize session_state so it's loaded only once per session | |
if "email_data" not in st.session_state: | |
st.session_state["email_data"] = dash_utils.load_email_text() | |
# For convenience, refer to the dict in session_state | |
d = st.session_state["email_data"] | |
# Current text in our text area | |
text = st.text_area("Email Text", d["message"], disabled=d["locked"]) | |
def on_save(): | |
if d["locked"]: | |
st.warning("Cannot save – message is locked!") | |
else: | |
# Update session data | |
dash_utils.save_email_text(d, new_message=text) | |
st.success("Message saved!") | |
def on_save_lock(): | |
# Save & lock | |
dash_utils.save_email_text(d, new_message=text, lock_state=True) | |
st.success("Message saved and locked!") | |
def on_unlock(): | |
# Unlock only (leave message as-is in session state) | |
dash_utils.save_email_text(d, lock_state=False) | |
st.success("Message unlocked!") | |
st.button("Save", on_click=on_save) | |
st.button("Save & Lock", on_click=on_save_lock) | |
st.button("Unlock", on_click=on_unlock) | |
PAGE_KEYS = { | |
"Buttons": buttons_page, | |
"Multi-Charts": multi_charts_page, | |
"Diff Chart": chart_page, | |
"Sources Chart": sources_chart_page, | |
"Calc Errors": calc_errors_page, | |
"Calc Errors (Alt. Layout)": calc_errors_page_extenders, | |
"Collect Errors": collect_errors_page, | |
"Discontinued Series": discontinued_series_page, | |
"Email": email_page, | |
} | |
def main(): | |
st.set_page_config(page_title="JPMaQS Dashboard", layout="wide") | |
st.sidebar.title("Navigation") | |
page = st.sidebar.radio( | |
"Go to", | |
list(PAGE_KEYS.keys()), | |
) | |
assert page in PAGE_KEYS, f"Invalid page: {page}" | |
PAGE_KEYS[page]() | |
if __name__ == "__main__": | |
main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import glob | |
import os | |
import pandas as pd | |
import streamlit | |
import datetime | |
import json | |
import time | |
from typing import Any | |
CURR_TICKERS_DIR = "/jpmaqs-isc-git/" | |
PREV_TICKERS_DIR = "/jpmaqs-isc-git-copy/" | |
TEMP_FOLDER = "_tmp/" | |
EMAIL_JSON_FILE = f"{TEMP_FOLDER}email_data.json" | |
def get_files_list(): | |
return sorted(glob.glob("data/**/*.csv", recursive=True)) | |
def cache_buster(freq="5min"): | |
""" | |
Returns the current time rounded to the nearest defined time period. | |
""" | |
rounded = pd.Timestamp.now().round(freq) | |
return rounded.strftime("%Y%m%d%H%M%S") | |
@streamlit.cache_data | |
def _get_report_file( | |
prefix: str, | |
env: str, | |
sep: str, | |
date: str = None, | |
cache_bust: Any = None, | |
): | |
time.sleep(15) | |
env_str = env.strip().upper() | |
base_pattern = f"{prefix}{env_str}{sep}" | |
files = [ | |
f for f in get_files_list() if os.path.basename(f).startswith(base_pattern) | |
] | |
err_str = f"No report found for prefix={prefix}, env={env}, date={date}" | |
if not files: | |
raise FileNotFoundError(err_str) | |
if date is None: | |
return files[-1] | |
full_pattern = f"{base_pattern}{date}.csv" | |
matching_files = [f for f in files if os.path.basename(f) == full_pattern] | |
result = matching_files[0] if len(matching_files) == 1 else None | |
if result is None: | |
raise FileNotFoundError(err_str) | |
return result | |
def _cached_get_report_file( | |
prefix: str, | |
env: str, | |
sep: str, | |
date: str = None, | |
cache_bust: Any = None, | |
): | |
cache_bust = cache_bust or cache_buster() | |
return _get_report_file(prefix, env, sep, date, cache_bust) | |
def get_calc_error_report(env="PROD", date=None): | |
file_path = _cached_get_report_file("calc-error_", env, "_", date) | |
return pd.read_csv(file_path) | |
def get_collect_error_report(env="PROD", date=None): | |
file_path = _cached_get_report_file("Collect-Errors-", env, "-", date) | |
return pd.read_csv(file_path) | |
def get_discontinued_series_report(env="PROD", date=None): | |
file_path = _cached_get_report_file("Discontinued-Series-", env, "-", date) | |
return pd.read_csv(file_path) | |
@streamlit.cache_data | |
def _get_tickers_list(cache_bust: Any = None): | |
curr_files = glob.glob(f"{CURR_TICKERS_DIR}**/*.csv", recursive=True) | |
prev_files = glob.glob(f"{PREV_TICKERS_DIR}**/*.csv", recursive=True) | |
curr_files = list(map(os.path.basename, curr_files)) | |
prev_files = list(map(os.path.basename, prev_files)) | |
curr_files = sorted(set(curr_files + prev_files)) | |
curr_files = [f.replace(".csv", "") for f in curr_files] | |
return curr_files | |
def get_tickers_list(): | |
return _get_tickers_list(cache_bust=cache_buster("1min")) | |
@streamlit.cache_data | |
def _get_ticker_files( | |
ticker: str, | |
curr_date: str = None, | |
prev_date: str = None, | |
cache_bust: Any = None, | |
): | |
def get_ticker_file(ticker, folder): | |
files = glob.glob(f"{folder}**/{ticker}.csv", recursive=True) | |
if not files: | |
raise FileNotFoundError(f"No file found for ticker={ticker}") | |
return pd.read_csv(files[0]) | |
curr_file = get_ticker_file(ticker, CURR_TICKERS_DIR) | |
prev_file = get_ticker_file(ticker, PREV_TICKERS_DIR) | |
return curr_file, prev_file | |
def _cached_get_ticker_files( | |
ticker: str, | |
curr_date: str = None, | |
prev_date: str = None, | |
cache_bust: Any = None, | |
): | |
cache_bust = cache_bust or cache_buster() | |
return _get_ticker_files(ticker, curr_date, prev_date, cache_bust) | |
def get_ticker_files(ticker: str, curr_date: str = None, prev_date: str = None): | |
return _cached_get_ticker_files(ticker, curr_date, prev_date) | |
def load_email_text(): | |
if os.path.exists(EMAIL_JSON_FILE): | |
with open(EMAIL_JSON_FILE) as f: | |
return json.load(f) | |
return { | |
"message": "", | |
"locked": False, | |
"last_saved_timestamp": "", | |
"today_date": "", | |
"history": {}, | |
} | |
def save_email_text(data, new_message=None, lock_state=None): | |
if new_message is not None: | |
now = datetime.datetime.now().isoformat(timespec="seconds") | |
data["message"] = new_message | |
data["last_saved_timestamp"] = now | |
data["today_date"] = datetime.date.today().isoformat() | |
data["history"][now] = new_message | |
if lock_state is not None: | |
data["locked"] = lock_state | |
with open(EMAIL_JSON_FILE, "w") as f: | |
json.dump(data, f) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment