Created
December 6, 2019 06:58
-
-
Save OnurGumus/1b77c32db4e8bb07443656157e5c8c5d to your computer and use it in GitHub Desktop.
Akkling Sharding hacks
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
open System | |
open Akka.Actor | |
open Akka.Configuration | |
open Akka.Cluster | |
open Akka.Cluster.Tools.Singleton | |
open Akka.Cluster.Sharding | |
open Akka.Persistence | |
open Akka.Persistence.Sqlite | |
open Akkling | |
open Akkling.Persistence | |
open Akkling.Cluster | |
open Akkling.Cluster.Sharding | |
open Hyperion | |
open Akka.Serialization | |
printf "%A" <| typeof<Hyperion.Serializer>.Assembly.FullName | |
let configWithPort port = | |
let config = Configuration.parse (""" | |
akka { | |
actor { | |
provider = "Akka.Cluster.ClusterActorRefProvider, Akka.Cluster" | |
serializers { | |
hyperion = "Akka.Serialization.HyperionSerializer, Akka.Serialization.Hyperion" | |
} | |
serialization-bindings { | |
"System.Object" = hyperion | |
} | |
} | |
remote { | |
helios.tcp { | |
public-hostname = "localhost" | |
hostname = "localhost" | |
port = """ + port.ToString() + """ | |
} | |
} | |
cluster { | |
auto-down-unreachable-after = 5s | |
seed-nodes = [ "akka.tcp://cluster-system@localhost:5000/" ] | |
sharding.remember-entities = true | |
} | |
persistence{ | |
journal { | |
plugin = "akka.persistence.journal.sqlite" | |
sqlite | |
{ | |
connection-string = "Data Source=mydb.db;" | |
auto-initialize = on | |
} | |
} | |
snapshot-store{ | |
plugin = "akka.persistence.snapshot-store.sqlite" | |
sqlite { | |
auto-initialize = on | |
connection-string = "Data Source=mydb.db" | |
} | |
} | |
} | |
} | |
""") | |
config.WithFallback(ClusterSingletonManager.DefaultConfig()) | |
let behavior (ctx : Actor<_>) msg = printfn "%A received %s" (ctx.Self.Path.ToStringWithAddress()) msg |> ignored | |
type Command = Vote of string | |
type VoteData = { Vote : string; Date : DateTime} | |
type Event = Voted of string | |
type internal TypedMessageExtractor<'Envelope, 'Message>(extractor: 'Envelope -> string*string*'Message) = | |
interface IMessageExtractor with | |
member this.ShardId message = | |
match message with | |
| :? 'Envelope as env -> | |
let shardId, _, _ = (extractor(env)) | |
shardId | |
| :? Akka.Cluster.Sharding.ShardRegion.StartEntity as se -> printfn "%A" se.EntityId; "shard-1" | |
member this.EntityId message = | |
match message with | |
| :? 'Envelope as env -> | |
let _, entityId, _ = (extractor(env)) | |
entityId | |
| _ ->printfn "kkj"; "entity-1" | |
member this.EntityMessage message = | |
match message with | |
| :? 'Envelope as env -> | |
let _, _, msg = (extractor(env)) | |
box msg | |
| _ -> null | |
type Message = | |
| Command of Command | |
| Event of Event | |
open Akkling.Persistence | |
open Akka.Cluster.Sharding | |
open System.Threading | |
// HACK over persistent actors | |
type FunPersistentShardingActor<'Message>(actor : Eventsourced<'Message> -> Effect<'Message>) as this = | |
inherit FunPersistentActor<'Message>(actor) | |
// sharded actors are produced in path like /user/{name}/{shardId}/{entityId}, therefore "{name}/{shardId}/{entityId}" is peristenceId of an actor | |
let pid = this.Self.Path.Parent.Parent.Name + "/" + this.Self.Path.Parent.Name + "/" + this.Self.Path.Name | |
override this.PersistenceId = pid | |
// this function hacks persistent functional actors props by replacing them with dedicated sharded version using different PeristenceId strategy | |
let internal adjustPersistentProps (props: Props<'Message>) : Props<'Message> = | |
if props.ActorType = typeof<FunPersistentActor<'Message>> | |
then { props with ActorType = typeof<FunPersistentShardingActor<'Message>> } | |
else props | |
let entityFactoryFor2 (system: ActorSystem) (name: string) (props: Props<'Message>) : EntityFac<'Message> = | |
let clusterSharding = ClusterSharding.Get(system) | |
let adjustedProps = adjustPersistentProps props | |
let shardRegion = clusterSharding.Start(name, adjustedProps.ToProps(), ClusterShardingSettings.Create(system), new TypedMessageExtractor<_,_>(EntityRefs.entityRefExtractor)) | |
{ ShardRegion = shardRegion; TypeName = name } | |
let actorProp (mailbox : Eventsourced<Message>)= | |
let rec set () = | |
actor { | |
let! msg = mailbox.Receive() | |
match msg with | |
| Command(Vote s) -> | |
printfn "%A %s" msg "oooooos" | |
// mailbox.Parent() <! Passivate(PoisonPill.Instance) | |
return s |> Voted |> Event |> Persist | |
| _ -> | |
printf "%A" msg | |
return! set() | |
// printfn "ooooooost" | |
// mailbox.System.Stop(downcast mailbox.Self.Underlying) | |
//return Stop | |
} | |
set() | |
// spawn two separate systems with shard regions on each of them | |
let system1 = System.create "cluster-system" (configWithPort 5000) | |
SqlitePersistence.Get(system1) |> ignore | |
let fac1 = entityFactoryFor2 system1 "printer" <| propsPersist ( actorProp) | |
// wait a while before starting a second system | |
System.Threading.Thread.Sleep 5000 | |
//let system2 = System.create "cluster-system" (configWithPort 5001) | |
//let fac2 = entityFactoryFor system2 "printer" <| propsPersist (actorProp) | |
System.Threading.Thread.Sleep 5000 | |
let entity1 = fac1.RefFor "shard-1" "entity-1" | |
//let john = fac1.RefFor "shard-2" "john" | |
//let alice = fac1.RefFor "shard-3" "alice" | |
//let frank = fac1.RefFor "shard-4" "frank" | |
let m : Message = ("hello" |> Vote |> Command) | |
entity1 <! m | |
Thread.Sleep(5000) | |
//System.Console.ReadKey() | |
//entity1.Underlying.Tell((GracefulShutdown.Instance), null) | |
// entity1 <! " world" | |
// john <! "hello John" | |
// alice <! "hello Alice" | |
// frank <! "hello Frank" | |
// check which shards have been build on the second shard region | |
System.Threading.Thread.Sleep(5000) | |
open Akka.Cluster.Sharding | |
let printShards shardReg = | |
async { | |
let! (stats: ShardRegionStats) = (typed shardReg) <? GetShardRegionStats.Instance | |
for kv in stats.Stats do | |
printfn "\tShard '%s' has %d entities on it" kv.Key kv.Value | |
} |> Async.RunSynchronously | |
printfn "Shards active on node 'localhost:5000':" | |
//printShards fac1.ShardRegion | |
printfn "Shards active on node 'localhost:5001':" | |
System.Console.ReadKey() | |
fac1.ShardRegion.Tell(Passivate(PoisonPill.Instance), downcast entity1.Underlying) | |
//entity1.Underlying.Tell((GracefulShutdown.Instance), null) | |
System.Console.ReadKey() | |
//printShards fac2.ShardRegion |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment