Created
January 26, 2017 09:08
-
-
Save Horusiath/85f9593bd086dfe439bef476e658e07f to your computer and use it in GitHub Desktop.
Akka.Persistence example with proto-buf.net as a custom domain event serializer
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using Akka.Actor; | |
using Akka.Configuration; | |
using Akka.Persistence.SqlServer; | |
namespace TestApp | |
{ | |
class Program | |
{ | |
public static void Main() | |
{ | |
var config = ConfigurationFactory.ParseString(@" | |
akka { | |
actor { | |
serializers { | |
proto-buf = ""TestApp.ProtoBufSerializer, TestApp"" | |
} | |
serialization-bindings { | |
""TestApp.IProtoBufSerializable, TestApp"" = proto-buf | |
} | |
serialization-identifiers { | |
""TestApp.ProtoBufSerializer, TestApp"" = 120 | |
} | |
} | |
persistence { | |
journal { | |
plugin = ""akka.persistence.journal.sql-server"" | |
sql-server { | |
auto-initialize = on | |
connection-string = ""Server=.;Database=akka_persistence_tests;Trusted_Connection=True;"" | |
} | |
} | |
snapshot-store { | |
plugin = ""akka.persistence.snapshot-store.sql-server"" | |
sql-server { | |
auto-initialize = on | |
connection-string = ""Server=.;Database=akka_persistence_tests;Trusted_Connection=True;"" | |
} | |
} | |
} | |
} | |
").WithFallback(SqlServerPersistence.DefaultConfiguration()); | |
using (var system = ActorSystem.Create("system", config)) | |
{ | |
var pref = system.ActorOf(Props.Create(() => new ShoppingCart("@user")), "@user"); | |
for (int i = 0; i < 100; i++) | |
{ | |
pref.Tell(new AddItem("item-" + i)); | |
} | |
var state = pref.Ask<ShoppingCartState>(new GetState()).Result; | |
Console.WriteLine($"{pref} state is: {string.Join(", ", state.Items)}"); | |
Console.ReadLine(); | |
} | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.IO; | |
using Akka.Actor; | |
namespace TestApp | |
{ | |
// protobuf.net serializer | |
public class ProtoBufSerializer : Akka.Serialization.Serializer | |
{ | |
public override bool IncludeManifest => false; | |
public ProtoBufSerializer(ExtendedActorSystem system) : base(system) | |
{ | |
} | |
public override byte[] ToBinary(object obj) | |
{ | |
// good idea is to use Microsoft.IO.RecyclableMemoryStream here | |
using (var stream = new MemoryStream()) | |
{ | |
ProtoBuf.Serializer.Serialize(stream, obj); | |
stream.Position = 0; | |
return stream.ToArray(); | |
} | |
} | |
public override object FromBinary(byte[] bytes, Type type) | |
{ | |
// good idea is to use Microsoft.IO.RecyclableMemoryStream here | |
using (var stream = new MemoryStream(bytes)) | |
{ | |
return ProtoBuf.Serializer.Deserialize(type, stream); | |
} | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using Akka.Persistence; | |
using ProtoBuf; | |
namespace TestApp | |
{ | |
#region messages | |
public interface IProtoBufSerializable { } | |
public sealed class GetState { } | |
public sealed class AddItem | |
{ | |
public readonly string Name; | |
public AddItem(string name) | |
{ | |
Name = name; | |
} | |
} | |
[ProtoContract] | |
public sealed class ItemAdded : IProtoBufSerializable | |
{ | |
[ProtoMember(1)] public readonly string Name; | |
[ProtoMember(2)] public readonly DateTime Timestamp; | |
public ItemAdded(string name, DateTime timestamp) | |
{ | |
Name = name; | |
Timestamp = timestamp; | |
} | |
} | |
#endregion | |
[ProtoContract] | |
public sealed class ShoppingCartState : IProtoBufSerializable | |
{ | |
[ProtoMember(1)] | |
public readonly List<string> Items = new List<string>(); | |
} | |
public sealed class ShoppingCart : ReceivePersistentActor | |
{ | |
private const int SnapshotAfter = 5; | |
public override string PersistenceId { get; } | |
private ShoppingCartState state; | |
private int snapshotCounter = 0; | |
public ShoppingCart(string persistenceId) | |
{ | |
PersistenceId = persistenceId; | |
state = new ShoppingCartState(); | |
Recover<ItemAdded>(e => UpdateState(e)); | |
Recover<SnapshotOffer>(offer => | |
{ | |
state = offer.Snapshot as ShoppingCartState ?? new ShoppingCartState(); | |
}); | |
Command<AddItem>(cmd => | |
{ | |
Persist(new ItemAdded(cmd.Name, DateTime.UtcNow), e => | |
{ | |
UpdateState(e); | |
// snapshot every 5 events | |
if ((++snapshotCounter) % SnapshotAfter == 0) | |
{ | |
SaveSnapshot(state); | |
} | |
}); | |
}); | |
Command<GetState>(_ => Sender.Tell(state, Self)); | |
} | |
private void UpdateState(ItemAdded e) | |
{ | |
state.Items.Add(e.Name); | |
} | |
} | |
} |
I am trying to mimic your solution but got type null in FromBinary.
@Havret I think you can fix that by enabling the manifest.
https://getakka.net/articles/networking/serialization.html#serializer-with-string-manifest
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This works great for our custom domain events being persisted, but I noticed all the internal akka messages, like the ones persisted by the Sharding, are still serialized with the default serializer (wire) event thought I can see there's a ClusterShardingMessageSerializer with support for Google's Protobuf in place. But for some reason it doesn't kick in. I thought the idea would be to have everything that goes into persistence serialized in Google Protobuf, including akka's own messages.