Skip to content

Instantly share code, notes, and snippets.

@jodoherty
Created March 26, 2025 02:02
Show Gist options
  • Select an option

  • Save jodoherty/4422d24cf64db5d2fb1e5d7a755fd99a to your computer and use it in GitHub Desktop.

Select an option

Save jodoherty/4422d24cf64db5d2fb1e5d7a755fd99a to your computer and use it in GitHub Desktop.
Prefect extra loggers with threading example.
"""
Prefect extra loggers with threading example.
Run it like this:
PREFECT_LOGGING_EXTRA_LOGGERS=__main__ PREFECT_API_URL=http://127.0.0.1:4200/api python main.py
You should see the plain Python logging for the '__main__' package in the
Prefect UI.
Pay attention to bind_run_context(). This function takes a function that is
to be executed inside another thread wraps it with a closure over the current
flow or task run context.
Without propagating this context into child threads, you will lose logging and
instead see the following message:
prefect.exceptions.MissingContextError: No run context available. You are not in a flow or task run context.
"""
import concurrent.futures
import logging
import threading
from prefect import flow, task, get_run_logger
from prefect.context import get_run_context
logger = logging.getLogger(__name__)
def hello():
logger.warning("hello() called")
def work():
logger.warning("work() called")
def bind_run_context(fn):
context = get_run_context().model_copy()
def wrapped_fn(*args, **kwargs):
with context:
fn(*args, **kwargs)
return wrapped_fn
@task
def task():
logger = get_run_logger()
logger.warning("Hello Task")
hello()
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(bind_run_context(work))
future.result()
@flow
def main():
hello()
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(bind_run_context(work))
future.result()
logger = get_run_logger()
logger.warning("Hello main")
task()
logger.info("Now creating a thread and calling hello and work")
def run():
hello()
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(bind_run_context(work))
future.result()
thread = threading.Thread(target=bind_run_context(run))
thread.start()
thread.join()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment