Created
April 2, 2025 20:17
-
-
Save ferromir/405adecd7567b636a90fe4c1f91e5120 to your computer and use it in GitHub Desktop.
Lidex translated to Go using Claude 3.7
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package workflow | |
import ( | |
"encoding/json" | |
"errors" | |
"fmt" | |
"time" | |
) | |
const ( | |
DEFAULT_MAX_FAILURES = 3 | |
DEFAULT_TIMEOUT_MS = 60_000 // 1m | |
DEFAULT_POLL_MS = 1_000 // 1s | |
) | |
// Status represents the current state of a workflow | |
type Status string | |
const ( | |
StatusIdle Status = "idle" | |
StatusRunning Status = "running" | |
StatusFailed Status = "failed" | |
StatusFinished Status = "finished" | |
StatusAborted Status = "aborted" | |
) | |
// Context provides methods for workflow execution | |
type Context struct { | |
step func(string, func() (interface{}, error)) (interface{}, error) | |
sleep func(string, int) error | |
start func(string, string, interface{}) (bool, error) | |
} | |
// Step executes a workflow step | |
func (c *Context) Step(id string, fn func() (interface{}, error)) (interface{}, error) { | |
return c.step(id, fn) | |
} | |
// Sleep puts the workflow to sleep | |
func (c *Context) Sleep(id string, ms int) error { | |
return c.sleep(id, ms) | |
} | |
// Start starts a new workflow | |
func (c *Context) Start(id string, handler string, input interface{}) (bool, error) { | |
return c.start(id, handler, input) | |
} | |
// Handler defines a function that executes a workflow | |
type Handler func(ctx *Context, input interface{}) error | |
// Client provides methods to interact with workflows | |
type Client struct { | |
start func(string, string, interface{}) (bool, error) | |
wait func(string, []Status, int, int) (Status, error) | |
poll func(func() bool) error | |
} | |
// Start begins a workflow | |
func (c *Client) Start(id string, handler string, input interface{}) (bool, error) { | |
return c.start(id, handler, input) | |
} | |
// Wait waits for a workflow to reach a specific status | |
func (c *Client) Wait(id string, status []Status, times int, ms int) (Status, error) { | |
return c.wait(id, status, times, ms) | |
} | |
// Poll starts polling for workflows to execute | |
func (c *Client) Poll(shouldStop func() bool) error { | |
return c.poll(shouldStop) | |
} | |
// Config holds the configuration for the workflow system | |
type Config struct { | |
Handlers map[string]Handler | |
Persistence Persistence | |
MaxFailures *int | |
TimeoutIntervalMs *int | |
PollIntervalMs *int | |
} | |
// RunData contains information needed to run a workflow | |
type RunData struct { | |
Handler string `json:"handler"` | |
Input interface{} `json:"input"` | |
Failures int `json:"failures,omitempty"` | |
} | |
// Persistence defines methods for storing workflow state | |
type Persistence interface { | |
// Init initializes the persistence provider | |
Init() error | |
// Insert adds a new workflow | |
Insert(workflowId string, handler string, input interface{}) (bool, error) | |
// Claim finds and claims a workflow that is ready to run | |
Claim(now time.Time, timeoutAt time.Time) (string, error) | |
// FindOutput retrieves stored output for a workflow step | |
FindOutput(workflowId string, stepId string) (interface{}, error) | |
// FindWakeUpAt retrieves the wake-up time for a sleep operation | |
FindWakeUpAt(workflowId string, napId string) (time.Time, bool, error) | |
// FindRunData gets information needed to run a workflow | |
FindRunData(workflowId string) (RunData, bool, error) | |
// SetAsFinished marks a workflow as completed | |
SetAsFinished(workflowId string) error | |
// FindStatus gets the current status of a workflow | |
FindStatus(workflowId string) (Status, bool, error) | |
// UpdateStatus updates the status and related fields of a workflow | |
UpdateStatus(workflowId string, status Status, timeoutAt time.Time, failures int, lastError string) error | |
// UpdateOutput stores the output of a workflow step | |
UpdateOutput(workflowId string, stepId string, output interface{}, timeoutAt time.Time) error | |
// UpdateWakeUpAt stores the wake-up time for a sleep operation | |
UpdateWakeUpAt(workflowId string, napId string, wakeUpAt time.Time, timeoutAt time.Time) error | |
} | |
// Sleep pauses execution for the specified duration | |
func goSleep(ms int) error { | |
time.Sleep(time.Duration(ms) * time.Millisecond) | |
return nil | |
} | |
// makeClaim creates a function that claims a workflow ready to run | |
func makeClaim(persistence Persistence, timeoutIntervalMs int) func() (string, error) { | |
return func() (string, error) { | |
now := time.Now() | |
timeoutAt := now.Add(time.Duration(timeoutIntervalMs) * time.Millisecond) | |
return persistence.Claim(now, timeoutAt) | |
} | |
} | |
// makeMakeStep creates a function that executes workflow steps | |
func makeMakeStep(persistence Persistence, timeoutIntervalMs int) func(string) func(string, func() (interface{}, error)) (interface{}, error) { | |
return func(workflowId string) func(string, func() (interface{}, error)) (interface{}, error) { | |
return func(stepId string, fn func() (interface{}, error)) (interface{}, error) { | |
output, err := persistence.FindOutput(workflowId, stepId) | |
if err == nil && output != nil { | |
return output, nil | |
} | |
output, err = fn() | |
if err != nil { | |
return nil, err | |
} | |
now := time.Now() | |
timeoutAt := now.Add(time.Duration(timeoutIntervalMs) * time.Millisecond) | |
err = persistence.UpdateOutput(workflowId, stepId, output, timeoutAt) | |
if err != nil { | |
return nil, err | |
} | |
return output, nil | |
} | |
} | |
} | |
// makeMakeSleep creates a function that handles workflow sleep operations | |
func makeMakeSleep(persistence Persistence, timeoutIntervalMs int) func(string) func(string, int) error { | |
return func(workflowId string) func(string, int) error { | |
return func(napId string, ms int) error { | |
wakeUpAt, found, err := persistence.FindWakeUpAt(workflowId, napId) | |
if err != nil { | |
return err | |
} | |
now := time.Now() | |
if found { | |
remainingMs := wakeUpAt.Sub(now).Milliseconds() | |
if remainingMs > 0 { | |
return goSleep(int(remainingMs)) | |
} | |
return nil | |
} | |
wakeUpAt = now.Add(time.Duration(ms) * time.Millisecond) | |
timeoutAt := wakeUpAt.Add(time.Duration(timeoutIntervalMs) * time.Millisecond) | |
err = persistence.UpdateWakeUpAt(workflowId, napId, wakeUpAt, timeoutAt) | |
if err != nil { | |
return err | |
} | |
return goSleep(ms) | |
} | |
} | |
} | |
// makeRun creates a function that executes a workflow | |
func makeRun( | |
persistence Persistence, | |
handlers map[string]Handler, | |
makeStep func(string) func(string, func() (interface{}, error)) (interface{}, error), | |
makeSleep func(string) func(string, int) error, | |
start func(string, string, interface{}) (bool, error), | |
maxFailures int, | |
timeoutIntervalMs int, | |
) func(string) error { | |
return func(workflowId string) error { | |
runData, found, err := persistence.FindRunData(workflowId) | |
if err != nil { | |
return err | |
} | |
if !found { | |
return fmt.Errorf("workflow not found: %s", workflowId) | |
} | |
fn, ok := handlers[runData.Handler] | |
if !ok { | |
return fmt.Errorf("handler not found: %s", runData.Handler) | |
} | |
ctx := &Context{ | |
step: makeStep(workflowId), | |
sleep: makeSleep(workflowId), | |
start: start, | |
} | |
err = fn(ctx, runData.Input) | |
if err != nil { | |
lastError := err.Error() | |
failures := runData.Failures + 1 | |
status := StatusFailed | |
if failures >= maxFailures { | |
status = StatusAborted | |
} | |
now := time.Now() | |
timeoutAt := now.Add(time.Duration(timeoutIntervalMs) * time.Millisecond) | |
return persistence.UpdateStatus( | |
workflowId, | |
status, | |
timeoutAt, | |
failures, | |
lastError, | |
) | |
} | |
return persistence.SetAsFinished(workflowId) | |
} | |
} | |
// makeStart creates a function that starts a workflow | |
func makeStart(persistence Persistence) func(string, string, interface{}) (bool, error) { | |
return func(workflowId string, handler string, input interface{}) (bool, error) { | |
return persistence.Insert(workflowId, handler, input) | |
} | |
} | |
// makeWait creates a function that waits for a workflow to reach a specific status | |
func makeWait(persistence Persistence) func(string, []Status, int, int) (Status, error) { | |
return func(workflowId string, statuses []Status, times int, ms int) (Status, error) { | |
for i := 0; i < times; i++ { | |
found, exists, err := persistence.FindStatus(workflowId) | |
if err != nil { | |
return "", err | |
} | |
if exists { | |
for _, status := range statuses { | |
if found == status { | |
return found, nil | |
} | |
} | |
} | |
err = goSleep(ms) | |
if err != nil { | |
return "", err | |
} | |
} | |
return "", nil | |
} | |
} | |
// makePoll creates a function that polls for workflows to execute | |
func makePoll( | |
claim func() (string, error), | |
run func(string) error, | |
pollIntervalMs int, | |
) func(func() bool) error { | |
return func(shouldStop func() bool) error { | |
for !shouldStop() { | |
workflowId, err := claim() | |
if err != nil { | |
return err | |
} | |
if workflowId != "" { | |
// Intentionally not waiting for completion | |
go func(id string) { | |
_ = run(id) | |
}(workflowId) | |
} else { | |
err = goSleep(pollIntervalMs) | |
if err != nil { | |
return err | |
} | |
} | |
} | |
return nil | |
} | |
} | |
// MakeClient creates a new workflow client based on the provided configuration | |
func MakeClient(config Config) (*Client, error) { | |
err := config.Persistence.Init() | |
if err != nil { | |
return nil, err | |
} | |
maxFailures := DEFAULT_MAX_FAILURES | |
if config.MaxFailures != nil { | |
maxFailures = *config.MaxFailures | |
} | |
timeoutIntervalMs := DEFAULT_TIMEOUT_MS | |
if config.TimeoutIntervalMs != nil { | |
timeoutIntervalMs = *config.TimeoutIntervalMs | |
} | |
pollIntervalMs := DEFAULT_POLL_MS | |
if config.PollIntervalMs != nil { | |
pollIntervalMs = *config.PollIntervalMs | |
} | |
start := makeStart(config.Persistence) | |
wait := makeWait(config.Persistence) | |
claim := makeClaim(config.Persistence, timeoutIntervalMs) | |
makeStep := makeMakeStep(config.Persistence, timeoutIntervalMs) | |
makeSleep := makeMakeSleep(config.Persistence, timeoutIntervalMs) | |
run := makeRun( | |
config.Persistence, | |
config.Handlers, | |
makeStep, | |
makeSleep, | |
start, | |
maxFailures, | |
timeoutIntervalMs, | |
) | |
poll := makePoll(claim, run, pollIntervalMs) | |
return &Client{ | |
start: start, | |
wait: wait, | |
poll: poll, | |
}, nil | |
} | |
// InternalTesting exposes internal functions for testing purposes | |
var InternalTesting = struct { | |
MakeClaim func(Persistence, int) func() (string, error) | |
MakeMakeStep func(Persistence, int) func(string) func(string, func() (interface{}, error)) (interface{}, error) | |
MakeMakeSleep func(Persistence, int) func(string) func(string, int) error | |
MakeRun func(Persistence, map[string]Handler, func(string) func(string, func() (interface{}, error)) (interface{}, error), func(string) func(string, int) error, func(string, string, interface{}) (bool, error), int, int) func(string) error | |
MakeStart func(Persistence) func(string, string, interface{}) (bool, error) | |
MakeWait func(Persistence) func(string, []Status, int, int) (Status, error) | |
MakePoll func(func() (string, error), func(string) error, int) func(func() bool) error | |
}{ | |
MakeClaim: makeClaim, | |
MakeMakeStep: makeMakeStep, | |
MakeMakeSleep: makeMakeSleep, | |
MakeRun: makeRun, | |
MakeStart: makeStart, | |
MakeWait: makeWait, | |
MakePoll: makePoll, | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment