Skip to content

Instantly share code, notes, and snippets.

@OnurGumus
Last active September 12, 2024 15:29
Show Gist options
  • Save OnurGumus/8397342ce79244a3832daead04ac885d to your computer and use it in GitHub Desktop.
Save OnurGumus/8397342ce79244a3832daead04ac885d to your computer and use it in GitHub Desktop.
CQRS example
module AlarmsGlobal.Command.Domain.User
open CQRS
open Akkling
open Akkling.Persistence
open AkklingHelpers
open Akka
open Common
open Serilog
open System
open Akka.Cluster.Tools.PublishSubscribe
open Actor
open Microsoft.Extensions.Configuration
open AlarmsGlobal.Shared.Model.Authentication
open Akka.Event
open AlarmsGlobal.Shared.Model
open AlarmsGlobal.Shared.Model.Subscription
open AlarmsGlobal.ServerInterfaces.Command
type Event =
| UserClientIdLinked of UserClientId * UserIdentity
| UserClientIdAlreadyLinked of UserClientId * UserIdentity
| UserClientIdUnlinked of UserClientId * UserIdentity
| UserClientIdAlreadyUnlinked of UserClientId * UserIdentity
| NotificationEnabled of UserClientId * UserIdentity
| NotificationDisabled of UserClientId * UserIdentity
type Command =
| LinkUserClientId of UserClientId * UserIdentity
| UnlinkUserClientId of UserClientId
| SendMessage of GlobalEvent
| EnableNotifications of UserClientId
| DisableNotifications of UserClientId
type UserSettings = { ImpactFilter: int }
type State = {
Version: int64
UserIdentity: UserIdentity option
UserClientIds: UserClientId list
UserSettings: UserSettings
DisabledNotifications: UserClientId list option
} with
interface ISerializable
let actorProp
(env: _)
(toEvent: string option -> string -> int64 -> Event -> _)
(mediator: IActorRef<Publish>)
(mailbox: Eventsourced<obj>)
=
let config = env :> IConfiguration
let messageSender = env :> IMessageSender
let apply (event: Event<_>) (_: State as state) =
match event.EventDetails, state with
| NotificationEnabled(userClientId, _), _ ->
match state.DisabledNotifications with
| None -> state
| Some _ -> {
state with
DisabledNotifications =
state.DisabledNotifications.Value
|> List.filter (fun x -> x <> userClientId)
|> Some
}
| NotificationDisabled(userClientId, _), _ ->
match state.DisabledNotifications with
| None -> { state with DisabledNotifications = Some [ userClientId ] }
| Some _ -> {
state with
DisabledNotifications = userClientId :: state.DisabledNotifications.Value |> Some
}
| UserClientIdUnlinked(userClientId, identity), _ -> {
state with
UserClientIds = state.UserClientIds |> List.filter (fun x -> x <> userClientId)
UserIdentity = Some identity
}
| UserClientIdLinked(userClientId, identity), _ -> {
state with
UserClientIds = userClientId :: state.UserClientIds
UserIdentity = Some identity
}
| UserClientIdAlreadyUnlinked _, _
| UserClientIdAlreadyLinked _, _ -> state
|> fun state -> { state with Version = event.Version }
let rec set (state: State) =
let body (bodyInput: BodyInput<Event>) =
let msg = bodyInput.Message
actor {
match msg, state with
| :? Persistence.RecoveryCompleted, _ -> return! state |> set
| :? (Common.Command<Command>) as msg, _ ->
let toEvent = toEvent (msg.Id) msg.CorrelationId
match msg.CommandDetails, state with
| EnableNotifications(userClientId), _ ->
if
state.DisabledNotifications.IsSome
|| state.DisabledNotifications.Value |> List.exists (fun x -> x = userClientId)
then
let event = toEvent (state.Version + 1L) (NotificationEnabled(userClientId, state.UserIdentity.Value))
return! event |> bodyInput.SendToSagaStarter |> Persist
else
let event = toEvent state.Version (NotificationEnabled(userClientId, state.UserIdentity.Value))
return! event |> bodyInput.SendToSagaStarter |> Persist
| DisableNotifications(userClientId), _ ->
if
state.DisabledNotifications.IsNone
|| state.DisabledNotifications.Value
|> List.exists (fun x -> x = userClientId)
|> not
then
let event = toEvent (state.Version + 1L) (NotificationDisabled(userClientId, state.UserIdentity.Value))
return! event |> bodyInput.SendToSagaStarter |> Persist
else
let event = toEvent state.Version (NotificationDisabled(userClientId, state.UserIdentity.Value))
return! event |> bodyInput.SendToSagaStarter |> Persist
| SendMessage(event), _ ->
let effects =
state.UserClientIds
|> List.filter (fun x ->state.DisabledNotifications.IsNone || not (state.DisabledNotifications.IsSome && (state.DisabledNotifications.Value |> List.contains x)))
|> List.map (fun userClientId ->
match userClientId with
| Email email ->
messageSender.SendMail(event, email) |> Async.Ignore |> Async.RunSynchronously
None
| PushSubscription push ->
let res = messageSender.SendPushNotification(push, event) |> Async.RunSynchronously
match res with
| Ok _ -> None
| Error _ ->
let event =
toEvent
(state.Version + 1L)
(UserClientIdUnlinked(userClientId, state.UserIdentity.Value))
event |> bodyInput.SendToSagaStarter |> Persist |> Some)
let filtered = effects |> List.filter Option.isSome |> List.map Option.get
match filtered with
| [] -> return! set state
| _ ->
return!
filtered
|> List.map (fun x -> x :> Effect<obj>)
|> List.reduce (fun a b -> a <@> b)
| UnlinkUserClientId(userClientId), _ ->
if state.UserClientIds |> List.exists (fun x -> x = userClientId) then
let event =
toEvent
(state.Version + 1L)
(UserClientIdUnlinked(userClientId, state.UserIdentity.Value))
return! event |> bodyInput.SendToSagaStarter |> Persist
else
let event =
toEvent
(state.Version)
(UserClientIdAlreadyUnlinked(userClientId, state.UserIdentity.Value))
return! event |> bodyInput.SendToSagaStarter |> Persist
| LinkUserClientId(userClientId, identity), _ ->
if state.UserClientIds |> List.exists (fun x -> x = userClientId) then
let event =
toEvent state.Version (UserClientIdAlreadyLinked(userClientId, identity))
return! event |> bodyInput.SendToSagaStarter |> Persist
else
let event =
toEvent (state.Version + 1L) (UserClientIdLinked(userClientId, identity))
match userClientId with
| Email e ->
let bodyMsg =
" Welcome to Alarms.Global. Your identity has been linked. Please add us to safe senders list to ensure you receive all our emails. "
let globlaEvent = {
Title = "Welcome to Alarms.Global" |> ShortString.TryCreate |> forceValidate
Body = bodyMsg |> LongString.TryCreate |> forceValidate
GlobalEventId = GlobalEventId.CreateNew()
Media = None
TargetRegion = []
Categories = []
EventDateInUTC = None
Tags = []
Source = None
Impact = None
}
do! messageSender.SendMail(globlaEvent, e) |> Async.Ignore
| _ -> ()
return! event |> bodyInput.SendToSagaStarter |> Persist
| _ ->
bodyInput.Log.Warning("Unhandled message: {msg}", msg)
return Unhandled
}
common mailbox mediator set state apply body
set {
Version = 0L
UserClientIds = []
UserIdentity = None
DisabledNotifications = None
UserSettings = { ImpactFilter = 5 }
}
let init (env: _) toEvent (actorApi: IActor) =
AkklingHelpers.entityFactoryFor actorApi.System shardResolver "User"
<| propsPersist (actorProp env toEvent (typed actorApi.Mediator))
<| true
let factory (env: #_) toEvent actorApi entityId =
(init env toEvent actorApi).RefFor DEFAULT_SHARD entityId
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment