Created
February 16, 2019 10:00
-
-
Save mimol91/4f75de25465b6cd7e4041a60bce6692f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
func Questions(p graphql.ResolveParams) (interface{}, error) { | |
resolver := NewResolver(&p, producer.Producer) | |
err, subscriber := resolver.SendEvent(topic.QuestionnaireTopicName, &protobuf.Message{ | |
Locale: "en-US", | |
Topic: topic.QuestionnaireTopicName, | |
Type: proc.List, | |
Payload: protobuf.CreateQuestionnaire(&questionnaire.QuestionnairePayload_List{ | |
List: &questionnaire.List{ | |
Bucket: "default", | |
}, | |
}), | |
}, proc.List) | |
if err != nil { | |
return nil, err | |
} | |
// making the resolver concurrently by returning a function (thunk). | |
return func() (interface{}, error) { | |
data, err := resolver.WaitForResponse(subscriber) | |
if err != nil { | |
return nil, err | |
} | |
payload, ok := data.(*protobuf.Message_Questionnaire) | |
if !ok { | |
return resolver.CastError() | |
} | |
var connection []interface{} | |
for _, question := range payload.Questionnaire.GetListed().Questions { | |
connection = append(connection, question) | |
} | |
return relay.ConnectionFromArray(connection, relay.NewConnectionArguments(p.Args)), nil | |
}, nil | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
func NewResolver(resolveParams *graphql.ResolveParams, producer *kafka.Producer) *resolver { | |
return &resolver{resolveParams: resolveParams, producer: producer} | |
} | |
type resolver struct { | |
resolveParams *graphql.ResolveParams | |
producer *kafka.Producer | |
uuid string | |
} | |
func (r *resolver) SendEvent(topic string, message *protobuf.Message, eventType string) (error, *wmodel.Subscriber) { | |
uuid, _ := guid.NewString() | |
r.uuid = uuid | |
subscriber := wmodel.Subscriber{Result: make(chan *model.Result, 1)} | |
wsubscriber.Add(uuid, &subscriber) | |
message.Id = uuid | |
return r.producer.SendEvent(topic, message, proc.SetHeaders(r.resolveParams.Context, eventType, uuid)), &subscriber | |
} | |
func (r *resolver) WaitForResponse(subscriber *wmodel.Subscriber) (interface{}, error) { | |
timeOutCtx, cancel := context.WithTimeout(r.resolveParams.Context, time.Duration(conf.Main.Subscriber.TimeoutMs)*time.Millisecond) | |
defer cancel() | |
defer wsubscriber.Delete(r.uuid) | |
select { | |
case res := <-subscriber.Result: | |
if res.Error != nil { | |
log.DebugF(res.Error.Error()) | |
return nil, res.Error | |
} | |
return res.Data, nil | |
case <-timeOutCtx.Done(): | |
timeoutErr := werror.NewError(werror.Errors[werror.GoRoutineTimeoutError]) | |
log.Warn("time out in resolver", werror.GoRoutineTimeoutError, timeoutErr, zap.String("event Id", r.uuid)) | |
return nil, timeoutErr | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment