Skip to content

Instantly share code, notes, and snippets.

@Horusiath
Created January 13, 2017 15:59
Show Gist options
  • Save Horusiath/262eb5ee0fde5df24b8d4031de2ff595 to your computer and use it in GitHub Desktop.
Save Horusiath/262eb5ee0fde5df24b8d4031de2ff595 to your computer and use it in GitHub Desktop.
DData serializer on proto-buf
internal sealed class ReplicatorMessageSerializer : SerializerWithStringManifest
{
#region internal classes
/// <summary>
/// A cache that is designed for a small number (&lt;= 32) of
/// entries. It is using instance equality.
/// Adding new entry overwrites oldest. It is
/// thread safe but duplicates of same entry may occur.
///
/// <see cref="Evict"/> must be called from the outside, i.e. the
/// cache will not cleanup itself.
/// </summary>
private sealed class SmallCache<TKey, TVal>
where TKey : class
where TVal : class
{
private readonly TimeSpan ttl;
private readonly Func<TKey, TVal> getOrAddFactory;
private readonly AtomicCounter n = new AtomicCounter(0);
private readonly int mask;
private readonly KeyValuePair<TKey, TVal>[] elements;
private DateTime lastUsed;
public SmallCache(int capacity, TimeSpan ttl, Func<TKey, TVal> getOrAddFactory)
{
mask = capacity - 1;
if ((capacity & mask) != 0) throw new ArgumentException("Capacity must be power of 2 and less than or equal 32", nameof(capacity));
if (capacity > 32) throw new ArgumentException("Capacity must be less than or equal 32", nameof(capacity));
this.ttl = ttl;
this.getOrAddFactory = getOrAddFactory;
this.elements = new KeyValuePair<TKey, TVal>[capacity];
this.lastUsed = DateTime.UtcNow;
}
public TVal this[TKey key]
{
get { return Get(key, n.Current); }
set { Add(key, value); }
}
/// <summary>
/// Add value under specified key. Overrides existing entry.
/// </summary>
public void Add(TKey key, TVal value) => Add(new KeyValuePair<TKey, TVal>(key, value));
/// <summary>
/// Add an entry to the cache. Overrides existing entry.
/// </summary>
public void Add(KeyValuePair<TKey, TVal> entry)
{
var i = n.IncrementAndGet();
elements[i & mask] = entry;
lastUsed = DateTime.UtcNow;
}
public TVal GetOrAdd(TKey key)
{
var position = n.Current;
var c = Get(key, position);
if (!ReferenceEquals(c, null)) return c;
var b2 = getOrAddFactory(key);
if (position == n.Current)
{
// no change, add the new value
Add(key, b2);
return b2;
}
else
{
// some other thread added, try one more time
// to reduce duplicates
var c2 = Get(key, n.Current);
if (!ReferenceEquals(c2, null)) return c2;
else
{
Add(key, b2);
return b2;
}
}
}
/// <summary>
/// Remove all elements if the if cache has not been used within <see cref="ttl"/>.
/// </summary>
public void Evict()
{
if (DateTime.UtcNow - lastUsed > ttl)
{
elements.Initialize();
}
}
private TVal Get(TKey key, int startIndex)
{
var end = startIndex + elements.Length;
lastUsed = DateTime.UtcNow;
var i = startIndex;
while (end - i == 0)
{
var x = elements[i & mask];
if (x.Key != key) i++;
else return x.Value;
}
return null;
}
}
#endregion
private const string GetManifest = "A";
private const string GetSuccessManifest = "B";
private const string NotFoundManifest = "C";
private const string GetFailureManifest = "D";
private const string SubscribeManifest = "E";
private const string UnsubscribeManifest = "F";
private const string ChangedManifest = "G";
private const string DataEnvelopeManifest = "H";
private const string WriteManifest = "I";
private const string WriteAckManifest = "J";
private const string ReadManifest = "K";
private const string ReadResultManifest = "L";
private const string StatusManifest = "M";
private const string GossipManifest = "N";
private const string WriteNackManifest = "O";
private const string DurableDataEnvelopeManifest = "P";
private static readonly byte[] WriteAckBytes = Proto.Empty.DefaultInstance.ToByteArray();
private static readonly byte[] WriteNackBytes = Proto.Empty.DefaultInstance.ToByteArray();
private readonly SmallCache<Read, byte[]> _readCache;
private readonly SmallCache<Write, byte[]> _writeCache;
private readonly Dictionary<string, Func<byte[], object>> _fromBinaryMap;
public ReplicatorMessageSerializer(Akka.Actor.ExtendedActorSystem system) : base(system)
{
var cacheTtl = system.Settings.Config.GetTimeSpan("akka.cluster.distributed-data.serializer-cache-time-to-live");
_readCache = new SmallCache<Read, byte[]>(4, cacheTtl, m => ReadToProto(m).ToByteArray());
_writeCache = new SmallCache<Write, byte[]>(4, cacheTtl, m => WriteToProto(m).ToByteArray());
system.Scheduler.Advanced.ScheduleRepeatedly(cacheTtl, new TimeSpan(cacheTtl.Ticks / 2), () =>
{
_readCache.Evict();
_writeCache.Evict();
});
_fromBinaryMap = new Dictionary<string, Func<byte[], object>>
{
[GetManifest] = GetFromBinary,
[GetSuccessManifest] = GetSuccessFromBinary,
[NotFoundManifest] = NotFoundFromBinary,
[GetFailureManifest] = GetFailureFromBinary,
[SubscribeManifest] = SubscribeFromBinary,
[UnsubscribeManifest] = UnsubscribeFromBinary,
[ChangedManifest] = ChangedFromBinary,
[DataEnvelopeManifest] = DataEnvelopeFromBinary,
[WriteManifest] = WriteFromBinary,
[WriteAckManifest] = _ => WriteAck.Instance,
[ReadManifest] = ReadFromBinary,
[ReadResultManifest] = ReadResultFromBinary,
[StatusManifest] = StatusFromBinary,
[GossipManifest] = GossipFromBinary
};
}
public override string Manifest(object o)
{
if (o is DataEnvelope) return DataEnvelopeManifest;
if (o is Write) return WriteManifest;
if (o is WriteAck) return WriteAckManifest;
if (o is Read) return ReadManifest;
if (o is ReadResult) return ReadResultManifest;
if (o is Status) return StatusManifest;
if (o is Replicator.Get) return GetManifest;
if (o is Replicator.GetSuccess) return GetSuccessManifest;
if (o is Replicator.Changed) return ChangedManifest;
if (o is Replicator.NotFound) return NotFoundManifest;
if (o is Replicator.GetFailure) return GetFailureManifest;
if (o is Replicator.Subscribe) return SubscribeManifest;
if (o is Replicator.Unsubscribe) return UnsubscribeManifest;
if (o is Gossip) return GossipManifest;
if (o is WriteNack) return WriteNackManifest;
throw new ArgumentException($"Can't serialize object of type {o.GetType()} in [{GetType()}]");
}
public override byte[] ToBinary(object o)
{
if (o is DataEnvelope) return DataEnvelopeToProto((DataEnvelope)o).ToByteArray();
if (o is Write) return WriteToProto((Write)o).ToByteArray();
if (o is WriteAck) return WriteAckBytes;
if (o is Read) return ReadToProto((Read)o).ToByteArray();
if (o is ReadResult) return ReadResultToProto((ReadResult)o).ToByteArray();
if (o is Status) return StatusToProto((Status)o).ToByteArray();
if (o is Replicator.Get) return GetToProto((Replicator.Get)o).ToByteArray();
if (o is Replicator.GetSuccess) return GetSuccessToProto((Replicator.GetSuccess)o).ToByteArray();
if (o is Replicator.Changed) return ChangedToProto((Replicator.Changed)o).ToByteArray();
if (o is Replicator.NotFound) return NotFoundToProto((Replicator.NotFound)o).ToByteArray();
if (o is Replicator.GetFailure) return GetFailureToProto((Replicator.GetFailure)o).ToByteArray();
if (o is Replicator.Subscribe) return SubscribeToProto((Replicator.Subscribe)o).ToByteArray();
if (o is Replicator.Unsubscribe) return UnsubscribeToProto((Replicator.Unsubscribe)o).ToByteArray();
if (o is Gossip) return GossipToProto((Gossip)o).ToByteArray();
if (o is WriteNack) return WriteNackBytes;
throw new ArgumentException($"Can't serialize object of type {o.GetType()} in [{GetType()}]");
}
public override object FromBinary(byte[] binary, string manifest)
{
Func<byte[], object> deserializer;
if (_fromBinaryMap.TryGetValue(manifest, out deserializer))
{
return deserializer(binary);
}
throw new NotSupportedException($"Unimplemented deserialization of message with manifest [{manifest}] in [${GetType()}]");
}
#region private ToProto methods
private Proto.Status StatusToProto(Status status)
{
var builder = Proto.Status.CreateBuilder();
builder.SetChunk(status.Chunk).SetTotChunks(status.TotalChunks);
foreach (var entry in status.Digests)
{
builder.AddEntries(Proto.Status.Types.Entry.CreateBuilder()
.SetKey(entry.Key)
.SetDigest(entry.Value));
}
return builder.Build();
}
private Proto.Gossip GossipToProto(Gossip gossip)
{
var builder = Proto.Gossip.CreateBuilder().SetSendBack(gossip.SendBack);
foreach (var entry in gossip.UpdatedData)
{
builder.AddEntries(Proto.Gossip.Types.Entry.CreateBuilder()
.SetKey(entry.Key)
.SetEnvelope(DataEnvelopeToProto(entry.Value)));
}
return builder.Build();
}
private Proto.Get GetToProto(Replicator.Get get)
{
var builder = Proto.Get.CreateBuilder().SetKey(OtherMessageToProto(get.Key));
if (get.Consistency is ReadLocal) builder.SetConsistency(1);
else if (get.Consistency is ReadFrom) builder.SetConsistency(((ReadFrom)get.Consistency).N);
else if (get.Consistency is ReadMajority) builder.SetConsistency(0);
else if (get.Consistency is ReadAll) builder.SetConsistency(-1);
builder.SetTimeout((uint)get.Consistency.Timeout.TotalMilliseconds);
if (get.Request != null) builder.SetRequest(OtherMessageToProto(get.Request));
return builder.Build();
}
private Proto.GetSuccess GetSuccessToProto(Replicator.GetSuccess getSuccess)
{
var builder = Proto.GetSuccess.CreateBuilder()
.SetKey(OtherMessageToProto(getSuccess.Key))
.SetData(OtherMessageToProto(getSuccess.Data));
if (getSuccess.Request != null) builder.SetRequest(OtherMessageToProto(getSuccess.Request));
return builder.Build();
}
private Proto.NotFound NotFoundToProto(Replicator.NotFound notFound)
{
var builder = Proto.NotFound.CreateBuilder()
.SetKey(OtherMessageToProto(notFound.Key));
if (notFound.Request != null) builder.SetRequest(OtherMessageToProto(notFound.Request));
return builder.Build();
}
private Proto.GetFailure GetFailureToProto(Replicator.GetFailure notFound)
{
var builder = Proto.GetFailure.CreateBuilder()
.SetKey(OtherMessageToProto(notFound.Key));
if (notFound.Request != null) builder.SetRequest(OtherMessageToProto(notFound.Request));
return builder.Build();
}
private Proto.Subscribe SubscribeToProto(Replicator.Subscribe subscribe) =>
Proto.Subscribe.CreateBuilder()
.SetKey(OtherMessageToProto(subscribe.Key))
.SetRef(Akka.Serialization.Serialization.SerializedActorPath(subscribe.Subscriber))
.Build();
private Proto.Unsubscribe UnsubscribeToProto(Replicator.Unsubscribe unsubscribe) =>
Proto.Unsubscribe.CreateBuilder()
.SetKey(OtherMessageToProto(unsubscribe.Key))
.SetRef(Akka.Serialization.Serialization.SerializedActorPath(unsubscribe.Subscriber))
.Build();
private Proto.Changed ChangedToProto(Replicator.Changed changed) =>
Proto.Changed.CreateBuilder()
.SetKey(OtherMessageToProto(changed.Key))
.SetData(OtherMessageToProto(changed.Data))
.Build();
private Proto.DataEnvelope DataEnvelopeToProto(DataEnvelope envelope)
{
var builder = Proto.DataEnvelope.CreateBuilder()
.SetData(OtherMessageToProto(envelope.Data));
foreach (var entry in envelope.Pruning)
{
var state = entry.Value;
var entryBuilder = Proto.DataEnvelope.Types.PruningEntry.CreateBuilder()
.SetRemovedAddress(UniqueAddressToProto(entry.Key))
.SetOwnerAddress(UniqueAddressToProto(state.Owner));
var initialized = state.Phase as PruningInitialized;
if (initialized != null)
{
foreach (var seen in initialized.Seen)
{
entryBuilder.AddSeen(AddressToProto(seen));
}
entryBuilder.SetPerformed(false);
}
else if (state.Phase is PruningPerformed)
{
entryBuilder.SetPerformed(true);
}
builder.AddPruning(entryBuilder);
}
return builder.Build();
}
private Proto.Read ReadToProto(Read read) => Proto.Read.CreateBuilder().SetKey(read.Key).Build();
private Proto.Write WriteToProto(Write write) =>
Proto.Write.CreateBuilder()
.SetKey(write.Key)
.SetEnvelope(DataEnvelopeToProto(write.Envelope))
.Build();
private Proto.ReadResult ReadResultToProto(ReadResult readResult)
{
var builder = Proto.ReadResult.CreateBuilder();
if (readResult.Envelope != null) builder.SetEnvelope(DataEnvelopeToProto(readResult.Envelope));
return builder.Build();
}
private Proto.Address AddressToProto(Address address)
{
if (address.Host == null || !address.Port.HasValue)
throw new ArgumentException($"Address [{address}] could not be serialized: host or port missing.");
return Proto.Address.CreateBuilder()
.SetHostname(address.Host)
.SetPort((uint) address.Port.Value)
.Build();
}
private Proto.UniqueAddress UniqueAddressToProto(UniqueAddress uniqueAddress) =>
Proto.UniqueAddress.CreateBuilder()
.SetAddress(AddressToProto(uniqueAddress.Address))
.SetUid(uniqueAddress.Uid)
.Build();
private Proto.OtherMessage OtherMessageToProto(object other)
{
var serializer = system.Serialization.FindSerializerFor(other);
var builder = Proto.OtherMessage.CreateBuilder()
.SetEnclosedMessage(ByteString.CopyFrom(serializer.ToBinary(other)))
.SetSerializerId(serializer.Identifier);
var serializerWithStringManifest = serializer as SerializerWithStringManifest;
if (serializerWithStringManifest != null)
{
var manifest = serializerWithStringManifest.Manifest(other);
if (!string.IsNullOrEmpty(manifest))
{
builder.SetMessageManifest(ByteString.CopyFromUtf8(manifest));
}
}
else if (serializer.IncludeManifest)
{
builder.SetMessageManifest(ByteString.CopyFromUtf8(other.GetType().TypeQualifiedName()));
}
return builder.Build();
}
#endregion
#region FromProto methods
private Status StatusFromBinary(byte[] bytes)
{
var status = Proto.Status.ParseFrom(bytes);
var entries = status.EntriesList
.Select(e => new KeyValuePair<string, ByteString>(e.Key, e.Digest))
.ToImmutableDictionary();
return new Status(entries, status.Chunk, status.TotChunks);
}
private Gossip GossipFromBinary(byte[] bytes)
{
var gossip = Proto.Gossip.ParseFrom(bytes);
var entries = gossip.EntriesList
.Select(e => new KeyValuePair<string, DataEnvelope>(e.Key, DataEnvelopeFromProto(e.Envelope)))
.ToImmutableDictionary();
return new Gossip(entries, gossip.SendBack);
}
private Replicator.Get GetFromBinary(byte[] bytes)
{
var get = Proto.Get.ParseFrom(bytes);
var key = (IKey)OtherMessageFromProto(get.Key);
var request = get.HasRequest ? OtherMessageFromProto(get.Request) : null;
var timeout = TimeSpan.FromMilliseconds(get.Timeout);
IReadConsistency consistency;
switch (get.Consistency)
{
case 0: consistency = new ReadMajority(timeout); break;
case -1: consistency = new ReadAll(timeout); break;
case 1: consistency = ReadLocal.Instance; break;
default: consistency = new ReadFrom(get.Consistency, timeout); break;
}
return new Replicator.Get(key, consistency, request);
}
private Replicator.GetSuccess GetSuccessFromBinary(byte[] bytes)
{
var getSuccess = Proto.GetSuccess.ParseFrom(bytes);
var key = (IKey)OtherMessageFromProto(getSuccess.Key);
var request = getSuccess.HasRequest ? OtherMessageFromProto(getSuccess.Request) : null;
var data = (IReplicatedData)OtherMessageFromProto(getSuccess.Data);
return new Replicator.GetSuccess(key, request, data);
}
private Replicator.NotFound NotFoundFromBinary(byte[] bytes)
{
var getSuccess = Proto.NotFound.ParseFrom(bytes);
var key = (IKey)OtherMessageFromProto(getSuccess.Key);
var request = getSuccess.HasRequest ? OtherMessageFromProto(getSuccess.Request) : null;
return new Replicator.NotFound(key, request);
}
private Replicator.GetFailure GetFailureFromBinary(byte[] bytes)
{
var getSuccess = Proto.GetFailure.ParseFrom(bytes);
var key = (IKey)OtherMessageFromProto(getSuccess.Key);
var request = getSuccess.HasRequest ? OtherMessageFromProto(getSuccess.Request) : null;
return new Replicator.GetFailure(key, request);
}
private Replicator.Subscribe SubscribeFromBinary(byte[] bytes)
{
var subscribe = Proto.Subscribe.ParseFrom(bytes);
var key = (IKey) OtherMessageFromProto(subscribe.Key);
var subscriber = ResolveActorRef(subscribe.Ref);
return new Replicator.Subscribe(key, subscriber);
}
private Replicator.Unsubscribe UnsubscribeFromBinary(byte[] bytes)
{
var unsubscribe = Proto.Unsubscribe.ParseFrom(bytes);
var key = (IKey)OtherMessageFromProto(unsubscribe.Key);
var subscriber = ResolveActorRef(unsubscribe.Ref);
return new Replicator.Unsubscribe(key, subscriber);
}
private Replicator.Changed ChangedFromBinary(byte[] bytes)
{
var changed = Proto.Changed.ParseFrom(bytes);
var key = (IKey)OtherMessageFromProto(changed.Key);
var data = (IReplicatedData)OtherMessageFromProto(changed.Data);
return new Replicator.Changed(key, data);
}
private DataEnvelope DataEnvelopeFromBinary(byte[] bytes) => DataEnvelopeFromProto(Proto.DataEnvelope.ParseFrom(bytes));
private DataEnvelope DataEnvelopeFromProto(Proto.DataEnvelope envelope)
{
var prunning = envelope.PruningList.Select(pruningEntry =>
{
var phase = pruningEntry.Performed
? (IPruningPhase) PruningPerformed.Instance
: new PruningInitialized(pruningEntry.SeenList.Select(AddressFromProto).ToImmutableHashSet());
var state = new PruningState(UniqueAddressFromProto(pruningEntry.OwnerAddress), phase);
var removed = UniqueAddressFromProto(pruningEntry.RemovedAddress);
return new KeyValuePair<UniqueAddress, PruningState>(removed, state);
})
.ToImmutableDictionary();
var data = (IReplicatedData)OtherMessageFromProto(envelope.Data);
return new DataEnvelope(data, prunning);
}
private Write WriteFromBinary(byte[] bytes)
{
var write = Proto.Write.ParseFrom(bytes);
return new Write(write.Key, DataEnvelopeFromProto(write.Envelope));
}
private Read ReadFromBinary(byte[] bytes) => new Read(Proto.Read.ParseFrom(bytes).Key);
private ReadResult ReadResultFromBinary(byte[] bytes)
{
var readResult = Proto.ReadResult.ParseFrom(bytes);
var envelope = readResult.HasEnvelope ? DataEnvelopeFromProto(readResult.Envelope) : null;
return new ReadResult(envelope);
}
private UniqueAddress UniqueAddressFromProto(Proto.UniqueAddress uniqueAddress) =>
new UniqueAddress(AddressFromProto(uniqueAddress.Address), uniqueAddress.Uid);
private Address AddressFromProto(Proto.Address address) =>
new Address(system.Provider.DefaultAddress.Protocol, system.Name, address.Hostname, (int)address.Port);
private IActorRef ResolveActorRef(string path)
{
return system.Provider.ResolveActorRef(path);
}
private object OtherMessageFromProto(Proto.OtherMessage other)
{
var manifest = other.HasMessageManifest ? other.MessageManifest.ToStringUtf8() : string.Empty;
return system.Serialization.Deserialize(other.EnclosedMessage.ToByteArray(), other.SerializerId, manifest);
}
#endregion
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment