Skip to content

Instantly share code, notes, and snippets.

@ferromir
Created April 2, 2025 20:17
Show Gist options
  • Save ferromir/405adecd7567b636a90fe4c1f91e5120 to your computer and use it in GitHub Desktop.
Save ferromir/405adecd7567b636a90fe4c1f91e5120 to your computer and use it in GitHub Desktop.
Lidex translated to Go using Claude 3.7
package workflow
import (
"encoding/json"
"errors"
"fmt"
"time"
)
const (
DEFAULT_MAX_FAILURES = 3
DEFAULT_TIMEOUT_MS = 60_000 // 1m
DEFAULT_POLL_MS = 1_000 // 1s
)
// Status represents the current state of a workflow
type Status string
const (
StatusIdle Status = "idle"
StatusRunning Status = "running"
StatusFailed Status = "failed"
StatusFinished Status = "finished"
StatusAborted Status = "aborted"
)
// Context provides methods for workflow execution
type Context struct {
step func(string, func() (interface{}, error)) (interface{}, error)
sleep func(string, int) error
start func(string, string, interface{}) (bool, error)
}
// Step executes a workflow step
func (c *Context) Step(id string, fn func() (interface{}, error)) (interface{}, error) {
return c.step(id, fn)
}
// Sleep puts the workflow to sleep
func (c *Context) Sleep(id string, ms int) error {
return c.sleep(id, ms)
}
// Start starts a new workflow
func (c *Context) Start(id string, handler string, input interface{}) (bool, error) {
return c.start(id, handler, input)
}
// Handler defines a function that executes a workflow
type Handler func(ctx *Context, input interface{}) error
// Client provides methods to interact with workflows
type Client struct {
start func(string, string, interface{}) (bool, error)
wait func(string, []Status, int, int) (Status, error)
poll func(func() bool) error
}
// Start begins a workflow
func (c *Client) Start(id string, handler string, input interface{}) (bool, error) {
return c.start(id, handler, input)
}
// Wait waits for a workflow to reach a specific status
func (c *Client) Wait(id string, status []Status, times int, ms int) (Status, error) {
return c.wait(id, status, times, ms)
}
// Poll starts polling for workflows to execute
func (c *Client) Poll(shouldStop func() bool) error {
return c.poll(shouldStop)
}
// Config holds the configuration for the workflow system
type Config struct {
Handlers map[string]Handler
Persistence Persistence
MaxFailures *int
TimeoutIntervalMs *int
PollIntervalMs *int
}
// RunData contains information needed to run a workflow
type RunData struct {
Handler string `json:"handler"`
Input interface{} `json:"input"`
Failures int `json:"failures,omitempty"`
}
// Persistence defines methods for storing workflow state
type Persistence interface {
// Init initializes the persistence provider
Init() error
// Insert adds a new workflow
Insert(workflowId string, handler string, input interface{}) (bool, error)
// Claim finds and claims a workflow that is ready to run
Claim(now time.Time, timeoutAt time.Time) (string, error)
// FindOutput retrieves stored output for a workflow step
FindOutput(workflowId string, stepId string) (interface{}, error)
// FindWakeUpAt retrieves the wake-up time for a sleep operation
FindWakeUpAt(workflowId string, napId string) (time.Time, bool, error)
// FindRunData gets information needed to run a workflow
FindRunData(workflowId string) (RunData, bool, error)
// SetAsFinished marks a workflow as completed
SetAsFinished(workflowId string) error
// FindStatus gets the current status of a workflow
FindStatus(workflowId string) (Status, bool, error)
// UpdateStatus updates the status and related fields of a workflow
UpdateStatus(workflowId string, status Status, timeoutAt time.Time, failures int, lastError string) error
// UpdateOutput stores the output of a workflow step
UpdateOutput(workflowId string, stepId string, output interface{}, timeoutAt time.Time) error
// UpdateWakeUpAt stores the wake-up time for a sleep operation
UpdateWakeUpAt(workflowId string, napId string, wakeUpAt time.Time, timeoutAt time.Time) error
}
// Sleep pauses execution for the specified duration
func goSleep(ms int) error {
time.Sleep(time.Duration(ms) * time.Millisecond)
return nil
}
// makeClaim creates a function that claims a workflow ready to run
func makeClaim(persistence Persistence, timeoutIntervalMs int) func() (string, error) {
return func() (string, error) {
now := time.Now()
timeoutAt := now.Add(time.Duration(timeoutIntervalMs) * time.Millisecond)
return persistence.Claim(now, timeoutAt)
}
}
// makeMakeStep creates a function that executes workflow steps
func makeMakeStep(persistence Persistence, timeoutIntervalMs int) func(string) func(string, func() (interface{}, error)) (interface{}, error) {
return func(workflowId string) func(string, func() (interface{}, error)) (interface{}, error) {
return func(stepId string, fn func() (interface{}, error)) (interface{}, error) {
output, err := persistence.FindOutput(workflowId, stepId)
if err == nil && output != nil {
return output, nil
}
output, err = fn()
if err != nil {
return nil, err
}
now := time.Now()
timeoutAt := now.Add(time.Duration(timeoutIntervalMs) * time.Millisecond)
err = persistence.UpdateOutput(workflowId, stepId, output, timeoutAt)
if err != nil {
return nil, err
}
return output, nil
}
}
}
// makeMakeSleep creates a function that handles workflow sleep operations
func makeMakeSleep(persistence Persistence, timeoutIntervalMs int) func(string) func(string, int) error {
return func(workflowId string) func(string, int) error {
return func(napId string, ms int) error {
wakeUpAt, found, err := persistence.FindWakeUpAt(workflowId, napId)
if err != nil {
return err
}
now := time.Now()
if found {
remainingMs := wakeUpAt.Sub(now).Milliseconds()
if remainingMs > 0 {
return goSleep(int(remainingMs))
}
return nil
}
wakeUpAt = now.Add(time.Duration(ms) * time.Millisecond)
timeoutAt := wakeUpAt.Add(time.Duration(timeoutIntervalMs) * time.Millisecond)
err = persistence.UpdateWakeUpAt(workflowId, napId, wakeUpAt, timeoutAt)
if err != nil {
return err
}
return goSleep(ms)
}
}
}
// makeRun creates a function that executes a workflow
func makeRun(
persistence Persistence,
handlers map[string]Handler,
makeStep func(string) func(string, func() (interface{}, error)) (interface{}, error),
makeSleep func(string) func(string, int) error,
start func(string, string, interface{}) (bool, error),
maxFailures int,
timeoutIntervalMs int,
) func(string) error {
return func(workflowId string) error {
runData, found, err := persistence.FindRunData(workflowId)
if err != nil {
return err
}
if !found {
return fmt.Errorf("workflow not found: %s", workflowId)
}
fn, ok := handlers[runData.Handler]
if !ok {
return fmt.Errorf("handler not found: %s", runData.Handler)
}
ctx := &Context{
step: makeStep(workflowId),
sleep: makeSleep(workflowId),
start: start,
}
err = fn(ctx, runData.Input)
if err != nil {
lastError := err.Error()
failures := runData.Failures + 1
status := StatusFailed
if failures >= maxFailures {
status = StatusAborted
}
now := time.Now()
timeoutAt := now.Add(time.Duration(timeoutIntervalMs) * time.Millisecond)
return persistence.UpdateStatus(
workflowId,
status,
timeoutAt,
failures,
lastError,
)
}
return persistence.SetAsFinished(workflowId)
}
}
// makeStart creates a function that starts a workflow
func makeStart(persistence Persistence) func(string, string, interface{}) (bool, error) {
return func(workflowId string, handler string, input interface{}) (bool, error) {
return persistence.Insert(workflowId, handler, input)
}
}
// makeWait creates a function that waits for a workflow to reach a specific status
func makeWait(persistence Persistence) func(string, []Status, int, int) (Status, error) {
return func(workflowId string, statuses []Status, times int, ms int) (Status, error) {
for i := 0; i < times; i++ {
found, exists, err := persistence.FindStatus(workflowId)
if err != nil {
return "", err
}
if exists {
for _, status := range statuses {
if found == status {
return found, nil
}
}
}
err = goSleep(ms)
if err != nil {
return "", err
}
}
return "", nil
}
}
// makePoll creates a function that polls for workflows to execute
func makePoll(
claim func() (string, error),
run func(string) error,
pollIntervalMs int,
) func(func() bool) error {
return func(shouldStop func() bool) error {
for !shouldStop() {
workflowId, err := claim()
if err != nil {
return err
}
if workflowId != "" {
// Intentionally not waiting for completion
go func(id string) {
_ = run(id)
}(workflowId)
} else {
err = goSleep(pollIntervalMs)
if err != nil {
return err
}
}
}
return nil
}
}
// MakeClient creates a new workflow client based on the provided configuration
func MakeClient(config Config) (*Client, error) {
err := config.Persistence.Init()
if err != nil {
return nil, err
}
maxFailures := DEFAULT_MAX_FAILURES
if config.MaxFailures != nil {
maxFailures = *config.MaxFailures
}
timeoutIntervalMs := DEFAULT_TIMEOUT_MS
if config.TimeoutIntervalMs != nil {
timeoutIntervalMs = *config.TimeoutIntervalMs
}
pollIntervalMs := DEFAULT_POLL_MS
if config.PollIntervalMs != nil {
pollIntervalMs = *config.PollIntervalMs
}
start := makeStart(config.Persistence)
wait := makeWait(config.Persistence)
claim := makeClaim(config.Persistence, timeoutIntervalMs)
makeStep := makeMakeStep(config.Persistence, timeoutIntervalMs)
makeSleep := makeMakeSleep(config.Persistence, timeoutIntervalMs)
run := makeRun(
config.Persistence,
config.Handlers,
makeStep,
makeSleep,
start,
maxFailures,
timeoutIntervalMs,
)
poll := makePoll(claim, run, pollIntervalMs)
return &Client{
start: start,
wait: wait,
poll: poll,
}, nil
}
// InternalTesting exposes internal functions for testing purposes
var InternalTesting = struct {
MakeClaim func(Persistence, int) func() (string, error)
MakeMakeStep func(Persistence, int) func(string) func(string, func() (interface{}, error)) (interface{}, error)
MakeMakeSleep func(Persistence, int) func(string) func(string, int) error
MakeRun func(Persistence, map[string]Handler, func(string) func(string, func() (interface{}, error)) (interface{}, error), func(string) func(string, int) error, func(string, string, interface{}) (bool, error), int, int) func(string) error
MakeStart func(Persistence) func(string, string, interface{}) (bool, error)
MakeWait func(Persistence) func(string, []Status, int, int) (Status, error)
MakePoll func(func() (string, error), func(string) error, int) func(func() bool) error
}{
MakeClaim: makeClaim,
MakeMakeStep: makeMakeStep,
MakeMakeSleep: makeMakeSleep,
MakeRun: makeRun,
MakeStart: makeStart,
MakeWait: makeWait,
MakePoll: makePoll,
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment