Created
November 24, 2025 14:05
-
-
Save graysonhicks/aa33caa507ca62add7f616e29abdd086 to your computer and use it in GitHub Desktop.
Nested Workflows or Sequential Steps Within Branches
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import { createStep, createWorkflow } from '@mastra/core/workflows'; | |
| import { z } from 'zod'; | |
| // ===== SECURITY ISSUE BRANCH WORKFLOW ===== | |
| const logSecurityIncident = createStep({ | |
| id: 'logSecurityIncident', | |
| inputSchema: z.object({ email: z.string(), issues: z.array(z.string()) }), | |
| outputSchema: z.object({ logged: z.boolean(), incidentId: z.string() }), | |
| execute: async ({ inputData }) => { | |
| // Log to security system | |
| const incidentId = `SEC-${Date.now()}`; | |
| console.log(`Logged security incident ${incidentId}:`, inputData.issues); | |
| return { logged: true, incidentId }; | |
| }, | |
| }); | |
| const quarantineEmail = createStep({ | |
| id: 'quarantineEmail', | |
| inputSchema: z.object({ email: z.string(), incidentId: z.string() }), | |
| outputSchema: z.object({ quarantined: z.boolean() }), | |
| execute: async ({ inputData }) => { | |
| // Move email to quarantine | |
| console.log(`Quarantined email with incident ${inputData.incidentId}`); | |
| return { quarantined: true }; | |
| }, | |
| }); | |
| const sendSecurityAlert = createStep({ | |
| id: 'sendSecurityAlert', | |
| inputSchema: z.object({ incidentId: z.string() }), | |
| outputSchema: z.object({ alertSent: z.boolean() }), | |
| execute: async ({ inputData }) => { | |
| // Send alert to security team | |
| console.log(`Sent security alert for ${inputData.incidentId}`); | |
| return { alertSent: true }; | |
| }, | |
| }); | |
| // Create security issue workflow | |
| export const securityIssueWorkflow = createWorkflow({ | |
| id: 'security-issue-workflow', | |
| inputSchema: z.object({ | |
| email: z.string(), | |
| issues: z.array(z.string()) | |
| }), | |
| outputSchema: z.object({ | |
| handled: z.boolean(), | |
| incidentId: z.string() | |
| }), | |
| }) | |
| .then(logSecurityIncident) | |
| .map(async ({ inputData }) => ({ | |
| email: inputData.email, | |
| incidentId: inputData.incidentId, | |
| })) | |
| .then(quarantineEmail) | |
| .map(async ({ getStepResult }) => ({ | |
| incidentId: getStepResult(logSecurityIncident).incidentId, | |
| })) | |
| .then(sendSecurityAlert) | |
| .map(async ({ getStepResult }) => ({ | |
| handled: true, | |
| incidentId: getStepResult(logSecurityIncident).incidentId, | |
| })) | |
| .commit(); | |
| // ===== CLEAN EMAIL BRANCH WORKFLOW ===== | |
| const validateSender = createStep({ | |
| id: 'validateSender', | |
| inputSchema: z.object({ email: z.string(), sender: z.string() }), | |
| outputSchema: z.object({ valid: z.boolean(), senderInfo: z.object({ domain: z.string() }) }), | |
| execute: async ({ inputData }) => { | |
| // Validate sender | |
| const domain = inputData.sender.split('@')[1]; | |
| return { valid: true, senderInfo: { domain } }; | |
| }, | |
| }); | |
| const extractEmailContent = createStep({ | |
| id: 'extractContent', | |
| inputSchema: z.object({ email: z.string() }), | |
| outputSchema: z.object({ subject: z.string(), body: z.string(), attachments: z.array(z.string()) }), | |
| execute: async ({ inputData }) => { | |
| // Extract content | |
| return { | |
| subject: 'Example Subject', | |
| body: 'Email body...', | |
| attachments: [] | |
| }; | |
| }, | |
| }); | |
| const classifyEmailType = createStep({ | |
| id: 'classifyType', | |
| inputSchema: z.object({ subject: z.string(), body: z.string() }), | |
| outputSchema: z.object({ category: z.string(), priority: z.number() }), | |
| execute: async ({ inputData }) => { | |
| // Classify email | |
| return { category: 'support', priority: 2 }; | |
| }, | |
| }); | |
| const processContent = createStep({ | |
| id: 'processContent', | |
| inputSchema: z.object({ category: z.string(), body: z.string() }), | |
| outputSchema: z.object({ processed: z.boolean(), assignedTo: z.string() }), | |
| execute: async ({ inputData }) => { | |
| // Process based on category | |
| return { processed: true, assignedTo: 'support-team' }; | |
| }, | |
| }); | |
| const updateDatabase = createStep({ | |
| id: 'updateDatabase', | |
| inputSchema: z.object({ category: z.string(), assignedTo: z.string() }), | |
| outputSchema: z.object({ updated: z.boolean(), recordId: z.string() }), | |
| execute: async ({ inputData }) => { | |
| // Update database | |
| const recordId = `REC-${Date.now()}`; | |
| return { updated: true, recordId }; | |
| }, | |
| }); | |
| // Create clean email workflow | |
| export const cleanEmailWorkflow = createWorkflow({ | |
| id: 'clean-email-workflow', | |
| inputSchema: z.object({ | |
| email: z.string(), | |
| sender: z.string() | |
| }), | |
| outputSchema: z.object({ | |
| processed: z.boolean(), | |
| recordId: z.string() | |
| }), | |
| }) | |
| .then(validateSender) | |
| .map(async ({ inputData }) => ({ | |
| email: inputData.email, | |
| })) | |
| .then(extractEmailContent) | |
| .map(async ({ inputData }) => ({ | |
| subject: inputData.subject, | |
| body: inputData.body, | |
| })) | |
| .then(classifyEmailType) | |
| .map(async ({ inputData }) => ({ | |
| category: inputData.category, | |
| body: inputData.body, | |
| })) | |
| .then(processContent) | |
| .map(async ({ inputData }) => ({ | |
| category: inputData.category, | |
| assignedTo: inputData.assignedTo, | |
| })) | |
| .then(updateDatabase) | |
| .map(async ({ inputData }) => ({ | |
| processed: true, | |
| recordId: inputData.recordId, | |
| })) | |
| .commit(); | |
| // ===== MAIN EMAIL PROCESSING WORKFLOW ===== | |
| const receiveEmail = createStep({ | |
| id: 'receiveEmail', | |
| inputSchema: z.object({ emailData: z.string() }), | |
| outputSchema: z.object({ email: z.string(), sender: z.string() }), | |
| execute: async ({ inputData }) => { | |
| // Parse email | |
| return { | |
| email: inputData.emailData, | |
| sender: '[email protected]' | |
| }; | |
| }, | |
| }); | |
| const securityCheck = createStep({ | |
| id: 'securityCheck', | |
| inputSchema: z.object({ email: z.string(), sender: z.string() }), | |
| outputSchema: z.object({ | |
| hasIssues: z.boolean(), | |
| issues: z.array(z.string()), | |
| email: z.string(), | |
| sender: z.string() | |
| }), | |
| execute: async ({ inputData }) => { | |
| // Perform security checks | |
| const hasIssues = Math.random() > 0.7; // Simulate detection | |
| const issues = hasIssues ? ['suspicious-link', 'unknown-sender'] : []; | |
| return { | |
| hasIssues, | |
| issues, | |
| email: inputData.email, | |
| sender: inputData.sender | |
| }; | |
| }, | |
| }); | |
| const sendConfirmation = createStep({ | |
| id: 'sendConfirmation', | |
| inputSchema: z.object({ | |
| handled: z.boolean().optional(), | |
| processed: z.boolean().optional() | |
| }), | |
| outputSchema: z.object({ sent: z.boolean() }), | |
| execute: async ({ inputData }) => { | |
| console.log('Sending confirmation email...'); | |
| return { sent: true }; | |
| }, | |
| }); | |
| // Main workflow with branching | |
| export const emailProcessingWorkflow = createWorkflow({ | |
| id: 'email-processing-workflow', | |
| inputSchema: z.object({ emailData: z.string() }), | |
| outputSchema: z.object({ sent: z.boolean() }), | |
| }) | |
| .then(receiveEmail) | |
| .then(securityCheck) | |
| .branch([ | |
| // Security issues detected branch | |
| [ | |
| async ({ inputData }) => inputData.hasIssues === true, | |
| securityIssueWorkflow, // Entire nested workflow as a branch | |
| ], | |
| // No security issues branch | |
| [ | |
| async ({ inputData }) => inputData.hasIssues === false, | |
| cleanEmailWorkflow, // Entire nested workflow as a branch | |
| ], | |
| ]) | |
| .then(sendConfirmation) | |
| .commit(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment