Created
April 2, 2025 20:30
-
-
Save ferromir/c27e8c2cd21f24f553d663a882159cff to your computer and use it in GitHub Desktop.
Lidex translated to Scala by Claude 3.7
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import scala.concurrent.{Future, Promise, ExecutionContext} | |
| import scala.concurrent.duration._ | |
| import scala.util.{Success, Failure, Try} | |
| import java.time.{Instant, Duration => JDuration} | |
| import scala.collection.mutable.Map | |
| object Workflow { | |
| val DEFAULT_MAX_FAILURES = 3 | |
| val DEFAULT_TIMEOUT_MS = 60000 // 1m | |
| val DEFAULT_POLL_MS = 1000 // 1s | |
| type Status = String // "idle" | "running" | "failed" | "finished" | "aborted" | |
| val StatusIdle: Status = "idle" | |
| val StatusRunning: Status = "running" | |
| val StatusFailed: Status = "failed" | |
| val StatusFinished: Status = "finished" | |
| val StatusAborted: Status = "aborted" | |
| trait Context { | |
| /** | |
| * Executes a step. | |
| * @param id The id of the step. | |
| * @param fn The function to be executed. | |
| */ | |
| def step[T](id: String, fn: () => Future[T]): Future[T] | |
| /** | |
| * Puts the workflow to sleep. | |
| * @param id The id of the nap. | |
| * @param ms The amount of milliseconds to sleep. | |
| */ | |
| def sleep(id: String, ms: Long): Future[Unit] | |
| /** | |
| * Starts a new workflow. | |
| * @param id The id of the workflow. | |
| * @param handler The handler name to execute the workflow. | |
| * @param input The input to the workflow. | |
| */ | |
| def start[T](id: String, handler: String, input: T): Future[Boolean] | |
| } | |
| type Handler = (Context, Any) => Future[Unit] | |
| trait Client { | |
| /** | |
| * It starts a workflow. | |
| * @param id The id of the workflow. | |
| * @param handler The handler name of the workflow. | |
| * @param input The input of the workflow, it must be serializable into JSON. | |
| * @return True if the workflow is created, false if the workflow already | |
| * existed. | |
| */ | |
| def start[T](id: String, handler: String, input: T): Future[Boolean] | |
| /** | |
| * Returns a matching workflow status if found, it retries for the specified | |
| * amount of times and it pauses in between. | |
| * @param id The id of workflow. | |
| * @param status A list of status to match. | |
| * @param times Amount of retries. | |
| * @param ms Amount of milliseconds to wait between retries. | |
| */ | |
| def wait( | |
| id: String, | |
| status: Seq[Status], | |
| times: Int, | |
| ms: Long | |
| ): Future[Option[Status]] | |
| /** | |
| * It starts polling workflows. | |
| * @param shouldStop Circuit breaker for the polling loop. | |
| */ | |
| def poll(shouldStop: () => Boolean): Future[Unit] | |
| } | |
| case class Config( | |
| handlers: Map[String, Handler], | |
| persistence: Persistence, | |
| maxFailures: Option[Int] = None, | |
| timeoutIntervalMs: Option[Long] = None, | |
| pollIntervalMs: Option[Long] = None | |
| ) | |
| case class RunData( | |
| handler: String, | |
| input: Any, | |
| failures: Int = 0 | |
| ) | |
| trait Persistence { | |
| /** | |
| * Initializes the persistence provider. | |
| */ | |
| def init(): Future[Unit] | |
| /** | |
| * Inserts a workflow. | |
| * @param workflowId The id of the workflow. | |
| * @param handler The name of the handler. | |
| * @param input The input for the workflow. | |
| * @return True is the workflow was inserted. False is the workflow already | |
| * exists. | |
| */ | |
| def insert(workflowId: String, handler: String, input: Any): Future[Boolean] | |
| /** | |
| * It consists of two actions: | |
| * 1. Find a workflow that is ready to run. | |
| * 2. Update the timeout and set the status to "running". | |
| * These 2 steps have to be performed atomically. | |
| * | |
| * A "ready to run" workflow matches the following condition: | |
| * (status is "idle") OR | |
| * (status is "running" AND timeoutAt < CURRENT_TIME) OR | |
| * (status is "failed" AND timeoutAt < CURRENT_TIME) | |
| * @param now The current time. | |
| * @param timeoutAt The workflow timeout. | |
| * @return The workflow id. | |
| */ | |
| def claim(now: Instant, timeoutAt: Instant): Future[Option[String]] | |
| /** | |
| * Finds the stored output for the given workflow and step. | |
| * @param workflowId Id of the workflow. | |
| * @param stepId Id of the step. | |
| * @return The output. Returns Option.empty if not found. | |
| */ | |
| def findOutput(workflowId: String, stepId: String): Future[Option[Any]] | |
| /** | |
| * Finds the stored wake up time for the given workflow and nap. | |
| * @param workflowId Id of the workflow. | |
| * @param napId Id of the nap. | |
| * @return The wake up time. Returns Option.empty if not found. | |
| */ | |
| def findWakeUpAt(workflowId: String, napId: String): Future[Option[Instant]] | |
| /** | |
| * Finds information about the workflow required to run it. | |
| * @param workflowId Id of the workflow. | |
| * @return The run data. | |
| */ | |
| def findRunData(workflowId: String): Future[Option[RunData]] | |
| /** | |
| * It sets the status of the workflow to "finished". | |
| * @param workflowId Id of the workflow. | |
| */ | |
| def setAsFinished(workflowId: String): Future[Unit] | |
| /** | |
| * Finds the status of a workflow. | |
| * @param workflowId Id of the workflow. | |
| * @return The status if found, otherwise Option.empty. | |
| */ | |
| def findStatus(workflowId: String): Future[Option[Status]] | |
| /** | |
| * Updates the status, timeoutAt, failures and lastError. | |
| * @param workflowId Id of the workflow. | |
| * @param status Status of the workflow. | |
| * @param timeoutAt The workflow timeout. | |
| * @param failures The amount of failures. | |
| * @param lastError Last error message. | |
| */ | |
| def updateStatus( | |
| workflowId: String, | |
| status: Status, | |
| timeoutAt: Instant, | |
| failures: Int, | |
| lastError: String | |
| ): Future[Unit] | |
| /** | |
| * Updates the step's output and timeoutAt. | |
| * @param workflowId Id of the workflow. | |
| * @param stepId Id of the step. | |
| * @param output Output of the step. | |
| * @param timeoutAt The workflow timeout. | |
| */ | |
| def updateOutput( | |
| workflowId: String, | |
| stepId: String, | |
| output: Any, | |
| timeoutAt: Instant | |
| ): Future[Unit] | |
| /** | |
| * Updates the step's output and timeoutAt. | |
| * @param workflowId Id of the workflow. | |
| * @param napId Id of the nap. | |
| * @param wakeUpAt Wake up time of the nap. | |
| * @param timeoutAt The workflow timeout. | |
| */ | |
| def updateWakeUpAt( | |
| workflowId: String, | |
| napId: String, | |
| wakeUpAt: Instant, | |
| timeoutAt: Instant | |
| ): Future[Unit] | |
| } | |
| def goSleep(ms: Long)(implicit ec: ExecutionContext): Future[Unit] = { | |
| val promise = Promise[Unit]() | |
| val scheduler = java.util.concurrent.Executors.newScheduledThreadPool(1) | |
| val runnable = new Runnable { | |
| def run(): Unit = { | |
| promise.success(()) | |
| } | |
| } | |
| scheduler.schedule(runnable, ms, java.util.concurrent.TimeUnit.MILLISECONDS) | |
| promise.future | |
| } | |
| def makeClaim(persistence: Persistence, timeoutIntervalMs: Long)(implicit ec: ExecutionContext): () => Future[Option[String]] = { | |
| () => { | |
| val now = Instant.now() | |
| val timeoutAt = now.plusMillis(timeoutIntervalMs) | |
| persistence.claim(now, timeoutAt) | |
| } | |
| } | |
| def makeMakeStep[T](persistence: Persistence, timeoutIntervalMs: Long)(implicit ec: ExecutionContext): String => ((String, () => Future[T]) => Future[T]) = { | |
| workflowId => { | |
| (stepId, fn) => { | |
| persistence.findOutput(workflowId, stepId).flatMap { | |
| case Some(output) => Future.successful(output.asInstanceOf[T]) | |
| case None => | |
| fn().flatMap { output => | |
| val now = Instant.now() | |
| val timeoutAt = now.plusMillis(timeoutIntervalMs) | |
| persistence.updateOutput(workflowId, stepId, output, timeoutAt).map(_ => output) | |
| } | |
| } | |
| } | |
| } | |
| } | |
| def makeMakeSleep(persistence: Persistence, timeoutIntervalMs: Long)(implicit ec: ExecutionContext): String => ((String, Long) => Future[Unit]) = { | |
| workflowId => { | |
| (napId, ms) => { | |
| persistence.findWakeUpAt(workflowId, napId).flatMap { | |
| case Some(wakeUpAt) => | |
| val now = Instant.now() | |
| val remainingMs = JDuration.between(now, wakeUpAt).toMillis | |
| if (remainingMs > 0) { | |
| goSleep(remainingMs) | |
| } else { | |
| Future.successful(()) | |
| } | |
| case None => | |
| val now = Instant.now() | |
| val wakeUpAt = now.plusMillis(ms) | |
| val timeoutAt = wakeUpAt.plusMillis(timeoutIntervalMs) | |
| persistence.updateWakeUpAt(workflowId, napId, wakeUpAt, timeoutAt).flatMap(_ => goSleep(ms)) | |
| } | |
| } | |
| } | |
| } | |
| def makeRun( | |
| persistence: Persistence, | |
| handlers: Map[String, Handler], | |
| makeStep: String => ((String, () => Future[Any]) => Future[Any]), | |
| makeSleep: String => ((String, Long) => Future[Unit]), | |
| start: (String, String, Any) => Future[Boolean], | |
| maxFailures: Int, | |
| timeoutIntervalMs: Long | |
| )(implicit ec: ExecutionContext): String => Future[Unit] = { | |
| workflowId => { | |
| persistence.findRunData(workflowId).flatMap { | |
| case None => | |
| Future.failed(new RuntimeException(s"workflow not found: $workflowId")) | |
| case Some(runData) => | |
| handlers.get(runData.handler) match { | |
| case None => | |
| Future.failed(new RuntimeException(s"handler not found: ${runData.handler}")) | |
| case Some(fn) => | |
| val ctx = new Context { | |
| def step[T](id: String, fn: () => Future[T]): Future[T] = | |
| makeStep(workflowId)(id, fn.asInstanceOf[() => Future[Any]]).asInstanceOf[Future[T]] | |
| def sleep(id: String, ms: Long): Future[Unit] = | |
| makeSleep(workflowId)(id, ms) | |
| def start[T](id: String, handler: String, input: T): Future[Boolean] = | |
| start(id, handler, input) | |
| } | |
| fn(ctx, runData.input).recoverWith { | |
| case error: Throwable => | |
| val lastError = error.getMessage | |
| val failures = runData.failures + 1 | |
| val status = if (failures < maxFailures) StatusFailed else StatusAborted | |
| val now = Instant.now() | |
| val timeoutAt = now.plusMillis(timeoutIntervalMs) | |
| persistence.updateStatus( | |
| workflowId, | |
| status, | |
| timeoutAt, | |
| failures, | |
| lastError | |
| ) | |
| }.flatMap(_ => persistence.setAsFinished(workflowId)) | |
| } | |
| } | |
| } | |
| } | |
| def makeStart(persistence: Persistence)(implicit ec: ExecutionContext): (String, String, Any) => Future[Boolean] = { | |
| (workflowId, handler, input) => { | |
| persistence.insert(workflowId, handler, input) | |
| } | |
| } | |
| def makeWait(persistence: Persistence)(implicit ec: ExecutionContext): (String, Seq[Status], Int, Long) => Future[Option[Status]] = { | |
| (workflowId, status, times, ms) => { | |
| def loop(remaining: Int): Future[Option[Status]] = { | |
| if (remaining <= 0) { | |
| Future.successful(None) | |
| } else { | |
| persistence.findStatus(workflowId).flatMap { | |
| case Some(found) if status.contains(found) => | |
| Future.successful(Some(found)) | |
| case _ => | |
| goSleep(ms).flatMap(_ => loop(remaining - 1)) | |
| } | |
| } | |
| } | |
| loop(times) | |
| } | |
| } | |
| def makePoll( | |
| claim: () => Future[Option[String]], | |
| run: String => Future[Unit], | |
| pollIntervalMs: Long | |
| )(implicit ec: ExecutionContext): (() => Boolean) => Future[Unit] = { | |
| shouldStop => { | |
| def loop(): Future[Unit] = { | |
| if (shouldStop()) { | |
| Future.successful(()) | |
| } else { | |
| claim().flatMap { | |
| case Some(workflowId) => | |
| // Intentionally not awaiting | |
| run(workflowId) | |
| loop() | |
| case None => | |
| goSleep(pollIntervalMs).flatMap(_ => loop()) | |
| } | |
| } | |
| } | |
| loop() | |
| } | |
| } | |
| /** | |
| * Creates a client based on the given configuration. If no configuration is | |
| * provided, then the library defaults are used. | |
| * @param config The configuration object. | |
| * @return The client instance. | |
| */ | |
| def makeClient(config: Config)(implicit ec: ExecutionContext): Future[Client] = { | |
| config.persistence.init().map { _ => | |
| val maxFailures = config.maxFailures.getOrElse(DEFAULT_MAX_FAILURES) | |
| val timeoutIntervalMs = config.timeoutIntervalMs.getOrElse(DEFAULT_TIMEOUT_MS) | |
| val pollIntervalMs = config.pollIntervalMs.getOrElse(DEFAULT_POLL_MS) | |
| val start = makeStart(config.persistence) | |
| val wait = makeWait(config.persistence) | |
| val claim = makeClaim(config.persistence, timeoutIntervalMs) | |
| val makeStep = makeMakeStep(config.persistence, timeoutIntervalMs) | |
| val makeSleep = makeMakeSleep(config.persistence, timeoutIntervalMs) | |
| val run = makeRun( | |
| config.persistence, | |
| config.handlers, | |
| makeStep, | |
| makeSleep, | |
| start, | |
| maxFailures, | |
| timeoutIntervalMs | |
| ) | |
| val poll = makePoll(claim, run, pollIntervalMs) | |
| new Client { | |
| def start[T](id: String, handler: String, input: T): Future[Boolean] = | |
| start(id, handler, input) | |
| def wait( | |
| id: String, | |
| status: Seq[Status], | |
| times: Int, | |
| ms: Long | |
| ): Future[Option[Status]] = | |
| wait(id, status, times, ms) | |
| def poll(shouldStop: () => Boolean): Future[Unit] = | |
| poll(shouldStop) | |
| } | |
| } | |
| } | |
| // For internal testing | |
| val forInternalTesting = { | |
| Map( | |
| "makeClaim" -> makeClaim _, | |
| "makeMakeStep" -> makeMakeStep _, | |
| "makeMakeSleep" -> makeMakeSleep _, | |
| "makeRun" -> makeRun _, | |
| "makeStart" -> makeStart _, | |
| "makeWait" -> makeWait _, | |
| "makePoll" -> makePoll _ | |
| ) | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment