Skip to content

Instantly share code, notes, and snippets.

View cicdw's full-sized avatar

Chris White cicdw

View GitHub Profile
@cicdw
cicdw / set_env_dask.sh
Created April 15, 2019 19:03
Necessary environment variables for running Prefect on Dask
export PREFECT__ENGINE__EXECUTOR__DEFAULT_CLASS="prefect.engine.executors.DaskExecutor"
export PREFECT__ENGINE__EXECUTOR__DASK__ADDRESS="tcp://10.0.0.41:8786"
@cicdw
cicdw / dask_cluster_launch.sh
Created April 15, 2019 19:08
CLI for spinning up a local Dask cluster
dask-scheduler
# Scheduler at: tcp://10.0.0.41:8786
# in different terminal windows
dask-worker tcp://10.0.0.41:8786
dask-worker tcp://10.0.0.41:8786
from prefect import task, Parameter, Flow
@task
def return_param(p):
return p
with Flow("parameter-example") as flow:
p = Parameter("p", default=42)
def puller(**kwargs):
ti = kwargs['ti']
# get value_1
v1 = ti.xcom_pull(key=None, task_ids='push')
# run your first Prefect flow from the command line
python -c "from prefect import Flow; f = Flow('empty'); f.run()"
airflow test tutorial print_date 2015–06–01
## Output
AIRFLOW_CTX_EXECUTION_DATE=2015–06–01T00:00:00+00:00
[2019–04–17 15:54:45,679] {bash_operator.py:110} INFO - Running command: date
[2019–04–17 15:54:45,685] {bash_operator.py:119} INFO - Output:
[2019–04–17 15:54:45,695] {bash_operator.py:123} INFO - Wed Apr 17 15:54:45 PDT 2019
from prefect import task, Flow
@task
def create_list():
return [1, 1, 2, 3]
@task
def add_one(x):
return x + 1
@cicdw
cicdw / sql_create.py
Created October 13, 2019 19:19
Creates a SQLite Table for storing SSH attempts
from prefect.tasks.database.sqlite import SQLiteScript
create_script = "CREATE TABLE IF NOT EXISTS SSHATTEMPTS (timestamp TEXT, username TEXT, port INTEGER, city TEXT, country TEXT, latitude REAL, longitude REAL)"
create_table = SQLiteScript(
db="ssh.db", script=create_script, name="Create Database and Table"
)
@cicdw
cicdw / prefect_lambda.py
Last active October 28, 2019 17:26
A template for an AWS Lambda Function which triggers Prefect Flow Runs
import json
import os
import urllib.parse
import urllib.request
print("Loading function")
def lambda_handler(event, context):
@cicdw
cicdw / provide_params.py
Created October 26, 2019 22:36
Snippet of providing parmeter values
# if you wish to pass information about the triggering event as a parameter,
# simply add that to the inputs dictionary under the parameters key,
# whose value should be a dictionary of PARAMETER_NAME -> PARAMETER_VALUE
# pass the full event
inputs['parameters'] = dict(event=event)
# or just the bucket name
inputs['parameters'] = dict(bucket_name=event['Records'][0]['s3']['bucket']['name'])