Created
December 6, 2021 09:31
-
-
Save mrsufgi/99ad8f697c2d2894b9904067d28223e5 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
func newEventCreatedTask(ctx context.Context, s domain.Event) (*asynq.Task, error) { | |
payload := map[string]interface{}{"event_id": s.ID, "payload": string(s.Payload)} // some type for payload | |
task, err := domain.NewTask(ctx, payload) | |
if err != nil { | |
log.Error("unable to create new task: %s", err.Error()) | |
return nil, err | |
} | |
// transform to raw data | |
data, err := json.Marshal(*task) | |
if err != nil { | |
log.Errorf("unable to unmarshal task: %s", err.Error()) | |
return nil, err | |
} | |
return asynq.NewTask("event:created", data), nil | |
} | |
func (es *EventsService) CreateEvent(ctx context.Context, event SomeInput) error { | |
task, err := newEventCreatedTask(ctx, domain.Event{ID: uuid.NewString(), Payload: json.RawMessage(*event)}) | |
if err != nil { | |
log.Errorf("could not create task: %v", err) | |
return err | |
} | |
res, err := es.ac.EnqueueContext(ctx, task, "some-queue") | |
if err != nil { | |
log.Errorf("could not enqueue task: %v", err) | |
return err | |
} | |
log.Debugf("new event created task enqueued: %#v", res.ID) | |
return nil | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
type CustomClientOpt struct { | |
// Network type to use, either tcp or unix. | |
// Default is tcp. | |
Network string | |
// Redis server address in "host:port" format. | |
Addr string | |
// Username to authenticate the current connection when Redis ACLs are used. | |
// See: https://redis.io/commands/auth. | |
Username string | |
// Password to authenticate the current connection. | |
// See: https://redis.io/commands/auth. | |
Password string | |
// Redis DB to select after connecting to a server. | |
// See: https://redis.io/commands/select. | |
DB int | |
// Dial timeout for establishing new connections. | |
// Default is 5 seconds. | |
DialTimeout time.Duration | |
// Timeout for socket reads. | |
// If timeout is reached, read commands will fail with a timeout error | |
// instead of blocking. | |
// | |
// Use value -1 for no timeout and 0 for default. | |
// Default is 3 seconds. | |
ReadTimeout time.Duration | |
// Timeout for socket writes. | |
// If timeout is reached, write commands will fail with a timeout error | |
// instead of blocking. | |
// | |
// Use value -1 for no timeout and 0 for default. | |
// Default is ReadTimout. | |
WriteTimeout time.Duration | |
// Maximum number of socket connections. | |
// Default is 10 connections per every CPU as reported by runtime.NumCPU. | |
PoolSize int | |
// TLS Config used to connect to a server. | |
// TLS will be negotiated only if this field is set. | |
TLSConfig *tls.Config | |
} | |
// nolint | |
func (opt CustomClientOpt) MakeRedisClient() interface{} { | |
rdb := redis.NewClient(&redis.Options{ | |
Network: opt.Network, | |
Addr: opt.Addr, | |
Username: opt.Username, | |
Password: opt.Password, | |
DB: opt.DB, | |
DialTimeout: opt.DialTimeout, | |
ReadTimeout: opt.ReadTimeout, | |
WriteTimeout: opt.WriteTimeout, | |
PoolSize: opt.PoolSize, | |
TLSConfig: opt.TLSConfig, | |
}) | |
rdb.AddHook(redisotel.TracingHook{}) | |
return rdb | |
} | |
type Task struct { | |
OtelContext propagation.MapCarrier `json:"otel_context"` | |
Payload string `json:"payload"` | |
} | |
func NewTask(ctx context.Context, p interface{}) (*Task, error) { | |
payload, err := json.Marshal(p) | |
if err != nil { | |
return nil, err | |
} | |
task := &Task{ | |
Payload: string(payload), | |
OtelContext: make(propagation.MapCarrier), | |
} | |
otel.GetTextMapPropagator().Inject(ctx, &task.OtelContext) | |
return task, nil | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
func NewEventHandler(m *asynq.ServeMux, s someService) *SnapshotsHandler { | |
handler := &Handler{ | |
Service: s, | |
} | |
// register handlers | |
m.HandleFunc("event:created", handler.HandleEvent) | |
return handler | |
} | |
func (vh *SnapshotsHandler) HandleEventCreatedTask(ctx context.Context, t *asynq.Task) error { | |
var payload domain.Task | |
if err := json.Unmarshal(t.Payload(), &payload); err != nil { | |
return err | |
} | |
var taskPayload map[string]interface{} // TODO: use type! | |
if err := json.Unmarshal([]byte(payload.Payload), &taskPayload); err != nil { | |
return err | |
} | |
// do something with payload... | |
} | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
r := setupRedis() | |
rc := asynq.NewClient(r) | |
defer rc.Close() | |
... | |
mux := asynq.NewServeMux() | |
mux.Use(TaskTracingMiddleware) | |
... | |
func setupRedis() asynq.RedisConnOpt { | |
return domain.CustomClientOpt{Addr: fmt.Sprintf("%s:%s", viper.GetString("REDIS_HOST"), viper.GetString("REDIS_PORT"))} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var tracer = otel.Tracer("asynq/tasks") | |
func TaskTracingMiddleware(h asynq.Handler) asynq.Handler { | |
return asynq.HandlerFunc(func(ctx context.Context, t *asynq.Task) error { | |
var task domain.Task | |
err := json.Unmarshal(t.Payload(), &task) | |
if err != nil { | |
log.Fatalf("error with unmarshaling:", err.Error()) | |
return err | |
} | |
// extract the context from the task | |
ctx = otel.GetTextMapPropagator().Extract(ctx, &task.OtelContext) | |
ctx, span := tracer.Start(ctx, fmt.Sprintf("middleware-task-%s", t.Type())) | |
defer span.End() | |
return h.ProcessTask(ctx, t) | |
}) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment