Skip to content

Instantly share code, notes, and snippets.

View cicdw's full-sized avatar

Chris White cicdw

View GitHub Profile
airflow test tutorial print_date 2015–06–01
## Output
AIRFLOW_CTX_EXECUTION_DATE=2015–06–01T00:00:00+00:00
[2019–04–17 15:54:45,679] {bash_operator.py:110} INFO - Running command: date
[2019–04–17 15:54:45,685] {bash_operator.py:119} INFO - Output:
[2019–04–17 15:54:45,695] {bash_operator.py:123} INFO - Wed Apr 17 15:54:45 PDT 2019
# run your first Prefect flow from the command line
python -c "from prefect import Flow; f = Flow('empty'); f.run()"
def puller(**kwargs):
ti = kwargs['ti']
# get value_1
v1 = ti.xcom_pull(key=None, task_ids='push')
from prefect import task, Parameter, Flow
@task
def return_param(p):
return p
with Flow("parameter-example") as flow:
p = Parameter("p", default=42)
@cicdw
cicdw / dask_cluster_launch.sh
Created April 15, 2019 19:08
CLI for spinning up a local Dask cluster
dask-scheduler
# Scheduler at: tcp://10.0.0.41:8786
# in different terminal windows
dask-worker tcp://10.0.0.41:8786
dask-worker tcp://10.0.0.41:8786
@cicdw
cicdw / set_env_dask.sh
Created April 15, 2019 19:03
Necessary environment variables for running Prefect on Dask
export PREFECT__ENGINE__EXECUTOR__DEFAULT_CLASS="prefect.engine.executors.DaskExecutor"
export PREFECT__ENGINE__EXECUTOR__DASK__ADDRESS="tcp://10.0.0.41:8786"
@cicdw
cicdw / post_standup_flow.py
Created February 25, 2019 01:02
complete code of the standup Prefect flow
import datetime
from google.cloud.firestore import Client
import random
import requests
import prefect
from prefect import Flow, Parameter, task
from prefect.client import Secret
from prefect.schedules import CronSchedule
@cicdw
cicdw / mapping_snippet.py
Created February 23, 2019 19:35
Extract of the mapping code from the reminder flow
reminder_flag = is_reminder_needed.map(get_team, unmapped(updates))
res = send_reminder.map(reminder_flag)
@cicdw
cicdw / reminder_flow.py
Last active July 30, 2019 04:27
choppy version of the Prefect reminder Flow
@task
def get_collection_name():
"""
Returns the current date, formatted, which maps
to a Google Firestore collection name.
"""
date_format = "%Y-%m-%d"
now = prefect.context["scheduled_start_time"]
return now.strftime(date_format)
@cicdw
cicdw / flow_deployment.py
Last active February 27, 2019 16:23
Create Prefect environment and deploy Flow
from prefect.environments import DockerEnvironment
env = DockerEnvironment(
base_image="python:3.6",
registry_url="XXXXXXXX",
python_dependencies=["google-cloud-firestore"],
files={"~/google-creds.json": "/root/google-creds.json"},
env_vars={"GOOGLE_APPLICATION_CREDENTIALS": "/root/google-creds.json"},
)