Skip to content

Instantly share code, notes, and snippets.

View cr3a7ure's full-sized avatar

Goutis Dimitrios cr3a7ure

View GitHub Profile
@cr3a7ure
cr3a7ure / subdag_operator_sample.py
Created December 4, 2020 19:13 — forked from ktsmy/subdag_operator_sample.py
Airflow Dynamic Workflow Sample
# -*- coding: utf-8 -*-
import airflow
from airflow.executors.celery_executor import CeleryExecutor
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.subdag_operator import SubDagOperator
def get_id_list():
""" idのリストを返す. 例のためとりあえず簡単に0〜99. """
@cr3a7ure
cr3a7ure / economic_events_update_dag.py
Created December 4, 2020 19:09 — forked from alucarded/economic_events_update_dag.py
Airflow DAG definition file to dynamically generate DAGs based on a variable (pull economic data when it is released)
#/usr/bin/python3
# -*- coding: utf-8 -*-
import logging
import airflow
from airflow import DAG
from datetime import timedelta, datetime
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.http_operator import SimpleHttpOperator
# -*- coding: utf-8 -*-
from airflow.operators.http_operator import SimpleHttpOperator
from airflow.operators.postgres_operator import PostgresOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.hooks.postgres_hook import PostgresHook
from airflow.models import Variable, DAG
from datetime import date, datetime, timedelta
@cr3a7ure
cr3a7ure / ssh_tunnel_postgres.py
Created December 4, 2020 19:07 — forked from edthix/ssh_tunnel_postgres.py
Sample airflow dag for ssh tunnel + postgres (assuming both SERVER_ssh_connector and SERVER_ssh_postresql_tunnel_connector are available)
from datetime import timedelta, datetime
import airflow
from airflow import DAG
from airflow.models import Variable
from airflow.contrib.operators.ssh_operator import SSHOperator
from airflow.contrib.hooks.ssh_hook import SSHHook
from airflow.operators.postgres_operator import PostgresOperator
default_args = {
'owner': 'airflow',
@cr3a7ure
cr3a7ure / dag_dumper.py
Created December 4, 2020 19:04 — forked from alevene/dag_dumper.py
Dump DAG definitions to file/stdout
"""
Script to print DAG+Task information in a text format. This can be used to quickly compare against other branches
or versions to confirm the "compiled" version of a DAG is matching expectations.
Usage:
1. ~/airflow-install (branch) $ ./worker/run --no-name python3 /opt/airflow/utils/dag_dumper.py -l dag1 dag2 -o /opt/airflow/dags/devenv_config/dumps/dag_dump_branch
2. ~/airflow-install (branch) $ git checkout development
3. ~/airflow-install (development) $ ./worker/run --no-name python3 /opt/airflow/utils/dag_dumper.py -l dag1 dag2 -o /opt/airflow/dags/devenv_config/dumps/dag_dump_dev
4. Run comparison against the 2 output files
@cr3a7ure
cr3a7ure / README.md
Created December 4, 2020 18:51 — forked from sjednac/README.md
Submit a Spark job to an existing Amazon EMR cluster

Submit a Spark job to an existing Amazon EMR cluster

Creates a step in Amazon EMR for a given cluster_id and monitors it's progress using a sensor. A more complex example, that involves cluster creation/termination can be found here.

import airflowlib.emr_lib as emr
import os
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
from datetime import datetime, timedelta
###############################################
# Parameters
###############################################
spark_master = "spark://spark:7077"
@cr3a7ure
cr3a7ure / slack.py
Created December 4, 2020 18:47 — forked from boxysean/slack.py
PythonSlackOperator -- how I've integrated notifications into my PythonOperators
# airflow/plugins/slack.py
import logging
from airflow.operators.python_operator import PythonOperator
from airflow.plugins_manager import AirflowPlugin
from slackclient import SlackClient
from titan.utils import config
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,