Skip to content

Instantly share code, notes, and snippets.

@percybolmer
Last active March 3, 2022 06:14
Show Gist options
  • Save percybolmer/22e1e40814d6dfb3f6db73c71666b64f to your computer and use it in GitHub Desktop.
Save percybolmer/22e1e40814d6dfb3f6db73c71666b64f to your computer and use it in GitHub Desktop.
Cadence how to restart workflow
package orders
import (
"time"
"go.uber.org/cadence/workflow"
"go.uber.org/zap"
)
// Order is a simple type to represent orders made
type Order struct {
Item string `json:"item"`
Price float32 `json:"price"`
By string `json:"by"`
}
func init() {
workflow.Register(workflowOrder)
}
// MaxSignalsAmount is how many signals we accept before restart
// Cadence recommends a production workflow to have <1000
const MaxSignalsAmount = 3
// workflowOrder will handle incomming Orders
func workflowOrder(ctx workflow.Context) error {
ao := workflow.ActivityOptions{
ScheduleToStartTimeout: time.Minute * 60,
StartToCloseTimeout: time.Minute * 60,
HeartbeatTimeout: time.Hour * 20,
// Here we will Add Retry policies etc later
}
// Add the Options to Context to apply configurations
ctx = workflow.WithActivityOptions(ctx, ao)
logger := workflow.GetLogger(ctx)
logger.Info("Waiting for Orders")
// restartWorkflow
var restartWorkflow bool
// signalCounter
signalCount := 0
// Grab the Selector from the workflow Context,
selector := workflow.NewSelector(ctx)
// For ever running loop
for {
// Get the Signal used to identify an Event, we named our Order event into order
signalChan := workflow.GetSignalChannel(ctx, "order")
// We add a "Receiver" to the Selector, The receiver is a function that will trigger once a new Signal is recieved
selector.AddReceive(signalChan, func(c workflow.Channel, more bool) {
// Create the Order to marshal the Input into
var order Order
// Receive will read input data into the struct
c.Receive(ctx, &order)
// increment signal counter
signalCount++
logger.Info("Order made", zap.String("item", order.Item), zap.Float32("price", order.Price))
})
if signalCount >= MaxSignalsAmount {
// We should restart
// Add a Default to the selector, which will make sure that this is triggered once all jobs in queue are done
selector.AddDefault(func() {
restartWorkflow = true
})
}
selector.Select(ctx)
// If its time to restart, return the ContinueAsNew
if restartWorkflow {
return workflow.NewContinueAsNewError(ctx, workflowOrder)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment