Created
June 2, 2025 22:01
-
-
Save mwufi/daa203f6623101ed6d2b04de0e55ce47 to your computer and use it in GitHub Desktop.
Service example
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import * as restate from "@restatedev/restate-sdk"; | |
| // Job interface | |
| interface Job { | |
| id: string; | |
| type: string; | |
| payload: any; | |
| priority?: number; | |
| createdAt: Date; | |
| maxRetries?: number; | |
| retryCount?: number; | |
| } | |
| interface JobResult { | |
| success: boolean; | |
| result?: any; | |
| error?: string; | |
| } | |
| // Virtual Object for managing jobs per organization | |
| const organizationJobController = restate.object({ | |
| name: "OrganizationJobController", | |
| handlers: { | |
| // Add a job to the organization's queue | |
| async enqueueJob(ctx: restate.ObjectContext, job: Job): Promise<void> { | |
| const jobs = (await ctx.get<Job[]>("job_queue")) || []; | |
| const currentJob = await ctx.get<Job | null>("current_job"); | |
| // Add job to queue | |
| jobs.push({ | |
| ...job, | |
| createdAt: new Date(), | |
| retryCount: 0, | |
| maxRetries: job.maxRetries || 3 | |
| }); | |
| // Sort by priority (higher numbers = higher priority) | |
| jobs.sort((a, b) => (b.priority || 0) - (a.priority || 0)); | |
| await ctx.set("job_queue", jobs); | |
| console.log(`Job ${job.id} enqueued for organization ${ctx.key}`); | |
| // If no job is currently processing, start processing | |
| if (!currentJob) { | |
| ctx.send(organizationJobController).processNextJob(); | |
| } | |
| }, | |
| // Process the next job in the queue | |
| async processNextJob(ctx: restate.ObjectContext): Promise<void> { | |
| const currentJob = await ctx.get<Job | null>("current_job"); | |
| // If already processing a job, skip | |
| if (currentJob) { | |
| console.log(`Organization ${ctx.key} is already processing job ${currentJob.id}`); | |
| return; | |
| } | |
| const jobs = (await ctx.get<Job[]>("job_queue")) || []; | |
| if (jobs.length === 0) { | |
| console.log(`No jobs in queue for organization ${ctx.key}`); | |
| return; | |
| } | |
| // Get the next job | |
| const nextJob = jobs.shift()!; | |
| await ctx.set("job_queue", jobs); | |
| await ctx.set("current_job", nextJob); | |
| console.log(`Starting job ${nextJob.id} for organization ${ctx.key}`); | |
| try { | |
| // Process the job with retry logic | |
| const result = await ctx.run(`process-job-${nextJob.id}`, async () => { | |
| return await this.executeJob(nextJob); | |
| }); | |
| await this.handleJobCompletion(ctx, nextJob, result); | |
| } catch (error) { | |
| console.error(`Job ${nextJob.id} failed:`, error); | |
| await this.handleJobFailure(ctx, nextJob, error as Error); | |
| } | |
| // Clear current job and process next | |
| await ctx.set("current_job", null); | |
| // Schedule next job processing | |
| ctx.send(organizationJobController).processNextJob(); | |
| }, | |
| // Handle successful job completion | |
| async handleJobCompletion( | |
| ctx: restate.ObjectContext, | |
| job: Job, | |
| result: JobResult | |
| ): Promise<void> { | |
| const completedJobs = (await ctx.get<Job[]>("completed_jobs")) || []; | |
| completedJobs.push({ | |
| ...job, | |
| completedAt: new Date(), | |
| result: result.result | |
| } as any); | |
| // Keep only last 100 completed jobs | |
| if (completedJobs.length > 100) { | |
| completedJobs.splice(0, completedJobs.length - 100); | |
| } | |
| await ctx.set("completed_jobs", completedJobs); | |
| console.log(`Job ${job.id} completed successfully for organization ${ctx.key}`); | |
| // Emit completion event | |
| ctx.send(jobEventHandler).jobCompleted(ctx.key, job.id, result); | |
| }, | |
| // Handle job failure with retry logic | |
| async handleJobFailure( | |
| ctx: restate.ObjectContext, | |
| job: Job, | |
| error: Error | |
| ): Promise<void> { | |
| job.retryCount = (job.retryCount || 0) + 1; | |
| if (job.retryCount < (job.maxRetries || 3)) { | |
| console.log(`Retrying job ${job.id} (attempt ${job.retryCount + 1})`); | |
| // Add back to queue for retry with exponential backoff | |
| const retryDelay = Math.min(1000 * Math.pow(2, job.retryCount), 30000); | |
| ctx.sendDelayed( | |
| organizationJobController, | |
| retryDelay | |
| ).retryJob(job); | |
| } else { | |
| // Max retries reached, move to failed jobs | |
| const failedJobs = (await ctx.get<Job[]>("failed_jobs")) || []; | |
| failedJobs.push({ | |
| ...job, | |
| failedAt: new Date(), | |
| error: error.message | |
| } as any); | |
| await ctx.set("failed_jobs", failedJobs); | |
| console.log(`Job ${job.id} failed permanently for organization ${ctx.key}`); | |
| // Emit failure event | |
| ctx.send(jobEventHandler).jobFailed(ctx.key, job.id, error.message); | |
| } | |
| }, | |
| // Retry a failed job | |
| async retryJob(ctx: restate.ObjectContext, job: Job): Promise<void> { | |
| const jobs = (await ctx.get<Job[]>("job_queue")) || []; | |
| jobs.unshift(job); // Add to front of queue for immediate processing | |
| await ctx.set("job_queue", jobs); | |
| // Process immediately if no current job | |
| const currentJob = await ctx.get<Job | null>("current_job"); | |
| if (!currentJob) { | |
| ctx.send(organizationJobController).processNextJob(); | |
| } | |
| }, | |
| // Get organization job status | |
| async getStatus(ctx: restate.ObjectContext): Promise<{ | |
| currentJob: Job | null; | |
| queueLength: number; | |
| completedCount: number; | |
| failedCount: number; | |
| }> { | |
| const currentJob = await ctx.get<Job | null>("current_job"); | |
| const jobs = (await ctx.get<Job[]>("job_queue")) || []; | |
| const completedJobs = (await ctx.get<Job[]>("completed_jobs")) || []; | |
| const failedJobs = (await ctx.get<Job[]>("failed_jobs")) || []; | |
| return { | |
| currentJob, | |
| queueLength: jobs.length, | |
| completedCount: completedJobs.length, | |
| failedCount: failedJobs.length | |
| }; | |
| }, | |
| // Cancel a specific job | |
| async cancelJob(ctx: restate.ObjectContext, jobId: string): Promise<boolean> { | |
| const jobs = (await ctx.get<Job[]>("job_queue")) || []; | |
| const initialLength = jobs.length; | |
| const filteredJobs = jobs.filter(job => job.id !== jobId); | |
| await ctx.set("job_queue", filteredJobs); | |
| const currentJob = await ctx.get<Job | null>("current_job"); | |
| if (currentJob?.id === jobId) { | |
| console.log(`Cannot cancel currently processing job ${jobId}`); | |
| return false; | |
| } | |
| return filteredJobs.length < initialLength; | |
| }, | |
| // Private method to execute the actual job | |
| async executeJob(job: Job): Promise<JobResult> { | |
| // This is where you'd implement your actual job processing logic | |
| // For this example, we'll simulate different job types | |
| switch (job.type) { | |
| case "data_export": | |
| // Simulate data export | |
| await new Promise(resolve => setTimeout(resolve, 2000)); | |
| return { | |
| success: true, | |
| result: { exportedRows: Math.floor(Math.random() * 10000) } | |
| }; | |
| case "email_campaign": | |
| // Simulate email sending | |
| await new Promise(resolve => setTimeout(resolve, 5000)); | |
| return { | |
| success: true, | |
| result: { emailsSent: job.payload.recipients?.length || 0 } | |
| }; | |
| case "report_generation": | |
| // Simulate report generation | |
| await new Promise(resolve => setTimeout(resolve, 3000)); | |
| return { | |
| success: true, | |
| result: { reportUrl: `https://reports.example.com/${job.id}.pdf` } | |
| }; | |
| default: | |
| throw new Error(`Unknown job type: ${job.type}`); | |
| } | |
| } | |
| } | |
| }); | |
| // Service for handling job events | |
| const jobEventHandler = restate.service({ | |
| name: "JobEventHandler", | |
| handlers: { | |
| async jobCompleted( | |
| ctx: restate.Context, | |
| organizationId: string, | |
| jobId: string, | |
| result: JobResult | |
| ): Promise<void> { | |
| console.log(`Job event: ${jobId} completed for org ${organizationId}`); | |
| // Send webhooks, update metrics, etc. | |
| }, | |
| async jobFailed( | |
| ctx: restate.Context, | |
| organizationId: string, | |
| jobId: string, | |
| error: string | |
| ): Promise<void> { | |
| console.log(`Job event: ${jobId} failed for org ${organizationId}: ${error}`); | |
| // Send alerts, update metrics, etc. | |
| } | |
| } | |
| }); | |
| // Example usage service | |
| const jobManager = restate.service({ | |
| name: "JobManager", | |
| handlers: { | |
| // Submit a new job for an organization | |
| async submitJob( | |
| ctx: restate.Context, | |
| organizationId: string, | |
| jobType: string, | |
| payload: any, | |
| priority?: number | |
| ): Promise<string> { | |
| const jobId = `job_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`; | |
| const job: Job = { | |
| id: jobId, | |
| type: jobType, | |
| payload, | |
| priority, | |
| createdAt: new Date() | |
| }; | |
| // Send to the organization's job controller | |
| ctx.send(organizationJobController, organizationId).enqueueJob(job); | |
| return jobId; | |
| }, | |
| // Get status for all organizations or a specific one | |
| async getOrganizationStatus( | |
| ctx: restate.Context, | |
| organizationId: string | |
| ): Promise<any> { | |
| return await ctx.call(organizationJobController, organizationId).getStatus(); | |
| } | |
| } | |
| }); | |
| // Export the Restate app | |
| export default restate.app({ | |
| services: [jobEventHandler, jobManager], | |
| objects: [organizationJobController] | |
| }); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment