Created
April 3, 2018 15:54
-
-
Save skozz/20c8166c45bcc3bcaeb1f7c1144658b0 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Code that goes along with the Airflow tutorial located at: | |
https://github.com/airbnb/airflow/blob/master/airflow/example_dags/tutorial.py | |
""" | |
from airflow import DAG | |
from airflow.operators.python_operator import PythonOperator | |
from airflow.operators.generic_transfer import GenericTransfer | |
from airflow.contrib.hooks import FTPHook | |
from airflow.hooks.mysql_hook import MySqlHook | |
from datetime import datetime, timedelta | |
import codecs | |
import os | |
import logging | |
five_days_ago = datetime.combine(datetime.today() - timedelta(5), datetime.min.time()) | |
default_args = { | |
'owner': 'flolas', | |
'depends_on_past': True, | |
'start_date': five_days_ago, | |
'email': ['[email protected]'], | |
'email_on_failure': False, | |
'email_on_retry': False, | |
# 'queue': 'bash_queue', | |
# 'pool': 'backfill', | |
# 'priority_weight': 10, | |
#'end_date': datetime(2016, 1, 1), | |
} | |
""" | |
This function get a file from an FTP, then save it to a folder on worker fs | |
TODO: Delete blank file when file not found, then skip | |
""" | |
def download_file_from_ftp(remote_path, local_path, prefix, suffix,conn_id, ext, **kwargs): | |
conn = FTPHook(ftp_conn_id=conn_id) | |
date = str(kwargs['execution_date'].day) + '-' + str(kwargs['execution_date'].month) + '-' + str(kwargs['execution_date'].year) | |
fl = prefix + date + suffix + '.' + ext | |
remote_filepath = remote_path + fl | |
local_filepath = local_path + fl | |
logging.info('Getting file: {}'.format(remote_filepath)) | |
conn.retrieve_file(remote_filepath, local_filepath) | |
return local_filepath | |
""" | |
Expand columns process with pandas and save to the same csv | |
""" | |
def process_file_from_fs(header=None, sep=',', decimal='.', **kwargs): | |
import pandas as pd | |
def unpack_col(col_to_unpack, df_to_append = None, header = 'col', sep=',', na_value=''): | |
unpacked_cols = col_to_unpack.fillna(na_value).apply(lambda x: pd.Series(x.split(','))).fillna(na_value) | |
#add dynamic columns names based on # of rows and parameter header passed for prefix (header_#) | |
col_names = [] | |
for i in unpacked_cols.columns: | |
col_names.append(header + '_' + str(i)) | |
unpacked_cols.columns = col_names | |
if isinstance(df_to_append, pd.DataFrame): | |
#return df concatenated with previously unpacked columns | |
return pd.concat([df_to_append, unpacked_cols], axis=1) | |
else: | |
#return df only with unpacked columns | |
return unpacked_cols | |
# Esto lo que hace es recibir el output de la tarea anterior con xcom(local_filepath si se logro descargar el archivo) | |
local_filepath = kwargs['ti'].xcom_pull(task_ids='download_file') | |
df = pd.read_csv(local_filepath, sep = sep, header = header, decimal = '.', parse_dates=True, encoding='utf-8') | |
df = unpack_col(df[1], df, header='sent_mail') | |
df = unpack_col(df[2], df, header='recv_mail') | |
df.to_csv(local_filepath, sep='\t', encoding='utf-8') | |
return local_filepath | |
def bulk_load_sql(table_name, **kwargs): | |
local_filepath = kwargs['ti'].xcom_pull(task_ids='download_file') | |
conn = MySqlHook(conn_name_attr='ib_sql') | |
conn.bulk_load(table_name, local_filepath) | |
return table_name | |
dag = DAG('carga-mails-ejecutivos-6', default_args=default_args,schedule_interval="@daily") | |
t1 = PythonOperator( | |
task_id='download_file', | |
python_callable=download_file_from_ftp, | |
provide_context=True, | |
op_kwargs={'remote_path': '/home/flolas/files/' | |
,'local_path': '/usr/local/airflow/files/' | |
,'ext': 'csv' | |
,'prefix': 'mail_' | |
,'suffix': '' | |
,'conn_id': 'ftp_servidor' | |
}, | |
dag=dag) | |
t2 = PythonOperator( | |
task_id='process_file', | |
python_callable=process_file_from_fs, | |
provide_context=True, | |
op_kwargs={'sep': '|'}, | |
dag=dag) | |
t3 = PythonOperator( | |
task_id='file_to_MySql', | |
provide_context=True, | |
python_callable=bulk_load_sql, | |
op_kwargs={'table_name': 'MailsEjecutivos'}, | |
dag=dag) | |
t1 >> t2 | |
t2 >> t3 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment