The Azure Messaging client libraries endeavor to provide an API surface that is protocol-agnostic and shaped such that it would be suitable for use with a variety of protocols and transport formats. Developers using the client libraries should not have to understand or be aware of the details related to AMQP, MQTT, STOMP, HTTP, JMS, or other protocols and transport formats that may be supported by the service - now or in the future; these details are considered the responsibility of the library.
Another goal of the Messaging client libraries is to support heterogeneous environments; when using the official Azure client libraries, data that was published with an SDK for one language should be understood when consumed with an SDK for any of the other supported languages.
There are, however, specialized scenarios when developers have need to participate in the preparation of transport messages. One such example is ensuring interoperability with other producers/consumers that do not make use of the Azure client libraries and interact with the service via protocol directly. This document outlines a proposed approach for supporting participation in the formatting of events/messages for transport.
-
This document is focused on transport message formatting; while the concept of a generic serializer based on the
Azure.Core
type is included here for illustration, the intent of this document is not to propose a design for serializer integration. Instead, a simple and straightforward integration is assumed. -
Examples in this document are based on types from the Service Bus client library; in the interest of avoiding repetition and conserving space, the Event Hubs equivalents are not shown. All concepts of the design can be adapted and applied to the Event Hubs types.
-
When receiving messages from Service Bus, the read-only type
ReceivedServiceBusMessage
is used. Creation and manipulation of that type is restricted to callers withinternal
scope. If this design is to be adopted, an approach will be needed that allows a received message to be created and populated during transport formatting and made immutable once formatting is complete. This challenge is not accounted for in this design and will need to be covered under a separate work stream. -
The names used in this document are intended for illustration only. Some names are not ideal and will need to be refined during discussions.
-
Some details not related to the high-level concept are not illustrated; the scope of this is limited to the high level shape and paradigms for the feature area.
-
Fake methods are used to illustrate "something needs to happen, but the details are unimportant." As a general rule, if an operation is not directly related to one of the Service Bus or Event Hubs types, it can likely be assumed that it is for illustration only. These methods will most often use ellipses for the parameter list, in order to help differentiate them.
-
A Body Serializer is a type that holds responsibility for serializing the body of an event/message to and from its native form and
ReadOnlyMemory<byte>
. -
A Message Formatter is a type that holds responsibility for transforming a
ServiceBusMessage
,ServiceBusReceivedMessage,
orEventData
instance to and from the type used for communication with the service for the the active protocol. For example, anAmqpMesage
.
The Azure Messaging team has positioned their services as being friendly to interoperability with non-Azure services and clients outside of the official Azure client libraries. Such scenarios require that callers understand and manipulate the protocol format for data directly.
One such protocol that has been widely adopted in the messaging ecosystem and often used for interoperability is AMQP. Unlike some protocols, AMQP allows for a wide set of flexibility in message format. For example, AMQP allows specifying the body of a message in several different ways with the data appearing in distinct parts of the message structure.
In order to support interoperability with heterogeneous clients and services which may form AMQP messages in varied ways which are difficult to process without specific knowledge, it is important that customers are able to influence how model types are formatted for transport and vice-versa.
A local tomato farm monitors the need to irrigate crops based on observations made about the ambient temperature and moisture in the environment by IoT devices deployed to strategic parts of their crop fields. Observations are communicated to a edge gateway in the farm's local office using a messaging pattern over the MQTT protocol. Once received, the observations are aggregated and time-sequenced across the different devices that collected the data and then published to Service Bus for further processing.
Because the edge gateway is a stand-alone hardware appliance, there are limited opportunities extend its behavior and the farm must, instead, rely in the built-in query language and publishing connector. As a result, controlling the format of messages being published from the gateway to Service Bus is difficult, costly, and not something that the farm wishes to take on.
Unfortunately, while the gateway is able to publish messages using the AMQP format recognized by Azure Service Bus, it forms the message using a different set of fields than the Azure client library, resulting in the inability to access the message body data when receiving the Service Bus message. To support this scenario, the developers consuming the Service Bus message require the ability to process the incoming message in AMQP form, extract the body, and populate it in the ReceivedServiceBusMessage
returned by the client.
An auto dealer has contracted a security firm to perform central monitoring of their lots to combat theft. The monitoring service uses security professionals to observe a bank of cameras for the different lots from a central location. In the event that suspicious activity is observed, the security professional is required to trigger an alarm in two phases.
In the first phase, banks of bright lights are turned on across the lot, providing better visibility to those monitoring the cameras and a deterrent for potential thieves. In the case that suspicious activity is confirmed and is continuing, the second phase is triggered which results in a loud alarm and automated gates closing.
The alarm is powered by a series of edge devices on the lot which control the automation. The edge devices read using the Apache Qpid library and have specific expectations on how a message is formatted. Because these devices are a boxed product, the option to customize their code is not available.
When the security office triggers an alarm, it communicates with the lot by publishing a message to a Service Bus queue. In order for that message to be understood by the edge device consuming it, the message must us a transport format that isn't normally used by the Service Bus client library. To support this scenario, developers creating the security application need the ability to control the formatting of the transport message before it is published.
-
Translation of an event/message between the client model type and the transport format will be broken into two stages - creation of the transport format and population of the resulting type.
-
The body serializer is an open type that allows developers to take ownership of how the event/message body becomes a set of bytes for transport and vice-versa. This is based on the generic serializer and, as such, the integration shown here will need to adapt to the actual serializer integration design.
-
The transport formatter is an open type that allows developers to take ownership of how an event/message is translated into a protocol message format and vice-versa. A custom formatter will have access to the default formatter for the active protocol, allowing a pattern of "delegate to the default formatter and then customize the output."
-
The options for the Messaging client types will allow a custom body serializer and custom message formatter, which will be applied to the operations of that client only. This is intended to allow developers fine-grained control over creation of clients for specialized purposes.
The following example creates a client using AMQP over TCP and registers a custom message formatter. Once specified as part of the options, the formatter is automatically used for operations on the receiver. Callers do not need to have awareness of it nor change the way they interact with the API.
var clientOptions = new ServiceBusClientOptions
{
TransportType = ServiceBusTransportType.AmqpWebSockets
}
var senderOptions = new ServiceBusReceiverOptions
{
MessageFormatter = new CustomAmqpMessageFormatter()
}
await using var client = new ServiceBusClient("<< CONNECTION STRING >>", clientOptions);
await using var sender = client.CreateReceiver("<< QUEUE NAME >>", senderOptions);
The following example creates a client using AMQP over TCP and registers a body serializer and message formatter. Note that the serializer design has not yet been completed and the details are expected to change; this snippet is intended only to illustrate the side-by-side independence of the serializer and formatter.
var clientOptions = new ServiceBusClientOptions
{
TransportType = ServiceBusTransportType.AmqpTcp
}
var senderOptions = new ServiceBusSenderOptions
{
BodySerializer = new CustomBodySerializer(),
MessageFormatter = new CustomAmqpMessageFormatter()
}
await using var client = new ServiceBusClient("<< CONNECTION STRING >>", clientOptions);
await using var sender = client.CreateSender("<< QUEUE NAME >>", senderOptions);
The body serializer is based on the generic ObjectSerializer
abstraction, for which it is expected that there will be offerings for common formats, such as Avro and JSON. This example is intended for simple illustration and would not likely be needed for a JSON body.
public class CustomBodySerializer : ObjectSerializer
{
private static readonly HashSet<Type> KnownTypes = new HashSet<Type>(new[] { typeof(TelemetryObservation) });
private static void AssertKnownType(Type inputType)
{
if (!KnownTypes.Contains(inputType))
{
throw new ArgumentException(nameof(inputType), "Unsupported type")
}
}
public override void Serialize(Stream stream, object value, Type inputType)
{
Assert.NotNull(value, nameof(value));
AssertKnownType(inputType);
stream.Write(JsonSerializer.SerializeToUtf8Bytes(value));
}
public override async ValueTask SerializeAsync(Stream stream, object value, Type inputType)
{
Assert.NotNull(value, nameof(value));
AssertKnownType(inputType);
await stream.WriteAsync(JsonSerializer.SerializeToUtf8Bytes(value));
}
public override object Deserialize(Stream stream, Type returnType)
{
Assert.NotNull(value, nameof(value));
AssertKnownType(returnType);
var buffer = new byte[stream.Length];
stream.Read(buffer);}
return JsonSerializer.Deserialize(buffer, returnType);
}
public override async ValueTask<object> DeserializeAsync(Stream stream, Type returnType)
{
Assert.NotNull(value, nameof(value));
AssertKnownType(returnType);
return await JsonSerializer.DeserializeAsync(stream, returnType);
}
}
The message formatter is based on a new TransportMessageFormatter
abstraction, which is part of this design. As a result, this implementation would need to evolve to adapt to revisions of the base type made as a result of review and discussion.
This custom formatter is interested in ensuring that the body of outgoing AMQP messages is always set as an AMQP value, that incoming messages use the same body format, and that incoming messages have an artificial sequence number applied when a message is received.
public class CustomAmqpMessageFormatter : TransportMessageFormatter
{
private long _receiveSequence = 0;
private static void AssertSupportedTransportType(Type transportMessageType)
{
if (transportMessageType != typeof(AmqpMessage))
{
throw new ArgumentException(nameof(transportMessageType), "Unsupported type")
}
}
protected override object CreateTransportMessage(ServiceBusMessage message, Type transportMessageType))
{
AssertSupportedTransportType(transportMessageType);
using var stream = new MemoryStream();
using var reader = new StreamReader(stream);
BodySerializer.Serialize(stream, message.Body, typeof(string));
return AmqpMessage.Create(reader.ReadToEnd());
}
protected override void PopulateServiceBusMessage<T>(ServiceBusMessage message, T sourceTransportMessage, bool wasPeekUsed)
{
AssertSupportedTransportType(typeof(T));
DefaultFormatter.PopulateServiceBusMessage(message, sourceTransportMessage, wasPeekUsed);
message.UserProperties("receive-sequence", Interlocked.Increment(ref _receiveSequence));
}
}
public abstract class TransportMessageFormatter
{
// State set internally by the client responsible for a service operation. One
// alternative approach would be to pass these properties as parameters for each call.
public ObjectSerializer BodySerializer { get; }
public TransportMessageFormatter DefaultFormatter { get; }
// Outgoing formatting
protected virtual object CreateTransportBatchMessage<T>(IEnumerable<T> transportMessages, Type transportBatchMessageType);
protected virtual object CreateTransportMessage(ServiceBusMessage message, Type transportMessageType));
protected virtual void PopulateTransportMessage<T>(T transportMessage, ServiceBusMessage sourceMessage);
// Incoming formatting
protected virtual ServiceBusMessage CreateServiceBusMessage<T>(T transportMessage, bool wasPeekUsed);
protected virtual void PopulateServiceBusMessage<T>(ServiceBusMessage message, T sourceTransportMessage, bool wasPeekUsed);
}
It would be great if you could provide an example of how the custom formatter could be used to set/get the body in AmqpValue vs AmqpSequence vs Data.
According to the spec, the body can have 1 of these three fields populated. AmqpSequence is actually defined as a List<List>, and Data is List<byte[]>. However in Track 1, they only allow the body to be settable as byte[]. Since we are delegating this to the user, I would think we should be able to support the spec exactly.