You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
application SDK MUST propagate trace parent part of the context (version, traceId, parentId and traceFlags) in the message in Diagnostic-Id application property following W3C trace-context format for traceparent encoding.
Also, propagate W3C trace context using W3C Trace Context Propagator from OpenTelemetry. It should populate traceparent and tracestate application properties.
traceparent must match Diagnostic-Id value.
When extracting the context, first get W3C trace-context and then (if missing) read Diagnostic-Id.
Sending messages
Every message MUST carry trace context and trace context MUST be unique per message if tracing is enabled.
SDK MUST generate context for message when adding message to a batch OR when sending it.
SDK MUST create a span for each user send call add links to each message's context. SDK MUST set links before starting the span.
If message span is created in the same scope as send span, they will be siblings.
SDK MUST set attributes and other properties on spans if they are sampled in
Send span:
span name: EventHubs.send - matches class/method pattern, but does not have to be precise
kind: client
az.namespace attribute : Microsoft.EventHub
messaging.destination.name attribute: ServiceBus/EventHub entity name (without partitionId or subscription id)
net.peer.name attribute: Fully qualified EventHub name such as <name>.servicebus.windows.net
messaging.system attribute should match eventhubs or servicebus
messaging.operation: publish
messaging.batch.message_count: in case more than one message is sent, should match number of messages in a batch
status
message span:
span name: EventHubs.message
kind: producer
az.namespace attribute : Microsoft.EventHub
messaging.destination.name attribute: ServiceBus/EventHub entity name (without partitionId or subscription id)
net.peer.name attribute: Fully qualified EventHub name such as <name>.servicebus.windows.net
messaging.system attribute should match eventhubs or servicebus
Receiving messages
SDK MUST create span for receive calls (when called by users and not processor client).
SDK should remember timestamp when receive method was started and then go ahead and receive messages.
Once messages are received, it should start a span, passing the recorded timestamp to OTel span builder.
It should also extract context from all the received messages ans set such contexts as links on this receive span.
Each link MUST have enqueuedTime attribute with unix epoch time with milliseconds precision representing when message was enqueued (x-opt-enqueued-time system property). Attribute value SHOULD have long type (if not possible, use string).
Receive span:
span name: EventHubs.receive - matches class/method pattern, but does not have to be precise
kind: client
az.namespace attribute : Microsoft.EventHub
messaging.destination.name attribute: ServiceBus/EventHub entity name (without partitionId or subscription id)
net.peer.name attribute: Fully qualified EventHub name such as <name>.servicebus.windows.net
messaging.system attribute should match eventhubs or servicebus
messaging.operation: receive
messaging.batch.message_count: in case more than one message is received, should match number of messages in a batch
status
Processing messages
If SDK supports event processing loop it MUST create span for message processing.
If Receiver/Consumer client has callback mode, and callback is called when messages are received, it should follow processing semantics defined here.
If SDK executes user callback per single message, it MUST wrap this callback with a processing span and make it direct child of context in the message.
If SDK gives an array of messages to user callback, SDK MUST read trace context from each message, create a processing span and link all the messages context to the new span.
SDK MUST add links before starting span.
Each link MUST have enqueuedTime attribute with unix epoch time with milliseconds precision representing when message was enqueued (x-opt-enqueued-time system property). Attribute value SHOULD have long type (if not possible, use string).
If implicit context is supported, SDK MUST set processing span on it.
If SDK calls user code in fire-and-forget way, SDK MUST still create a span for processing (and end it immediately after invoking user code).
SDK MUST set attributes on the processing span:
span name: EventHubs.process
kind: consumer
az.namespace attribute : Microsoft.EventHub
messaging.destination.name attribute: ServiceBus/EventHub entity name (without partitionId or subscription id)
net.peer.name attribute: Fully qualified EventHub name such as <name>.servicebus.windows.net
messaging.system attribute should match eventhubs or servicebus
messaging.operation: process
messaging.batch.message_count: in case more than one message is processed, should match number of messages in a batch
status
Other API calls that involve communication with service
SDK MUST instrument calls to the service that settle the message.
If such calls happen within the scope of processing span or user span, they MUST become children of this span.
If specific message is being settled, settle span should be linked to that message trace context.
Settle span:
span name: ServiceBus.complete, ServiceBus.abandon, ServiceBus.deadLetter, etc. (strict match is not required)
kind: client
az.namespace attribute : Microsoft.ServiceBus
messaging.destination.name attribute: ServiceBus/EventHub entity name (without partitionId or subscription id)
net.peer.name attribute: Fully qualified EventHub name such as <name>.servicebus.windows.net
messaging.system attribute should match eventhubs or servicebus
SDK MUST propagate trace parent part of the context (version, traceId, parentId and traceFlags) in the message in Diagnostic-Id property following W3C trace-context format for traceparent encoding.
SDK MUST ignore tracestate at this point.
Motivation
EventHubs messages are propagated throught EventHubs service as is. The trace context MUST be stamped on the message when instrumentation is enabled regardless of message span sampling decision.
Propagation format through EventHubs is proposed here. Note this is proposal against proposal.
Following this proposal has following drawbacks and risks
If proposal changes, we need to take care of backward compatibility
It requires additional work in EventHubs SDK to support byte[] payloads
It required additional work in OpenTelemetry to support different encodings for traceparent and tracestate (byte[] and string).
There is also existing header backed into the EventHubs SDK: Diagnostic-Id.
So we have a couple of options:
Keep using Diagnostic-Id to propagate W3C context for now. switch to W3C AMQP proposal when it is more mature.
Propagate 00-traceid-spanid-0<?> as string in Diagnostic-Id
On receiving side, in .NET pass Diagnostic-Id to Activity - it will figure out the format. In other languages - ignore ids in old format
Do not support tracestate propagation
Implement new AMPQ proposal
Propagate Traceparent as binary
Propagate Tracestate as string
Implement backward-compatibitlity for Diagnostic-Id in .NET
Pros (Option 1):
do not introduce a new thing (header) that might change in future.
Save us in future from another layer of backward-compatibility
No need to start supporting binary payloads
No need to start supporting different encodings for traceparent and tracestate in OpenTelemetry
Cons:
Tracestate is not propagated
Different client libraries don't share the same protocol (kafka to EH) - which is reasonable in absence of standard and there are no other official insturmentations.
So we will pass trace-parent through existing Diagnostic-Id and wait for the standard to mature. Azure Monitor will keep pushing for AMQP protocol standartization.
Tracing calls to EventHub service
It could be useful to trace send/receive calls to EventHub service via EventHubs diagnostics logs. AQMP trace-context proposal does not gives any guidance on this.
This spec also does not cover such scenarios at this point.
Batching and OpenTelemetry links
EventHubs SDKs (ServiceBus, Kafka and other messaging SDKs) support sending, receiving and processing batches of messages.
Tracing of batched operation require new kind of relationship between spans that allows to associate one span with many others.
Let's say producer created several message and injected a unique tracing context into each message. Now consumer received the batch. Each message has it's own context. Let's imagine producer calculates average over something in the batch and publishes result to the storage. For this, consumer will create a new span for the processing part, do aggregation and call storage (which will lead to other spans being created). Consumer will use OpenTelemetry mechanism called link to represent relation to messages.
Link is basically a span context (traceId, spanId and flags that were propagated over the wire).
Processing span will have a list of such links so UX will be able to show the relationship between traces.
Send messages
EventHubs SDK API allows sending single messages or batches.
Every message MUST carry trace context and trace context MUST be unique per message.
It must be possible to trace messages separately. Creating new context requires new span to be created and reported as each new context should be logged to maintain causation between spans.
SDK MUST generate context for message when creating message OR when sending it
If SDK generates context when message is constructed, messages become related to the scope where they were created and if messages are sent in the different scope (background thread) they still carry proper context.
There is an assumption that users create and send messages in the same scope. In this case, SDK may stamp context when message is sent (if it does not have context).
If SDK provides helpers to create batches (as EventDataBatch such helpers can take care of context creating and stamping. Users can also manually stamp context if needed during creation.
So SDK may use strategy p1 (stamp context in message constructor) or combination of p2 and p3.
SDK MUST create a span for each send call from user and add links to each message's context
Context may be created for the message before send happens, it is possible to extract the context from the message to create a link, but not efficient. As an optimization, SDK should store the message span context instance somewhere (as EventData class internal property or weak reference) to simplify linking and make it efficient.
SDK MUST set links before starting the send span. This enables sampling based on links sampling decisions.
SDK MUST set attributes and other properties on spans if they are sampled inSend span:
span name: Azure.EventHubs.send
kind: client
az.namespace attribute : Microsoft.EventHub
message_bus.destination attribute: EventHub entity name
peer.address attribute: Fully qualified EventHub service endpoint such as <name>.servicebus.windows.net
status
message span:
span name: Azure.EventHubs.message
kind: producer
az.namespace attribute : Microsoft.EventHub
message_bus.destination attribute: EventHub entity name
peer.address attribute: Fully qualified EventHub service endpoint such as <name>.servicebus.windows.net
If message span is created in the same scope as send span, they MUST be siblings.
OpenTelemetry example
publicstaticvoidsendSync(Iterable<EventData> messages, Contextcontext) {
// check if we have context passed explicitly or implicitlySpanparentSpan = (Span) context.getData(OPENTELEMETRY_SPAN_KEY).orElse(TRACER.getCurrentSpan());
Span.Builderbuilder = tracer
.spanBuilder("Azure.EventHubs.send")
.setParent(parentSpan)
.setSpanKind(Kind.CLIENT);
// add links and generate contextfor(EventDatamsg : messages){
if (msg.SpanContext != null) {
// if message has context, link it to the span// if may have context from previous retry or it was set when creaing EventData or EventDataBatchbuilder.addLink((SpanContext)msg.SpanContext);
} else {
// otherwise, create a context. No need to link it because messages are created in the same scope as// Send happens and they are children of the same spanSpanmsgSpan = tracer
.spanBuilder("Azure.EventHubs.message")
.setSpanKind(Kind.PRODUCER); // Producer not available on OpenCensus, set internal for census
.setParent(parentSpan)
.startSpan();
Stringtraceparent = getDiagnosticId(msgSpan.getContext());
msg.getProperties().putIfAbsent("Diagnostic-Id", traceparent);
msgSpan.end();
msg.SpanContext = msgSpan.getContext();
}
}
Spanspan = builder.startSpan();
if (span.isRecordingEvents()) {
span.setAttribute("az.namespace", `"Microsoft.EventHub");
span.setAttribute("message_bus.destination", this.entityPath);
span.setAttribute("peer.address", this.endpoint);
}
try {
// do send ...ehClient.sendSyncWithRetries(messages);
} catch (EventHubExceptionex){
span.setStatus(Status.UNKNOWN.withDescription(ex.toString()));
}
span.end();
}
privatestaticfinalStringgetDiagnosticId(SpanContextspanContext) {
char[] chars = newchar[55];
chars[0] = '0';
chars[1] = '0';
chars[2] = '-';
spanContext.getTraceId().copyLowerBase16To(chars, 3);
chars[35] = '-';
spanContext.getSpanId().copyLowerBase16To(chars, 36);
chars[52] = '-';
spanContext.getTraceOptions().copyLowerBase16To(chars, 53);
returnnewString(chars);
}
C# example
.NET has primitives to work with distributed tracing - System.Diagnostics.Activity and System.Diagnostics.DiagnosticSource. They allow to decouple library instrumentation from the listener without adding external dependencies.
Example below demonstrates Activity usage (and intentionally skips DiagnosticSource subscription/notification details - check out this article for more info)
In this example links are sent as DiagnosticSource payload (as Activity does not support them directly).
publicasync Task SendMessagesAsync(IEnumerable<EventData>messages){// check if DiagnosticSource is enabled boolisInstrumentationEnabled= diagnosticSource.IsEnabled()&& diagnosticSource.IsEnabled("send");if(!isInstrumentationEnabled){// do send ...return;}// and if it is, do the instrumentation:varsendActivity=new Activity("send");vareventDatas= messages as EventData[]?? messages.ToArray();List<Activity>links=null;foreach(var msg in eventDatas){// if message happen to have context alreadyif(msg.SpanContext !=null){if(links==null)links=newList<Activity>();
links.Add(msg.SpanContext);}// it might be also that user added context manually,else{varmsgActivity=new Activity("message");
diagnosticSource.StartActivity(msgActivity,null);// WARNING: byte[] properties are not supported yet by EventHub .NET SDK
msg.Properties["Diagnostic-Id"]= msgActivity.Id;
diagnosticSource.StopActivity(msgActivity,null);
msg.SpanContext =msgActivity;}}
diagnosticSource.StartActivity(sendActivity,null);if(sendActivity.Recorded){
sendActivity.AddTag("az.namespace","Microsoft.EventHub");
sendActivity.AddTag("message_bus.destination",this.entityPath);
sendActivity.AddTag("peer.address",this.endpoint);}// do send // ...
diagnosticSource.StopActivity(sendActivity,new{Links = links});}privatestaticreadonlyDiagnosticListenerdiagnosticSource=new DiagnosticListener("Azure.EventHubs");
Process messages
Message processing could be handled by user code or SDK. EventHubs SDKs provide EventHub processor host in (at least) some languages and the processor host gives user messages one-by-one (Java, track2) or list of the messages that likely carry different tracing contexts.
If SDK supports event processing loop and passes a single message to user, it MUST create span for message processing and make it a child of context from message (or orphan if there is no context).
If SDK supports event processing loop and passes array of messages to user, it MUST create span for message processing and link context from messages.
Before processing events, SDK must read trace context from the messages, create a processing span and link all the messages context to the new span. SDK MUST add links before starting span.
If SDK does not awaits for user code to complete, it still MUST create processing span, attribute it with parent or links and MUST end it after user code is invoked
SDK MUST set attributes on each link
enqueuedTime attribute: unix epoch time with milliseconds precision representing when message was enqueued (x-opt-enqueued-time system property). Attribute value SHOULD have long type (if not possible, use string).
SDK MUST set attributes on the processing span
span name: Azure.EventHubs.process
message_bus.destination attribute: EventHub entity name
peer.address attribute: Fully qualified EventHub service endpoint such as sb://<name>.servicebus.windows.net/
kind: consumer
status
If users are interested in individual message tracing, they MUST create spans manually as children of each particular message.
On the SDK level we do not know how users treat messages: aggregate or process individually.
In case of aggregation, multiple traces from different messages are merging into a new trace.
In case of individual processing (e.g. store content of each message in Cosmos), it could make sense to continue each transaction individually.
OpenTelemetry example
publicvoidonEvents(PartitionContextpartitionContext, Iterable<EventData> messages, Contextcontext) {
// check if we have context passed explicitly or implicitly// parent span is most likely null and it's okSpanparentSpan = (Span) context.getData(OPENTELEMETRY_SPAN_KEY).orElse(TRACER.getCurrentSpan());
Span.Builderbuilder = tracer
.spanBuilder("Azure.EventHubs.process")
.setParent(parentSpan)
.setSpanKind(Kind.CONSUMER); // use SERVER with OpenCensus, CONSUMER with opentelemetryfor(EventDatamsg : messages){
// TODO: optimization: If receive is instrumented, we might have extracted context then// and we should have cached it thereSpanContextmsgContext = AmqpPropagationFormat.extractContext(msg);
// if message has context, link it to the spanif (msgContext.isValid()) {
builder.addLink(msgContext);
}
}
Spanspan = builder.startSpan();
Scopescope = tracer.withSpan(span);
if (span.isRecordingEvents()) {
span.setAttribute("az.namespace", "Microsoft.EventHub");
span.setAttribute("message_bus.destination", this.entityPath);
span.setAttribute("peer.address", this.endpoint);
}
try {
// call users impl of IEventProcessor.onEvents
} catch (Exceptionex) {
span.setStatus(Status.UNKNOWN.withDescription(ex.toString()));
} finally {
scope.close();
}
span.end();
}
Process single message
publicvoidonEvent(PartitionContextcontext, EventDatamessage) {
Span.Builderbuilder = tracer
.spanBuilder("Azure.EventHubs.process")
.setSpanKind(Kind.COMSUMER);
SpanContextmsgContext = AmqpPropagationFormat.extractContext(message);
// if message has context, make it parent of the spanif (msgContext.isValid()) {
builder.setParent(msgContext);
}
Spanspan = builder.startSpan();
Scopescope = tracer.withSpan(span);
if (span.isRecordingEvents()) {
span.setAttribute("az.namespace", "Microsoft.EventHub");
span.setAttribute("message_bus.destination", this.entityPath);
span.setAttribute("peer.address", this.endpoint);
}
try {
// call user's callback
} catch (Exceptionex) {
span.setStatus(Status.UNKNOWN.withDescription(ex.toString()));
} finally {
scope.close();
}
span.end();
}
C# example
publicasync Task ProcessEventsAsync(PartitionContextcontext,IEnumerable<EventData>messages){// check if DiagnosticSource is enabled boolisInstrumentationEnabled= diagnosticSource.IsEnabled()&& diagnosticSource.IsEnabled("process");if(!isInstrumentationEnabled){// do the processing...// ... return;}varactivity=new Activity("process");List<Activity>links=null;foreach(var msg in messages){if(msg.SpanContext !=null){if(links==null)links=newList<Activity>();
links.Add(msg.SpanContext);}}
diagnosticSource.StartActivity(activity,null);if(activity.Recorded){
activity.AddTag("az.namespace","Microsoft.EventHub");
activity.AddTag("message_bus.destination",this.entityPath);
activity.AddTag("peer.address",this.endpoint);}// do the processing...// ...
diagnosticSource.StopActivity(activity,new{ Links = links });}
Receive messages
EventHubs SDK API allows receving batches of messages.
SDK MAY instrument receive call
EventHubs prefetches messages and receiving on the heavily loaded service is likely a local operation. If there are no messages for a long time, receiving will result in timeout - which should not be treated as failure.
Sampling decision will be made on receive span before it knows which messages it received, on the high-scale service with agressive sampling chances that receive call is sampled in could be low.
Presumably majority of users don't call receive directly and use event processor host instead.
Considering all this, instrumenting receive call brings low value.
If SDK decides to instrument it, SDK MUST create a span for receive call and, if this span is sampled in, SDK MUST add links to each received message's context.
Linking should be skipped if there is no context in the message.
Linking may still happen if receive span is sampled out - it will happen anyway when message batch is processed (and SDK may try to optimize it further).
Parsed span context could be stored on the EventData as internal property for optimization purposes (we'll need it again for message processing).
SDK MUST set attributes and other properties on spans if they are sampled in
span name: Azure.EventHubs.receive
kind: client
az.namespace attribute : Microsoft.EventHub
message_bus.destination attribute: EventHubs entity name
peer.address attribute: Fully qualified EventHubs service endpoint such as <name>.servicebus.windows.net
status
OpenTelemetry example
publicstaticIterable<EventData> receiveSync(intcount, Contextcontext) {
// check if we have context passed explicitly or implicitly// parent Span is most likely null and it's okSpanparentSpan = (Span) context.getData(OPENTELEMETRY_SPAN_KEY).orElse(TRACER.getCurrentSpan());
Spanspan = tracer.spanBuilder("Azure.EventHubs.receive")
.setSpanKind(Kind.CLIENT)
.setParent(parentSpan)
.startSpan();
Iterable<EventData> messages = null;
try {
// do receive ...messages = ehReceiver.receiveSync(count);
} catch (EventHubExceptionex) {
span.setStatus(Status.UNKNOWN.withDescription(ex.toString()));
returnmessages;
}
for (EventDatamsg : messages) {
SpanContextmsgContext = AmqpPropagationFormat.extractContext(msg);
if (msgContext.isValid() && span.isRecordingEvents()) {
span.addLink(msgContext);
}
// we will need context again when processing event - let's cache itmsg.SpanContext = msgContext;
}
if (span.isRecordingEvents()) {
span.setAttribute("az.namespace", "Microsoft.EventHub");
span.setAttribute("message_bus.destination", this.entityPath);
span.setAttribute("peer.address", this.endpoint);
}
span.end();
returnmessages;
}
C# example
publicasyncTask<IEnumerable<EventData>>ReceiveAsync(intcount){// check if DiagnosticSource is enabled boolisInstrumentationEnabled= diagnosticSource.IsEnabled()&& diagnosticSource.IsEnabled("receive");if(!isInstrumentationEnabled){returnawaitthis.ReceiveInternalAsync(count);}varactivity=new Activity("receive");
diagnosticSource.StartActivity(activity,null);IEnumerable<EventData>receivedMessages=awaitthis.ReceiveInternalAsync(count);List<Activity>links=null;foreach(var msg in receivedMessages){if(msg.Properties.TryGetValue("Diagnostic-Id",outvar traceparent)){if(links==null)links=newList<Activity>();varctx=new Activity("message").SetParentId((string)traceparent);
links.Add(ctx);
msg.SpanContext =ctx;}}if(activity.Recorded){
activity.AddTag("az.namespace","Microsoft.EventHub");
activity.AddTag("message_bus.destination",this.entityPath);
activity.AddTag("peer.address",this.endpoint);}
diagnosticSource.StopActivity(activity,new{Links = links});returnreceivedMessages;}
Other operations
SDK MUST instrument calls to the service that update checkpoint or remote state of the message.
If such calls happen within the scope of processing span or user span, they MUST become children of this span.
All service calls should be instrumented and MUST carry following info
span name: Azure.EventHubs.<operation-name>
kind: client
az.namespace attribute : Microsoft.EventHub
message_bus.destination attribute: EventHubs entity name
peer.address attribute: Fully qualified EventHubs service endpoint such as <name>.servicebus.windows.net
status
It may be useful to add other attributes such as partition or offset.
Extract/Inject Helpers
Advanced scenarios may involve creating context manually and injecting it into the message: e.g. service is a proxy that transforms one kind of message into Event Hub event and sends it.
More typical scenario is when users what to have more control over message processing e.g. when they process messages individually and want to extract context and start a new span from it.
SDK MUST provide helper method to extract and context and MAY provide helper method to inject context.
While extraction and injections could be handled by OpenTelemetry propagation formats and inject/extract methods, it makes user write non-trival code and know exactly which format is used for trace context by EventHubs SDK (and we now use something very custom).
Such methods may be shipped in OpenTelemetry plugin packages or (in case of .NET) could be exposed on the SDK layer.
Proposed public methods to extract context from the message
Usage example:
User who do not use Event Processor Host should be able to write code similar to below one to process messages one-by-one or do batching depending on their scenario.
publicstaticclassAmqpPropagationFormat{publicstatic Activity ExtractActivity(EventDatamessage){varactivity=new Activity("message");if(message.Properties.TryGetValue("Diagnostic-Id",outvar diagnosticId)){// activity will validate it
activity.SetParentId(diagnosticId.ToString());}returnactivity;}}
Usage example:
staticasync Task ManualProcessingLoop(EventHubClienteventHubClient,stringpartitionId){vartracer= Tracing.Tracer;varreceiver= eventHubClient.CreateReceiver("$Default", partitionId, DateTime.UtcNow);while(true){IEnumerable<EventData>messages=await receiver.ReceiveAsync(5);foreach(EventData message in messages){varspan= tracer.SpanBuilder("process message").SetParent(AmqpPropagationFormat.ExtractActivity(message)).StartSpan();// process messages
span.End();}}}
customMeasurements["timeSinceEnqueued"] - mean time (in ms) among linked messages:
go through the span.Links - each of them has enqueuedTime attribute. Calculate the difference between span start time (UTC) and this timestamp. Put mean value in custom measurements. Value could be negative, if it is, set it to 0.
links (see below)
All spans (not EventHub-specific):
if links are present, put them into custom properties _MS.links in json blob [{"operation_Id":link0.trace_id, "id":link0.span_id}]e.g. [{"operation_Id":"5eca8b153632494ba00f619d6877b134","id":"d4c1279b6e7b7c47"}, {"operation_Id":"ff28988d0776b44f9ca93352da126047","id":"bf4fa4855d161141"}]. Ignore tracestate, attributes and options
ResultCode : status.CanonicalCode
Success : status.isOk
name : span name
duration : span duration
INTERNAL spans if az.namespace attribute is present, translate into DependencyTelemetery with type = InProc | attributes["az.namespace"]
INTERNAL spans if az.namespacenot present - translate into DependencyTelemetery with type = InProc
From an outside discussion, when you want to create child spans of the operation span you still want to have a minimal set of attributes:
messaging.destination.name
server.address