Skip to content

Instantly share code, notes, and snippets.

@sgsharma
Created March 21, 2022 05:11
Show Gist options
  • Save sgsharma/ed07b72a99377234a7bbac3a16df11af to your computer and use it in GitHub Desktop.
Save sgsharma/ed07b72a99377234a7bbac3a16df11af to your computer and use it in GitHub Desktop.
Node get/set context
require("./tracing.js");
const { SpanKind } = require("@opentelemetry/api");
const api = require("@opentelemetry/api");
const axios = require('axios');
const { kafka } = require("./kafka.js");
const tracer = api.trace.getTracer("message-consumer");
async function handleMessage(message){
const webpage = await axios.get('https://docs.honeycomb.io/getting-data-in/opentelemetry/javascript/')
const msgspan = tracer.startSpan("message-work");
const {value, timestamp} = message
console.log({
value: value.toString(),
timestamp: timestamp.toString(),
});
msgspan.setAttributes({ "message": value.toString() });
msgspan.end()
return webpage;
}
const consume = async () => {
const consumer = kafka.consumer({ groupId: "test-group" });
await consumer.connect();
await consumer.subscribe({ topic: "test-topic", fromBeginning: true });
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const span = tracer.startSpan(
"consume_message",
{
kind: SpanKind.CONSUMER,
},
api.context.active()
);
const ctx = api.trace.setSpan(api.context.active(), span);
// get any web page
const webpage = await api.context.with(
ctx,
handleMessage,
undefined,
message)
span.end()
},
});
};
consume().then(console.log("All done consuming"));
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment