Created
September 21, 2020 14:19
-
-
Save creyke/ab3c22b829ba2603dce7e48532c125e8 to your computer and use it in GitHub Desktop.
Orleans Event Hub Custom Data
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class CustomEventHubDataAdapter : EventHubDataAdapter | |
{ | |
private readonly SerializationManager serializationManager; | |
public CustomEventHubDataAdapter(SerializationManager serializationManager) | |
: base(serializationManager) | |
{ | |
this.serializationManager = serializationManager; | |
} | |
public override StreamPosition GetStreamPosition(string partition, Microsoft.Azure.EventHubs.EventData queueMessage) | |
{ | |
// TODO: Update to new way of using non guid streams!!!! | |
var streamGuid = new Guid(Convert.ToInt32(queueMessage.Properties["CustomPartitionKey"]), 0, 0, new byte[8]); | |
string streamNamespace = StreamNamespaces.Raw; | |
IStreamIdentity stremIdentity = new StreamIdentity(streamGuid, streamNamespace); | |
StreamSequenceToken token = | |
new EventHubSequenceTokenV2(queueMessage.SystemProperties.Offset, queueMessage.SystemProperties.SequenceNumber, 0); | |
return new StreamPosition(stremIdentity, token); | |
} | |
public override CachedMessage FromQueueMessage(StreamPosition streamPosition, EventData queueMessage, DateTime dequeueTime, Func<int, ArraySegment<byte>> getSegment) | |
{ | |
var events = GetEvents(queueMessage); | |
var eventData = EventHubBatchContainer.ToEventData( | |
serializationManager, | |
streamPosition.StreamIdentity.Guid, | |
streamPosition.StreamIdentity.Namespace, | |
events, | |
null); | |
eventData.SystemProperties = queueMessage.SystemProperties; | |
foreach (var kvp in queueMessage.Properties) | |
{ | |
eventData.Properties[kvp.Key] = queueMessage.Properties[kvp.Key]; | |
} | |
return base.FromQueueMessage( | |
streamPosition, | |
eventData, | |
dequeueTime, | |
getSegment); | |
} | |
private IEnumerable<object> GetEvents(EventData eventData) | |
{ | |
var e = JsonConvert.DeserializeObject<CustomPayload>(Encoding.ASCII.GetString(eventData.Body)); | |
return new object[] | |
{ | |
e | |
}; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment