Skip to content

Instantly share code, notes, and snippets.

@creyke
Created September 21, 2020 14:19
Show Gist options
  • Save creyke/ab3c22b829ba2603dce7e48532c125e8 to your computer and use it in GitHub Desktop.
Save creyke/ab3c22b829ba2603dce7e48532c125e8 to your computer and use it in GitHub Desktop.
Orleans Event Hub Custom Data
public class CustomEventHubDataAdapter : EventHubDataAdapter
{
private readonly SerializationManager serializationManager;
public CustomEventHubDataAdapter(SerializationManager serializationManager)
: base(serializationManager)
{
this.serializationManager = serializationManager;
}
public override StreamPosition GetStreamPosition(string partition, Microsoft.Azure.EventHubs.EventData queueMessage)
{
// TODO: Update to new way of using non guid streams!!!!
var streamGuid = new Guid(Convert.ToInt32(queueMessage.Properties["CustomPartitionKey"]), 0, 0, new byte[8]);
string streamNamespace = StreamNamespaces.Raw;
IStreamIdentity stremIdentity = new StreamIdentity(streamGuid, streamNamespace);
StreamSequenceToken token =
new EventHubSequenceTokenV2(queueMessage.SystemProperties.Offset, queueMessage.SystemProperties.SequenceNumber, 0);
return new StreamPosition(stremIdentity, token);
}
public override CachedMessage FromQueueMessage(StreamPosition streamPosition, EventData queueMessage, DateTime dequeueTime, Func<int, ArraySegment<byte>> getSegment)
{
var events = GetEvents(queueMessage);
var eventData = EventHubBatchContainer.ToEventData(
serializationManager,
streamPosition.StreamIdentity.Guid,
streamPosition.StreamIdentity.Namespace,
events,
null);
eventData.SystemProperties = queueMessage.SystemProperties;
foreach (var kvp in queueMessage.Properties)
{
eventData.Properties[kvp.Key] = queueMessage.Properties[kvp.Key];
}
return base.FromQueueMessage(
streamPosition,
eventData,
dequeueTime,
getSegment);
}
private IEnumerable<object> GetEvents(EventData eventData)
{
var e = JsonConvert.DeserializeObject<CustomPayload>(Encoding.ASCII.GetString(eventData.Body));
return new object[]
{
e
};
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment