Skip to content

Instantly share code, notes, and snippets.

@sakethramanujam
Last active June 3, 2019 02:14
Show Gist options
  • Save sakethramanujam/72e6a42109fc4ac54e57c3cd257af7a5 to your computer and use it in GitHub Desktop.
Save sakethramanujam/72e6a42109fc4ac54e57c3cd257af7a5 to your computer and use it in GitHub Desktop.
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from summary import read_file,daily,monthly
import datetime as dt
default_args={
'owner': 'airflow',
'start_date': dt.datetime(2019, 5, 31, 10, 00, 00),
'concurrency':1,
'retries':5
}
def daily_summary(dataframe):
daily(dataframe)
return("Daily Summary!")
def monthly_summary(dataframe):
monthly(dataframe)
return("Monthly Summary!")
with DAG('sample-workflow',
default_args=default_args,
schedule_interval='*/10 * * * *',
) as dag:
dataframe = read_file("~/Desktop/AirflowSample-Pipeline/afdir/data/sample-2.csv")
daily_opr = PythonOperator(task_id ='daily',
python_callable=daily_summary,
op_kwargs={'dataframe':dataframe}
)
monthly_opr = PythonOperator(task_id = 'monthly',
python_callable=monthly_summary,
op_kwargs={'dataframe':dataframe}
)
daily_opr>>monthly_opr
import pandas as pd
import os
import sys
def read_file(filename):
data = pd.read_csv(filename)
data['Month'] = pd.DatetimeIndex(data['Date']).month
data["Day"] = pd.DatetimeIndex(data['Date']).day
data["WeekNum"]=pd.DatetimeIndex(data['Date']).week
return data
def daily(dataframe):
pathtodaily = "./sales/daily"
if(os.path.exists(pathtodaily)):
pass
else:
os.makedirs(pathtodaily)
groupbyobj = dataframe.drop(['Month','Day','WeekNum'],axis=1).groupby('Date')
for date,group in groupbyobj:
datewise = groupbyobj.get_group(date)
filename = "{}".format(str(date))+".csv"
print(filename)
datewise.to_csv(os.path.join(pathtodaily,filename),index=False)
return "Daily Sales Generated!"
def monthly(dataframe):
pathtomonthly = "./sales/monthly"
if(os.path.exists(pathtomonthly)):
pass
else:
os.makedirs(pathtomonthly)
monthgroupby = dataframe.drop(["Day","WeekNum"],axis=1).groupby('Month')
for month,monthgroup in monthgroupby:
monthdata = monthgroupby.get_group(month)
monthwise = pd.DataFrame()
for store,storegroup in monthdata.groupby('Store'):
storedata = storegroup.reset_index(drop=True)
storewise = pd.DataFrame()
for variant,variantgroup in storedata.groupby('Variant'):
temp_data = {"Store":[store],
"Variant":[variant],
"Date":[storedata['Date'][0]],
"Amount":[variantgroup["Amount"].sum()],
"Quantity":[variantgroup["Quantity"].sum()]
}
tdf = pd.DataFrame(temp_data)
#storewise = pd.concat([tdf])
storewise = storewise.append(tdf,ignore_index=True)
#print(tdf)
monthwise = monthwise.append(storewise,ignore_index=True)
#print(storewise)
fn = str(storedata['Date'][0])+".csv"
monthwise.to_csv(os.path.join(pathtomonthly,fn),index=False)
return "Monthly Sales Generated!"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment