Skip to content

Instantly share code, notes, and snippets.

@graysonhicks
Created November 24, 2025 14:05
Show Gist options
  • Select an option

  • Save graysonhicks/aa33caa507ca62add7f616e29abdd086 to your computer and use it in GitHub Desktop.

Select an option

Save graysonhicks/aa33caa507ca62add7f616e29abdd086 to your computer and use it in GitHub Desktop.
Nested Workflows or Sequential Steps Within Branches
import { createStep, createWorkflow } from '@mastra/core/workflows';
import { z } from 'zod';
// ===== SECURITY ISSUE BRANCH WORKFLOW =====
const logSecurityIncident = createStep({
id: 'logSecurityIncident',
inputSchema: z.object({ email: z.string(), issues: z.array(z.string()) }),
outputSchema: z.object({ logged: z.boolean(), incidentId: z.string() }),
execute: async ({ inputData }) => {
// Log to security system
const incidentId = `SEC-${Date.now()}`;
console.log(`Logged security incident ${incidentId}:`, inputData.issues);
return { logged: true, incidentId };
},
});
const quarantineEmail = createStep({
id: 'quarantineEmail',
inputSchema: z.object({ email: z.string(), incidentId: z.string() }),
outputSchema: z.object({ quarantined: z.boolean() }),
execute: async ({ inputData }) => {
// Move email to quarantine
console.log(`Quarantined email with incident ${inputData.incidentId}`);
return { quarantined: true };
},
});
const sendSecurityAlert = createStep({
id: 'sendSecurityAlert',
inputSchema: z.object({ incidentId: z.string() }),
outputSchema: z.object({ alertSent: z.boolean() }),
execute: async ({ inputData }) => {
// Send alert to security team
console.log(`Sent security alert for ${inputData.incidentId}`);
return { alertSent: true };
},
});
// Create security issue workflow
export const securityIssueWorkflow = createWorkflow({
id: 'security-issue-workflow',
inputSchema: z.object({
email: z.string(),
issues: z.array(z.string())
}),
outputSchema: z.object({
handled: z.boolean(),
incidentId: z.string()
}),
})
.then(logSecurityIncident)
.map(async ({ inputData }) => ({
email: inputData.email,
incidentId: inputData.incidentId,
}))
.then(quarantineEmail)
.map(async ({ getStepResult }) => ({
incidentId: getStepResult(logSecurityIncident).incidentId,
}))
.then(sendSecurityAlert)
.map(async ({ getStepResult }) => ({
handled: true,
incidentId: getStepResult(logSecurityIncident).incidentId,
}))
.commit();
// ===== CLEAN EMAIL BRANCH WORKFLOW =====
const validateSender = createStep({
id: 'validateSender',
inputSchema: z.object({ email: z.string(), sender: z.string() }),
outputSchema: z.object({ valid: z.boolean(), senderInfo: z.object({ domain: z.string() }) }),
execute: async ({ inputData }) => {
// Validate sender
const domain = inputData.sender.split('@')[1];
return { valid: true, senderInfo: { domain } };
},
});
const extractEmailContent = createStep({
id: 'extractContent',
inputSchema: z.object({ email: z.string() }),
outputSchema: z.object({ subject: z.string(), body: z.string(), attachments: z.array(z.string()) }),
execute: async ({ inputData }) => {
// Extract content
return {
subject: 'Example Subject',
body: 'Email body...',
attachments: []
};
},
});
const classifyEmailType = createStep({
id: 'classifyType',
inputSchema: z.object({ subject: z.string(), body: z.string() }),
outputSchema: z.object({ category: z.string(), priority: z.number() }),
execute: async ({ inputData }) => {
// Classify email
return { category: 'support', priority: 2 };
},
});
const processContent = createStep({
id: 'processContent',
inputSchema: z.object({ category: z.string(), body: z.string() }),
outputSchema: z.object({ processed: z.boolean(), assignedTo: z.string() }),
execute: async ({ inputData }) => {
// Process based on category
return { processed: true, assignedTo: 'support-team' };
},
});
const updateDatabase = createStep({
id: 'updateDatabase',
inputSchema: z.object({ category: z.string(), assignedTo: z.string() }),
outputSchema: z.object({ updated: z.boolean(), recordId: z.string() }),
execute: async ({ inputData }) => {
// Update database
const recordId = `REC-${Date.now()}`;
return { updated: true, recordId };
},
});
// Create clean email workflow
export const cleanEmailWorkflow = createWorkflow({
id: 'clean-email-workflow',
inputSchema: z.object({
email: z.string(),
sender: z.string()
}),
outputSchema: z.object({
processed: z.boolean(),
recordId: z.string()
}),
})
.then(validateSender)
.map(async ({ inputData }) => ({
email: inputData.email,
}))
.then(extractEmailContent)
.map(async ({ inputData }) => ({
subject: inputData.subject,
body: inputData.body,
}))
.then(classifyEmailType)
.map(async ({ inputData }) => ({
category: inputData.category,
body: inputData.body,
}))
.then(processContent)
.map(async ({ inputData }) => ({
category: inputData.category,
assignedTo: inputData.assignedTo,
}))
.then(updateDatabase)
.map(async ({ inputData }) => ({
processed: true,
recordId: inputData.recordId,
}))
.commit();
// ===== MAIN EMAIL PROCESSING WORKFLOW =====
const receiveEmail = createStep({
id: 'receiveEmail',
inputSchema: z.object({ emailData: z.string() }),
outputSchema: z.object({ email: z.string(), sender: z.string() }),
execute: async ({ inputData }) => {
// Parse email
return {
email: inputData.emailData,
sender: '[email protected]'
};
},
});
const securityCheck = createStep({
id: 'securityCheck',
inputSchema: z.object({ email: z.string(), sender: z.string() }),
outputSchema: z.object({
hasIssues: z.boolean(),
issues: z.array(z.string()),
email: z.string(),
sender: z.string()
}),
execute: async ({ inputData }) => {
// Perform security checks
const hasIssues = Math.random() > 0.7; // Simulate detection
const issues = hasIssues ? ['suspicious-link', 'unknown-sender'] : [];
return {
hasIssues,
issues,
email: inputData.email,
sender: inputData.sender
};
},
});
const sendConfirmation = createStep({
id: 'sendConfirmation',
inputSchema: z.object({
handled: z.boolean().optional(),
processed: z.boolean().optional()
}),
outputSchema: z.object({ sent: z.boolean() }),
execute: async ({ inputData }) => {
console.log('Sending confirmation email...');
return { sent: true };
},
});
// Main workflow with branching
export const emailProcessingWorkflow = createWorkflow({
id: 'email-processing-workflow',
inputSchema: z.object({ emailData: z.string() }),
outputSchema: z.object({ sent: z.boolean() }),
})
.then(receiveEmail)
.then(securityCheck)
.branch([
// Security issues detected branch
[
async ({ inputData }) => inputData.hasIssues === true,
securityIssueWorkflow, // Entire nested workflow as a branch
],
// No security issues branch
[
async ({ inputData }) => inputData.hasIssues === false,
cleanEmailWorkflow, // Entire nested workflow as a branch
],
])
.then(sendConfirmation)
.commit();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment