Skip to content

Instantly share code, notes, and snippets.

@ferromir
Created April 2, 2025 20:21
Show Gist options
  • Save ferromir/130c0819e3eb091dfafaee5d5ad47dc2 to your computer and use it in GitHub Desktop.
Save ferromir/130c0819e3eb091dfafaee5d5ad47dc2 to your computer and use it in GitHub Desktop.
Lidex translated to Python using Claude 3.7
from typing import Dict, List, Callable, Any, TypeVar, Optional, Union, Mapping
from enum import Enum
from datetime import datetime, timedelta
import json
import asyncio
import time
DEFAULT_MAX_FAILURES = 3
DEFAULT_TIMEOUT_MS = 60_000 # 1m
DEFAULT_POLL_MS = 1_000 # 1s
# Type aliases
T = TypeVar('T')
Status = str # "idle" | "running" | "failed" | "finished" | "aborted"
# Status constants
STATUS_IDLE = "idle"
STATUS_RUNNING = "running"
STATUS_FAILED = "failed"
STATUS_FINISHED = "finished"
STATUS_ABORTED = "aborted"
class Context:
"""Context for workflow execution"""
def __init__(self, step_fn, sleep_fn, start_fn):
self._step_fn = step_fn
self._sleep_fn = sleep_fn
self._start_fn = start_fn
async def step(self, id: str, fn: Callable[[], Any]) -> Any:
"""
Executes a step.
Args:
id: The id of the step.
fn: The function to be executed.
"""
return await self._step_fn(id, fn)
async def sleep(self, id: str, ms: int) -> None:
"""
Puts the workflow to sleep.
Args:
id: The id of the nap.
ms: The amount of milliseconds to sleep.
"""
await self._sleep_fn(id, ms)
async def start(self, id: str, handler: str, input: Any) -> bool:
"""
Starts a new workflow.
Args:
id: The id of the workflow.
handler: The handler name to execute the workflow.
input: The input to the workflow.
"""
return await self._start_fn(id, handler, input)
# Type definitions
Handler = Callable[[Context, Any], Any]
class Client:
"""Client for workflow operations"""
def __init__(self, start_fn, wait_fn, poll_fn):
self._start_fn = start_fn
self._wait_fn = wait_fn
self._poll_fn = poll_fn
async def start(self, id: str, handler: str, input: Any) -> bool:
"""
It starts a workflow.
Args:
id: The id of the workflow.
handler: The handler name of the workflow.
input: The input of the workflow, it must be serializable into JSON.
Returns:
True if the workflow is created, false if the workflow already existed.
"""
return await self._start_fn(id, handler, input)
async def wait(self, id: str, status: List[Status], times: int, ms: int) -> Optional[Status]:
"""
Returns a matching workflow status if found, it retries for the specified
amount of times and it pauses in between.
Args:
id: The id of workflow.
status: A list of status to match.
times: Amount of retries.
ms: Amount of milliseconds to wait between retries.
"""
return await self._wait_fn(id, status, times, ms)
async def poll(self, should_stop: Callable[[], bool]) -> None:
"""
It starts polling workflows.
Args:
should_stop: Circuit breaker for the polling loop.
"""
await self._poll_fn(should_stop)
class Config:
"""Configuration for workflow system"""
def __init__(
self,
handlers: Dict[str, Handler],
persistence: 'Persistence',
max_failures: Optional[int] = None,
timeout_interval_ms: Optional[int] = None,
poll_interval_ms: Optional[int] = None
):
self.handlers = handlers
self.persistence = persistence
self.max_failures = max_failures
self.timeout_interval_ms = timeout_interval_ms
self.poll_interval_ms = poll_interval_ms
class RunData:
"""Data needed to run a workflow"""
def __init__(self, handler: str, input: Any, failures: int = 0):
self.handler = handler
self.input = input
self.failures = failures
class Persistence:
"""Interface for workflow persistence"""
async def init(self) -> None:
"""Initializes the persistence provider."""
raise NotImplementedError
async def insert(self, workflow_id: str, handler: str, input: Any) -> bool:
"""
Inserts a workflow.
Args:
workflow_id: The id of the workflow.
handler: The name of the handler.
input: The input for the workflow.
Returns:
True is the workflow was inserted. False is the workflow already exists.
"""
raise NotImplementedError
async def claim(self, now: datetime, timeout_at: datetime) -> Optional[str]:
"""
It consists of two actions:
1. Find a workflow that is ready to run.
2. Update the timeout and set the status to "running".
These 2 steps have to be performed atomically.
A "ready to run" workflow matches the following condition:
(status is "idle") OR
(status is "running" AND timeoutAt < CURRENT_TIME) OR
(status is "failed" AND timeoutAt < CURRENT_TIME)
Args:
now: The current time.
timeout_at: The workflow timeout.
Returns:
The workflow id.
"""
raise NotImplementedError
async def find_output(self, workflow_id: str, step_id: str) -> Any:
"""
Finds the stored output for the given workflow and step.
Args:
workflow_id: Id of the workflow.
step_id: Id of the step.
Returns:
The output. Returns None if not found.
"""
raise NotImplementedError
async def find_wake_up_at(self, workflow_id: str, nap_id: str) -> Optional[datetime]:
"""
Finds the stored wake up time for the given workflow and nap.
Args:
workflow_id: Id of the workflow.
nap_id: Id of the nap.
Returns:
The wake up time. Returns None if not found.
"""
raise NotImplementedError
async def find_run_data(self, workflow_id: str) -> Optional[RunData]:
"""
Finds information about the workflow required to run it.
Args:
workflow_id: Id of the workflow.
Returns:
The run data.
"""
raise NotImplementedError
async def set_as_finished(self, workflow_id: str) -> None:
"""
It sets the status of the workflow to "finished".
Args:
workflow_id: Id of the workflow.
"""
raise NotImplementedError
async def find_status(self, workflow_id: str) -> Optional[Status]:
"""
Finds the status of a workflow.
Args:
workflow_id: Id of the workflow.
Returns:
The status if found, otherwise None.
"""
raise NotImplementedError
async def update_status(
self,
workflow_id: str,
status: Status,
timeout_at: datetime,
failures: int,
last_error: str,
) -> None:
"""
Updates the status, timeoutAt, failures and lastError.
Args:
workflow_id: Id of the workflow.
status: Status of the workflow.
timeout_at: The workflow timeout.
failures: The amount of failures.
last_error: Last error message.
"""
raise NotImplementedError
async def update_output(
self,
workflow_id: str,
step_id: str,
output: Any,
timeout_at: datetime,
) -> None:
"""
Updates the step's output and timeoutAt.
Args:
workflow_id: Id of the workflow.
step_id: Id of the step.
output: Output of the step.
timeout_at: The workflow timeout.
"""
raise NotImplementedError
async def update_wake_up_at(
self,
workflow_id: str,
nap_id: str,
wake_up_at: datetime,
timeout_at: datetime,
) -> None:
"""
Updates the step's output and timeoutAt.
Args:
workflow_id: Id of the workflow.
nap_id: Id of the nap.
wake_up_at: Wake up time of the nap.
timeout_at: The workflow timeout.
"""
raise NotImplementedError
async def go_sleep(ms: int) -> None:
"""Sleep for the specified milliseconds"""
await asyncio.sleep(ms / 1000.0)
def make_claim(persistence: Persistence, timeout_interval_ms: int):
"""Create function to claim a workflow ready to run"""
async def claim_fn() -> Optional[str]:
now = datetime.now()
timeout_at = now + timedelta(milliseconds=timeout_interval_ms)
return await persistence.claim(now, timeout_at)
return claim_fn
def make_make_step(persistence: Persistence, timeout_interval_ms: int):
"""Create function to execute workflow steps"""
def make_step(workflow_id: str):
async def step_fn(step_id: str, fn: Callable[[], Any]) -> Any:
output = await persistence.find_output(workflow_id, step_id)
if output is not None:
return output
output = await fn()
now = datetime.now()
timeout_at = now + timedelta(milliseconds=timeout_interval_ms)
await persistence.update_output(workflow_id, step_id, output, timeout_at)
return output
return step_fn
return make_step
def make_make_sleep(persistence: Persistence, timeout_interval_ms: int):
"""Create function to handle workflow sleep operations"""
def make_sleep(workflow_id: str):
async def sleep_fn(nap_id: str, ms: int) -> None:
wake_up_at = await persistence.find_wake_up_at(workflow_id, nap_id)
now = datetime.now()
if wake_up_at:
remaining_ms = (wake_up_at - now).total_seconds() * 1000
if remaining_ms > 0:
await go_sleep(remaining_ms)
return
wake_up_at = now + timedelta(milliseconds=ms)
timeout_at = wake_up_at + timedelta(milliseconds=timeout_interval_ms)
await persistence.update_wake_up_at(workflow_id, nap_id, wake_up_at, timeout_at)
await go_sleep(ms)
return sleep_fn
return make_sleep
def make_run(
persistence: Persistence,
handlers: Dict[str, Handler],
make_step,
make_sleep,
start,
max_failures: int,
timeout_interval_ms: int,
):
"""Create function to execute a workflow"""
async def run_fn(workflow_id: str) -> None:
run_data = await persistence.find_run_data(workflow_id)
if not run_data:
raise ValueError(f"workflow not found: {workflow_id}")
fn = handlers.get(run_data.handler)
if not fn:
raise ValueError(f"handler not found: {run_data.handler}")
ctx = Context(
step_fn=make_step(workflow_id),
sleep_fn=make_sleep(workflow_id),
start_fn=start,
)
try:
await fn(ctx, run_data.input)
except Exception as error:
last_error = str(error)
failures = (run_data.failures or 0) + 1
status = STATUS_FAILED if failures < max_failures else STATUS_ABORTED
now = datetime.now()
timeout_at = now + timedelta(milliseconds=timeout_interval_ms)
await persistence.update_status(
workflow_id,
status,
timeout_at,
failures,
last_error,
)
return
await persistence.set_as_finished(workflow_id)
return run_fn
def make_start(persistence: Persistence):
"""Create function to start a workflow"""
async def start_fn(workflow_id: str, handler: str, input: Any) -> bool:
return await persistence.insert(workflow_id, handler, input)
return start_fn
def make_wait(persistence: Persistence):
"""Create function to wait for a workflow to reach a specific status"""
async def wait_fn(workflow_id: str, status: List[Status], times: int, ms: int) -> Optional[Status]:
for i in range(times):
found = await persistence.find_status(workflow_id)
if found and found in status:
return found
await go_sleep(ms)
return None
return wait_fn
def make_poll(claim, run, poll_interval_ms: int):
"""Create function to poll for workflows to execute"""
async def poll_fn(should_stop: Callable[[], bool]) -> None:
while not should_stop():
workflow_id = await claim()
if workflow_id:
# Create a task but don't await it
asyncio.create_task(run(workflow_id))
else:
await go_sleep(poll_interval_ms)
return poll_fn
async def make_client(config: Config) -> Client:
"""
Creates a client based on the given configuration. If no configuration is
provided, then the library defaults are used.
Args:
config: The configuration object.
Returns:
The client instance.
"""
await config.persistence.init()
max_failures = config.max_failures or DEFAULT_MAX_FAILURES
timeout_interval_ms = config.timeout_interval_ms or DEFAULT_TIMEOUT_MS
poll_interval_ms = config.poll_interval_ms or DEFAULT_POLL_MS
start = make_start(config.persistence)
wait = make_wait(config.persistence)
claim = make_claim(config.persistence, timeout_interval_ms)
make_step = make_make_step(config.persistence, timeout_interval_ms)
make_sleep = make_make_sleep(config.persistence, timeout_interval_ms)
run = make_run(
config.persistence,
config.handlers,
make_step,
make_sleep,
start,
max_failures,
timeout_interval_ms,
)
poll = make_poll(claim, run, poll_interval_ms)
return Client(start, wait, poll)
# Export functions for internal testing
for_internal_testing = {
"make_claim": make_claim,
"make_make_step": make_make_step,
"make_make_sleep": make_make_sleep,
"make_run": make_run,
"make_start": make_start,
"make_wait": make_wait,
"make_poll": make_poll,
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment