Created
March 26, 2025 02:02
-
-
Save jodoherty/4422d24cf64db5d2fb1e5d7a755fd99a to your computer and use it in GitHub Desktop.
Prefect extra loggers with threading example.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| Prefect extra loggers with threading example. | |
| Run it like this: | |
| PREFECT_LOGGING_EXTRA_LOGGERS=__main__ PREFECT_API_URL=http://127.0.0.1:4200/api python main.py | |
| You should see the plain Python logging for the '__main__' package in the | |
| Prefect UI. | |
| Pay attention to bind_run_context(). This function takes a function that is | |
| to be executed inside another thread wraps it with a closure over the current | |
| flow or task run context. | |
| Without propagating this context into child threads, you will lose logging and | |
| instead see the following message: | |
| prefect.exceptions.MissingContextError: No run context available. You are not in a flow or task run context. | |
| """ | |
| import concurrent.futures | |
| import logging | |
| import threading | |
| from prefect import flow, task, get_run_logger | |
| from prefect.context import get_run_context | |
| logger = logging.getLogger(__name__) | |
| def hello(): | |
| logger.warning("hello() called") | |
| def work(): | |
| logger.warning("work() called") | |
| def bind_run_context(fn): | |
| context = get_run_context().model_copy() | |
| def wrapped_fn(*args, **kwargs): | |
| with context: | |
| fn(*args, **kwargs) | |
| return wrapped_fn | |
| @task | |
| def task(): | |
| logger = get_run_logger() | |
| logger.warning("Hello Task") | |
| hello() | |
| with concurrent.futures.ThreadPoolExecutor() as executor: | |
| future = executor.submit(bind_run_context(work)) | |
| future.result() | |
| @flow | |
| def main(): | |
| hello() | |
| with concurrent.futures.ThreadPoolExecutor() as executor: | |
| future = executor.submit(bind_run_context(work)) | |
| future.result() | |
| logger = get_run_logger() | |
| logger.warning("Hello main") | |
| task() | |
| logger.info("Now creating a thread and calling hello and work") | |
| def run(): | |
| hello() | |
| with concurrent.futures.ThreadPoolExecutor() as executor: | |
| future = executor.submit(bind_run_context(work)) | |
| future.result() | |
| thread = threading.Thread(target=bind_run_context(run)) | |
| thread.start() | |
| thread.join() | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment