Skip to content

Instantly share code, notes, and snippets.

@OnurGumus
Created December 6, 2019 06:58
Show Gist options
  • Save OnurGumus/1b77c32db4e8bb07443656157e5c8c5d to your computer and use it in GitHub Desktop.
Save OnurGumus/1b77c32db4e8bb07443656157e5c8c5d to your computer and use it in GitHub Desktop.
Akkling Sharding hacks
open System
open Akka.Actor
open Akka.Configuration
open Akka.Cluster
open Akka.Cluster.Tools.Singleton
open Akka.Cluster.Sharding
open Akka.Persistence
open Akka.Persistence.Sqlite
open Akkling
open Akkling.Persistence
open Akkling.Cluster
open Akkling.Cluster.Sharding
open Hyperion
open Akka.Serialization
printf "%A" <| typeof<Hyperion.Serializer>.Assembly.FullName
let configWithPort port =
let config = Configuration.parse ("""
akka {
actor {
provider = "Akka.Cluster.ClusterActorRefProvider, Akka.Cluster"
serializers {
hyperion = "Akka.Serialization.HyperionSerializer, Akka.Serialization.Hyperion"
}
serialization-bindings {
"System.Object" = hyperion
}
}
remote {
helios.tcp {
public-hostname = "localhost"
hostname = "localhost"
port = """ + port.ToString() + """
}
}
cluster {
auto-down-unreachable-after = 5s
seed-nodes = [ "akka.tcp://cluster-system@localhost:5000/" ]
sharding.remember-entities = true
}
persistence{
journal {
plugin = "akka.persistence.journal.sqlite"
sqlite
{
connection-string = "Data Source=mydb.db;"
auto-initialize = on
}
}
snapshot-store{
plugin = "akka.persistence.snapshot-store.sqlite"
sqlite {
auto-initialize = on
connection-string = "Data Source=mydb.db"
}
}
}
}
""")
config.WithFallback(ClusterSingletonManager.DefaultConfig())
let behavior (ctx : Actor<_>) msg = printfn "%A received %s" (ctx.Self.Path.ToStringWithAddress()) msg |> ignored
type Command = Vote of string
type VoteData = { Vote : string; Date : DateTime}
type Event = Voted of string
type internal TypedMessageExtractor<'Envelope, 'Message>(extractor: 'Envelope -> string*string*'Message) =
interface IMessageExtractor with
member this.ShardId message =
match message with
| :? 'Envelope as env ->
let shardId, _, _ = (extractor(env))
shardId
| :? Akka.Cluster.Sharding.ShardRegion.StartEntity as se -> printfn "%A" se.EntityId; "shard-1"
member this.EntityId message =
match message with
| :? 'Envelope as env ->
let _, entityId, _ = (extractor(env))
entityId
| _ ->printfn "kkj"; "entity-1"
member this.EntityMessage message =
match message with
| :? 'Envelope as env ->
let _, _, msg = (extractor(env))
box msg
| _ -> null
type Message =
| Command of Command
| Event of Event
open Akkling.Persistence
open Akka.Cluster.Sharding
open System.Threading
// HACK over persistent actors
type FunPersistentShardingActor<'Message>(actor : Eventsourced<'Message> -> Effect<'Message>) as this =
inherit FunPersistentActor<'Message>(actor)
// sharded actors are produced in path like /user/{name}/{shardId}/{entityId}, therefore "{name}/{shardId}/{entityId}" is peristenceId of an actor
let pid = this.Self.Path.Parent.Parent.Name + "/" + this.Self.Path.Parent.Name + "/" + this.Self.Path.Name
override this.PersistenceId = pid
// this function hacks persistent functional actors props by replacing them with dedicated sharded version using different PeristenceId strategy
let internal adjustPersistentProps (props: Props<'Message>) : Props<'Message> =
if props.ActorType = typeof<FunPersistentActor<'Message>>
then { props with ActorType = typeof<FunPersistentShardingActor<'Message>> }
else props
let entityFactoryFor2 (system: ActorSystem) (name: string) (props: Props<'Message>) : EntityFac<'Message> =
let clusterSharding = ClusterSharding.Get(system)
let adjustedProps = adjustPersistentProps props
let shardRegion = clusterSharding.Start(name, adjustedProps.ToProps(), ClusterShardingSettings.Create(system), new TypedMessageExtractor<_,_>(EntityRefs.entityRefExtractor))
{ ShardRegion = shardRegion; TypeName = name }
let actorProp (mailbox : Eventsourced<Message>)=
let rec set () =
actor {
let! msg = mailbox.Receive()
match msg with
| Command(Vote s) ->
printfn "%A %s" msg "oooooos"
// mailbox.Parent() <! Passivate(PoisonPill.Instance)
return s |> Voted |> Event |> Persist
| _ ->
printf "%A" msg
return! set()
// printfn "ooooooost"
// mailbox.System.Stop(downcast mailbox.Self.Underlying)
//return Stop
}
set()
// spawn two separate systems with shard regions on each of them
let system1 = System.create "cluster-system" (configWithPort 5000)
SqlitePersistence.Get(system1) |> ignore
let fac1 = entityFactoryFor2 system1 "printer" <| propsPersist ( actorProp)
// wait a while before starting a second system
System.Threading.Thread.Sleep 5000
//let system2 = System.create "cluster-system" (configWithPort 5001)
//let fac2 = entityFactoryFor system2 "printer" <| propsPersist (actorProp)
System.Threading.Thread.Sleep 5000
let entity1 = fac1.RefFor "shard-1" "entity-1"
//let john = fac1.RefFor "shard-2" "john"
//let alice = fac1.RefFor "shard-3" "alice"
//let frank = fac1.RefFor "shard-4" "frank"
let m : Message = ("hello" |> Vote |> Command)
entity1 <! m
Thread.Sleep(5000)
//System.Console.ReadKey()
//entity1.Underlying.Tell((GracefulShutdown.Instance), null)
// entity1 <! " world"
// john <! "hello John"
// alice <! "hello Alice"
// frank <! "hello Frank"
// check which shards have been build on the second shard region
System.Threading.Thread.Sleep(5000)
open Akka.Cluster.Sharding
let printShards shardReg =
async {
let! (stats: ShardRegionStats) = (typed shardReg) <? GetShardRegionStats.Instance
for kv in stats.Stats do
printfn "\tShard '%s' has %d entities on it" kv.Key kv.Value
} |> Async.RunSynchronously
printfn "Shards active on node 'localhost:5000':"
//printShards fac1.ShardRegion
printfn "Shards active on node 'localhost:5001':"
System.Console.ReadKey()
fac1.ShardRegion.Tell(Passivate(PoisonPill.Instance), downcast entity1.Underlying)
//entity1.Underlying.Tell((GracefulShutdown.Instance), null)
System.Console.ReadKey()
//printShards fac2.ShardRegion
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment