Skip to content

Instantly share code, notes, and snippets.

View discdiver's full-sized avatar

Jeff Hale discdiver

View GitHub Profile
Found flow 'Previously unreliable pipeline'
Deployment YAML created at
'/Users/jeffhale/Desktop/prefect/prefect-101/pipeline4-deployment.
yaml'.
@discdiver
discdiver / ex5.py
Created August 2, 2022 18:21
ex5.py
import random
from prefect import flow, task
@task(name="Get data from API", retries=4, retry_delay_seconds=2) # NEW ****
def call_unreliable_api():
choices = [{"data": 42}, "failure"]
res = random.choice(choices)
if res == "failure":
raise Exception("Our unreliable service failed")
16:15:20.543 | INFO | prefect.engine - Created flow run 'blue-flounder' for flow 'Previously unreliable pipeline'
16:15:20.612 | INFO | Flow run 'blue-flounder' - Created task run 'Get data from API-466f2784-0' for task 'Get data from API'
16:15:20.613 | INFO | Flow run 'blue-flounder' - Executing 'Get data from API-466f2784-0' immediately...
16:15:20.625 | ERROR | Task run 'Get data from API-466f2784-0' - Encountered exception during execution:
Traceback (most recent call last):
File "/Users/jeffhale/miniforge3/lib/python3.10/site-packages/prefect/engine.py", line 1048, in orchestrate_task_run
result = await run_sync(task.fn, *args, **kwargs)
File "/Users/jeffhale/miniforge3/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 56, in run_sync_in_worker_thread
return await anyio.to_thread.run_sync(call, cancellable=True)
File "/Users/jeffhale/miniforge3/lib/python3.10/site-packages/anyio/to_thread.py", line 31, in run_sync
@discdiver
discdiver / ex4.py
Created August 2, 2022 18:10
ex4.py
import random
from prefect import flow, task
@task(name="Get data from API", retries=4, retry_delay_seconds=2) # NEW ****
def call_unreliable_api():
choices = [{"data": 42}, "failure"]
res = random.choice(choices)
if res == "failure":
raise Exception("Our unreliable service failed")
else:
09:31:52.689 | INFO | prefect.engine - Created flow run 'elite-hedgehog' for flow 'pipeline3'
09:31:52.748 | INFO | Flow run 'elite-hedgehog' - Created task run 'call_unreliable_api-466f2784-0' for task 'call_unreliable_api'
09:31:52.749 | INFO | Flow run 'elite-hedgehog' - Executing 'call_unreliable_api-466f2784-0' immediately...
09:31:52.771 | INFO | Task run 'call_unreliable_api-466f2784-0' - Finished in state Completed()
09:31:52.782 | INFO | Flow run 'elite-hedgehog' - Created task run 'augment_data-960bb844-0' for task 'augment_data'
09:31:52.782 | INFO | Flow run 'elite-hedgehog' - Executing 'augment_data-960bb844-0' immediately...
09:31:52.801 | INFO | Task run 'augment_data-960bb844-0' - Finished in state Completed()
09:31:52.810 | INFO | Flow run 'elite-hedgehog' - Created task run 'write_results_to_database-fbbf5571-0' for task 'write_results_to_database'
09:31:52.811 | INFO | Flow run 'elite-hedgehog' - Executing 'write_results_to_database-fbbf5571-0' immediately...
09:3
@discdiver
discdiver / ex3.py
Last active August 2, 2022 18:09
ex3.py
import random
from prefect import flow, task # NEW ****
@task # NEW ****
def call_unreliable_api():
choices = [{"data": 42}, "failure"]
res = random.choice(choices)
if res == "failure":
raise Exception("Our unreliable service failed")
else:
09:31:12.195 | INFO | prefect.engine - Created flow run 'scarlet-avocet' for flow 'pipeline2'
09:31:12.256 | INFO | Flow run 'scarlet-avocet' - Finished in state Completed()
Wrote {'data': 42, 'message': 'Trying a flow!'} to database successfully!
import random
from prefect import flow # NEW ****
def call_unreliable_api():
choices = [{"data": 42}, "failure"]
res = random.choice(choices)
if res == "failure":
raise Exception("Our unreliable service failed")
else:
return res
pipeline(msg="Super Special Message")
import random
def call_unreliable_api():
choices = [{"data": 42}, "failure"]
res = random.choice(choices)
if res == "failure":
raise Exception("Our unreliable service failed")
else:
return res