Task : Parameterize downloading of file from a url(https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page), Parquetize it (convert the csv file to parquet ), Upload to GCS, Orchestrate it with Airflow.
Last active
February 2, 2022 23:46
-
-
Save gabidoye/fe473e44c0f0f1283d39701be7e45294 to your computer and use it in GitHub Desktop.
DE_ZoomCamp Week2 Assignment (
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# | |
import os | |
import logging | |
import pyarrow.csv as pv | |
import pyarrow.parquet as pq | |
from datetime import datetime | |
from airflow import DAG | |
from airflow.operators.bash import BashOperator | |
from airflow.operators.python import PythonOperator | |
from google.cloud import storage | |
from airflow.providers.google.cloud.operators.bigquery import BigQueryCreateExternalTableOperator | |
PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "dataengineering-bizzy") | |
BUCKET = os.environ.get("GCP_GCS_BUCKET", "dtc_data_lake_dataengineering-bizzy") | |
BIGQUERY_DATASET = os.environ.get("FHV_DATASET", 'fhv_trips_data') | |
Airflow_Home = os.environ.get("AIRFLOW_HOME", "/opt/airflow/") | |
url_prefix = 'https://nyc-tlc.s3.amazonaws.com/trip+data' | |
url_template = url_prefix + '/fhv_tripdata_{{ execution_date.strftime(\'%Y-%m\') }}.csv' | |
output_file_template= Airflow_Home +'/fhv_output_{{ execution_date.strftime(\'%Y-%m\') }}.csv' | |
parquet_file = 'fhv_output_{{ execution_date.strftime(\'%Y-%m\') }}.parquet' | |
def format_2_parquet(src_file): | |
if not src_file.endswith('.csv'): | |
logging.error("Can only accept source files in CSV format, for the moment") | |
return | |
table = pv.read_csv(src_file) | |
pq.write_table(table, src_file.replace('.csv', '.parquet')) | |
# NOTE: takes 20 mins, at an upload speed of 800kbps. Faster if your internet has a better upload speed | |
def upload_to_gcs(bucket, object_name, local_file): | |
""" | |
Ref: https://cloud.google.com/storage/docs/uploading-objects#storage-upload-object-python | |
:param bucket: GCS bucket name | |
:param object_name: target path & file-name | |
:param local_file: source path & file-name | |
:return: | |
""" | |
# WORKAROUND to prevent timeout for files > 6 MB on 800 kbps upload speed. | |
# (Ref: https://github.com/googleapis/python-storage/issues/74) | |
storage.blob._MAX_MULTIPART_SIZE = 5 * 1024 * 1024 # 5 MB | |
storage.blob._DEFAULT_CHUNKSIZE = 5 * 1024 * 1024 # 5 MB | |
# End of Workaround | |
client = storage.Client() | |
bucket = client.bucket(bucket) | |
blob = bucket.blob(object_name) | |
blob.upload_from_filename(local_file) | |
load_workflow= DAG( | |
"fhv_ingest_2_gcs_dag", | |
catchup=True, | |
schedule_interval="0 6 2 * *", | |
start_date= datetime(2019, 1, 1), | |
end_date = datetime(2020,12,31), | |
max_active_runs= 3 | |
) | |
with load_workflow: | |
curl_fhv_job= BashOperator( | |
task_id='fhv_file_url', | |
bash_command=f'curl -sSLf {url_template} > {output_file_template}' | |
) | |
format_csv_to_parquet_task = PythonOperator( | |
task_id="format_csv_2_parquet_task", | |
python_callable=format_2_parquet, | |
op_kwargs={ | |
"src_file": f"{output_file_template}", | |
}, | |
) | |
upload_to_gcs_job = PythonOperator( | |
task_id="upload_to_gcs_job", | |
python_callable=upload_to_gcs, | |
op_kwargs={ | |
"bucket": BUCKET, | |
"object_name": f"raw/fhv/{parquet_file}", | |
"local_file": f"{Airflow_Home}/{parquet_file}", | |
}, | |
) | |
cleanup_job= BashOperator( | |
task_id='cleanup_csv', | |
bash_command=f'rm {output_file_template}' | |
) | |
curl_fhv_job >> format_csv_to_parquet_task >> upload_to_gcs_job |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#from aifc import Aifc_read | |
import os | |
import logging | |
import pyarrow.csv as pv | |
import pyarrow.parquet as pq | |
from datetime import datetime | |
from airflow import DAG | |
from airflow.operators.bash import BashOperator | |
from airflow.operators.python import PythonOperator | |
from google.cloud import storage | |
from airflow.providers.google.cloud.operators.bigquery import BigQueryCreateExternalTableOperator | |
PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "dataengineering-bizzy") | |
BUCKET = os.environ.get("GCP_GCS_BUCKET", "dtc_data_lake_dataengineering-bizzy") | |
BIGQUERY_DATASET = os.environ.get("BIGQUERY_DATASET", 'trips_data_all') | |
Airflow_Home = os.environ.get("AIRFLOW_HOME", "/opt/airflow/") | |
url_prefix = 'https://s3.amazonaws.com/nyc-tlc/trip+data' | |
url_template = url_prefix + '/yellow_tripdata_{{ execution_date.strftime(\'%Y-%m\') }}.csv' | |
output_file_template= Airflow_Home +'/output_{{ execution_date.strftime(\'%Y-%m\') }}.csv' | |
parquet_file = 'output_{{ execution_date.strftime(\'%Y-%m\') }}.parquet' | |
#parquet_file = Airflow_Home +'/output_*.parquet' | |
def format_2_parquet(src_file): | |
if not src_file.endswith('.csv'): | |
logging.error("Can only accept source files in CSV format, for the moment") | |
return | |
table = pv.read_csv(src_file) | |
pq.write_table(table, src_file.replace('.csv', '.parquet')) | |
# NOTE: takes 20 mins, at an upload speed of 800kbps. Faster if your internet has a better upload speed | |
def upload_to_gcs(bucket, object_name, local_file): | |
""" | |
Ref: https://cloud.google.com/storage/docs/uploading-objects#storage-upload-object-python | |
:param bucket: GCS bucket name | |
:param object_name: target path & file-name | |
:param local_file: source path & file-name | |
:return: | |
""" | |
# WORKAROUND to prevent timeout for files > 6 MB on 800 kbps upload speed. | |
# (Ref: https://github.com/googleapis/python-storage/issues/74) | |
storage.blob._MAX_MULTIPART_SIZE = 5 * 1024 * 1024 # 5 MB | |
storage.blob._DEFAULT_CHUNKSIZE = 5 * 1024 * 1024 # 5 MB | |
# End of Workaround | |
client = storage.Client() | |
bucket = client.bucket(bucket) | |
blob = bucket.blob(object_name) | |
blob.upload_from_filename(local_file) | |
load_workflow= DAG( | |
"download_ingest_2_gcs_dag", | |
catchup=True, | |
schedule_interval="0 6 2 * *", | |
start_date= datetime(2019, 1, 1), | |
end_date = datetime(2020,12,31), | |
max_active_runs= 3 | |
) | |
with load_workflow: | |
curl_job= BashOperator( | |
task_id='download_file_url', | |
bash_command=f'curl -sSL {url_template} > {output_file_template}' | |
) | |
format_to_parquet_task = PythonOperator( | |
task_id="format_2_parquet_task", | |
python_callable=format_2_parquet, | |
op_kwargs={ | |
"src_file": f"{output_file_template}", | |
}, | |
) | |
local_to_gcs_job = PythonOperator( | |
task_id="local_to_gcs_job", | |
python_callable=upload_to_gcs, | |
op_kwargs={ | |
"bucket": BUCKET, | |
"object_name": f"raw/{parquet_file}", | |
"local_file": f"{Airflow_Home}/{parquet_file}", | |
}, | |
) | |
bigquery_external_table_job = BigQueryCreateExternalTableOperator( | |
task_id="bigquery_external_table_job", | |
table_resource={ | |
"tableReference": { | |
"projectId": PROJECT_ID, | |
"datasetId": BIGQUERY_DATASET, | |
"tableId": "external_table", | |
}, | |
"externalDataConfiguration": { | |
"sourceFormat": "PARQUET", | |
"sourceUris": [f"gs://{BUCKET}/raw/{parquet_file}"], | |
}, | |
}, | |
) | |
cleanup_job= BashOperator( | |
task_id='cleanup_csv', | |
bash_command=f'rm {output_file_template}' | |
) | |
curl_job >> format_to_parquet_task >> local_to_gcs_job >> bigquery_external_table_job >> cleanup_job |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#from aifc import Aifc_read | |
import os | |
import logging | |
import pyarrow.csv as pv | |
import pyarrow.parquet as pq | |
from datetime import datetime | |
from airflow import DAG | |
from airflow.operators.bash import BashOperator | |
from airflow.operators.python import PythonOperator | |
from google.cloud import storage | |
from airflow.providers.google.cloud.operators.bigquery import BigQueryCreateExternalTableOperator | |
PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "dataengineering-bizzy") | |
BUCKET = os.environ.get("GCP_GCS_BUCKET", "dtc_data_lake_dataengineering-bizzy") | |
BIGQUERY_DATASET = os.environ.get("FHV_DATASET", 'fhv_trips_data') | |
Airflow_Home = os.environ.get("AIRFLOW_HOME", "/opt/airflow/") | |
url = 'https://s3.amazonaws.com/nyc-tlc/misc/taxi+_zone_lookup.csv' | |
output_file_template= Airflow_Home +'/taxi+_zone_lookup.csv' | |
parquet_file = 'taxi+_zone_lookup.parquet' | |
def format_2_parquet(src_file): | |
if not src_file.endswith('.csv'): | |
logging.error("Can only accept source files in CSV format, for the moment") | |
return | |
table = pv.read_csv(src_file) | |
pq.write_table(table, src_file.replace('.csv', '.parquet')) | |
# NOTE: takes 20 mins, at an upload speed of 800kbps. Faster if your internet has a better upload speed | |
def upload_to_gcs(bucket, object_name, local_file): | |
""" | |
Ref: https://cloud.google.com/storage/docs/uploading-objects#storage-upload-object-python | |
:param bucket: GCS bucket name | |
:param object_name: target path & file-name | |
:param local_file: source path & file-name | |
:return: | |
""" | |
# WORKAROUND to prevent timeout for files > 6 MB on 800 kbps upload speed. | |
# (Ref: https://github.com/googleapis/python-storage/issues/74) | |
storage.blob._MAX_MULTIPART_SIZE = 5 * 1024 * 1024 # 5 MB | |
storage.blob._DEFAULT_CHUNKSIZE = 5 * 1024 * 1024 # 5 MB | |
# End of Workaround | |
client = storage.Client() | |
bucket = client.bucket(bucket) | |
blob = bucket.blob(object_name) | |
blob.upload_from_filename(local_file) | |
load_workflow= DAG( | |
"zone_ingest_2_gcs_dag", | |
catchup=False, | |
schedule_interval="0 6 2 * *", | |
start_date= datetime(2019, 1, 1) | |
) | |
with load_workflow: | |
curl_zone_job= BashOperator( | |
task_id='zone_file_url', | |
bash_command=f'curl -sSLf {url} > {output_file_template}' | |
) | |
format_csv_to_parquet_task = PythonOperator( | |
task_id="format_csv_2_parquet_task_zone", | |
python_callable=format_2_parquet, | |
op_kwargs={ | |
"src_file": f"{output_file_template}", | |
}, | |
) | |
upload_to_gcs_job = PythonOperator( | |
task_id="upload_to_gcs_job_zone", | |
python_callable=upload_to_gcs, | |
op_kwargs={ | |
"bucket": BUCKET, | |
"object_name": f"raw/zone/{parquet_file}", | |
"local_file": f"{Airflow_Home}/{parquet_file}", | |
}, | |
) | |
cleanup_job= BashOperator( | |
task_id='cleanup_csv_zone', | |
bash_command=f'rm {output_file_template}' | |
) | |
curl_zone_job >> format_csv_to_parquet_task >> upload_to_gcs_job >> cleanup_job |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment