Last active
April 29, 2025 09:15
-
-
Save icebob/733b7c31845276d6de1bd795635be0e3 to your computer and use it in GitHub Desktop.
Moleculer Workflow service schema concept
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// users.service.js | |
module.exports = { | |
name: "users", | |
// Define workflows | |
workflows: { | |
// User signup workflow. | |
signupWorkflow: { | |
// Workflow execution timeout | |
timeout: "1 day", | |
// Workflow event history retention | |
retention: "3 days", | |
// Retry policy | |
retryPolicy: {}, | |
// Parameter validation | |
params: {}, | |
// Workflow handler | |
async handler(ctx) { | |
// Check the e-mail address is not exists | |
const isExist = await ctx.wf.call("users.getByEmail", { email: ctx.params.email }); | |
if (isExist) { | |
throw new MoleculerClientError("E-mail address is already signed up. Use the login button"); | |
} | |
// Check the e-mail address is valid and not a temp mail address | |
await ctx.wf.call("utils.isTemporaryEmail", { email: ctx.params.email }); | |
// Register (max execution is 10 sec) | |
const user = await ctx.wf.call("users.register", ctx.params, { | |
timeout: 10, | |
// Set the workflow state to this before call the action | |
beforeSetState: "REGISTERING", | |
// Set the workflow state to this after the action called | |
afterSetState: "SENDING_EMAIL", | |
// For Saga, define the compensation action in case of failure | |
compensation: "users.remove" | |
}); | |
// Send verification | |
await ctx.wf.call("mail.send", { type: "verification", user }); | |
// Wait for verification (max 1 hour) | |
await ctx.wf.setState("WAIT_VERIFICATION"); | |
try { | |
await ctx.wf.waitForSignal("email.verification", { | |
key: user.id, | |
timeout: "1 hour" | |
}); | |
await ctx.wf.setState("VERIFIED"); | |
} catch(err) { | |
if (err.name == "WorkflowTaskTimeoutError") { | |
// Registraion not verified in 1 hour, remove the user | |
await ctx.wf.call("user.remove", { id: user.id }); | |
return null; | |
} | |
// Other error is thrown further | |
throw err; | |
} | |
// Set user verified and save | |
user.verified = true; | |
await ctx.wf.call("users.update", user); | |
// Send welcome email | |
await ctx.wf.call("mail.send", { type: "welcome", user }); | |
// Set the workflow state to done (It can be a string, number, or object) | |
await ctx.wf.setState("DONE"); | |
// It will be stored as a result value to the workflow in event history | |
return user; | |
} | |
} | |
}, | |
actions: { | |
signup: { | |
rest: "POST /register", | |
async handler(ctx) { | |
const res = await ctx.wf.run("users.signupWorkflow", ctx.params, { | |
workflowId: ctx.requestID // optional | |
/* other workflow run options */ | |
}); | |
// Here the workflow is running, the res is a state object | |
return { | |
// With the workflowId, you can call the `checkSignupState` REST action | |
// to get the state of the execution on the frontend. | |
workflowId: res.workflowId | |
} | |
// or wait for the execution and return the result | |
return await res.result(); | |
} | |
}, | |
verify: { | |
rest: "POST /verify/:token", | |
handler(ctx) { | |
// Check the validity | |
const user = ctx.call("users.find", { verificationToken: ctx.params.token }); | |
if (user) { | |
ctx.wf.sendSignal("email.verification", { key: user.id }); | |
} | |
} | |
}, | |
checkSignupState: { | |
rest: "GET /state/:workflowId", | |
async handler(ctx) { | |
const res = await ctx.wf.getState({ workflowId: ctx.params.workflowId }); | |
if (res.state == "DONE") { | |
return { user: res.result }; | |
} else { | |
return { state: res.state, startedAt: res.startedAt, duration: res.duration }; | |
} | |
} | |
} | |
}, | |
started() { | |
this.broker.wf.run("notifyNonLoggedUsers", {}, { | |
workflowId: "midnight-notify", // Only start a new schedule if not exists with the same workflowId | |
// Delayed run | |
startDelay: "1 hour", | |
// Recurring run | |
schedule: { | |
cron: "0 0 * * *" // run every midnight | |
} | |
}); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment