Created
September 8, 2023 11:12
-
-
Save mrknmc/2345dc17aac64830cb34a13fa99ee688 to your computer and use it in GitHub Desktop.
workflow.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from dataclasses import dataclass | |
from datetime import timedelta | |
from uuid import UUID | |
from temporalio import activity | |
from temporalio import workflow | |
from parrot.core.logging import get_logger | |
logger = workflow.LoggerAdapter(get_logger(__name__), None) | |
@dataclass | |
class UploadDocumentActivityParams: | |
document_type: str | |
booking_id: UUID | |
@dataclass | |
class ClaimedSubtaskWorkflowParams: | |
subtask_id: str | |
@dataclass | |
class SubtasksWorkflowParams: | |
surge: bool | |
@dataclass | |
class CreateAgreementActivityParams: | |
document_type: str | |
document_id: str | |
booking_id: UUID | |
@dataclass | |
class ManualTranscriptionWorkflowParams: | |
document_type: str | |
booking_id: UUID | |
def is_subtask_claimed(subtask_id: str) -> bool: | |
return False | |
def is_subtask_in_final_state(subtask_id: str) -> bool: | |
return False | |
@activity.defn | |
def create_subtask_media_resources(): | |
# TODO: produce & store media resources in S3 | |
pass | |
@activity.defn | |
def notify_surge(): | |
# TODO: send email to reviewers | |
pass | |
@activity.defn | |
def create_subtasks(): | |
# TODO: create subtasks and save them in DB | |
pass | |
@workflow.defn | |
class ClaimedSubtaskWorkflow: | |
@workflow.run | |
async def run(self, params: ClaimedSubtaskWorkflowParams): | |
while 1: | |
await workflow.wait_condition(lambda: is_subtask_claimed(params.subtask_id)) | |
result = await workflow.wait_condition( | |
lambda: is_subtask_in_final_state(params.subtask_id), | |
timeout=timedelta(days=1), | |
) | |
# TODO: either unclaim if timeout or go back to waiting for it to be claimed | |
if result.completed: | |
break | |
@workflow.defn | |
class SubtasksWorkflow: | |
@workflow.run | |
async def run(self, params: SubtasksWorkflowParams): | |
media_resources = await workflow.execute_activity( | |
create_subtask_media_resources | |
) | |
subtasks = await workflow.execute_activity(create_subtasks, media_resources) | |
if params.surge: | |
subtasks = await workflow.execute_activity(notify_surge, media_resources) | |
subtask_flows = [ | |
workflow.execute_child_workflow(ClaimedSubtaskWorkflow, subtask) | |
for subtask in subtasks | |
] | |
# wait for all subtasks to be done | |
await workflow.wait_condition(lambda: all(subtask_flows)) | |
@workflow.defn | |
class ReviewWorkflow: | |
@workflow.run | |
async def run(self): | |
# TODO: same idea as above | |
pass | |
@workflow.defn | |
class ManualTranscriptionWorkflo: | |
@workflow.run | |
async def run(self, params: ManualTranscriptionWorkflowParams): | |
# TODO : fix wrong enum serialization | |
logger.info( | |
f"Document signature workflow for {params.document_type}-{params.booking_id} started" | |
) | |
await workflow.start_child_workflow(SubtasksWorkflow, None) | |
await workflow.start_child_workflow(ReviewWorkflow, None) | |
return None |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment