This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
export AIRFLOW_HOME=/path/to/airflow/ | |
# Obviously you will need the proper Read/Write access | |
# to this directory | |
mkdir -p ${AIRFLOW_HOME} | |
airflow initdb |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
CREATE EXTENSION IF NOT EXISTS "uuid-ossp"; | |
CREATE TABLE clickstream ( | |
click_id uuid PRIMARY KEY NOT NULL DEFAULT uuid_generate_v4(), | |
click_timestamp TIMESTAMP WITH TIME ZONE, | |
user_id UUID, | |
is_ad_display_event BOOLEAN, | |
is_ad_search_event BOOLEAN | |
); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
airflow connections -a - conn_id "mini-warehouse-db" \ | |
- conn_type "postgres" \ | |
- conn_host "localhost" \ | |
- conn_port "5432" \ | |
- conn_schema "miniwarehousedb" \ | |
- conn_login "etl" \ | |
- conn_password "password" \ | |
- conn_extra "" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from airflow import DAG | |
from airflow.operators.postgres_operator import PostgresOperator | |
from airflow.sensors.sql_sensor import SqlSensor | |
from datetime import datetime, timedelta | |
# Default DAG parameters | |
default_args = { | |
'owner': 'airflow', | |
'depends_on_past': False, | |
'start_date': datetime(2018, 10, 1), |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# The connection ID to the datawarehouse DB (must match the connection | |
# handler defined in airflow for the data source) | |
target_db = 'mini-warehouse-db' | |
# A simple sensor checking wether clickstream data is present | |
# for the day following the execution_date of the DAG Run | |
sensor_query_template = ''' | |
SELECT TRUE | |
FROM clickstream | |
WHERE click_timestamp::DATE >= '{{ execution_date + macros.timedelta(days=1) }}'::DATE |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# A minimalist idempotent aggregation query for clickstream data | |
aggregation_query_template = ''' | |
BEGIN; | |
DELETE FROM clickstream_aggregated | |
WHERE click_date = '{{ execution_date }}'::DATE; | |
INSERT INTO clickstream_aggregated | |
SELECT click_timestamp::DATE AS click_date, |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
BEGIN; | |
INSERT INTO clickstream | |
(click_timestamp, user_id, is_ad_display_event, is_ad_search_event) | |
VALUES | |
('2018–10–01 03:00:00', uuid_generate_v4(), TRUE, FALSE), | |
('2018–10–01 04:00:00', uuid_generate_v4(), FALSE, TRUE), | |
('2018–10–02 03:00:00', uuid_generate_v4(), TRUE, FALSE), | |
('2018–10–02 05:00:00', uuid_generate_v4(), TRUE, FALSE), | |
('2018–10–03 04:00:00', uuid_generate_v4(), FALSE, TRUE), |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"connector.class": "io.confluent.connect.s3.S3SinkConnector", | |
"topics": "leboncoin_staging_ads_ad-publications_public_avro", | |
"s3.bucket.name": "leboncoin_kafka_events_bucket", | |
"topics.dir": "staging/topics/ads/raw/parquet", | |
"storage.class": "io.confluent.connect.s3.storage.S3Storage", | |
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat", | |
"path.format": "'schema_version'=VV/'event_date'=YYYY-MM-dd/'event_hour'=HH", | |
"partitioner.class": "fr.leboncoin.data.archiver.partitioner.CustomTimePartitioner", | |
"timestamp.extractor": "fr.leboncoin.data.archiver.parser.LeboncoinTimestampExtractor", |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public interface Partitioner<T> { | |
void configure(Map<String, Object> config); | |
/** | |
* Returns string representing the output path for a sinkRecord to be encoded and stored. | |
* | |
* @param sinkRecord The record to be stored by the Sink Connector | |
* @return The path/filename the SinkRecord will be stored into after it is encoded | |
*/ | |
String encodePartition(SinkRecord sinkRecord); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"connector.class": "io.confluent.connect.s3.S3SinkConnector", | |
"errors.tolerance": "none", | |
"errors.logs.enable": "true", | |
"errors.log.include.messages": "true", | |
"topics": "leboncoin_staging_ads_ad-publications_public_avro", | |
"s3.bucket.name": "leboncoin_kafka_events_bucket", | |
"topics.dir": "staging/topics/ads/raw/parquet", | |
"storage.class": "io.confluent.connect.s3.storage.S3Storage", | |
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat", |
OlderNewer