Created
March 3, 2022 06:26
-
-
Save percybolmer/ad0bf0986f5adf9568b8376c7e06d7e7 to your computer and use it in GitHub Desktop.
Cadence order workflow
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package orders | |
import ( | |
"context" | |
"errors" | |
"programmingpercy/cadence-tavern/customer" | |
"time" | |
"go.uber.org/cadence/activity" | |
"go.uber.org/cadence/workflow" | |
"go.uber.org/zap" | |
) | |
// Order is a simple type to represent orders made | |
type Order struct { | |
Item string `json:"item"` | |
Price float32 `json:"price"` | |
By string `json:"by"` | |
} | |
func init() { | |
workflow.Register(workflowOrder) | |
workflow.Register(workflowProcessOrder) | |
activity.Register(activityIsCustomerLegal) | |
activity.Register(activitiyFindCustomerByName) | |
} | |
// MaxSignalsAmount is how many signals we accept before restart | |
// Cadence recommends a production workflow to have <1000 | |
const MaxSignalsAmount = 3 | |
// workflowOrder will handle incomming Orders | |
func workflowOrder(ctx workflow.Context) error { | |
ao := workflow.ActivityOptions{ | |
ScheduleToStartTimeout: time.Minute * 60, | |
StartToCloseTimeout: time.Minute * 60, | |
HeartbeatTimeout: time.Hour * 20, | |
// Here we will Add Retry policies etc later | |
} | |
// Add the Options to Context to apply configurations | |
ctx = workflow.WithActivityOptions(ctx, ao) | |
logger := workflow.GetLogger(ctx) | |
logger.Info("Waiting for Orders") | |
// restartWorkflow | |
var restartWorkflow bool | |
// signalCounter | |
signalCount := 0 | |
// Preconfigure ChildWorkflow Options | |
orderWaiterCfg := workflow.ChildWorkflowOptions{ | |
ExecutionStartToCloseTimeout: time.Minute * 2, // Each Order can tops take 2 min | |
} | |
// Grab the Selector from the workflow Context, | |
selector := workflow.NewSelector(ctx) | |
// For ever running loop | |
for { | |
// Get the Signal used to identify an Event, we named our Order event into order | |
signalChan := workflow.GetSignalChannel(ctx, "order") | |
// We add a "Receiver" to the Selector, The receiver is a function that will trigger once a new Signal is recieved | |
selector.AddReceive(signalChan, func(c workflow.Channel, more bool) { | |
// Create the Order to marshal the Input into | |
var order Order | |
// Receive will read input data into the struct | |
c.Receive(ctx, &order) | |
// increment signal counter | |
signalCount++ | |
// Create ctx for Child flow | |
orderCtx := workflow.WithChildOptions(ctx, orderWaiterCfg) | |
// Trigger the child workflow | |
waiter := workflow.ExecuteChildWorkflow(orderCtx, workflowProcessOrder, order) | |
if err := waiter.Get(ctx, nil); err != nil { | |
workflow.GetLogger(ctx).Error("Order has failed.", zap.Error(err)) | |
} | |
}) | |
if signalCount >= MaxSignalsAmount { | |
// We should restart | |
// Add a Default to the selector, which will make sure that this is triggered once all jobs in queue are done | |
selector.AddDefault(func() { | |
restartWorkflow = true | |
}) | |
} | |
selector.Select(ctx) | |
// If its time to restart, return the ContinueAsNew | |
if restartWorkflow { | |
return workflow.NewContinueAsNewError(ctx, workflowOrder) | |
} | |
} | |
} | |
// workflowProcessOrder is used to handle orders and will be ran as a CHILD | |
func workflowProcessOrder(ctx workflow.Context, order Order) error { | |
logger := workflow.GetLogger(ctx) | |
logger.Info("process order workflow started") | |
ao := workflow.ActivityOptions{ | |
ScheduleToStartTimeout: time.Minute, | |
StartToCloseTimeout: time.Minute, | |
HeartbeatTimeout: time.Second * 20, | |
// Here we will Add Retry policies etc later | |
} | |
// Add the Options to Context to apply configurations | |
ctx = workflow.WithActivityOptions(ctx, ao) | |
// Find Customer from Repo | |
var cust customer.Customer | |
err := workflow.ExecuteActivity(ctx, activitiyFindCustomerByName, order.By).Get(ctx, &cust) | |
if err != nil { | |
logger.Error("Customer is not in the Tavern", zap.Error(err)) | |
return err | |
} | |
var allowed bool | |
err = workflow.ExecuteActivity(ctx, activityIsCustomerLegal, cust).Get(ctx, &allowed) | |
if err != nil { | |
logger.Error("Customer is not of age", zap.Error(err)) | |
return err | |
} | |
logger.Info("Order made", zap.String("item", order.Item), zap.Float32("price", order.Price)) | |
return nil | |
} | |
// activityFindCustomerByName is used to find the Customer is in the Tavern | |
func activitiyFindCustomerByName(ctx context.Context, name string) (customer.Customer, error) { | |
return customer.Database.Get(name) | |
} | |
// activityIsCustomerLegal is used to check the age of the customer | |
func activityIsCustomerLegal(ctx context.Context, visitor customer.Customer) (bool, error) { | |
if visitor.Age < 18 { | |
return false, errors.New("customer is not old enough, dont serve him") | |
} | |
return true, nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment