Last active
January 30, 2023 18:36
-
-
Save nathanielobrown/77e9d7500e317317a8ee3ffc59ea35dc to your computer and use it in GitHub Desktop.
Testing delivery of activity cancellation exceptions (CacelledError) for the Temporal Python SDK
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import concurrent.futures | |
import time | |
from datetime import timedelta | |
import requests | |
import rich | |
from temporalio import activity, workflow | |
from temporalio.client import WorkflowFailureError | |
from temporalio.exceptions import CancelledError | |
from temporalio.testing import WorkflowEnvironment | |
from temporalio.worker import UnsandboxedWorkflowRunner, Worker | |
def print(*args, **kwargs): | |
rich.print(*args, **kwargs) | |
@activity.defn | |
def slow_activity(arg: str): | |
print(f"\nStarted activity {arg}") | |
start_time = time.time() | |
try: | |
# Big file that will be slow to download (~4 seconds). I used this instead of | |
# time.sleep just to make **sure** that HTTP requests would be cancelled | |
requests.get( | |
"https://github.com/docker/docker-ce/archive/refs/tags/v19.03.14.zip" | |
) | |
except CancelledError as e: | |
run_time = time.time() - start_time | |
print( | |
f"\n[yellow]CancelledError in activity {arg} after {run_time:.2f}s:[/yellow] {e!s}" | |
) | |
return | |
print(f"\n[blue]Completed activity {arg}[/blue]") | |
return f"result {arg}" | |
@workflow.defn | |
class Workflow: | |
@workflow.run | |
async def run(self, arg: str): | |
return await workflow.start_activity( | |
slow_activity, arg, schedule_to_close_timeout=timedelta(seconds=1.2) | |
) | |
async def test_define_activity__handles_timeouts__worker_edition(): | |
task_queue = "test" | |
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: | |
async with await WorkflowEnvironment.start_time_skipping() as env: | |
async with Worker( | |
client=env.client, | |
activity_executor=executor, | |
task_queue=task_queue, | |
activities=[slow_activity], | |
workflows=[Workflow], | |
workflow_runner=UnsandboxedWorkflowRunner(), | |
max_concurrent_activities=5, | |
): | |
handles = [] | |
for i in range(1, 6): | |
handle = await env.client.start_workflow( | |
Workflow.run, | |
f"workflow{i}", | |
task_queue=task_queue, | |
id=f"test{i}", | |
) | |
handles.append(handle) | |
for handle in handles: | |
try: | |
result = await handle.result() | |
except WorkflowFailureError as e: | |
print(f"\n[red]Workflow {handle.id} failed:[/red] {e.cause!s}") | |
else: | |
print(f"\n[green]{result}[/green]") | |
time.sleep(5) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment