Created
July 19, 2024 17:26
-
-
Save koblas/26ad55243ec98dcdd4fcdd67a22df54c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
func natsToHTTP(msg jetstream.Msg) *http.Request { | |
hdrs := msg.Headers() | |
request := http.Request{ | |
URL: &url.URL{ | |
Scheme: "queue", | |
Path: hdrs.Get(jetURLPath), | |
}, | |
Method: http.MethodPost, | |
Header: http.Header(hdrs), | |
Body: io.NopCloser(bytes.NewReader(msg.Data())), | |
} | |
return &request | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
func messageProcessor(ctx context.Context, mux *http.ServeMux, worker chan jetstream.Msg) { | |
log := logkit.FromContext(ctx) | |
for { | |
select { | |
case <-ctx.Done(): | |
return | |
case msg := <-worker: | |
req := natsToHTTP(msg) | |
w := httptest.NewRecorder() | |
mux.ServeHTTP(w, req.WithContext(ctx)) | |
res := w.Result() | |
defer res.Body.Close() | |
if res.StatusCode != http.StatusOK { | |
buf, _ := io.ReadAll(io.LimitReader(res.Body, 1024)) | |
log.With("statusCode", res.StatusCode).With("statusMsg", string(buf)).Info("nats consumer error") | |
} | |
_ = msg.Ack() | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment