Skip to content

Instantly share code, notes, and snippets.

@mwufi
Created June 2, 2025 22:01
Show Gist options
  • Save mwufi/daa203f6623101ed6d2b04de0e55ce47 to your computer and use it in GitHub Desktop.
Save mwufi/daa203f6623101ed6d2b04de0e55ce47 to your computer and use it in GitHub Desktop.
Service example
import * as restate from "@restatedev/restate-sdk";
// Job interface
interface Job {
id: string;
type: string;
payload: any;
priority?: number;
createdAt: Date;
maxRetries?: number;
retryCount?: number;
}
interface JobResult {
success: boolean;
result?: any;
error?: string;
}
// Virtual Object for managing jobs per organization
const organizationJobController = restate.object({
name: "OrganizationJobController",
handlers: {
// Add a job to the organization's queue
async enqueueJob(ctx: restate.ObjectContext, job: Job): Promise<void> {
const jobs = (await ctx.get<Job[]>("job_queue")) || [];
const currentJob = await ctx.get<Job | null>("current_job");
// Add job to queue
jobs.push({
...job,
createdAt: new Date(),
retryCount: 0,
maxRetries: job.maxRetries || 3
});
// Sort by priority (higher numbers = higher priority)
jobs.sort((a, b) => (b.priority || 0) - (a.priority || 0));
await ctx.set("job_queue", jobs);
console.log(`Job ${job.id} enqueued for organization ${ctx.key}`);
// If no job is currently processing, start processing
if (!currentJob) {
ctx.send(organizationJobController).processNextJob();
}
},
// Process the next job in the queue
async processNextJob(ctx: restate.ObjectContext): Promise<void> {
const currentJob = await ctx.get<Job | null>("current_job");
// If already processing a job, skip
if (currentJob) {
console.log(`Organization ${ctx.key} is already processing job ${currentJob.id}`);
return;
}
const jobs = (await ctx.get<Job[]>("job_queue")) || [];
if (jobs.length === 0) {
console.log(`No jobs in queue for organization ${ctx.key}`);
return;
}
// Get the next job
const nextJob = jobs.shift()!;
await ctx.set("job_queue", jobs);
await ctx.set("current_job", nextJob);
console.log(`Starting job ${nextJob.id} for organization ${ctx.key}`);
try {
// Process the job with retry logic
const result = await ctx.run(`process-job-${nextJob.id}`, async () => {
return await this.executeJob(nextJob);
});
await this.handleJobCompletion(ctx, nextJob, result);
} catch (error) {
console.error(`Job ${nextJob.id} failed:`, error);
await this.handleJobFailure(ctx, nextJob, error as Error);
}
// Clear current job and process next
await ctx.set("current_job", null);
// Schedule next job processing
ctx.send(organizationJobController).processNextJob();
},
// Handle successful job completion
async handleJobCompletion(
ctx: restate.ObjectContext,
job: Job,
result: JobResult
): Promise<void> {
const completedJobs = (await ctx.get<Job[]>("completed_jobs")) || [];
completedJobs.push({
...job,
completedAt: new Date(),
result: result.result
} as any);
// Keep only last 100 completed jobs
if (completedJobs.length > 100) {
completedJobs.splice(0, completedJobs.length - 100);
}
await ctx.set("completed_jobs", completedJobs);
console.log(`Job ${job.id} completed successfully for organization ${ctx.key}`);
// Emit completion event
ctx.send(jobEventHandler).jobCompleted(ctx.key, job.id, result);
},
// Handle job failure with retry logic
async handleJobFailure(
ctx: restate.ObjectContext,
job: Job,
error: Error
): Promise<void> {
job.retryCount = (job.retryCount || 0) + 1;
if (job.retryCount < (job.maxRetries || 3)) {
console.log(`Retrying job ${job.id} (attempt ${job.retryCount + 1})`);
// Add back to queue for retry with exponential backoff
const retryDelay = Math.min(1000 * Math.pow(2, job.retryCount), 30000);
ctx.sendDelayed(
organizationJobController,
retryDelay
).retryJob(job);
} else {
// Max retries reached, move to failed jobs
const failedJobs = (await ctx.get<Job[]>("failed_jobs")) || [];
failedJobs.push({
...job,
failedAt: new Date(),
error: error.message
} as any);
await ctx.set("failed_jobs", failedJobs);
console.log(`Job ${job.id} failed permanently for organization ${ctx.key}`);
// Emit failure event
ctx.send(jobEventHandler).jobFailed(ctx.key, job.id, error.message);
}
},
// Retry a failed job
async retryJob(ctx: restate.ObjectContext, job: Job): Promise<void> {
const jobs = (await ctx.get<Job[]>("job_queue")) || [];
jobs.unshift(job); // Add to front of queue for immediate processing
await ctx.set("job_queue", jobs);
// Process immediately if no current job
const currentJob = await ctx.get<Job | null>("current_job");
if (!currentJob) {
ctx.send(organizationJobController).processNextJob();
}
},
// Get organization job status
async getStatus(ctx: restate.ObjectContext): Promise<{
currentJob: Job | null;
queueLength: number;
completedCount: number;
failedCount: number;
}> {
const currentJob = await ctx.get<Job | null>("current_job");
const jobs = (await ctx.get<Job[]>("job_queue")) || [];
const completedJobs = (await ctx.get<Job[]>("completed_jobs")) || [];
const failedJobs = (await ctx.get<Job[]>("failed_jobs")) || [];
return {
currentJob,
queueLength: jobs.length,
completedCount: completedJobs.length,
failedCount: failedJobs.length
};
},
// Cancel a specific job
async cancelJob(ctx: restate.ObjectContext, jobId: string): Promise<boolean> {
const jobs = (await ctx.get<Job[]>("job_queue")) || [];
const initialLength = jobs.length;
const filteredJobs = jobs.filter(job => job.id !== jobId);
await ctx.set("job_queue", filteredJobs);
const currentJob = await ctx.get<Job | null>("current_job");
if (currentJob?.id === jobId) {
console.log(`Cannot cancel currently processing job ${jobId}`);
return false;
}
return filteredJobs.length < initialLength;
},
// Private method to execute the actual job
async executeJob(job: Job): Promise<JobResult> {
// This is where you'd implement your actual job processing logic
// For this example, we'll simulate different job types
switch (job.type) {
case "data_export":
// Simulate data export
await new Promise(resolve => setTimeout(resolve, 2000));
return {
success: true,
result: { exportedRows: Math.floor(Math.random() * 10000) }
};
case "email_campaign":
// Simulate email sending
await new Promise(resolve => setTimeout(resolve, 5000));
return {
success: true,
result: { emailsSent: job.payload.recipients?.length || 0 }
};
case "report_generation":
// Simulate report generation
await new Promise(resolve => setTimeout(resolve, 3000));
return {
success: true,
result: { reportUrl: `https://reports.example.com/${job.id}.pdf` }
};
default:
throw new Error(`Unknown job type: ${job.type}`);
}
}
}
});
// Service for handling job events
const jobEventHandler = restate.service({
name: "JobEventHandler",
handlers: {
async jobCompleted(
ctx: restate.Context,
organizationId: string,
jobId: string,
result: JobResult
): Promise<void> {
console.log(`Job event: ${jobId} completed for org ${organizationId}`);
// Send webhooks, update metrics, etc.
},
async jobFailed(
ctx: restate.Context,
organizationId: string,
jobId: string,
error: string
): Promise<void> {
console.log(`Job event: ${jobId} failed for org ${organizationId}: ${error}`);
// Send alerts, update metrics, etc.
}
}
});
// Example usage service
const jobManager = restate.service({
name: "JobManager",
handlers: {
// Submit a new job for an organization
async submitJob(
ctx: restate.Context,
organizationId: string,
jobType: string,
payload: any,
priority?: number
): Promise<string> {
const jobId = `job_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
const job: Job = {
id: jobId,
type: jobType,
payload,
priority,
createdAt: new Date()
};
// Send to the organization's job controller
ctx.send(organizationJobController, organizationId).enqueueJob(job);
return jobId;
},
// Get status for all organizations or a specific one
async getOrganizationStatus(
ctx: restate.Context,
organizationId: string
): Promise<any> {
return await ctx.call(organizationJobController, organizationId).getStatus();
}
}
});
// Export the Restate app
export default restate.app({
services: [jobEventHandler, jobManager],
objects: [organizationJobController]
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment