Last active
March 3, 2022 06:14
-
-
Save percybolmer/22e1e40814d6dfb3f6db73c71666b64f to your computer and use it in GitHub Desktop.
Cadence how to restart workflow
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package orders | |
import ( | |
"time" | |
"go.uber.org/cadence/workflow" | |
"go.uber.org/zap" | |
) | |
// Order is a simple type to represent orders made | |
type Order struct { | |
Item string `json:"item"` | |
Price float32 `json:"price"` | |
By string `json:"by"` | |
} | |
func init() { | |
workflow.Register(workflowOrder) | |
} | |
// MaxSignalsAmount is how many signals we accept before restart | |
// Cadence recommends a production workflow to have <1000 | |
const MaxSignalsAmount = 3 | |
// workflowOrder will handle incomming Orders | |
func workflowOrder(ctx workflow.Context) error { | |
ao := workflow.ActivityOptions{ | |
ScheduleToStartTimeout: time.Minute * 60, | |
StartToCloseTimeout: time.Minute * 60, | |
HeartbeatTimeout: time.Hour * 20, | |
// Here we will Add Retry policies etc later | |
} | |
// Add the Options to Context to apply configurations | |
ctx = workflow.WithActivityOptions(ctx, ao) | |
logger := workflow.GetLogger(ctx) | |
logger.Info("Waiting for Orders") | |
// restartWorkflow | |
var restartWorkflow bool | |
// signalCounter | |
signalCount := 0 | |
// Grab the Selector from the workflow Context, | |
selector := workflow.NewSelector(ctx) | |
// For ever running loop | |
for { | |
// Get the Signal used to identify an Event, we named our Order event into order | |
signalChan := workflow.GetSignalChannel(ctx, "order") | |
// We add a "Receiver" to the Selector, The receiver is a function that will trigger once a new Signal is recieved | |
selector.AddReceive(signalChan, func(c workflow.Channel, more bool) { | |
// Create the Order to marshal the Input into | |
var order Order | |
// Receive will read input data into the struct | |
c.Receive(ctx, &order) | |
// increment signal counter | |
signalCount++ | |
logger.Info("Order made", zap.String("item", order.Item), zap.Float32("price", order.Price)) | |
}) | |
if signalCount >= MaxSignalsAmount { | |
// We should restart | |
// Add a Default to the selector, which will make sure that this is triggered once all jobs in queue are done | |
selector.AddDefault(func() { | |
restartWorkflow = true | |
}) | |
} | |
selector.Select(ctx) | |
// If its time to restart, return the ContinueAsNew | |
if restartWorkflow { | |
return workflow.NewContinueAsNewError(ctx, workflowOrder) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment