Skip to content

Instantly share code, notes, and snippets.

@jsquire
Last active May 15, 2020 17:42
Show Gist options
  • Save jsquire/85e124293986f8cd3010c5869de4ce99 to your computer and use it in GitHub Desktop.
Save jsquire/85e124293986f8cd3010c5869de4ce99 to your computer and use it in GitHub Desktop.
Azure Messaging: Allow control of the transport formatting

Azure Messaging: Allow control of the transport formatting

The Azure Messaging client libraries endeavor to provide an API surface that is protocol-agnostic and shaped such that it would be suitable for use with a variety of protocols and transport formats. Developers using the client libraries should not have to understand or be aware of the details related to AMQP, MQTT, STOMP, HTTP, JMS, or other protocols and transport formats that may be supported by the service - now or in the future; these details are considered the responsibility of the library.

Another goal of the Messaging client libraries is to support heterogeneous environments; when using the official Azure client libraries, data that was published with an SDK for one language should be understood when consumed with an SDK for any of the other supported languages.

There are, however, specialized scenarios when developers have need to participate in the preparation of transport messages. One such example is ensuring interoperability with other producers/consumers that do not make use of the Azure client libraries and interact with the service via protocol directly. This document outlines a proposed approach for supporting participation in the formatting of events/messages for transport.

Things to know before reading

  • This document is focused on transport message formatting; while the concept of a generic serializer based on the Azure.Core type is included here for illustration, the intent of this document is not to propose a design for serializer integration. Instead, a simple and straightforward integration is assumed.

  • Examples in this document are based on types from the Service Bus client library; in the interest of avoiding repetition and conserving space, the Event Hubs equivalents are not shown. All concepts of the design can be adapted and applied to the Event Hubs types.

  • When receiving messages from Service Bus, the read-only type ReceivedServiceBusMessage is used. Creation and manipulation of that type is restricted to callers with internal scope. If this design is to be adopted, an approach will be needed that allows a received message to be created and populated during transport formatting and made immutable once formatting is complete. This challenge is not accounted for in this design and will need to be covered under a separate work stream.

  • The names used in this document are intended for illustration only. Some names are not ideal and will need to be refined during discussions.

  • Some details not related to the high-level concept are not illustrated; the scope of this is limited to the high level shape and paradigms for the feature area.

  • Fake methods are used to illustrate "something needs to happen, but the details are unimportant." As a general rule, if an operation is not directly related to one of the Service Bus or Event Hubs types, it can likely be assumed that it is for illustration only. These methods will most often use ellipses for the parameter list, in order to help differentiate them.

Terminology

  • A Body Serializer is a type that holds responsibility for serializing the body of an event/message to and from its native form and ReadOnlyMemory<byte>.

  • A Message Formatter is a type that holds responsibility for transforming a ServiceBusMessage, ServiceBusReceivedMessage, or EventData instance to and from the type used for communication with the service for the the active protocol. For example, an AmqpMesage.

Why this is needed

The Azure Messaging team has positioned their services as being friendly to interoperability with non-Azure services and clients outside of the official Azure client libraries. Such scenarios require that callers understand and manipulate the protocol format for data directly.

One such protocol that has been widely adopted in the messaging ecosystem and often used for interoperability is AMQP. Unlike some protocols, AMQP allows for a wide set of flexibility in message format. For example, AMQP allows specifying the body of a message in several different ways with the data appearing in distinct parts of the message structure.

In order to support interoperability with heterogeneous clients and services which may form AMQP messages in varied ways which are difficult to process without specific knowledge, it is important that customers are able to influence how model types are formatted for transport and vice-versa.

High level scenarios

Processing of IoT device observations

A local tomato farm monitors the need to irrigate crops based on observations made about the ambient temperature and moisture in the environment by IoT devices deployed to strategic parts of their crop fields. Observations are communicated to a edge gateway in the farm's local office using a messaging pattern over the MQTT protocol. Once received, the observations are aggregated and time-sequenced across the different devices that collected the data and then published to Service Bus for further processing.

Because the edge gateway is a stand-alone hardware appliance, there are limited opportunities extend its behavior and the farm must, instead, rely in the built-in query language and publishing connector. As a result, controlling the format of messages being published from the gateway to Service Bus is difficult, costly, and not something that the farm wishes to take on.

Unfortunately, while the gateway is able to publish messages using the AMQP format recognized by Azure Service Bus, it forms the message using a different set of fields than the Azure client library, resulting in the inability to access the message body data when receiving the Service Bus message. To support this scenario, the developers consuming the Service Bus message require the ability to process the incoming message in AMQP form, extract the body, and populate it in the ReceivedServiceBusMessage returned by the client.

Automated triggering of an alarm system

An auto dealer has contracted a security firm to perform central monitoring of their lots to combat theft. The monitoring service uses security professionals to observe a bank of cameras for the different lots from a central location. In the event that suspicious activity is observed, the security professional is required to trigger an alarm in two phases.

In the first phase, banks of bright lights are turned on across the lot, providing better visibility to those monitoring the cameras and a deterrent for potential thieves. In the case that suspicious activity is confirmed and is continuing, the second phase is triggered which results in a loud alarm and automated gates closing.

The alarm is powered by a series of edge devices on the lot which control the automation. The edge devices read using the Apache Qpid library and have specific expectations on how a message is formatted. Because these devices are a boxed product, the option to customize their code is not available.

When the security office triggers an alarm, it communicates with the lot by publishing a message to a Service Bus queue. In order for that message to be understood by the edge device consuming it, the message must us a transport format that isn't normally used by the Service Bus client library. To support this scenario, developers creating the security application need the ability to control the formatting of the transport message before it is published.

Proposed approach

  • Translation of an event/message between the client model type and the transport format will be broken into two stages - creation of the transport format and population of the resulting type.

  • The body serializer is an open type that allows developers to take ownership of how the event/message body becomes a set of bytes for transport and vice-versa. This is based on the generic serializer and, as such, the integration shown here will need to adapt to the actual serializer integration design.

  • The transport formatter is an open type that allows developers to take ownership of how an event/message is translated into a protocol message format and vice-versa. A custom formatter will have access to the default formatter for the active protocol, allowing a pattern of "delegate to the default formatter and then customize the output."

  • The options for the Messaging client types will allow a custom body serializer and custom message formatter, which will be applied to the operations of that client only. This is intended to allow developers fine-grained control over creation of clients for specialized purposes.

Usage examples

Register a custom serializer

The following example creates a client using AMQP over TCP and registers a custom message formatter. Once specified as part of the options, the formatter is automatically used for operations on the receiver. Callers do not need to have awareness of it nor change the way they interact with the API.

var clientOptions = new ServiceBusClientOptions
{
    TransportType = ServiceBusTransportType.AmqpWebSockets
}

var senderOptions = new ServiceBusReceiverOptions 
{
    MessageFormatter = new CustomAmqpMessageFormatter()
}

await using var client = new ServiceBusClient("<< CONNECTION STRING >>", clientOptions);
await using var sender = client.CreateReceiver("<< QUEUE NAME >>", senderOptions);

Register a custom serializer and formatter

The following example creates a client using AMQP over TCP and registers a body serializer and message formatter. Note that the serializer design has not yet been completed and the details are expected to change; this snippet is intended only to illustrate the side-by-side independence of the serializer and formatter.

var clientOptions = new ServiceBusClientOptions
{
    TransportType = ServiceBusTransportType.AmqpTcp
}

var senderOptions = new ServiceBusSenderOptions
{
    BodySerializer = new CustomBodySerializer(),
    MessageFormatter = new CustomAmqpMessageFormatter()
}

await using var client = new ServiceBusClient("<< CONNECTION STRING >>", clientOptions);
await using var sender = client.CreateSender("<< QUEUE NAME >>", senderOptions);

Create a custom body serializer

The body serializer is based on the generic ObjectSerializer abstraction, for which it is expected that there will be offerings for common formats, such as Avro and JSON. This example is intended for simple illustration and would not likely be needed for a JSON body.

public class CustomBodySerializer : ObjectSerializer
{
    private static readonly HashSet<Type> KnownTypes = new HashSet<Type>(new[] { typeof(TelemetryObservation) });
    
    private static void AssertKnownType(Type inputType)
    {
        if (!KnownTypes.Contains(inputType))
        {
            throw new ArgumentException(nameof(inputType), "Unsupported type")
        }
    }
    
    public override void Serialize(Stream stream, object value, Type inputType)
    {
        Assert.NotNull(value, nameof(value));
        AssertKnownType(inputType);
        stream.Write(JsonSerializer.SerializeToUtf8Bytes(value));
    }
    
    public override async ValueTask SerializeAsync(Stream stream, object value, Type inputType)
    {
        Assert.NotNull(value, nameof(value));
        AssertKnownType(inputType);
        await stream.WriteAsync(JsonSerializer.SerializeToUtf8Bytes(value));
    }
    
    public override object Deserialize(Stream stream, Type returnType)
    {
        Assert.NotNull(value, nameof(value));
        AssertKnownType(returnType);
        
        var buffer = new byte[stream.Length];
        stream.Read(buffer);}
        return JsonSerializer.Deserialize(buffer, returnType);
    }
    
    public override async ValueTask<object> DeserializeAsync(Stream stream, Type returnType)
    {
        Assert.NotNull(value, nameof(value));
        AssertKnownType(returnType);
        return await JsonSerializer.DeserializeAsync(stream, returnType);
    }
}

Create a message formatter

The message formatter is based on a new TransportMessageFormatter abstraction, which is part of this design. As a result, this implementation would need to evolve to adapt to revisions of the base type made as a result of review and discussion.

This custom formatter is interested in ensuring that the body of outgoing AMQP messages is always set as an AMQP value, that incoming messages use the same body format, and that incoming messages have an artificial sequence number applied when a message is received.

public class CustomAmqpMessageFormatter : TransportMessageFormatter
{
    private long _receiveSequence = 0;
    
    private static void AssertSupportedTransportType(Type transportMessageType)
    {
        if (transportMessageType != typeof(AmqpMessage))
        {
            throw new ArgumentException(nameof(transportMessageType), "Unsupported type")
        }
    }
    
    protected override object CreateTransportMessage(ServiceBusMessage message, Type transportMessageType))
    {
        AssertSupportedTransportType(transportMessageType);
        
        using var stream = new MemoryStream();	
        using var reader = new StreamReader(stream);
		
        BodySerializer.Serialize(stream, message.Body, typeof(string));
        return AmqpMessage.Create(reader.ReadToEnd());		
    }
    
    protected override void PopulateServiceBusMessage<T>(ServiceBusMessage message, T sourceTransportMessage, bool wasPeekUsed)
    {
        AssertSupportedTransportType(typeof(T));
        
        DefaultFormatter.PopulateServiceBusMessage(message, sourceTransportMessage, wasPeekUsed);
        message.UserProperties("receive-sequence", Interlocked.Increment(ref _receiveSequence));
    }
}

API Skeleton

TransportMessageFormatter

public abstract class TransportMessageFormatter
{
    // State set internally by the client responsible for a service operation.  One  
    // alternative approach would be to pass these properties as parameters for each call.
    public ObjectSerializer BodySerializer { get; }
    public TransportMessageFormatter DefaultFormatter { get; }

    // Outgoing formatting
    protected virtual object CreateTransportBatchMessage<T>(IEnumerable<T> transportMessages, Type transportBatchMessageType);
    protected virtual object CreateTransportMessage(ServiceBusMessage message, Type transportMessageType));
    protected virtual void PopulateTransportMessage<T>(T transportMessage, ServiceBusMessage sourceMessage);
    
    // Incoming formatting
    protected virtual ServiceBusMessage CreateServiceBusMessage<T>(T transportMessage, bool wasPeekUsed);
    protected virtual void PopulateServiceBusMessage<T>(ServiceBusMessage message, T sourceTransportMessage, bool wasPeekUsed);
}
@JoshLove-msft
Copy link

It would be great if you could provide an example of how the custom formatter could be used to set/get the body in AmqpValue vs AmqpSequence vs Data.
According to the spec, the body can have 1 of these three fields populated. AmqpSequence is actually defined as a List<List>, and Data is List<byte[]>. However in Track 1, they only allow the body to be settable as byte[]. Since we are delegating this to the user, I would think we should be able to support the spec exactly.

@JoshLove-msft
Copy link

Are the serializer and body formatter options independent, i.e. I can provide a custom serializer without custom formatter and vice versa?

@jsquire
Copy link
Author

jsquire commented May 14, 2020

Are the serializer and body formatter options independent, i.e. I can provide a custom serializer without custom formatter and vice versa?

Independent; I added another snippet to help clarify.

@jsquire
Copy link
Author

jsquire commented May 14, 2020

It would be great if you could provide an example of how the custom formatter could be used to set/get the body in AmqpValue vs AmqpSequence vs Data.
According to the spec, the body can have 1 of these three fields populated. AmqpSequence is actually defined as a List, and Data is List<byte[]>. However in Track 1, they only allow the body to be settable as byte[]. Since we are delegating this to the user, I would think we should be able to support the spec exactly.

Agreed. It will definitely be something we want to cover and be sure that support is solid. I'm going to defer for the moment until we reach agreement that this is the path that we want to take forward. Assuming we reach consensus on the general approach, I want to do an actual working prototype and will test out the body types as part of that work, then use them for additional examples for the design.

The existing sample covers "how do I set a value body", and the other types are just overloads of AmqpMMessage.Create. Reading back should be very similar to this block in the AmqpMessageConverter, I'd expect. Serialization and generic body typing potentially come into play to answer the question of "how do I set this on the ServiceBusMessage body", which is why I'm somewhat bundling them together for examples. I think that we'll need to have the serializer + ServiceBusMessage<T> support before the message formatter really makes sense.

@JoshLove-msft
Copy link

JoshLove-msft commented May 15, 2020

In PopulateServiceBusMessage, I think the only thing we want to expose as settable would be the message body.
The type would be ServiceBusReceivedMessage rather than ServiceBusMessage.
Even then, the approach to allow customers direct access to a property on the ServiceBusReceivedMessage seems misleading since this type is meant to be immutable. Could we instead just have the parameter be only the sourceTransportType and have the return type be ReadOnlyMemory<byte>?

@JoshLove-msft
Copy link

JoshLove-msft commented May 15, 2020

Are the serializer and body formatter options independent, i.e. I can provide a custom serializer without custom formatter and vice versa?

Independent; I added another snippet to help clarify.

So if a BodySerializer is specified by itself, we will invoke the custom body serializer. But what if they specify a BodySerializer and a TransportMessageFormatter? Would we skip invoking the BodySerializer methods because we assume the customer will use the BodySerializer themselves in their CreateTransportMessage, as in the code snippet? If so, that seems like it could be a bit misleading. I would think that if these steps are independent they would both get executed and the user wouldn't be expected to do anything with the BodySerializer in CreateTransportMessage. Instead they would be working with the already serialized body.

@jsquire
Copy link
Author

jsquire commented May 15, 2020

The type would be ServiceBusReceivedMessage rather than ServiceBusMessage.

Please see Things to know before reading, #3. πŸ˜„

(I promise that I didn't sneak that in after your comment just to leave a cheeky reply)

@jsquire
Copy link
Author

jsquire commented May 15, 2020

In PopulateServiceBusMessage, I think the only thing we want to expose as settable would be the message body.

I disagree; we want the formatter to have responsibility for the entire AMQP <=> ServcieBusReceivedMessage translation. The goal of this model is to ensure that developers have access to the underlying protocol and library types (in full) and that our library does not include any "black box magic" that isn't overridable. That includes being able to lie about things considered system properties, should they wish to do so.

As an example, lets say a hospital has medical devices sending telemetry to Service Bus. These are specialized hardware that the hospital cannot program. One such device sends all of the metadata in the AMQP body as a JSON payload. The hospital would like to have a normalized message returned from Receive calls to simplify their logic. So, they write a custom formatter to inspect a message after our default formatter processes it and detect when some system properties such as Subject and CorrelationId are not present. In that case, they read them from the body and "fix up" the ServiceBusReceivedMessage so that it matches the pattern used by the others.

A bit contrived, yes, but not outside the realm of possibility. Could this particular scenario be handled downstream in the system rather than in the message formatting, sure. The counter question that I'd pose is what would we gain from opening up one part of the messaging protocol for developers to control but locking down others? Also, what happens in the case were we potentially support a different protocol with a wire format different than AMQP?

Even then, the approach to allow customers direct access to a property on the ServiceBusReceivedMessage seems misleading since this type is meant to be immutable. Could we instead just have the parameter be only the sourceTransportType and have the return type be ReadOnlyMemory<byte>?

Please see Things to know before reading, #3. In summary, the return type would be the immutable ServiceBusReceivedMessage but we would need to consider how to provide the concept of a received message under construction that doesn't become fully immutable until the formatting process is complete. Possibly through a child type or a set of localized methods that can manipulate the internal bits.

@jsquire
Copy link
Author

jsquire commented May 15, 2020

So if a BodySerializer is specified by itself, we will invoke the custom body serializer. But what if they specify a BodySerializer and a TransportMessageFormatter? Would we skip invoking the BodySerializer methods because we assume the customer will use the BodySerializer themselves in their CreateTransportMessage, as in the code snippet?

When a custom formatter is specified, transforming between transport and model types (AmqpMessage and ServcieBusMessage, for example) is delegated to the custom formatter. We do not take any action unless requested. The custom formatter has access to our default formatter, which would do all of the things that our AmqpMessageConverter does today but broken into the "Create the type" and "Populate the type" semantics.

If a body serializer is specified and the custom formatter calls the default formatter, the default formatter will use it. If not, then it won't. At any point, the custom formatter has access to the serializer; if they chose to let the default set the initial value and then override it - cool. If they choose to own creation and not delegate to the default - cool. That's the reason why I broke things into "Create" and "Populate" stages.

You could certainly make the case (and should) that the naming pattern CreateXXX doesn't indicate whether we create with a body or not. That's something we should talk through and decide on. Does "Create" own setting the body (which is how the AMQP library works when creating AmqpMessage), does "Populate", do we want to make assumptions there in general or just document what each protocol's default formatter does? I don't have a fully formed plan there.

If so, that seems like it could be a bit misleading. I would think that if these steps are independent they would both get executed and the user wouldn't be expected to do anything with the BodySerializer in CreateTransportMessage. Instead they would be working with the already serialized body.

Curious. Why would you think that? For example, AmqpMessage::Create requires the body to be specified because it is a factory for a bunch of AmqpMessage subtypes. In some cases, the formatter may need to serialize the body to put it into the correct form. In other cases, it may not. I'm not sure that we can make a reasonable generalization there that would serve all cases and still meet the goal of "allow developers access to the underlying protocol and control over transformation."

@JoshLove-msft
Copy link

Curious, will this be introduced for EH as well or would we wait until there is a customer request?

@JoshLove-msft
Copy link

The type would be ServiceBusReceivedMessage rather than ServiceBusMessage.

Please see Things to know before reading, #3. πŸ˜„

(I promise that I didn't sneak that in after your comment just to leave a cheeky reply)

Ah sorry about that 😞 :

@JoshLove-msft
Copy link

If a body serializer is specified and the custom formatter calls the default formatter, the default formatter will use it. If not, then it won't. At any point, the custom formatter has access to the serializer; if they chose to let the default set the initial value and then override it - cool. If they choose to own creation and not delegate to the default - cool. That's the reason why I broke things into "Create" and "Populate" stages.

Gotcha so if both are overriden, the serializer won't be called automatically. The serializer entry point is via the formatter, whether the formatter is overridden or not, correct?

@JoshLove-msft
Copy link

BTW the high level scenarios in this are great, they really help to motivate the design.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment