Last active
July 17, 2025 19:19
-
-
Save EronWright/8c0c77bde51105a6bbace09dcf5f3c57 to your computer and use it in GitHub Desktop.
Restate.dev "Work Queue" service
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package workqueue | |
import ( | |
restate "github.com/restatedev/sdk-go" | |
) | |
// WorkQueueExample is an example workflow that demonstrates how to use the FixedWorkQueue as a blocking queue. | |
// The workflow enqueues an abstract "work item" to the queue, waits to be scheduled, does some work, and then | |
// notifies the queue. The "work" in this case is simply waiting for a callback to be invoked. | |
// | |
// Try running two instances of this workflow, and observe how the second one waits until the first one | |
// completes before it enters the "working" state and does any "work". Invoke the callback with an arbitrary message | |
// to allow the workflow to complete. | |
type WorkQueueExample struct { | |
} | |
func (w *WorkQueueExample) ServiceName() string { | |
return "workqueue.WorkQueueExample" | |
} | |
func (w *WorkQueueExample) Run(ctx restate.WorkflowContext) error { | |
queueKey := "example" | |
queueClient := NewFixedWorkQueueClient(ctx, queueKey) | |
// Schedule a request through the work queue | |
continuationSignal := restate.Awakeable[string](ctx) | |
queueClient.Enqueue(). | |
Send(continuationSignal.Id(), restate.WithIdempotencyKey(continuationSignal.Id())) | |
ctx.Log().Info("Waiting for a continuation signal to begin work", "id", continuationSignal.Id()) | |
// Wait for the work queue to schedule the work item and give you a completion signal | |
restate.Set(ctx, "status", "queued") | |
completionId, err := continuationSignal.Result() | |
if err != nil { | |
return err | |
} | |
ctx.Log().Info("Got a completion signal", "id", completionId) | |
// Wait for the work to be completed, e.g. to recieve a message via a callback | |
restate.Set(ctx, "status", "working") | |
message, err := restate.Promise[string](ctx, "callback").Result() | |
if err != nil { | |
return err | |
} | |
ctx.Log().Info("My work here is done", "message", message) | |
// Signal completion of the work item to the work queue, to allow for more items to be scheduled | |
restate.Set(ctx, "status", "done") | |
restate.ResolveAwakeable(ctx, completionId, restate.Void{}) | |
return nil | |
} | |
type CallbackRequest struct { | |
Message string `json:"message"` | |
} | |
func (w *WorkQueueExample) Callback(ctx restate.WorkflowSharedContext, request CallbackRequest) error { | |
return restate.Promise[string](ctx, "callback").Resolve(request.Message) | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package workqueue | |
import ( | |
restate "github.com/restatedev/sdk-go" | |
) | |
// FixedWorkQueueClient is the client API for workqueue.FixedWorkQueue service. | |
type FixedWorkQueueClient interface { | |
Configure(opts ...restate.ClientOption) restate.Client[*ConfigureRequest, restate.Void] | |
// Enqueue adds a new work item to the queue. | |
// id is the awakeable ID to be signaled when the work item can proceed. | |
// The awakeable should accept a string result, which is the id of an awakeable | |
// to be signaled by the caller when the work item is complete. | |
// For example, the caller can create an awakeable like this: | |
// | |
// continuationSignal := restate.Awakeable[string](ctx) | |
// queueClient := workqueue.NewFixedWorkQueueClient(ctx, queueKey) | |
// queueClient.Enqueue().Send(continuationSignal.Id()) | |
// // Wait for the work queue to schedule the work item | |
// completionId, err := continuationSignal.Result() | |
// if err != nil { | |
// return err | |
// } | |
// | |
// // Do the work... | |
// // Signal completion of the work item | |
// restate.ResolveAwakeable(ctx, completionId, restate.Void{}) | |
Enqueue(opts ...restate.ClientOption) restate.Client[string, restate.Void] | |
} | |
type fixedWorkQueueClient struct { | |
ctx restate.Context | |
key string | |
options []restate.ClientOption | |
} | |
func NewFixedWorkQueueClient(ctx restate.Context, key string, opts ...restate.ClientOption) FixedWorkQueueClient { | |
// cOpts := append([]restate.ClientOption{restate.WithProtoJSON}, opts...) | |
return &fixedWorkQueueClient{ctx, key, opts} | |
} | |
func (c *fixedWorkQueueClient) Configure(opts ...restate.ClientOption) restate.Client[*ConfigureRequest, restate.Void] { | |
cOpts := c.options | |
if len(opts) > 0 { | |
cOpts = append(append([]restate.ClientOption{}, cOpts...), opts...) | |
} | |
return restate.WithRequestType[*ConfigureRequest](restate.Object[restate.Void](c.ctx, "workqueue.FixedWorkQueue", c.key, "Configure", cOpts...)) | |
} | |
func (c *fixedWorkQueueClient) Enqueue(opts ...restate.ClientOption) restate.Client[string, restate.Void] { | |
cOpts := c.options | |
if len(opts) > 0 { | |
cOpts = append(append([]restate.ClientOption{}, cOpts...), opts...) | |
} | |
return restate.WithRequestType[string](restate.Object[restate.Void](c.ctx, "workqueue.FixedWorkQueue", c.key, "Enqueue", cOpts...)) | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package workqueue | |
import ( | |
"slices" | |
restate "github.com/restatedev/sdk-go" | |
) | |
const ( | |
defaultCapacity = 1 | |
) | |
// FixedWorkQueue implements a simple fixed-capacity work queue. | |
// The queue processes work items in the order they are received, | |
// up to the configured capacity. | |
// | |
// Callers can enqueue work items by calling Enqueue with an awakeable id. | |
// At execution time, the queue will signal the awakeable to proceed. | |
// When the work item is complete, it must signal back to the queue to mark itself complete. | |
type FixedWorkQueue struct { | |
} | |
func NewFixedWorkQueue() *FixedWorkQueue { | |
return &FixedWorkQueue{} | |
} | |
func (q *FixedWorkQueue) ServiceName() string { | |
return "workqueue.FixedWorkQueue" | |
} | |
type ConfigureRequest struct { | |
Capacity int `json:"capacity"` | |
} | |
// Initialize sets up the workspace pool with configuration | |
func (q *FixedWorkQueue) Configure(ctx restate.ObjectContext, config ConfigureRequest) error { | |
restate.Set(ctx, "capacity", config.Capacity) | |
// Dequeue the next item(s) if there is increased capacity | |
restate.ObjectSend(ctx, "workqueue.FixedWorkQueue", restate.Key(ctx), "Dequeue").Send(restate.Void{}) | |
return nil | |
} | |
// Enqueue adds a new work item to the queue. | |
// id is the awakeable ID to be signaled when the work item can proceed. | |
// The awakeable should accept a string result, which is the id of an awakeable | |
// to be signaled by the caller when the work item is complete. | |
// For example, the caller can create an awakeable like this: | |
// | |
// continuationSignal := restate.Awakeable[string](ctx) | |
// restate.ObjectSend(ctx, "workqueue.FixedWorkQueue", queueKey, "Enqueue"). | |
// Send(continuationSignal.Id()) | |
// | |
// // Wait for the work queue to schedule the work item | |
// completionId, err := continuationSignal.Result() | |
// if err != nil { | |
// return err | |
// } | |
// | |
// // Do the work... | |
// // Signal completion of the work item | |
// restate.ResolveAwakeable(ctx, completionId, restate.Void{}) | |
func (q *FixedWorkQueue) Enqueue(ctx restate.ObjectContext, id string) error { | |
// Append the request to the queue | |
queue, err := restate.Get[[]string](ctx, "queue") | |
if err != nil { | |
queue = []string{} | |
} | |
queue = append(queue, id) | |
restate.Set(ctx, "queue", queue) | |
ctx.Log().Info("Enqueued a work item", "id", id) | |
restate.ObjectSend(ctx, "workqueue.FixedWorkQueue", restate.Key(ctx), "Dequeue").Send(restate.Void{}) | |
return nil | |
} | |
func (q *FixedWorkQueue) Dequeue(ctx restate.ObjectContext) error { | |
capacity := getCapacity(ctx) | |
current, err := restate.Get[[]string](ctx, "current") | |
if err != nil || current == nil { | |
current = []string{} | |
} | |
queue, err := restate.Get[[]string](ctx, "queue") | |
if err != nil || queue == nil { | |
queue = []string{} | |
} | |
for len(current) < capacity && len(queue) > 0 { | |
// Dequeue the next item | |
id := queue[0] | |
queue = queue[1:] | |
current = append(current, id) | |
restate.ObjectSend(ctx, "workqueue.FixedWorkQueue", restate.Key(ctx), "Do"). | |
Send(id, restate.WithIdempotencyKey(id)) | |
ctx.Log().Info("Dequeued a work item", "id", id) | |
} | |
restate.Set(ctx, "current", current) | |
restate.Set(ctx, "queue", queue) | |
return nil | |
} | |
func getCapacity(ctx restate.ObjectContext) int { | |
capacity, err := restate.Get[*int](ctx, "capacity") | |
if err != nil || capacity == nil { | |
return defaultCapacity | |
} | |
return *capacity | |
} | |
func (q *FixedWorkQueue) Do(ctx restate.ObjectSharedContext, id string) error { | |
// signal the work item to proceed | |
completionSignal := restate.Awakeable[restate.Void](ctx) | |
restate.ResolveAwakeable(ctx, id, completionSignal.Id()) | |
// wait for completion | |
_, err := completionSignal.Result() | |
if err != nil { | |
return err | |
} | |
// Update the work queue to mark this item as complete | |
restate.ObjectSend(ctx, "workqueue.FixedWorkQueue", restate.Key(ctx), "OnComplete"). | |
Send(id, restate.WithIdempotencyKey(id)) | |
return nil | |
} | |
func (q *FixedWorkQueue) OnComplete(ctx restate.ObjectContext, id string) error { | |
// Remove the completed item from the set of current work items | |
current, _ := restate.Get[[]string](ctx, "current") | |
current = slices.DeleteFunc(current, func(item string) bool { return item == id }) | |
restate.Set(ctx, "current", current) | |
ctx.Log().Info("Completed a work item", "id", id) | |
// Dequeue the next item(s) if there is capacity | |
restate.ObjectSend(ctx, "workqueue.FixedWorkQueue", restate.Key(ctx), "Dequeue").Send(restate.Void{}) | |
return nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment