Skip to content

Instantly share code, notes, and snippets.

@mimol91
Created February 16, 2019 10:00
Show Gist options
  • Save mimol91/4f75de25465b6cd7e4041a60bce6692f to your computer and use it in GitHub Desktop.
Save mimol91/4f75de25465b6cd7e4041a60bce6692f to your computer and use it in GitHub Desktop.
func Questions(p graphql.ResolveParams) (interface{}, error) {
resolver := NewResolver(&p, producer.Producer)
err, subscriber := resolver.SendEvent(topic.QuestionnaireTopicName, &protobuf.Message{
Locale: "en-US",
Topic: topic.QuestionnaireTopicName,
Type: proc.List,
Payload: protobuf.CreateQuestionnaire(&questionnaire.QuestionnairePayload_List{
List: &questionnaire.List{
Bucket: "default",
},
}),
}, proc.List)
if err != nil {
return nil, err
}
// making the resolver concurrently by returning a function (thunk).
return func() (interface{}, error) {
data, err := resolver.WaitForResponse(subscriber)
if err != nil {
return nil, err
}
payload, ok := data.(*protobuf.Message_Questionnaire)
if !ok {
return resolver.CastError()
}
var connection []interface{}
for _, question := range payload.Questionnaire.GetListed().Questions {
connection = append(connection, question)
}
return relay.ConnectionFromArray(connection, relay.NewConnectionArguments(p.Args)), nil
}, nil
}
func NewResolver(resolveParams *graphql.ResolveParams, producer *kafka.Producer) *resolver {
return &resolver{resolveParams: resolveParams, producer: producer}
}
type resolver struct {
resolveParams *graphql.ResolveParams
producer *kafka.Producer
uuid string
}
func (r *resolver) SendEvent(topic string, message *protobuf.Message, eventType string) (error, *wmodel.Subscriber) {
uuid, _ := guid.NewString()
r.uuid = uuid
subscriber := wmodel.Subscriber{Result: make(chan *model.Result, 1)}
wsubscriber.Add(uuid, &subscriber)
message.Id = uuid
return r.producer.SendEvent(topic, message, proc.SetHeaders(r.resolveParams.Context, eventType, uuid)), &subscriber
}
func (r *resolver) WaitForResponse(subscriber *wmodel.Subscriber) (interface{}, error) {
timeOutCtx, cancel := context.WithTimeout(r.resolveParams.Context, time.Duration(conf.Main.Subscriber.TimeoutMs)*time.Millisecond)
defer cancel()
defer wsubscriber.Delete(r.uuid)
select {
case res := <-subscriber.Result:
if res.Error != nil {
log.DebugF(res.Error.Error())
return nil, res.Error
}
return res.Data, nil
case <-timeOutCtx.Done():
timeoutErr := werror.NewError(werror.Errors[werror.GoRoutineTimeoutError])
log.Warn("time out in resolver", werror.GoRoutineTimeoutError, timeoutErr, zap.String("event Id", r.uuid))
return nil, timeoutErr
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment