Created
January 13, 2017 15:59
-
-
Save Horusiath/262eb5ee0fde5df24b8d4031de2ff595 to your computer and use it in GitHub Desktop.
DData serializer on proto-buf
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
internal sealed class ReplicatorMessageSerializer : SerializerWithStringManifest | |
{ | |
#region internal classes | |
/// <summary> | |
/// A cache that is designed for a small number (<= 32) of | |
/// entries. It is using instance equality. | |
/// Adding new entry overwrites oldest. It is | |
/// thread safe but duplicates of same entry may occur. | |
/// | |
/// <see cref="Evict"/> must be called from the outside, i.e. the | |
/// cache will not cleanup itself. | |
/// </summary> | |
private sealed class SmallCache<TKey, TVal> | |
where TKey : class | |
where TVal : class | |
{ | |
private readonly TimeSpan ttl; | |
private readonly Func<TKey, TVal> getOrAddFactory; | |
private readonly AtomicCounter n = new AtomicCounter(0); | |
private readonly int mask; | |
private readonly KeyValuePair<TKey, TVal>[] elements; | |
private DateTime lastUsed; | |
public SmallCache(int capacity, TimeSpan ttl, Func<TKey, TVal> getOrAddFactory) | |
{ | |
mask = capacity - 1; | |
if ((capacity & mask) != 0) throw new ArgumentException("Capacity must be power of 2 and less than or equal 32", nameof(capacity)); | |
if (capacity > 32) throw new ArgumentException("Capacity must be less than or equal 32", nameof(capacity)); | |
this.ttl = ttl; | |
this.getOrAddFactory = getOrAddFactory; | |
this.elements = new KeyValuePair<TKey, TVal>[capacity]; | |
this.lastUsed = DateTime.UtcNow; | |
} | |
public TVal this[TKey key] | |
{ | |
get { return Get(key, n.Current); } | |
set { Add(key, value); } | |
} | |
/// <summary> | |
/// Add value under specified key. Overrides existing entry. | |
/// </summary> | |
public void Add(TKey key, TVal value) => Add(new KeyValuePair<TKey, TVal>(key, value)); | |
/// <summary> | |
/// Add an entry to the cache. Overrides existing entry. | |
/// </summary> | |
public void Add(KeyValuePair<TKey, TVal> entry) | |
{ | |
var i = n.IncrementAndGet(); | |
elements[i & mask] = entry; | |
lastUsed = DateTime.UtcNow; | |
} | |
public TVal GetOrAdd(TKey key) | |
{ | |
var position = n.Current; | |
var c = Get(key, position); | |
if (!ReferenceEquals(c, null)) return c; | |
var b2 = getOrAddFactory(key); | |
if (position == n.Current) | |
{ | |
// no change, add the new value | |
Add(key, b2); | |
return b2; | |
} | |
else | |
{ | |
// some other thread added, try one more time | |
// to reduce duplicates | |
var c2 = Get(key, n.Current); | |
if (!ReferenceEquals(c2, null)) return c2; | |
else | |
{ | |
Add(key, b2); | |
return b2; | |
} | |
} | |
} | |
/// <summary> | |
/// Remove all elements if the if cache has not been used within <see cref="ttl"/>. | |
/// </summary> | |
public void Evict() | |
{ | |
if (DateTime.UtcNow - lastUsed > ttl) | |
{ | |
elements.Initialize(); | |
} | |
} | |
private TVal Get(TKey key, int startIndex) | |
{ | |
var end = startIndex + elements.Length; | |
lastUsed = DateTime.UtcNow; | |
var i = startIndex; | |
while (end - i == 0) | |
{ | |
var x = elements[i & mask]; | |
if (x.Key != key) i++; | |
else return x.Value; | |
} | |
return null; | |
} | |
} | |
#endregion | |
private const string GetManifest = "A"; | |
private const string GetSuccessManifest = "B"; | |
private const string NotFoundManifest = "C"; | |
private const string GetFailureManifest = "D"; | |
private const string SubscribeManifest = "E"; | |
private const string UnsubscribeManifest = "F"; | |
private const string ChangedManifest = "G"; | |
private const string DataEnvelopeManifest = "H"; | |
private const string WriteManifest = "I"; | |
private const string WriteAckManifest = "J"; | |
private const string ReadManifest = "K"; | |
private const string ReadResultManifest = "L"; | |
private const string StatusManifest = "M"; | |
private const string GossipManifest = "N"; | |
private const string WriteNackManifest = "O"; | |
private const string DurableDataEnvelopeManifest = "P"; | |
private static readonly byte[] WriteAckBytes = Proto.Empty.DefaultInstance.ToByteArray(); | |
private static readonly byte[] WriteNackBytes = Proto.Empty.DefaultInstance.ToByteArray(); | |
private readonly SmallCache<Read, byte[]> _readCache; | |
private readonly SmallCache<Write, byte[]> _writeCache; | |
private readonly Dictionary<string, Func<byte[], object>> _fromBinaryMap; | |
public ReplicatorMessageSerializer(Akka.Actor.ExtendedActorSystem system) : base(system) | |
{ | |
var cacheTtl = system.Settings.Config.GetTimeSpan("akka.cluster.distributed-data.serializer-cache-time-to-live"); | |
_readCache = new SmallCache<Read, byte[]>(4, cacheTtl, m => ReadToProto(m).ToByteArray()); | |
_writeCache = new SmallCache<Write, byte[]>(4, cacheTtl, m => WriteToProto(m).ToByteArray()); | |
system.Scheduler.Advanced.ScheduleRepeatedly(cacheTtl, new TimeSpan(cacheTtl.Ticks / 2), () => | |
{ | |
_readCache.Evict(); | |
_writeCache.Evict(); | |
}); | |
_fromBinaryMap = new Dictionary<string, Func<byte[], object>> | |
{ | |
[GetManifest] = GetFromBinary, | |
[GetSuccessManifest] = GetSuccessFromBinary, | |
[NotFoundManifest] = NotFoundFromBinary, | |
[GetFailureManifest] = GetFailureFromBinary, | |
[SubscribeManifest] = SubscribeFromBinary, | |
[UnsubscribeManifest] = UnsubscribeFromBinary, | |
[ChangedManifest] = ChangedFromBinary, | |
[DataEnvelopeManifest] = DataEnvelopeFromBinary, | |
[WriteManifest] = WriteFromBinary, | |
[WriteAckManifest] = _ => WriteAck.Instance, | |
[ReadManifest] = ReadFromBinary, | |
[ReadResultManifest] = ReadResultFromBinary, | |
[StatusManifest] = StatusFromBinary, | |
[GossipManifest] = GossipFromBinary | |
}; | |
} | |
public override string Manifest(object o) | |
{ | |
if (o is DataEnvelope) return DataEnvelopeManifest; | |
if (o is Write) return WriteManifest; | |
if (o is WriteAck) return WriteAckManifest; | |
if (o is Read) return ReadManifest; | |
if (o is ReadResult) return ReadResultManifest; | |
if (o is Status) return StatusManifest; | |
if (o is Replicator.Get) return GetManifest; | |
if (o is Replicator.GetSuccess) return GetSuccessManifest; | |
if (o is Replicator.Changed) return ChangedManifest; | |
if (o is Replicator.NotFound) return NotFoundManifest; | |
if (o is Replicator.GetFailure) return GetFailureManifest; | |
if (o is Replicator.Subscribe) return SubscribeManifest; | |
if (o is Replicator.Unsubscribe) return UnsubscribeManifest; | |
if (o is Gossip) return GossipManifest; | |
if (o is WriteNack) return WriteNackManifest; | |
throw new ArgumentException($"Can't serialize object of type {o.GetType()} in [{GetType()}]"); | |
} | |
public override byte[] ToBinary(object o) | |
{ | |
if (o is DataEnvelope) return DataEnvelopeToProto((DataEnvelope)o).ToByteArray(); | |
if (o is Write) return WriteToProto((Write)o).ToByteArray(); | |
if (o is WriteAck) return WriteAckBytes; | |
if (o is Read) return ReadToProto((Read)o).ToByteArray(); | |
if (o is ReadResult) return ReadResultToProto((ReadResult)o).ToByteArray(); | |
if (o is Status) return StatusToProto((Status)o).ToByteArray(); | |
if (o is Replicator.Get) return GetToProto((Replicator.Get)o).ToByteArray(); | |
if (o is Replicator.GetSuccess) return GetSuccessToProto((Replicator.GetSuccess)o).ToByteArray(); | |
if (o is Replicator.Changed) return ChangedToProto((Replicator.Changed)o).ToByteArray(); | |
if (o is Replicator.NotFound) return NotFoundToProto((Replicator.NotFound)o).ToByteArray(); | |
if (o is Replicator.GetFailure) return GetFailureToProto((Replicator.GetFailure)o).ToByteArray(); | |
if (o is Replicator.Subscribe) return SubscribeToProto((Replicator.Subscribe)o).ToByteArray(); | |
if (o is Replicator.Unsubscribe) return UnsubscribeToProto((Replicator.Unsubscribe)o).ToByteArray(); | |
if (o is Gossip) return GossipToProto((Gossip)o).ToByteArray(); | |
if (o is WriteNack) return WriteNackBytes; | |
throw new ArgumentException($"Can't serialize object of type {o.GetType()} in [{GetType()}]"); | |
} | |
public override object FromBinary(byte[] binary, string manifest) | |
{ | |
Func<byte[], object> deserializer; | |
if (_fromBinaryMap.TryGetValue(manifest, out deserializer)) | |
{ | |
return deserializer(binary); | |
} | |
throw new NotSupportedException($"Unimplemented deserialization of message with manifest [{manifest}] in [${GetType()}]"); | |
} | |
#region private ToProto methods | |
private Proto.Status StatusToProto(Status status) | |
{ | |
var builder = Proto.Status.CreateBuilder(); | |
builder.SetChunk(status.Chunk).SetTotChunks(status.TotalChunks); | |
foreach (var entry in status.Digests) | |
{ | |
builder.AddEntries(Proto.Status.Types.Entry.CreateBuilder() | |
.SetKey(entry.Key) | |
.SetDigest(entry.Value)); | |
} | |
return builder.Build(); | |
} | |
private Proto.Gossip GossipToProto(Gossip gossip) | |
{ | |
var builder = Proto.Gossip.CreateBuilder().SetSendBack(gossip.SendBack); | |
foreach (var entry in gossip.UpdatedData) | |
{ | |
builder.AddEntries(Proto.Gossip.Types.Entry.CreateBuilder() | |
.SetKey(entry.Key) | |
.SetEnvelope(DataEnvelopeToProto(entry.Value))); | |
} | |
return builder.Build(); | |
} | |
private Proto.Get GetToProto(Replicator.Get get) | |
{ | |
var builder = Proto.Get.CreateBuilder().SetKey(OtherMessageToProto(get.Key)); | |
if (get.Consistency is ReadLocal) builder.SetConsistency(1); | |
else if (get.Consistency is ReadFrom) builder.SetConsistency(((ReadFrom)get.Consistency).N); | |
else if (get.Consistency is ReadMajority) builder.SetConsistency(0); | |
else if (get.Consistency is ReadAll) builder.SetConsistency(-1); | |
builder.SetTimeout((uint)get.Consistency.Timeout.TotalMilliseconds); | |
if (get.Request != null) builder.SetRequest(OtherMessageToProto(get.Request)); | |
return builder.Build(); | |
} | |
private Proto.GetSuccess GetSuccessToProto(Replicator.GetSuccess getSuccess) | |
{ | |
var builder = Proto.GetSuccess.CreateBuilder() | |
.SetKey(OtherMessageToProto(getSuccess.Key)) | |
.SetData(OtherMessageToProto(getSuccess.Data)); | |
if (getSuccess.Request != null) builder.SetRequest(OtherMessageToProto(getSuccess.Request)); | |
return builder.Build(); | |
} | |
private Proto.NotFound NotFoundToProto(Replicator.NotFound notFound) | |
{ | |
var builder = Proto.NotFound.CreateBuilder() | |
.SetKey(OtherMessageToProto(notFound.Key)); | |
if (notFound.Request != null) builder.SetRequest(OtherMessageToProto(notFound.Request)); | |
return builder.Build(); | |
} | |
private Proto.GetFailure GetFailureToProto(Replicator.GetFailure notFound) | |
{ | |
var builder = Proto.GetFailure.CreateBuilder() | |
.SetKey(OtherMessageToProto(notFound.Key)); | |
if (notFound.Request != null) builder.SetRequest(OtherMessageToProto(notFound.Request)); | |
return builder.Build(); | |
} | |
private Proto.Subscribe SubscribeToProto(Replicator.Subscribe subscribe) => | |
Proto.Subscribe.CreateBuilder() | |
.SetKey(OtherMessageToProto(subscribe.Key)) | |
.SetRef(Akka.Serialization.Serialization.SerializedActorPath(subscribe.Subscriber)) | |
.Build(); | |
private Proto.Unsubscribe UnsubscribeToProto(Replicator.Unsubscribe unsubscribe) => | |
Proto.Unsubscribe.CreateBuilder() | |
.SetKey(OtherMessageToProto(unsubscribe.Key)) | |
.SetRef(Akka.Serialization.Serialization.SerializedActorPath(unsubscribe.Subscriber)) | |
.Build(); | |
private Proto.Changed ChangedToProto(Replicator.Changed changed) => | |
Proto.Changed.CreateBuilder() | |
.SetKey(OtherMessageToProto(changed.Key)) | |
.SetData(OtherMessageToProto(changed.Data)) | |
.Build(); | |
private Proto.DataEnvelope DataEnvelopeToProto(DataEnvelope envelope) | |
{ | |
var builder = Proto.DataEnvelope.CreateBuilder() | |
.SetData(OtherMessageToProto(envelope.Data)); | |
foreach (var entry in envelope.Pruning) | |
{ | |
var state = entry.Value; | |
var entryBuilder = Proto.DataEnvelope.Types.PruningEntry.CreateBuilder() | |
.SetRemovedAddress(UniqueAddressToProto(entry.Key)) | |
.SetOwnerAddress(UniqueAddressToProto(state.Owner)); | |
var initialized = state.Phase as PruningInitialized; | |
if (initialized != null) | |
{ | |
foreach (var seen in initialized.Seen) | |
{ | |
entryBuilder.AddSeen(AddressToProto(seen)); | |
} | |
entryBuilder.SetPerformed(false); | |
} | |
else if (state.Phase is PruningPerformed) | |
{ | |
entryBuilder.SetPerformed(true); | |
} | |
builder.AddPruning(entryBuilder); | |
} | |
return builder.Build(); | |
} | |
private Proto.Read ReadToProto(Read read) => Proto.Read.CreateBuilder().SetKey(read.Key).Build(); | |
private Proto.Write WriteToProto(Write write) => | |
Proto.Write.CreateBuilder() | |
.SetKey(write.Key) | |
.SetEnvelope(DataEnvelopeToProto(write.Envelope)) | |
.Build(); | |
private Proto.ReadResult ReadResultToProto(ReadResult readResult) | |
{ | |
var builder = Proto.ReadResult.CreateBuilder(); | |
if (readResult.Envelope != null) builder.SetEnvelope(DataEnvelopeToProto(readResult.Envelope)); | |
return builder.Build(); | |
} | |
private Proto.Address AddressToProto(Address address) | |
{ | |
if (address.Host == null || !address.Port.HasValue) | |
throw new ArgumentException($"Address [{address}] could not be serialized: host or port missing."); | |
return Proto.Address.CreateBuilder() | |
.SetHostname(address.Host) | |
.SetPort((uint) address.Port.Value) | |
.Build(); | |
} | |
private Proto.UniqueAddress UniqueAddressToProto(UniqueAddress uniqueAddress) => | |
Proto.UniqueAddress.CreateBuilder() | |
.SetAddress(AddressToProto(uniqueAddress.Address)) | |
.SetUid(uniqueAddress.Uid) | |
.Build(); | |
private Proto.OtherMessage OtherMessageToProto(object other) | |
{ | |
var serializer = system.Serialization.FindSerializerFor(other); | |
var builder = Proto.OtherMessage.CreateBuilder() | |
.SetEnclosedMessage(ByteString.CopyFrom(serializer.ToBinary(other))) | |
.SetSerializerId(serializer.Identifier); | |
var serializerWithStringManifest = serializer as SerializerWithStringManifest; | |
if (serializerWithStringManifest != null) | |
{ | |
var manifest = serializerWithStringManifest.Manifest(other); | |
if (!string.IsNullOrEmpty(manifest)) | |
{ | |
builder.SetMessageManifest(ByteString.CopyFromUtf8(manifest)); | |
} | |
} | |
else if (serializer.IncludeManifest) | |
{ | |
builder.SetMessageManifest(ByteString.CopyFromUtf8(other.GetType().TypeQualifiedName())); | |
} | |
return builder.Build(); | |
} | |
#endregion | |
#region FromProto methods | |
private Status StatusFromBinary(byte[] bytes) | |
{ | |
var status = Proto.Status.ParseFrom(bytes); | |
var entries = status.EntriesList | |
.Select(e => new KeyValuePair<string, ByteString>(e.Key, e.Digest)) | |
.ToImmutableDictionary(); | |
return new Status(entries, status.Chunk, status.TotChunks); | |
} | |
private Gossip GossipFromBinary(byte[] bytes) | |
{ | |
var gossip = Proto.Gossip.ParseFrom(bytes); | |
var entries = gossip.EntriesList | |
.Select(e => new KeyValuePair<string, DataEnvelope>(e.Key, DataEnvelopeFromProto(e.Envelope))) | |
.ToImmutableDictionary(); | |
return new Gossip(entries, gossip.SendBack); | |
} | |
private Replicator.Get GetFromBinary(byte[] bytes) | |
{ | |
var get = Proto.Get.ParseFrom(bytes); | |
var key = (IKey)OtherMessageFromProto(get.Key); | |
var request = get.HasRequest ? OtherMessageFromProto(get.Request) : null; | |
var timeout = TimeSpan.FromMilliseconds(get.Timeout); | |
IReadConsistency consistency; | |
switch (get.Consistency) | |
{ | |
case 0: consistency = new ReadMajority(timeout); break; | |
case -1: consistency = new ReadAll(timeout); break; | |
case 1: consistency = ReadLocal.Instance; break; | |
default: consistency = new ReadFrom(get.Consistency, timeout); break; | |
} | |
return new Replicator.Get(key, consistency, request); | |
} | |
private Replicator.GetSuccess GetSuccessFromBinary(byte[] bytes) | |
{ | |
var getSuccess = Proto.GetSuccess.ParseFrom(bytes); | |
var key = (IKey)OtherMessageFromProto(getSuccess.Key); | |
var request = getSuccess.HasRequest ? OtherMessageFromProto(getSuccess.Request) : null; | |
var data = (IReplicatedData)OtherMessageFromProto(getSuccess.Data); | |
return new Replicator.GetSuccess(key, request, data); | |
} | |
private Replicator.NotFound NotFoundFromBinary(byte[] bytes) | |
{ | |
var getSuccess = Proto.NotFound.ParseFrom(bytes); | |
var key = (IKey)OtherMessageFromProto(getSuccess.Key); | |
var request = getSuccess.HasRequest ? OtherMessageFromProto(getSuccess.Request) : null; | |
return new Replicator.NotFound(key, request); | |
} | |
private Replicator.GetFailure GetFailureFromBinary(byte[] bytes) | |
{ | |
var getSuccess = Proto.GetFailure.ParseFrom(bytes); | |
var key = (IKey)OtherMessageFromProto(getSuccess.Key); | |
var request = getSuccess.HasRequest ? OtherMessageFromProto(getSuccess.Request) : null; | |
return new Replicator.GetFailure(key, request); | |
} | |
private Replicator.Subscribe SubscribeFromBinary(byte[] bytes) | |
{ | |
var subscribe = Proto.Subscribe.ParseFrom(bytes); | |
var key = (IKey) OtherMessageFromProto(subscribe.Key); | |
var subscriber = ResolveActorRef(subscribe.Ref); | |
return new Replicator.Subscribe(key, subscriber); | |
} | |
private Replicator.Unsubscribe UnsubscribeFromBinary(byte[] bytes) | |
{ | |
var unsubscribe = Proto.Unsubscribe.ParseFrom(bytes); | |
var key = (IKey)OtherMessageFromProto(unsubscribe.Key); | |
var subscriber = ResolveActorRef(unsubscribe.Ref); | |
return new Replicator.Unsubscribe(key, subscriber); | |
} | |
private Replicator.Changed ChangedFromBinary(byte[] bytes) | |
{ | |
var changed = Proto.Changed.ParseFrom(bytes); | |
var key = (IKey)OtherMessageFromProto(changed.Key); | |
var data = (IReplicatedData)OtherMessageFromProto(changed.Data); | |
return new Replicator.Changed(key, data); | |
} | |
private DataEnvelope DataEnvelopeFromBinary(byte[] bytes) => DataEnvelopeFromProto(Proto.DataEnvelope.ParseFrom(bytes)); | |
private DataEnvelope DataEnvelopeFromProto(Proto.DataEnvelope envelope) | |
{ | |
var prunning = envelope.PruningList.Select(pruningEntry => | |
{ | |
var phase = pruningEntry.Performed | |
? (IPruningPhase) PruningPerformed.Instance | |
: new PruningInitialized(pruningEntry.SeenList.Select(AddressFromProto).ToImmutableHashSet()); | |
var state = new PruningState(UniqueAddressFromProto(pruningEntry.OwnerAddress), phase); | |
var removed = UniqueAddressFromProto(pruningEntry.RemovedAddress); | |
return new KeyValuePair<UniqueAddress, PruningState>(removed, state); | |
}) | |
.ToImmutableDictionary(); | |
var data = (IReplicatedData)OtherMessageFromProto(envelope.Data); | |
return new DataEnvelope(data, prunning); | |
} | |
private Write WriteFromBinary(byte[] bytes) | |
{ | |
var write = Proto.Write.ParseFrom(bytes); | |
return new Write(write.Key, DataEnvelopeFromProto(write.Envelope)); | |
} | |
private Read ReadFromBinary(byte[] bytes) => new Read(Proto.Read.ParseFrom(bytes).Key); | |
private ReadResult ReadResultFromBinary(byte[] bytes) | |
{ | |
var readResult = Proto.ReadResult.ParseFrom(bytes); | |
var envelope = readResult.HasEnvelope ? DataEnvelopeFromProto(readResult.Envelope) : null; | |
return new ReadResult(envelope); | |
} | |
private UniqueAddress UniqueAddressFromProto(Proto.UniqueAddress uniqueAddress) => | |
new UniqueAddress(AddressFromProto(uniqueAddress.Address), uniqueAddress.Uid); | |
private Address AddressFromProto(Proto.Address address) => | |
new Address(system.Provider.DefaultAddress.Protocol, system.Name, address.Hostname, (int)address.Port); | |
private IActorRef ResolveActorRef(string path) | |
{ | |
return system.Provider.ResolveActorRef(path); | |
} | |
private object OtherMessageFromProto(Proto.OtherMessage other) | |
{ | |
var manifest = other.HasMessageManifest ? other.MessageManifest.ToStringUtf8() : string.Empty; | |
return system.Serialization.Deserialize(other.EnclosedMessage.ToByteArray(), other.SerializerId, manifest); | |
} | |
#endregion | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment