Skip to content

Instantly share code, notes, and snippets.

@Magnus167
Last active March 5, 2025 12:19
Show Gist options
  • Save Magnus167/a5bee20de7749f3a9841ffdefeea6e0a to your computer and use it in GitHub Desktop.
Save Magnus167/a5bee20de7749f3a9841ffdefeea6e0a to your computer and use it in GitHub Desktop.
import streamlit as st
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import dash_utils
import concurrent.futures as cf
def calc_errors_page():
st.title("Calc-Errors report")
st.write("Calc-Errors report for PROD, UAT, and DEV environments.")
# Create tabs for each environment
tab_prod, tab_uat, tab_dev = st.tabs(["PROD", "UAT", "DEV"])
tabs = {"PROD": tab_prod, "UAT": tab_uat, "DEV": tab_dev}
# Create placeholders for each tab so we can update them later
_ = {env: tabs[env].empty() for env in ["PROD", "UAT", "DEV"]}
# Launch database calls concurrently using ThreadPoolExecutor
with cf.ThreadPoolExecutor(max_workers=3) as executor:
# Map each environment to a future
future_to_env = {
executor.submit(dash_utils.get_calc_error_report, env=env): env
for env in ["PROD", "UAT", "DEV"]
}
for future in cf.as_completed(future_to_env):
env = future_to_env[future]
try:
data = future.result()
except Exception as e:
data = f"Error fetching data for {env}: {e}"
# Update the corresponding tab with the results
with tabs[env]:
st.subheader(f"Calc-Errors Report ({env})")
if isinstance(data, pd.DataFrame):
st.dataframe(data)
else:
st.write(data)
def calc_errors_page_extenders():
st.title("Calc-Errors report")
st.write("Calc-Errors report for PROD, UAT, and DEV environments.")
# Create expanders for each environment
expander_prod = st.expander("PROD", expanded=False)
expander_uat = st.expander("UAT", expanded=False)
expander_dev = st.expander("DEV", expanded=False)
expanders = {"PROD": expander_prod, "UAT": expander_uat, "DEV": expander_dev}
# Launch database calls concurrently using ThreadPoolExecutor
with cf.ThreadPoolExecutor(max_workers=3) as executor:
# Map each environment to a future
future_to_env = {
executor.submit(dash_utils.get_calc_error_report, env=env): env
for env in ["PROD", "UAT", "DEV"]
}
for future in cf.as_completed(future_to_env):
env = future_to_env[future]
try:
data = future.result()
except Exception as e:
data = f"Error fetching data for {env}: {e}"
# Update the corresponding expander with the results
with expanders[env]:
st.subheader(f"Calc-Errors Report ({env})")
if isinstance(data, pd.DataFrame):
st.dataframe(data)
else:
st.write(data)
def collect_errors_page():
st.title("Collect-Errors report")
st.write("Collect-Errors report for PROD, UAT, and DEV environments.")
# Create tabs for each environment
tab_prod, tab_uat, tab_dev = st.tabs(["PROD", "UAT", "DEV"])
tabs = {"PROD": tab_prod, "UAT": tab_uat, "DEV": tab_dev}
# Create placeholders for each tab so we can update them later
_ = {env: tabs[env].empty() for env in ["PROD", "UAT", "DEV"]}
# Launch database calls concurrently using ThreadPoolExecutor
with cf.ThreadPoolExecutor(max_workers=3) as executor:
# Map each environment to a future
future_to_env = {
executor.submit(dash_utils.get_collect_error_report, env=env): env
for env in ["PROD", "UAT", "DEV"]
}
for future in cf.as_completed(future_to_env):
env = future_to_env[future]
try:
data = future.result()
except Exception as e:
data = f"Error fetching data for {env}: {e}"
# Update the corresponding tab with the results
with tabs[env]:
st.subheader(f"Collect-Errors Report ({env})")
if isinstance(data, pd.DataFrame):
st.dataframe(data)
else:
st.write(data)
def discontinued_series_page():
st.title("Discontinued-Series report")
st.write("Discontinued-Series report for PROD, UAT, and DEV environments.")
# Create tabs for each environment
tab_prod, tab_uat, tab_dev = st.tabs(["PROD", "UAT", "DEV"])
tabs = {"PROD": tab_prod, "UAT": tab_uat, "DEV": tab_dev}
# Create placeholders for each tab so we can update them later
_ = {env: tabs[env].empty() for env in ["PROD", "UAT", "DEV"]}
# Launch database calls concurrently using ThreadPoolExecutor
with cf.ThreadPoolExecutor(max_workers=3) as executor:
# Map each environment to a future
future_to_env = {
executor.submit(dash_utils.get_discontinued_series_report, env=env): env
for env in ["PROD", "UAT", "DEV"]
}
for future in cf.as_completed(future_to_env):
env = future_to_env[future]
try:
data = future.result()
except Exception as e:
data = f"Error fetching data for {env}: {e}"
# Update the corresponding tab with the results
with tabs[env]:
st.subheader(f"Discontinued-Series Report ({env})")
if isinstance(data, pd.DataFrame):
st.dataframe(data)
else:
st.write(data)
def chart_page():
st.title("Diff Chart Page")
st.write(
"This chart shows the difference between the indicator published today and yesterday."
)
tickers_list = dash_utils.get_tickers_list()
ticker = st.selectbox("Select Ticker", tickers_list)
curr_df, prev_df = dash_utils.get_ticker_files(ticker)
old_name, new_name = f"{ticker} (old)", f"{ticker} (new)"
curr_df = curr_df.set_index("real_date")["value"].rename(new_name)
prev_df = prev_df.set_index("real_date")["value"].rename(old_name)
diff_df = curr_df - prev_df
data = pd.concat([prev_df, curr_df, diff_df], axis=1)
data = data.fillna(method="ffill")
st.line_chart(diff_df)
def sources_chart_page():
st.markdown(
"""
<iframe
src="https://macrosynergy.github.io/msy-content-links/"
width="1500"
height="1500"
style="border:none;">
</iframe>
""",
unsafe_allow_html=True,
)
def buttons_page():
st.title("Buttons Page")
st.write("Here are some buttons with links.")
st.button(
"JPMaQS on JPMM",
on_click=lambda: st.write("[JPMaQS on JPMM](https://www.jpmm.com/#jpmaqs)"),
)
st.button(
"JPMaQS Confluence",
on_click=lambda: st.write(
"[JPMaQS Confluence](https://example.com/confluence)"
),
)
st.button(
"Macrosynergy Package docs",
on_click=lambda: st.write(
"[Macrosynergy Package docs](https://docs.macrosynergy.com)"
),
)
def multi_charts_page():
st.title("Multi-Chart Page")
st.write("Select a chart type from the dropdown.")
chart_type = st.selectbox(
"Select Chart Type", ["Line Chart", "Bar Chart", "Scatter Plot"]
)
data = pd.DataFrame(np.random.randn(20, 2), columns=["X", "Y"])
if chart_type == "Line Chart":
st.line_chart(data)
elif chart_type == "Bar Chart":
st.bar_chart(data)
elif chart_type == "Scatter Plot":
st.scatter_chart(data)
def email_page():
# Initialize session_state so it's loaded only once per session
if "email_data" not in st.session_state:
st.session_state["email_data"] = dash_utils.load_email_text()
# For convenience, refer to the dict in session_state
d = st.session_state["email_data"]
# Current text in our text area
text = st.text_area("Email Text", d["message"], disabled=d["locked"])
def on_save():
if d["locked"]:
st.warning("Cannot save – message is locked!")
else:
# Update session data
dash_utils.save_email_text(d, new_message=text)
st.success("Message saved!")
def on_save_lock():
# Save & lock
dash_utils.save_email_text(d, new_message=text, lock_state=True)
st.success("Message saved and locked!")
def on_unlock():
# Unlock only (leave message as-is in session state)
dash_utils.save_email_text(d, lock_state=False)
st.success("Message unlocked!")
st.button("Save", on_click=on_save)
st.button("Save & Lock", on_click=on_save_lock)
st.button("Unlock", on_click=on_unlock)
PAGE_KEYS = {
"Buttons": buttons_page,
"Multi-Charts": multi_charts_page,
"Diff Chart": chart_page,
"Sources Chart": sources_chart_page,
"Calc Errors": calc_errors_page,
"Calc Errors (Alt. Layout)": calc_errors_page_extenders,
"Collect Errors": collect_errors_page,
"Discontinued Series": discontinued_series_page,
"Email": email_page,
}
def main():
st.set_page_config(page_title="JPMaQS Dashboard", layout="wide")
st.sidebar.title("Navigation")
page = st.sidebar.radio(
"Go to",
list(PAGE_KEYS.keys()),
)
assert page in PAGE_KEYS, f"Invalid page: {page}"
PAGE_KEYS[page]()
if __name__ == "__main__":
main()
import glob
import os
import pandas as pd
import streamlit
import datetime
import json
import time
from typing import Any
CURR_TICKERS_DIR = "/jpmaqs-isc-git/"
PREV_TICKERS_DIR = "/jpmaqs-isc-git-copy/"
TEMP_FOLDER = "_tmp/"
EMAIL_JSON_FILE = f"{TEMP_FOLDER}email_data.json"
def get_files_list():
return sorted(glob.glob("data/**/*.csv", recursive=True))
def cache_buster(freq="5min"):
"""
Returns the current time rounded to the nearest defined time period.
"""
rounded = pd.Timestamp.now().round(freq)
return rounded.strftime("%Y%m%d%H%M%S")
@streamlit.cache_data
def _get_report_file(
prefix: str,
env: str,
sep: str,
date: str = None,
cache_bust: Any = None,
):
time.sleep(15)
env_str = env.strip().upper()
base_pattern = f"{prefix}{env_str}{sep}"
files = [
f for f in get_files_list() if os.path.basename(f).startswith(base_pattern)
]
err_str = f"No report found for prefix={prefix}, env={env}, date={date}"
if not files:
raise FileNotFoundError(err_str)
if date is None:
return files[-1]
full_pattern = f"{base_pattern}{date}.csv"
matching_files = [f for f in files if os.path.basename(f) == full_pattern]
result = matching_files[0] if len(matching_files) == 1 else None
if result is None:
raise FileNotFoundError(err_str)
return result
def _cached_get_report_file(
prefix: str,
env: str,
sep: str,
date: str = None,
cache_bust: Any = None,
):
cache_bust = cache_bust or cache_buster()
return _get_report_file(prefix, env, sep, date, cache_bust)
def get_calc_error_report(env="PROD", date=None):
file_path = _cached_get_report_file("calc-error_", env, "_", date)
return pd.read_csv(file_path)
def get_collect_error_report(env="PROD", date=None):
file_path = _cached_get_report_file("Collect-Errors-", env, "-", date)
return pd.read_csv(file_path)
def get_discontinued_series_report(env="PROD", date=None):
file_path = _cached_get_report_file("Discontinued-Series-", env, "-", date)
return pd.read_csv(file_path)
@streamlit.cache_data
def _get_tickers_list(cache_bust: Any = None):
curr_files = glob.glob(f"{CURR_TICKERS_DIR}**/*.csv", recursive=True)
prev_files = glob.glob(f"{PREV_TICKERS_DIR}**/*.csv", recursive=True)
curr_files = list(map(os.path.basename, curr_files))
prev_files = list(map(os.path.basename, prev_files))
curr_files = sorted(set(curr_files + prev_files))
curr_files = [f.replace(".csv", "") for f in curr_files]
return curr_files
def get_tickers_list():
return _get_tickers_list(cache_bust=cache_buster("1min"))
@streamlit.cache_data
def _get_ticker_files(
ticker: str,
curr_date: str = None,
prev_date: str = None,
cache_bust: Any = None,
):
def get_ticker_file(ticker, folder):
files = glob.glob(f"{folder}**/{ticker}.csv", recursive=True)
if not files:
raise FileNotFoundError(f"No file found for ticker={ticker}")
return pd.read_csv(files[0])
curr_file = get_ticker_file(ticker, CURR_TICKERS_DIR)
prev_file = get_ticker_file(ticker, PREV_TICKERS_DIR)
return curr_file, prev_file
def _cached_get_ticker_files(
ticker: str,
curr_date: str = None,
prev_date: str = None,
cache_bust: Any = None,
):
cache_bust = cache_bust or cache_buster()
return _get_ticker_files(ticker, curr_date, prev_date, cache_bust)
def get_ticker_files(ticker: str, curr_date: str = None, prev_date: str = None):
return _cached_get_ticker_files(ticker, curr_date, prev_date)
def load_email_text():
if os.path.exists(EMAIL_JSON_FILE):
with open(EMAIL_JSON_FILE) as f:
return json.load(f)
return {
"message": "",
"locked": False,
"last_saved_timestamp": "",
"today_date": "",
"history": {},
}
def save_email_text(data, new_message=None, lock_state=None):
if new_message is not None:
now = datetime.datetime.now().isoformat(timespec="seconds")
data["message"] = new_message
data["last_saved_timestamp"] = now
data["today_date"] = datetime.date.today().isoformat()
data["history"][now] = new_message
if lock_state is not None:
data["locked"] = lock_state
with open(EMAIL_JSON_FILE, "w") as f:
json.dump(data, f)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment