This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
A DAG docstring might be a good way to explain at a high level | |
what problem space the DAG is looking at. | |
Links to design documents, upstream dependencies etc | |
are highly recommended. | |
""" | |
from datetime import datetime, timedelta | |
from airflow.models import DAG # Import the DAG class | |
from airflow.operators.bash_operator import BashOperator | |
from airflow.operators.python_operator import PythonOperator |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
A DAG docstring might be a good way to explain at a high level | |
what problem space the DAG is looking at. | |
Links to design documents, upstream dependencies etc | |
are highly recommended. | |
""" | |
from datetime import datetime, timedelta | |
from airflow.models import DAG # Import the DAG class | |
from airflow.operators.sensors import NamedHivePartitionSensor | |
from airflow.operators.hive_operator import HiveOperator |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
NamedHivePartitionSensor( | |
task_id='waiting_for_my_awesome_hive_table', | |
partition_names=['robert.fct_bitcoin_revenue'], | |
dag=dag | |
) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
template = """ | |
SELECT | |
SUM(m_bitcoin_trade_revenue) AS m_revenue | |
FROM | |
robert.fct_bitcoin_revenue | |
WHERE | |
ds = '{{ ds }}' | |
""" | |
HiveOperator( |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
HiveToMySqlTransfer( | |
task_id='data_to_mysql_transfer', | |
sql="SELECT * FROM robert.dim_bitcoin_revenue WHERE ds = '{{ ds }}'", | |
mysql_conn_id='robert_mysql', | |
mysql_table='dim_bitcoin_revenue', | |
hiveserver2_conn_id='hiveserver2_silver', | |
dag=dag) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
SELECT | |
b.dim_market | |
, SUM(a.m_bookings) AS m_bookings | |
FROM ( | |
SELECT | |
id_listing | |
, 1 AS m_bookings | |
, m_a # not used (for illustration only) | |
, m_b # not used (for illustration only) | |
, m_c # not used (for illustration only) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
INSERT OVERWRITE TABLE bookings_summary PARTITION (ds= '{{ earliest_ds }}') | |
SELECT | |
dim_market | |
, SUM(m_bookings) AS m_bookings | |
FROM | |
fct_bookings | |
WHERE | |
ds = '{{ earliest_ds }}' | |
GROUP BY | |
dim_market |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
INSERT OVERWRITE TABLE fct_bookings PARTITION (ds='{{ latest_ds }}') | |
SELECT | |
id_listing | |
, m_bookings | |
FROM | |
some_up_stream_booking_tables | |
WHERE | |
ds = '{{ latest_ds }}' | |
; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
INSERT OVERWRITE TABLE bookings_summary PARTITION (ds) | |
SELECT | |
dim_market | |
, SUM(m_bookings) AS m_bookings | |
, ds # For Hive to know we are using dynamic partitions | |
FROM | |
fct_bookings | |
WHERE | |
ds BETWEEN '{{ earliest_ds }}' AND '{{ latest_ds }}' | |
GROUP BY |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Column | Type | Extra | Comment | |
--------------------+---------+-----------------+------------------------------------------- | |
id_listing | bigint | | Unique id of the listing. | |
id_host | bigint | | Unique id of the host for the listing | |
dim_market | varchar | | The market in which the listing is located | |
ds | varchar | partition key | | |
(4 rows) |