Skip to content

Instantly share code, notes, and snippets.

@ferromir
Created April 2, 2025 20:30
Show Gist options
  • Select an option

  • Save ferromir/c27e8c2cd21f24f553d663a882159cff to your computer and use it in GitHub Desktop.

Select an option

Save ferromir/c27e8c2cd21f24f553d663a882159cff to your computer and use it in GitHub Desktop.
Lidex translated to Scala by Claude 3.7
import scala.concurrent.{Future, Promise, ExecutionContext}
import scala.concurrent.duration._
import scala.util.{Success, Failure, Try}
import java.time.{Instant, Duration => JDuration}
import scala.collection.mutable.Map
object Workflow {
val DEFAULT_MAX_FAILURES = 3
val DEFAULT_TIMEOUT_MS = 60000 // 1m
val DEFAULT_POLL_MS = 1000 // 1s
type Status = String // "idle" | "running" | "failed" | "finished" | "aborted"
val StatusIdle: Status = "idle"
val StatusRunning: Status = "running"
val StatusFailed: Status = "failed"
val StatusFinished: Status = "finished"
val StatusAborted: Status = "aborted"
trait Context {
/**
* Executes a step.
* @param id The id of the step.
* @param fn The function to be executed.
*/
def step[T](id: String, fn: () => Future[T]): Future[T]
/**
* Puts the workflow to sleep.
* @param id The id of the nap.
* @param ms The amount of milliseconds to sleep.
*/
def sleep(id: String, ms: Long): Future[Unit]
/**
* Starts a new workflow.
* @param id The id of the workflow.
* @param handler The handler name to execute the workflow.
* @param input The input to the workflow.
*/
def start[T](id: String, handler: String, input: T): Future[Boolean]
}
type Handler = (Context, Any) => Future[Unit]
trait Client {
/**
* It starts a workflow.
* @param id The id of the workflow.
* @param handler The handler name of the workflow.
* @param input The input of the workflow, it must be serializable into JSON.
* @return True if the workflow is created, false if the workflow already
* existed.
*/
def start[T](id: String, handler: String, input: T): Future[Boolean]
/**
* Returns a matching workflow status if found, it retries for the specified
* amount of times and it pauses in between.
* @param id The id of workflow.
* @param status A list of status to match.
* @param times Amount of retries.
* @param ms Amount of milliseconds to wait between retries.
*/
def wait(
id: String,
status: Seq[Status],
times: Int,
ms: Long
): Future[Option[Status]]
/**
* It starts polling workflows.
* @param shouldStop Circuit breaker for the polling loop.
*/
def poll(shouldStop: () => Boolean): Future[Unit]
}
case class Config(
handlers: Map[String, Handler],
persistence: Persistence,
maxFailures: Option[Int] = None,
timeoutIntervalMs: Option[Long] = None,
pollIntervalMs: Option[Long] = None
)
case class RunData(
handler: String,
input: Any,
failures: Int = 0
)
trait Persistence {
/**
* Initializes the persistence provider.
*/
def init(): Future[Unit]
/**
* Inserts a workflow.
* @param workflowId The id of the workflow.
* @param handler The name of the handler.
* @param input The input for the workflow.
* @return True is the workflow was inserted. False is the workflow already
* exists.
*/
def insert(workflowId: String, handler: String, input: Any): Future[Boolean]
/**
* It consists of two actions:
* 1. Find a workflow that is ready to run.
* 2. Update the timeout and set the status to "running".
* These 2 steps have to be performed atomically.
*
* A "ready to run" workflow matches the following condition:
* (status is "idle") OR
* (status is "running" AND timeoutAt < CURRENT_TIME) OR
* (status is "failed" AND timeoutAt < CURRENT_TIME)
* @param now The current time.
* @param timeoutAt The workflow timeout.
* @return The workflow id.
*/
def claim(now: Instant, timeoutAt: Instant): Future[Option[String]]
/**
* Finds the stored output for the given workflow and step.
* @param workflowId Id of the workflow.
* @param stepId Id of the step.
* @return The output. Returns Option.empty if not found.
*/
def findOutput(workflowId: String, stepId: String): Future[Option[Any]]
/**
* Finds the stored wake up time for the given workflow and nap.
* @param workflowId Id of the workflow.
* @param napId Id of the nap.
* @return The wake up time. Returns Option.empty if not found.
*/
def findWakeUpAt(workflowId: String, napId: String): Future[Option[Instant]]
/**
* Finds information about the workflow required to run it.
* @param workflowId Id of the workflow.
* @return The run data.
*/
def findRunData(workflowId: String): Future[Option[RunData]]
/**
* It sets the status of the workflow to "finished".
* @param workflowId Id of the workflow.
*/
def setAsFinished(workflowId: String): Future[Unit]
/**
* Finds the status of a workflow.
* @param workflowId Id of the workflow.
* @return The status if found, otherwise Option.empty.
*/
def findStatus(workflowId: String): Future[Option[Status]]
/**
* Updates the status, timeoutAt, failures and lastError.
* @param workflowId Id of the workflow.
* @param status Status of the workflow.
* @param timeoutAt The workflow timeout.
* @param failures The amount of failures.
* @param lastError Last error message.
*/
def updateStatus(
workflowId: String,
status: Status,
timeoutAt: Instant,
failures: Int,
lastError: String
): Future[Unit]
/**
* Updates the step's output and timeoutAt.
* @param workflowId Id of the workflow.
* @param stepId Id of the step.
* @param output Output of the step.
* @param timeoutAt The workflow timeout.
*/
def updateOutput(
workflowId: String,
stepId: String,
output: Any,
timeoutAt: Instant
): Future[Unit]
/**
* Updates the step's output and timeoutAt.
* @param workflowId Id of the workflow.
* @param napId Id of the nap.
* @param wakeUpAt Wake up time of the nap.
* @param timeoutAt The workflow timeout.
*/
def updateWakeUpAt(
workflowId: String,
napId: String,
wakeUpAt: Instant,
timeoutAt: Instant
): Future[Unit]
}
def goSleep(ms: Long)(implicit ec: ExecutionContext): Future[Unit] = {
val promise = Promise[Unit]()
val scheduler = java.util.concurrent.Executors.newScheduledThreadPool(1)
val runnable = new Runnable {
def run(): Unit = {
promise.success(())
}
}
scheduler.schedule(runnable, ms, java.util.concurrent.TimeUnit.MILLISECONDS)
promise.future
}
def makeClaim(persistence: Persistence, timeoutIntervalMs: Long)(implicit ec: ExecutionContext): () => Future[Option[String]] = {
() => {
val now = Instant.now()
val timeoutAt = now.plusMillis(timeoutIntervalMs)
persistence.claim(now, timeoutAt)
}
}
def makeMakeStep[T](persistence: Persistence, timeoutIntervalMs: Long)(implicit ec: ExecutionContext): String => ((String, () => Future[T]) => Future[T]) = {
workflowId => {
(stepId, fn) => {
persistence.findOutput(workflowId, stepId).flatMap {
case Some(output) => Future.successful(output.asInstanceOf[T])
case None =>
fn().flatMap { output =>
val now = Instant.now()
val timeoutAt = now.plusMillis(timeoutIntervalMs)
persistence.updateOutput(workflowId, stepId, output, timeoutAt).map(_ => output)
}
}
}
}
}
def makeMakeSleep(persistence: Persistence, timeoutIntervalMs: Long)(implicit ec: ExecutionContext): String => ((String, Long) => Future[Unit]) = {
workflowId => {
(napId, ms) => {
persistence.findWakeUpAt(workflowId, napId).flatMap {
case Some(wakeUpAt) =>
val now = Instant.now()
val remainingMs = JDuration.between(now, wakeUpAt).toMillis
if (remainingMs > 0) {
goSleep(remainingMs)
} else {
Future.successful(())
}
case None =>
val now = Instant.now()
val wakeUpAt = now.plusMillis(ms)
val timeoutAt = wakeUpAt.plusMillis(timeoutIntervalMs)
persistence.updateWakeUpAt(workflowId, napId, wakeUpAt, timeoutAt).flatMap(_ => goSleep(ms))
}
}
}
}
def makeRun(
persistence: Persistence,
handlers: Map[String, Handler],
makeStep: String => ((String, () => Future[Any]) => Future[Any]),
makeSleep: String => ((String, Long) => Future[Unit]),
start: (String, String, Any) => Future[Boolean],
maxFailures: Int,
timeoutIntervalMs: Long
)(implicit ec: ExecutionContext): String => Future[Unit] = {
workflowId => {
persistence.findRunData(workflowId).flatMap {
case None =>
Future.failed(new RuntimeException(s"workflow not found: $workflowId"))
case Some(runData) =>
handlers.get(runData.handler) match {
case None =>
Future.failed(new RuntimeException(s"handler not found: ${runData.handler}"))
case Some(fn) =>
val ctx = new Context {
def step[T](id: String, fn: () => Future[T]): Future[T] =
makeStep(workflowId)(id, fn.asInstanceOf[() => Future[Any]]).asInstanceOf[Future[T]]
def sleep(id: String, ms: Long): Future[Unit] =
makeSleep(workflowId)(id, ms)
def start[T](id: String, handler: String, input: T): Future[Boolean] =
start(id, handler, input)
}
fn(ctx, runData.input).recoverWith {
case error: Throwable =>
val lastError = error.getMessage
val failures = runData.failures + 1
val status = if (failures < maxFailures) StatusFailed else StatusAborted
val now = Instant.now()
val timeoutAt = now.plusMillis(timeoutIntervalMs)
persistence.updateStatus(
workflowId,
status,
timeoutAt,
failures,
lastError
)
}.flatMap(_ => persistence.setAsFinished(workflowId))
}
}
}
}
def makeStart(persistence: Persistence)(implicit ec: ExecutionContext): (String, String, Any) => Future[Boolean] = {
(workflowId, handler, input) => {
persistence.insert(workflowId, handler, input)
}
}
def makeWait(persistence: Persistence)(implicit ec: ExecutionContext): (String, Seq[Status], Int, Long) => Future[Option[Status]] = {
(workflowId, status, times, ms) => {
def loop(remaining: Int): Future[Option[Status]] = {
if (remaining <= 0) {
Future.successful(None)
} else {
persistence.findStatus(workflowId).flatMap {
case Some(found) if status.contains(found) =>
Future.successful(Some(found))
case _ =>
goSleep(ms).flatMap(_ => loop(remaining - 1))
}
}
}
loop(times)
}
}
def makePoll(
claim: () => Future[Option[String]],
run: String => Future[Unit],
pollIntervalMs: Long
)(implicit ec: ExecutionContext): (() => Boolean) => Future[Unit] = {
shouldStop => {
def loop(): Future[Unit] = {
if (shouldStop()) {
Future.successful(())
} else {
claim().flatMap {
case Some(workflowId) =>
// Intentionally not awaiting
run(workflowId)
loop()
case None =>
goSleep(pollIntervalMs).flatMap(_ => loop())
}
}
}
loop()
}
}
/**
* Creates a client based on the given configuration. If no configuration is
* provided, then the library defaults are used.
* @param config The configuration object.
* @return The client instance.
*/
def makeClient(config: Config)(implicit ec: ExecutionContext): Future[Client] = {
config.persistence.init().map { _ =>
val maxFailures = config.maxFailures.getOrElse(DEFAULT_MAX_FAILURES)
val timeoutIntervalMs = config.timeoutIntervalMs.getOrElse(DEFAULT_TIMEOUT_MS)
val pollIntervalMs = config.pollIntervalMs.getOrElse(DEFAULT_POLL_MS)
val start = makeStart(config.persistence)
val wait = makeWait(config.persistence)
val claim = makeClaim(config.persistence, timeoutIntervalMs)
val makeStep = makeMakeStep(config.persistence, timeoutIntervalMs)
val makeSleep = makeMakeSleep(config.persistence, timeoutIntervalMs)
val run = makeRun(
config.persistence,
config.handlers,
makeStep,
makeSleep,
start,
maxFailures,
timeoutIntervalMs
)
val poll = makePoll(claim, run, pollIntervalMs)
new Client {
def start[T](id: String, handler: String, input: T): Future[Boolean] =
start(id, handler, input)
def wait(
id: String,
status: Seq[Status],
times: Int,
ms: Long
): Future[Option[Status]] =
wait(id, status, times, ms)
def poll(shouldStop: () => Boolean): Future[Unit] =
poll(shouldStop)
}
}
}
// For internal testing
val forInternalTesting = {
Map(
"makeClaim" -> makeClaim _,
"makeMakeStep" -> makeMakeStep _,
"makeMakeSleep" -> makeMakeSleep _,
"makeRun" -> makeRun _,
"makeStart" -> makeStart _,
"makeWait" -> makeWait _,
"makePoll" -> makePoll _
)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment