Last active
June 3, 2019 02:14
-
-
Save sakethramanujam/72e6a42109fc4ac54e57c3cd257af7a5 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from airflow import DAG | |
from airflow.operators.bash_operator import BashOperator | |
from airflow.operators.python_operator import PythonOperator | |
from summary import read_file,daily,monthly | |
import datetime as dt | |
default_args={ | |
'owner': 'airflow', | |
'start_date': dt.datetime(2019, 5, 31, 10, 00, 00), | |
'concurrency':1, | |
'retries':5 | |
} | |
def daily_summary(dataframe): | |
daily(dataframe) | |
return("Daily Summary!") | |
def monthly_summary(dataframe): | |
monthly(dataframe) | |
return("Monthly Summary!") | |
with DAG('sample-workflow', | |
default_args=default_args, | |
schedule_interval='*/10 * * * *', | |
) as dag: | |
dataframe = read_file("~/Desktop/AirflowSample-Pipeline/afdir/data/sample-2.csv") | |
daily_opr = PythonOperator(task_id ='daily', | |
python_callable=daily_summary, | |
op_kwargs={'dataframe':dataframe} | |
) | |
monthly_opr = PythonOperator(task_id = 'monthly', | |
python_callable=monthly_summary, | |
op_kwargs={'dataframe':dataframe} | |
) | |
daily_opr>>monthly_opr |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pandas as pd | |
import os | |
import sys | |
def read_file(filename): | |
data = pd.read_csv(filename) | |
data['Month'] = pd.DatetimeIndex(data['Date']).month | |
data["Day"] = pd.DatetimeIndex(data['Date']).day | |
data["WeekNum"]=pd.DatetimeIndex(data['Date']).week | |
return data | |
def daily(dataframe): | |
pathtodaily = "./sales/daily" | |
if(os.path.exists(pathtodaily)): | |
pass | |
else: | |
os.makedirs(pathtodaily) | |
groupbyobj = dataframe.drop(['Month','Day','WeekNum'],axis=1).groupby('Date') | |
for date,group in groupbyobj: | |
datewise = groupbyobj.get_group(date) | |
filename = "{}".format(str(date))+".csv" | |
print(filename) | |
datewise.to_csv(os.path.join(pathtodaily,filename),index=False) | |
return "Daily Sales Generated!" | |
def monthly(dataframe): | |
pathtomonthly = "./sales/monthly" | |
if(os.path.exists(pathtomonthly)): | |
pass | |
else: | |
os.makedirs(pathtomonthly) | |
monthgroupby = dataframe.drop(["Day","WeekNum"],axis=1).groupby('Month') | |
for month,monthgroup in monthgroupby: | |
monthdata = monthgroupby.get_group(month) | |
monthwise = pd.DataFrame() | |
for store,storegroup in monthdata.groupby('Store'): | |
storedata = storegroup.reset_index(drop=True) | |
storewise = pd.DataFrame() | |
for variant,variantgroup in storedata.groupby('Variant'): | |
temp_data = {"Store":[store], | |
"Variant":[variant], | |
"Date":[storedata['Date'][0]], | |
"Amount":[variantgroup["Amount"].sum()], | |
"Quantity":[variantgroup["Quantity"].sum()] | |
} | |
tdf = pd.DataFrame(temp_data) | |
#storewise = pd.concat([tdf]) | |
storewise = storewise.append(tdf,ignore_index=True) | |
#print(tdf) | |
monthwise = monthwise.append(storewise,ignore_index=True) | |
#print(storewise) | |
fn = str(storedata['Date'][0])+".csv" | |
monthwise.to_csv(os.path.join(pathtomonthly,fn),index=False) | |
return "Monthly Sales Generated!" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment