Skip to content

Instantly share code, notes, and snippets.

View kaxil's full-sized avatar

Kaxil Naik kaxil

View GitHub Profile
@kaxil
kaxil / airflow_default_args.py
Last active December 25, 2018 20:25
Airflow Default Arguments
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(2),
# All the parameters below are BigQuery specific and will be available to all the tasks
'bigquery_conn_id': 'gcp-bigquery-connection',
'write_disposition': 'WRITE_EMPTY',
'create_disposition': 'CREATE_IF_NEEDED',
'labels': {'client': 'client-1'}
}
@kaxil
kaxil / airflow_params_usage_1.py
Last active December 25, 2018 19:55
Using Airflow params argument
# You can pass `params` dict to DAG object
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(2),
}
dag = DAG(
dag_id='airflow_tutorial_2',
default_args=default_args,
@kaxil
kaxil / airflow_list_task_dependencies.py
Last active December 25, 2018 19:07
Using List to set Airflow Task Dependencies
# Setting task dependencies (the NORMAL way)
task_one >> task_two
task_two >> task_two_1 >> end
task_two >> task_two_2 >> end
task_two >> task_two_3 >> end
# Using Lists (being a PRO :-D )
task_one >> task_two >> [task_two_1, task_two_2, task_two_3] >> end
@kaxil
kaxil / example_dag.py
Last active December 25, 2018 18:47
Example DAG to showcase repeating dag parameter
# Normal DAG without Context Manager
args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2),
}
dag = DAG(
dag_id='example_dag',
default_args=args,
schedule_interval='0 0 * * *',
@kaxil
kaxil / custom-log-filtering-and-formatting.py
Created December 17, 2018 00:48 — forked from acdha/custom-log-filtering-and-formatting.py
Example of how to filter or apply custom formatting using Python's logging library
#!/usr/bin/env python
# encoding: utf-8
from pprint import pformat, pprint
import logging
class PasswordMaskingFilter(logging.Filter):
"""Demonstrate how to filter sensitive data:"""
@kaxil
kaxil / slack.py
Created October 30, 2018 10:49 — forked from boxysean/slack.py
PythonSlackOperator -- how I've integrated notifications into my PythonOperators
# airflow/plugins/slack.py
import logging
from airflow.operators.python_operator import PythonOperator
from airflow.plugins_manager import AirflowPlugin
from slackclient import SlackClient
from titan.utils import config
@kaxil
kaxil / run.sh
Created September 20, 2018 14:33 — forked from nordineb/run.sh
Azure Instance Metadata service
# Get all metadata
curl -H Metadata:true "http://169.254.169.254/metadata/instance?api-version=2017-04-02"
# Get all network metadata
curl -H Metadata:true "http://169.254.169.254/metadata/instance/network?api-version=2017-04-02"
# Get public ip only
curl -H Metadata:true "http://169.254.169.254/metadata/instance/network/interface/0/ipv4/ipAddress/0/publicIpAddress?api-version=2017-04-02&format=text"
@kaxil
kaxil / pubsub-to-bokeh.py
Created May 27, 2018 20:05
Google Cloud Pub/Sub to Bokeh Dashboard - Streaming Dashboard
# User module to receive tweets
from recevie_tweets_pubsub import receive_tweets
import pandas
from bokeh.io import curdoc
from bokeh.models import ColumnDataSource
from bokeh.models import DatetimeTickFormatter
from bokeh.plotting import figure, output_file
import sys
@kaxil
kaxil / twitter-streaming-pubsub.py
Last active April 26, 2020 08:01
Twitter Streaming API to PubSub
def publish(client, pubsub_topic, data_lines):
"""Publish to the given pubsub topic."""
messages = []
for line in data_lines:
messages.append({'data': line})
body = {'messages': messages}
str_body = json.dumps(body)
data = base64.urlsafe_b64encode(bytearray(str_body, 'utf8'))
client.publish(topic=pubsub_topic, data=data)
@kaxil
kaxil / ClickLogConsumer.java
Created May 13, 2018 03:40 — forked from zero-master/ClickLogConsumer.java
Write from Cloud Pub/Sub to BigQuery using Fileload and save cost on streaming inserts!
package ...
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.api.services.bigquery.model.TimePartitioning;
import com.google.common.collect.ImmutableList;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;