Last active
February 12, 2026 19:23
-
-
Save slinkydeveloper/d929a67027a7192e4829bc7ece7cd64c to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import * as restate from "@restatedev/restate-sdk"; | |
| import {RestatePromise, TerminalError} from "@restatedev/restate-sdk"; | |
| import {readFile} from "fs/promises"; | |
| import {join} from "path"; | |
| import type * as dosswf from "./workflow_dsl_types" | |
| import * as dosswfruntime from "./workflow_interpreter_runtime" | |
| type ActionStackElement = { id: number, stepIds: string[] }; | |
| type ExecuteRequest = { | |
| steps: dosswf.ActionStep[]; | |
| actionStack: ActionStackElement[]; | |
| thisStackId: number; | |
| // Filled only if this is a subworkflow within MAP! | |
| iterationIndex?: number; | |
| }; | |
| const TRIGGER_KEY = "trigger"; | |
| const workflowRunner = restate.service({ | |
| name: "WorkflowRunner", | |
| handlers: { | |
| start: async (ctx: restate.Context, {workflowDefinition, trigger}: { | |
| workflowDefinition: string, | |
| trigger: any | |
| }) => { | |
| const workflow = await ctx.run(`load ${workflowDefinition} workflow`, () => loadWorkflowFromFile(workflowDefinition)); | |
| const workflowKey = ctx.rand.uuidv4(); | |
| // Init the WorkflowResults tree | |
| await ctx.objectClient(workflowResultsTree, workflowKey).init(trigger); | |
| // Generate root workflow key and start | |
| const workflowPromise = ctx.objectClient(workflowInterpreter, workflowKey).interpret({ | |
| steps: workflow.steps, | |
| actionStack: [], | |
| thisStackId: 0, | |
| }); | |
| const invocationId = await workflowPromise.invocationId; | |
| try { | |
| // Await completion of workflow | |
| await workflowPromise; | |
| } finally { | |
| // Schedule cleanup in 24hrs from when workflow promise completes | |
| ctx.objectSendClient(workflowResultsTree, workflowKey).cleanup(restate.rpc.sendOpts({delay: {hours: 24}})); | |
| } | |
| return { | |
| workflowKey, | |
| invocationId | |
| } | |
| } | |
| } | |
| }) | |
| async function getAndFilter(ctx: restate.ObjectContext, key: string, innerExpression?: string) { | |
| let value = await ctx.get(key); | |
| if (!value) { | |
| throw new TerminalError(`Key ${key} not found`, {errorCode: 404}) | |
| } | |
| if (innerExpression) { | |
| value = dosswfruntime.expressionResult(value, innerExpression) | |
| } | |
| return value | |
| } | |
| function stepResultStateKey(stack: number[], stepId: string) { | |
| return `${["stepResults"].concat(stack.map(n => n.toString())).join("/")}/${stepId}`; | |
| } | |
| function dataStoreStateKey(dataStoreKey: string) { | |
| return `datastore/${dataStoreKey}`; | |
| } | |
| const workflowResultsTree = restate.object({ | |
| name: "WorkflowResultsTree", | |
| handlers: { | |
| // --- Readers --- | |
| getTrigger: (ctx: restate.ObjectContext, {innerExpression}: { innerExpression?: string }) => | |
| getAndFilter(ctx, TRIGGER_KEY, innerExpression), | |
| getStepResult: async (ctx: restate.ObjectContext, {stack, stepId, innerExpression}: { | |
| stack: number[], | |
| stepId: string, | |
| innerExpression?: string | |
| }) => | |
| getAndFilter(ctx, stepResultStateKey(stack, stepId), innerExpression), | |
| getDataStoreValue: (ctx: restate.ObjectContext, {dataStoreKey, innerExpression}: { | |
| dataStoreKey: string, | |
| innerExpression?: string | |
| }) => | |
| getAndFilter(ctx, dataStoreStateKey(dataStoreKey), innerExpression), | |
| // --- Writers --- | |
| init: async (ctx: restate.ObjectContext, {trigger}: { trigger: any }) => { | |
| ctx.set(TRIGGER_KEY, trigger); | |
| }, | |
| setStepResult: async (ctx: restate.ObjectContext, {stack, stepId, value}: { | |
| stack: number[], | |
| stepId: string, | |
| value: any | |
| }) => { | |
| ctx.set(stepResultStateKey(stack, stepId), value); | |
| }, | |
| setDataStoreValue: async (ctx: restate.ObjectContext, {dataStoreKey, value}: { | |
| dataStoreKey: string, | |
| value: any | |
| }) => { | |
| // TODO ask @dbt merging rules for data store values | |
| ctx.set(dataStoreStateKey(dataStoreKey), value); | |
| }, | |
| cleanup: async (ctx: restate.ObjectContext) => { | |
| ctx.clearAll(); | |
| }, | |
| }, | |
| options: { | |
| // An optimization to avoid loading all state entries (in all the handlers we usually just need to read/write one) | |
| enableLazyState: true, | |
| // This service should just be called by WorkflowRunner | |
| ingressPrivate: true | |
| } | |
| }) | |
| const workflowInterpreter = restate.object({ | |
| name: "WorkflowInterpreter", | |
| handlers: { | |
| interpret: restate.createObjectSharedHandler(async (ctx: restate.ObjectSharedContext, req: ExecuteRequest) => { | |
| // Step results | |
| const stepResults: Record<string, dosswf.StepResult> = {}; | |
| const currentStack = req.actionStack.map(s => s.id).concat([req.thisStackId]); | |
| // Will be incremented to build the stack | |
| let childrenCount = 0; | |
| // Factory for the step invocation context | |
| const stepExecutionContext = <T>(stepDefinition: T): dosswfruntime.StepExecutionContext<T> => ({ | |
| stepDefinition, | |
| restate: ctx, | |
| mapIterationIndex: req.iterationIndex, | |
| getTrigger: | |
| async (innerExpression) => ctx.objectClient(workflowResultsTree, ctx.key).getTrigger({innerExpression}), | |
| getStepResult: async (stepId, innerExpression) => { | |
| // Let's first check if we have this locally | |
| let value = stepResults[stepId]; | |
| if (value !== undefined) { | |
| // Found it locally, let's apply the expression and we're good to go | |
| if (innerExpression !== undefined && value.type === "success") { | |
| // Apply filter | |
| value.result = dosswfruntime.expressionResult(value.result, innerExpression); | |
| } | |
| } else { | |
| // Find this in the stack | |
| const stack = []; | |
| let found = false; | |
| for (const stackElement of req.actionStack) { | |
| stack.push(stackElement.id); | |
| if (stackElement.stepIds.find(stackStepId => stackStepId === stepId)) { | |
| // Found it | |
| found = true; | |
| break; | |
| } | |
| } | |
| if (!found) { | |
| throw new TerminalError(`Step ${stepId} not found in action stack, looks like a bug in the workflow definition?`); | |
| } | |
| // Ok we've found who in the stack has this step result, let's query it | |
| value = ctx.objectClient(workflowResultsTree, ctx.key).getStepResult({stack, stepId, innerExpression}) as unknown as dosswf.StepResult; | |
| } | |
| return value; | |
| }, | |
| getDataStoreValue: async (dataStoreKey: string, innerExpression?: string) => ctx.objectClient(workflowResultsTree, ctx.key).getDataStoreValue({dataStoreKey, innerExpression}), | |
| }) | |
| for (const step of req.steps) { | |
| // Prepare the stack element to propagate, and the child key | |
| const getActionStack = () => { | |
| const stackElement: ActionStackElement = { | |
| id: req.thisStackId, | |
| stepIds: Object.keys(stepResults) | |
| }; | |
| return req.actionStack.concat([stackElement]); | |
| } | |
| switch (step.type) { | |
| case "query": { | |
| stepResults[step.id] = await dosswfruntime.tableQuery(stepExecutionContext(step)); | |
| ctx.objectSendClient(workflowResultsTree, ctx.key).setStepResult({stack: currentStack, stepId: step.id, value: stepResults[step.id]}); | |
| break; | |
| } | |
| case "select": { | |
| // Evaluate condition and decide which branch to go to | |
| const conditionEvaluationResult = await dosswfruntime.evaluateConditionExpression(stepExecutionContext(step)); | |
| const branch = conditionEvaluationResult ? step.trueBranch : step.falseBranch; | |
| const newStackElementId = childrenCount; | |
| childrenCount++; | |
| await ctx.objectClient(workflowInterpreter, ctx.key).interpret({ | |
| steps: branch, | |
| actionStack:getActionStack(), | |
| thisStackId: newStackElementId | |
| }) | |
| break; | |
| } | |
| case "map": { | |
| const numberOfIterations = await dosswfruntime.evaluateInputExpressionArraySize(stepExecutionContext(step)); | |
| const actionStack = getActionStack(); | |
| // Create various child invocations | |
| const childInterpreterPromises: RestatePromise<void>[] = []; | |
| for (let iterationIndex = 0; iterationIndex < numberOfIterations; iterationIndex++) { | |
| const newStackElementId = childrenCount; | |
| childrenCount++; | |
| childInterpreterPromises.push( | |
| ctx.objectClient(workflowInterpreter, ctx.key).interpret({ | |
| steps: step.sub, | |
| actionStack, | |
| thisStackId: newStackElementId, | |
| iterationIndex | |
| }) | |
| ); | |
| } | |
| // Now await all child invocations to complete | |
| await RestatePromise.all(childInterpreterPromises); | |
| break; | |
| } | |
| } | |
| } | |
| }), | |
| }, | |
| options: { | |
| // This service should just get called by WorkflowRunner | |
| ingressPrivate: true | |
| } | |
| }); | |
| async function loadWorkflowFromFile(id: string): Promise<dosswf.ActionDefinition> { | |
| const filePath = join(process.cwd(), 'workflow', `${id}.json`); | |
| const fileContent = await readFile(filePath, 'utf-8'); | |
| return JSON.parse(fileContent) as dosswf.ActionDefinition; | |
| } | |
| restate.serve({ | |
| services: [workflowRunner, workflowInterpreter, workflowResultsTree], | |
| }); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment