Skip to content

Instantly share code, notes, and snippets.

@EronWright
Last active July 17, 2025 19:19
Show Gist options
  • Save EronWright/8c0c77bde51105a6bbace09dcf5f3c57 to your computer and use it in GitHub Desktop.
Save EronWright/8c0c77bde51105a6bbace09dcf5f3c57 to your computer and use it in GitHub Desktop.
Restate.dev "Work Queue" service
package workqueue
import (
restate "github.com/restatedev/sdk-go"
)
// WorkQueueExample is an example workflow that demonstrates how to use the FixedWorkQueue as a blocking queue.
// The workflow enqueues an abstract "work item" to the queue, waits to be scheduled, does some work, and then
// notifies the queue. The "work" in this case is simply waiting for a callback to be invoked.
//
// Try running two instances of this workflow, and observe how the second one waits until the first one
// completes before it enters the "working" state and does any "work". Invoke the callback with an arbitrary message
// to allow the workflow to complete.
type WorkQueueExample struct {
}
func (w *WorkQueueExample) ServiceName() string {
return "workqueue.WorkQueueExample"
}
func (w *WorkQueueExample) Run(ctx restate.WorkflowContext) error {
queueKey := "example"
queueClient := NewFixedWorkQueueClient(ctx, queueKey)
// Schedule a request through the work queue
continuationSignal := restate.Awakeable[string](ctx)
queueClient.Enqueue().
Send(continuationSignal.Id(), restate.WithIdempotencyKey(continuationSignal.Id()))
ctx.Log().Info("Waiting for a continuation signal to begin work", "id", continuationSignal.Id())
// Wait for the work queue to schedule the work item and give you a completion signal
restate.Set(ctx, "status", "queued")
completionId, err := continuationSignal.Result()
if err != nil {
return err
}
ctx.Log().Info("Got a completion signal", "id", completionId)
// Wait for the work to be completed, e.g. to recieve a message via a callback
restate.Set(ctx, "status", "working")
message, err := restate.Promise[string](ctx, "callback").Result()
if err != nil {
return err
}
ctx.Log().Info("My work here is done", "message", message)
// Signal completion of the work item to the work queue, to allow for more items to be scheduled
restate.Set(ctx, "status", "done")
restate.ResolveAwakeable(ctx, completionId, restate.Void{})
return nil
}
type CallbackRequest struct {
Message string `json:"message"`
}
func (w *WorkQueueExample) Callback(ctx restate.WorkflowSharedContext, request CallbackRequest) error {
return restate.Promise[string](ctx, "callback").Resolve(request.Message)
}
package workqueue
import (
restate "github.com/restatedev/sdk-go"
)
// FixedWorkQueueClient is the client API for workqueue.FixedWorkQueue service.
type FixedWorkQueueClient interface {
Configure(opts ...restate.ClientOption) restate.Client[*ConfigureRequest, restate.Void]
// Enqueue adds a new work item to the queue.
// id is the awakeable ID to be signaled when the work item can proceed.
// The awakeable should accept a string result, which is the id of an awakeable
// to be signaled by the caller when the work item is complete.
// For example, the caller can create an awakeable like this:
//
// continuationSignal := restate.Awakeable[string](ctx)
// queueClient := workqueue.NewFixedWorkQueueClient(ctx, queueKey)
// queueClient.Enqueue().Send(continuationSignal.Id())
// // Wait for the work queue to schedule the work item
// completionId, err := continuationSignal.Result()
// if err != nil {
// return err
// }
//
// // Do the work...
// // Signal completion of the work item
// restate.ResolveAwakeable(ctx, completionId, restate.Void{})
Enqueue(opts ...restate.ClientOption) restate.Client[string, restate.Void]
}
type fixedWorkQueueClient struct {
ctx restate.Context
key string
options []restate.ClientOption
}
func NewFixedWorkQueueClient(ctx restate.Context, key string, opts ...restate.ClientOption) FixedWorkQueueClient {
// cOpts := append([]restate.ClientOption{restate.WithProtoJSON}, opts...)
return &fixedWorkQueueClient{ctx, key, opts}
}
func (c *fixedWorkQueueClient) Configure(opts ...restate.ClientOption) restate.Client[*ConfigureRequest, restate.Void] {
cOpts := c.options
if len(opts) > 0 {
cOpts = append(append([]restate.ClientOption{}, cOpts...), opts...)
}
return restate.WithRequestType[*ConfigureRequest](restate.Object[restate.Void](c.ctx, "workqueue.FixedWorkQueue", c.key, "Configure", cOpts...))
}
func (c *fixedWorkQueueClient) Enqueue(opts ...restate.ClientOption) restate.Client[string, restate.Void] {
cOpts := c.options
if len(opts) > 0 {
cOpts = append(append([]restate.ClientOption{}, cOpts...), opts...)
}
return restate.WithRequestType[string](restate.Object[restate.Void](c.ctx, "workqueue.FixedWorkQueue", c.key, "Enqueue", cOpts...))
}
package workqueue
import (
"slices"
restate "github.com/restatedev/sdk-go"
)
const (
defaultCapacity = 1
)
// FixedWorkQueue implements a simple fixed-capacity work queue.
// The queue processes work items in the order they are received,
// up to the configured capacity.
//
// Callers can enqueue work items by calling Enqueue with an awakeable id.
// At execution time, the queue will signal the awakeable to proceed.
// When the work item is complete, it must signal back to the queue to mark itself complete.
type FixedWorkQueue struct {
}
func NewFixedWorkQueue() *FixedWorkQueue {
return &FixedWorkQueue{}
}
func (q *FixedWorkQueue) ServiceName() string {
return "workqueue.FixedWorkQueue"
}
type ConfigureRequest struct {
Capacity int `json:"capacity"`
}
// Initialize sets up the workspace pool with configuration
func (q *FixedWorkQueue) Configure(ctx restate.ObjectContext, config ConfigureRequest) error {
restate.Set(ctx, "capacity", config.Capacity)
// Dequeue the next item(s) if there is increased capacity
restate.ObjectSend(ctx, "workqueue.FixedWorkQueue", restate.Key(ctx), "Dequeue").Send(restate.Void{})
return nil
}
// Enqueue adds a new work item to the queue.
// id is the awakeable ID to be signaled when the work item can proceed.
// The awakeable should accept a string result, which is the id of an awakeable
// to be signaled by the caller when the work item is complete.
// For example, the caller can create an awakeable like this:
//
// continuationSignal := restate.Awakeable[string](ctx)
// restate.ObjectSend(ctx, "workqueue.FixedWorkQueue", queueKey, "Enqueue").
// Send(continuationSignal.Id())
//
// // Wait for the work queue to schedule the work item
// completionId, err := continuationSignal.Result()
// if err != nil {
// return err
// }
//
// // Do the work...
// // Signal completion of the work item
// restate.ResolveAwakeable(ctx, completionId, restate.Void{})
func (q *FixedWorkQueue) Enqueue(ctx restate.ObjectContext, id string) error {
// Append the request to the queue
queue, err := restate.Get[[]string](ctx, "queue")
if err != nil {
queue = []string{}
}
queue = append(queue, id)
restate.Set(ctx, "queue", queue)
ctx.Log().Info("Enqueued a work item", "id", id)
restate.ObjectSend(ctx, "workqueue.FixedWorkQueue", restate.Key(ctx), "Dequeue").Send(restate.Void{})
return nil
}
func (q *FixedWorkQueue) Dequeue(ctx restate.ObjectContext) error {
capacity := getCapacity(ctx)
current, err := restate.Get[[]string](ctx, "current")
if err != nil || current == nil {
current = []string{}
}
queue, err := restate.Get[[]string](ctx, "queue")
if err != nil || queue == nil {
queue = []string{}
}
for len(current) < capacity && len(queue) > 0 {
// Dequeue the next item
id := queue[0]
queue = queue[1:]
current = append(current, id)
restate.ObjectSend(ctx, "workqueue.FixedWorkQueue", restate.Key(ctx), "Do").
Send(id, restate.WithIdempotencyKey(id))
ctx.Log().Info("Dequeued a work item", "id", id)
}
restate.Set(ctx, "current", current)
restate.Set(ctx, "queue", queue)
return nil
}
func getCapacity(ctx restate.ObjectContext) int {
capacity, err := restate.Get[*int](ctx, "capacity")
if err != nil || capacity == nil {
return defaultCapacity
}
return *capacity
}
func (q *FixedWorkQueue) Do(ctx restate.ObjectSharedContext, id string) error {
// signal the work item to proceed
completionSignal := restate.Awakeable[restate.Void](ctx)
restate.ResolveAwakeable(ctx, id, completionSignal.Id())
// wait for completion
_, err := completionSignal.Result()
if err != nil {
return err
}
// Update the work queue to mark this item as complete
restate.ObjectSend(ctx, "workqueue.FixedWorkQueue", restate.Key(ctx), "OnComplete").
Send(id, restate.WithIdempotencyKey(id))
return nil
}
func (q *FixedWorkQueue) OnComplete(ctx restate.ObjectContext, id string) error {
// Remove the completed item from the set of current work items
current, _ := restate.Get[[]string](ctx, "current")
current = slices.DeleteFunc(current, func(item string) bool { return item == id })
restate.Set(ctx, "current", current)
ctx.Log().Info("Completed a work item", "id", id)
// Dequeue the next item(s) if there is capacity
restate.ObjectSend(ctx, "workqueue.FixedWorkQueue", restate.Key(ctx), "Dequeue").Send(restate.Void{})
return nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment