Created
April 2, 2025 20:21
-
-
Save ferromir/130c0819e3eb091dfafaee5d5ad47dc2 to your computer and use it in GitHub Desktop.
Lidex translated to Python using Claude 3.7
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import Dict, List, Callable, Any, TypeVar, Optional, Union, Mapping | |
from enum import Enum | |
from datetime import datetime, timedelta | |
import json | |
import asyncio | |
import time | |
DEFAULT_MAX_FAILURES = 3 | |
DEFAULT_TIMEOUT_MS = 60_000 # 1m | |
DEFAULT_POLL_MS = 1_000 # 1s | |
# Type aliases | |
T = TypeVar('T') | |
Status = str # "idle" | "running" | "failed" | "finished" | "aborted" | |
# Status constants | |
STATUS_IDLE = "idle" | |
STATUS_RUNNING = "running" | |
STATUS_FAILED = "failed" | |
STATUS_FINISHED = "finished" | |
STATUS_ABORTED = "aborted" | |
class Context: | |
"""Context for workflow execution""" | |
def __init__(self, step_fn, sleep_fn, start_fn): | |
self._step_fn = step_fn | |
self._sleep_fn = sleep_fn | |
self._start_fn = start_fn | |
async def step(self, id: str, fn: Callable[[], Any]) -> Any: | |
""" | |
Executes a step. | |
Args: | |
id: The id of the step. | |
fn: The function to be executed. | |
""" | |
return await self._step_fn(id, fn) | |
async def sleep(self, id: str, ms: int) -> None: | |
""" | |
Puts the workflow to sleep. | |
Args: | |
id: The id of the nap. | |
ms: The amount of milliseconds to sleep. | |
""" | |
await self._sleep_fn(id, ms) | |
async def start(self, id: str, handler: str, input: Any) -> bool: | |
""" | |
Starts a new workflow. | |
Args: | |
id: The id of the workflow. | |
handler: The handler name to execute the workflow. | |
input: The input to the workflow. | |
""" | |
return await self._start_fn(id, handler, input) | |
# Type definitions | |
Handler = Callable[[Context, Any], Any] | |
class Client: | |
"""Client for workflow operations""" | |
def __init__(self, start_fn, wait_fn, poll_fn): | |
self._start_fn = start_fn | |
self._wait_fn = wait_fn | |
self._poll_fn = poll_fn | |
async def start(self, id: str, handler: str, input: Any) -> bool: | |
""" | |
It starts a workflow. | |
Args: | |
id: The id of the workflow. | |
handler: The handler name of the workflow. | |
input: The input of the workflow, it must be serializable into JSON. | |
Returns: | |
True if the workflow is created, false if the workflow already existed. | |
""" | |
return await self._start_fn(id, handler, input) | |
async def wait(self, id: str, status: List[Status], times: int, ms: int) -> Optional[Status]: | |
""" | |
Returns a matching workflow status if found, it retries for the specified | |
amount of times and it pauses in between. | |
Args: | |
id: The id of workflow. | |
status: A list of status to match. | |
times: Amount of retries. | |
ms: Amount of milliseconds to wait between retries. | |
""" | |
return await self._wait_fn(id, status, times, ms) | |
async def poll(self, should_stop: Callable[[], bool]) -> None: | |
""" | |
It starts polling workflows. | |
Args: | |
should_stop: Circuit breaker for the polling loop. | |
""" | |
await self._poll_fn(should_stop) | |
class Config: | |
"""Configuration for workflow system""" | |
def __init__( | |
self, | |
handlers: Dict[str, Handler], | |
persistence: 'Persistence', | |
max_failures: Optional[int] = None, | |
timeout_interval_ms: Optional[int] = None, | |
poll_interval_ms: Optional[int] = None | |
): | |
self.handlers = handlers | |
self.persistence = persistence | |
self.max_failures = max_failures | |
self.timeout_interval_ms = timeout_interval_ms | |
self.poll_interval_ms = poll_interval_ms | |
class RunData: | |
"""Data needed to run a workflow""" | |
def __init__(self, handler: str, input: Any, failures: int = 0): | |
self.handler = handler | |
self.input = input | |
self.failures = failures | |
class Persistence: | |
"""Interface for workflow persistence""" | |
async def init(self) -> None: | |
"""Initializes the persistence provider.""" | |
raise NotImplementedError | |
async def insert(self, workflow_id: str, handler: str, input: Any) -> bool: | |
""" | |
Inserts a workflow. | |
Args: | |
workflow_id: The id of the workflow. | |
handler: The name of the handler. | |
input: The input for the workflow. | |
Returns: | |
True is the workflow was inserted. False is the workflow already exists. | |
""" | |
raise NotImplementedError | |
async def claim(self, now: datetime, timeout_at: datetime) -> Optional[str]: | |
""" | |
It consists of two actions: | |
1. Find a workflow that is ready to run. | |
2. Update the timeout and set the status to "running". | |
These 2 steps have to be performed atomically. | |
A "ready to run" workflow matches the following condition: | |
(status is "idle") OR | |
(status is "running" AND timeoutAt < CURRENT_TIME) OR | |
(status is "failed" AND timeoutAt < CURRENT_TIME) | |
Args: | |
now: The current time. | |
timeout_at: The workflow timeout. | |
Returns: | |
The workflow id. | |
""" | |
raise NotImplementedError | |
async def find_output(self, workflow_id: str, step_id: str) -> Any: | |
""" | |
Finds the stored output for the given workflow and step. | |
Args: | |
workflow_id: Id of the workflow. | |
step_id: Id of the step. | |
Returns: | |
The output. Returns None if not found. | |
""" | |
raise NotImplementedError | |
async def find_wake_up_at(self, workflow_id: str, nap_id: str) -> Optional[datetime]: | |
""" | |
Finds the stored wake up time for the given workflow and nap. | |
Args: | |
workflow_id: Id of the workflow. | |
nap_id: Id of the nap. | |
Returns: | |
The wake up time. Returns None if not found. | |
""" | |
raise NotImplementedError | |
async def find_run_data(self, workflow_id: str) -> Optional[RunData]: | |
""" | |
Finds information about the workflow required to run it. | |
Args: | |
workflow_id: Id of the workflow. | |
Returns: | |
The run data. | |
""" | |
raise NotImplementedError | |
async def set_as_finished(self, workflow_id: str) -> None: | |
""" | |
It sets the status of the workflow to "finished". | |
Args: | |
workflow_id: Id of the workflow. | |
""" | |
raise NotImplementedError | |
async def find_status(self, workflow_id: str) -> Optional[Status]: | |
""" | |
Finds the status of a workflow. | |
Args: | |
workflow_id: Id of the workflow. | |
Returns: | |
The status if found, otherwise None. | |
""" | |
raise NotImplementedError | |
async def update_status( | |
self, | |
workflow_id: str, | |
status: Status, | |
timeout_at: datetime, | |
failures: int, | |
last_error: str, | |
) -> None: | |
""" | |
Updates the status, timeoutAt, failures and lastError. | |
Args: | |
workflow_id: Id of the workflow. | |
status: Status of the workflow. | |
timeout_at: The workflow timeout. | |
failures: The amount of failures. | |
last_error: Last error message. | |
""" | |
raise NotImplementedError | |
async def update_output( | |
self, | |
workflow_id: str, | |
step_id: str, | |
output: Any, | |
timeout_at: datetime, | |
) -> None: | |
""" | |
Updates the step's output and timeoutAt. | |
Args: | |
workflow_id: Id of the workflow. | |
step_id: Id of the step. | |
output: Output of the step. | |
timeout_at: The workflow timeout. | |
""" | |
raise NotImplementedError | |
async def update_wake_up_at( | |
self, | |
workflow_id: str, | |
nap_id: str, | |
wake_up_at: datetime, | |
timeout_at: datetime, | |
) -> None: | |
""" | |
Updates the step's output and timeoutAt. | |
Args: | |
workflow_id: Id of the workflow. | |
nap_id: Id of the nap. | |
wake_up_at: Wake up time of the nap. | |
timeout_at: The workflow timeout. | |
""" | |
raise NotImplementedError | |
async def go_sleep(ms: int) -> None: | |
"""Sleep for the specified milliseconds""" | |
await asyncio.sleep(ms / 1000.0) | |
def make_claim(persistence: Persistence, timeout_interval_ms: int): | |
"""Create function to claim a workflow ready to run""" | |
async def claim_fn() -> Optional[str]: | |
now = datetime.now() | |
timeout_at = now + timedelta(milliseconds=timeout_interval_ms) | |
return await persistence.claim(now, timeout_at) | |
return claim_fn | |
def make_make_step(persistence: Persistence, timeout_interval_ms: int): | |
"""Create function to execute workflow steps""" | |
def make_step(workflow_id: str): | |
async def step_fn(step_id: str, fn: Callable[[], Any]) -> Any: | |
output = await persistence.find_output(workflow_id, step_id) | |
if output is not None: | |
return output | |
output = await fn() | |
now = datetime.now() | |
timeout_at = now + timedelta(milliseconds=timeout_interval_ms) | |
await persistence.update_output(workflow_id, step_id, output, timeout_at) | |
return output | |
return step_fn | |
return make_step | |
def make_make_sleep(persistence: Persistence, timeout_interval_ms: int): | |
"""Create function to handle workflow sleep operations""" | |
def make_sleep(workflow_id: str): | |
async def sleep_fn(nap_id: str, ms: int) -> None: | |
wake_up_at = await persistence.find_wake_up_at(workflow_id, nap_id) | |
now = datetime.now() | |
if wake_up_at: | |
remaining_ms = (wake_up_at - now).total_seconds() * 1000 | |
if remaining_ms > 0: | |
await go_sleep(remaining_ms) | |
return | |
wake_up_at = now + timedelta(milliseconds=ms) | |
timeout_at = wake_up_at + timedelta(milliseconds=timeout_interval_ms) | |
await persistence.update_wake_up_at(workflow_id, nap_id, wake_up_at, timeout_at) | |
await go_sleep(ms) | |
return sleep_fn | |
return make_sleep | |
def make_run( | |
persistence: Persistence, | |
handlers: Dict[str, Handler], | |
make_step, | |
make_sleep, | |
start, | |
max_failures: int, | |
timeout_interval_ms: int, | |
): | |
"""Create function to execute a workflow""" | |
async def run_fn(workflow_id: str) -> None: | |
run_data = await persistence.find_run_data(workflow_id) | |
if not run_data: | |
raise ValueError(f"workflow not found: {workflow_id}") | |
fn = handlers.get(run_data.handler) | |
if not fn: | |
raise ValueError(f"handler not found: {run_data.handler}") | |
ctx = Context( | |
step_fn=make_step(workflow_id), | |
sleep_fn=make_sleep(workflow_id), | |
start_fn=start, | |
) | |
try: | |
await fn(ctx, run_data.input) | |
except Exception as error: | |
last_error = str(error) | |
failures = (run_data.failures or 0) + 1 | |
status = STATUS_FAILED if failures < max_failures else STATUS_ABORTED | |
now = datetime.now() | |
timeout_at = now + timedelta(milliseconds=timeout_interval_ms) | |
await persistence.update_status( | |
workflow_id, | |
status, | |
timeout_at, | |
failures, | |
last_error, | |
) | |
return | |
await persistence.set_as_finished(workflow_id) | |
return run_fn | |
def make_start(persistence: Persistence): | |
"""Create function to start a workflow""" | |
async def start_fn(workflow_id: str, handler: str, input: Any) -> bool: | |
return await persistence.insert(workflow_id, handler, input) | |
return start_fn | |
def make_wait(persistence: Persistence): | |
"""Create function to wait for a workflow to reach a specific status""" | |
async def wait_fn(workflow_id: str, status: List[Status], times: int, ms: int) -> Optional[Status]: | |
for i in range(times): | |
found = await persistence.find_status(workflow_id) | |
if found and found in status: | |
return found | |
await go_sleep(ms) | |
return None | |
return wait_fn | |
def make_poll(claim, run, poll_interval_ms: int): | |
"""Create function to poll for workflows to execute""" | |
async def poll_fn(should_stop: Callable[[], bool]) -> None: | |
while not should_stop(): | |
workflow_id = await claim() | |
if workflow_id: | |
# Create a task but don't await it | |
asyncio.create_task(run(workflow_id)) | |
else: | |
await go_sleep(poll_interval_ms) | |
return poll_fn | |
async def make_client(config: Config) -> Client: | |
""" | |
Creates a client based on the given configuration. If no configuration is | |
provided, then the library defaults are used. | |
Args: | |
config: The configuration object. | |
Returns: | |
The client instance. | |
""" | |
await config.persistence.init() | |
max_failures = config.max_failures or DEFAULT_MAX_FAILURES | |
timeout_interval_ms = config.timeout_interval_ms or DEFAULT_TIMEOUT_MS | |
poll_interval_ms = config.poll_interval_ms or DEFAULT_POLL_MS | |
start = make_start(config.persistence) | |
wait = make_wait(config.persistence) | |
claim = make_claim(config.persistence, timeout_interval_ms) | |
make_step = make_make_step(config.persistence, timeout_interval_ms) | |
make_sleep = make_make_sleep(config.persistence, timeout_interval_ms) | |
run = make_run( | |
config.persistence, | |
config.handlers, | |
make_step, | |
make_sleep, | |
start, | |
max_failures, | |
timeout_interval_ms, | |
) | |
poll = make_poll(claim, run, poll_interval_ms) | |
return Client(start, wait, poll) | |
# Export functions for internal testing | |
for_internal_testing = { | |
"make_claim": make_claim, | |
"make_make_step": make_make_step, | |
"make_make_sleep": make_make_sleep, | |
"make_run": make_run, | |
"make_start": make_start, | |
"make_wait": make_wait, | |
"make_poll": make_poll, | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment