Last active
September 12, 2024 15:29
-
-
Save OnurGumus/8397342ce79244a3832daead04ac885d to your computer and use it in GitHub Desktop.
CQRS example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module AlarmsGlobal.Command.Domain.User | |
open CQRS | |
open Akkling | |
open Akkling.Persistence | |
open AkklingHelpers | |
open Akka | |
open Common | |
open Serilog | |
open System | |
open Akka.Cluster.Tools.PublishSubscribe | |
open Actor | |
open Microsoft.Extensions.Configuration | |
open AlarmsGlobal.Shared.Model.Authentication | |
open Akka.Event | |
open AlarmsGlobal.Shared.Model | |
open AlarmsGlobal.Shared.Model.Subscription | |
open AlarmsGlobal.ServerInterfaces.Command | |
type Event = | |
| UserClientIdLinked of UserClientId * UserIdentity | |
| UserClientIdAlreadyLinked of UserClientId * UserIdentity | |
| UserClientIdUnlinked of UserClientId * UserIdentity | |
| UserClientIdAlreadyUnlinked of UserClientId * UserIdentity | |
| NotificationEnabled of UserClientId * UserIdentity | |
| NotificationDisabled of UserClientId * UserIdentity | |
type Command = | |
| LinkUserClientId of UserClientId * UserIdentity | |
| UnlinkUserClientId of UserClientId | |
| SendMessage of GlobalEvent | |
| EnableNotifications of UserClientId | |
| DisableNotifications of UserClientId | |
type UserSettings = { ImpactFilter: int } | |
type State = { | |
Version: int64 | |
UserIdentity: UserIdentity option | |
UserClientIds: UserClientId list | |
UserSettings: UserSettings | |
DisabledNotifications: UserClientId list option | |
} with | |
interface ISerializable | |
let actorProp | |
(env: _) | |
(toEvent: string option -> string -> int64 -> Event -> _) | |
(mediator: IActorRef<Publish>) | |
(mailbox: Eventsourced<obj>) | |
= | |
let config = env :> IConfiguration | |
let messageSender = env :> IMessageSender | |
let apply (event: Event<_>) (_: State as state) = | |
match event.EventDetails, state with | |
| NotificationEnabled(userClientId, _), _ -> | |
match state.DisabledNotifications with | |
| None -> state | |
| Some _ -> { | |
state with | |
DisabledNotifications = | |
state.DisabledNotifications.Value | |
|> List.filter (fun x -> x <> userClientId) | |
|> Some | |
} | |
| NotificationDisabled(userClientId, _), _ -> | |
match state.DisabledNotifications with | |
| None -> { state with DisabledNotifications = Some [ userClientId ] } | |
| Some _ -> { | |
state with | |
DisabledNotifications = userClientId :: state.DisabledNotifications.Value |> Some | |
} | |
| UserClientIdUnlinked(userClientId, identity), _ -> { | |
state with | |
UserClientIds = state.UserClientIds |> List.filter (fun x -> x <> userClientId) | |
UserIdentity = Some identity | |
} | |
| UserClientIdLinked(userClientId, identity), _ -> { | |
state with | |
UserClientIds = userClientId :: state.UserClientIds | |
UserIdentity = Some identity | |
} | |
| UserClientIdAlreadyUnlinked _, _ | |
| UserClientIdAlreadyLinked _, _ -> state | |
|> fun state -> { state with Version = event.Version } | |
let rec set (state: State) = | |
let body (bodyInput: BodyInput<Event>) = | |
let msg = bodyInput.Message | |
actor { | |
match msg, state with | |
| :? Persistence.RecoveryCompleted, _ -> return! state |> set | |
| :? (Common.Command<Command>) as msg, _ -> | |
let toEvent = toEvent (msg.Id) msg.CorrelationId | |
match msg.CommandDetails, state with | |
| EnableNotifications(userClientId), _ -> | |
if | |
state.DisabledNotifications.IsSome | |
|| state.DisabledNotifications.Value |> List.exists (fun x -> x = userClientId) | |
then | |
let event = toEvent (state.Version + 1L) (NotificationEnabled(userClientId, state.UserIdentity.Value)) | |
return! event |> bodyInput.SendToSagaStarter |> Persist | |
else | |
let event = toEvent state.Version (NotificationEnabled(userClientId, state.UserIdentity.Value)) | |
return! event |> bodyInput.SendToSagaStarter |> Persist | |
| DisableNotifications(userClientId), _ -> | |
if | |
state.DisabledNotifications.IsNone | |
|| state.DisabledNotifications.Value | |
|> List.exists (fun x -> x = userClientId) | |
|> not | |
then | |
let event = toEvent (state.Version + 1L) (NotificationDisabled(userClientId, state.UserIdentity.Value)) | |
return! event |> bodyInput.SendToSagaStarter |> Persist | |
else | |
let event = toEvent state.Version (NotificationDisabled(userClientId, state.UserIdentity.Value)) | |
return! event |> bodyInput.SendToSagaStarter |> Persist | |
| SendMessage(event), _ -> | |
let effects = | |
state.UserClientIds | |
|> List.filter (fun x ->state.DisabledNotifications.IsNone || not (state.DisabledNotifications.IsSome && (state.DisabledNotifications.Value |> List.contains x))) | |
|> List.map (fun userClientId -> | |
match userClientId with | |
| Email email -> | |
messageSender.SendMail(event, email) |> Async.Ignore |> Async.RunSynchronously | |
None | |
| PushSubscription push -> | |
let res = messageSender.SendPushNotification(push, event) |> Async.RunSynchronously | |
match res with | |
| Ok _ -> None | |
| Error _ -> | |
let event = | |
toEvent | |
(state.Version + 1L) | |
(UserClientIdUnlinked(userClientId, state.UserIdentity.Value)) | |
event |> bodyInput.SendToSagaStarter |> Persist |> Some) | |
let filtered = effects |> List.filter Option.isSome |> List.map Option.get | |
match filtered with | |
| [] -> return! set state | |
| _ -> | |
return! | |
filtered | |
|> List.map (fun x -> x :> Effect<obj>) | |
|> List.reduce (fun a b -> a <@> b) | |
| UnlinkUserClientId(userClientId), _ -> | |
if state.UserClientIds |> List.exists (fun x -> x = userClientId) then | |
let event = | |
toEvent | |
(state.Version + 1L) | |
(UserClientIdUnlinked(userClientId, state.UserIdentity.Value)) | |
return! event |> bodyInput.SendToSagaStarter |> Persist | |
else | |
let event = | |
toEvent | |
(state.Version) | |
(UserClientIdAlreadyUnlinked(userClientId, state.UserIdentity.Value)) | |
return! event |> bodyInput.SendToSagaStarter |> Persist | |
| LinkUserClientId(userClientId, identity), _ -> | |
if state.UserClientIds |> List.exists (fun x -> x = userClientId) then | |
let event = | |
toEvent state.Version (UserClientIdAlreadyLinked(userClientId, identity)) | |
return! event |> bodyInput.SendToSagaStarter |> Persist | |
else | |
let event = | |
toEvent (state.Version + 1L) (UserClientIdLinked(userClientId, identity)) | |
match userClientId with | |
| Email e -> | |
let bodyMsg = | |
" Welcome to Alarms.Global. Your identity has been linked. Please add us to safe senders list to ensure you receive all our emails. " | |
let globlaEvent = { | |
Title = "Welcome to Alarms.Global" |> ShortString.TryCreate |> forceValidate | |
Body = bodyMsg |> LongString.TryCreate |> forceValidate | |
GlobalEventId = GlobalEventId.CreateNew() | |
Media = None | |
TargetRegion = [] | |
Categories = [] | |
EventDateInUTC = None | |
Tags = [] | |
Source = None | |
Impact = None | |
} | |
do! messageSender.SendMail(globlaEvent, e) |> Async.Ignore | |
| _ -> () | |
return! event |> bodyInput.SendToSagaStarter |> Persist | |
| _ -> | |
bodyInput.Log.Warning("Unhandled message: {msg}", msg) | |
return Unhandled | |
} | |
common mailbox mediator set state apply body | |
set { | |
Version = 0L | |
UserClientIds = [] | |
UserIdentity = None | |
DisabledNotifications = None | |
UserSettings = { ImpactFilter = 5 } | |
} | |
let init (env: _) toEvent (actorApi: IActor) = | |
AkklingHelpers.entityFactoryFor actorApi.System shardResolver "User" | |
<| propsPersist (actorProp env toEvent (typed actorApi.Mediator)) | |
<| true | |
let factory (env: #_) toEvent actorApi entityId = | |
(init env toEvent actorApi).RefFor DEFAULT_SHARD entityId |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment