Created
May 27, 2022 11:46
-
-
Save KayvanShah1/ac5db4fb481b9683069f2c0d1784cd11 to your computer and use it in GitHub Desktop.
Prefect 2.0 ORION: Flow in Flow Example
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
tickers: | |
- ticker_code: ^IXIC | |
name: NASDAQ Composite | |
- ticker_code: ^GSPC | |
name: S&P 500 | |
- ticker_code: ^DJI | |
name: Dow Jones Industrial Average | |
- ticker_code: ^BSESN | |
name: S&P BSE SENSEX | |
- ticker_code: ^RUT | |
name: Russell 2000 | |
- ticker_code: ^GDAXI | |
name: DAX PERFORMANCE-INDEX | |
- ticker_code: ^N225 | |
name: Nikkei 225 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import os | |
import random | |
from datetime import date | |
from pathlib import Path | |
import pandas as pd | |
import yaml | |
import yfinance as yf | |
from pandas_datareader import data as pdr | |
yf.pdr_override() | |
from prefect import flow, task | |
from prefect.logging import get_logger, get_run_logger | |
from prefect.task_runners import SequentialTaskRunner | |
BASE_PATH = Path(__file__).parent | |
YAHOO_CONFIG_YAML = os.path.join(BASE_PATH, "yahoo.yaml") | |
YAHOO_DATA_PATH = os.path.join(BASE_PATH, "data") | |
@task | |
def read_config(metadata_path: str) -> dict: | |
"""This function reads the metadata from a JSON or YAML file | |
Args: | |
metadata_path (str): Path to metadata JSON or YAML file | |
Returns: | |
dict: The JSON configuration file | |
""" | |
with open(metadata_path, "r") as f: | |
if metadata_path.split(".") == "json": | |
return json.load(f) | |
return yaml.load(f, Loader=yaml.SafeLoader) | |
@task | |
def get_ticker_data( | |
ticker_dict: dict, | |
start_date: str = date.today().strftime("%Y-%m-%d"), | |
end_date: str = date.today().strftime("%Y-%m-%d"), | |
): | |
ticker = ticker_dict["ticker_code"] | |
data = pdr.get_data_yahoo(ticker, start=start_date, end=end_date) | |
return data | |
@task | |
def clean_ticker_data(df): | |
df = df.reset_index() | |
df.columns = [col.lower().replace(" ", "_") for col in df.columns] | |
df["date"] = pd.to_datetime(df["date"], errors="coerce") | |
df = df.sort_values(by="date", ascending=True) | |
return df | |
@task | |
def basic_preprocess(df): | |
df.iloc[:, 1:] = df.iloc[:, 1:].astype("float64") | |
df = df.interpolate() | |
df = df.fillna(method="ffill") | |
return df | |
@task | |
def save_ticker_data(ticker_dict: dict, cleaned_data): | |
logger = get_run_logger() | |
ticker, ticker_name = ticker_dict["ticker_code"], ticker_dict["name"] | |
if not os.path.exists(YAHOO_DATA_PATH): | |
os.makedirs(YAHOO_DATA_PATH) | |
file_name = f"{ticker_name}({ticker}).parquet" | |
file_path = os.path.join(YAHOO_DATA_PATH, file_name) | |
if os.path.exists(file_path): | |
df = pd.read_parquet(file_path) | |
cleaned_data = df.append(cleaned_data) | |
cleaned_data = cleaned_data.reset_index() | |
cleaned_data.drop_duplicates(subset="date", keep="last", inplace=True) | |
cleaned_data.to_parquet(file_path, index=False) | |
logger.info(f"{ticker_name} data saved at {file_path}") | |
@flow(task_runner=SequentialTaskRunner()) | |
def ticker_single_pipeline(ticker_dict: dict): | |
data = get_ticker_data(ticker_dict, start_date="2022-05-01") | |
data = clean_ticker_data(data) | |
data = basic_preprocess(data) | |
data = save_ticker_data(ticker_dict, data) | |
@flow | |
def ticker_mutli_pipeline(): | |
conf = read_config(YAHOO_CONFIG_YAML) | |
for ticker_dict in conf.result()["tickers"]: | |
ticker_single_pipeline(ticker_dict) | |
ticker_mutli_pipeline() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment