Last active
April 11, 2021 18:17
-
-
Save BashkaMen/b30b31c024ea88bc7a39b7b16a5b3eaa to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
open System | |
open System.Collections.Concurrent | |
open System.Threading.Tasks | |
let inline ( ^ ) f x = f x | |
let inline safeable f x = | |
try | |
Ok (f x) | |
with e -> Error e | |
type UserStatus = Normal | VIP | SuperVIP | |
type Name = Name of string | |
type FraudInfo = Unknown | Fraud | NotFraud | |
type User = { | |
Id: Guid | |
Name: Name | |
Status: UserStatus | |
FraudInfo: FraudInfo | |
Balance: int | |
} | |
let newUser() = { | |
Id = Guid.NewGuid() | |
Name = Name "" | |
Status = Normal | |
FraudInfo = Unknown | |
Balance = 0 | |
} | |
let withName name x = { x with Name = name } | |
let withStatus status x = { x with Status = status } | |
let bonus status x = | |
match status with | |
| Normal -> 0 | |
| VIP -> 10 | |
| SuperVIP -> 20 | |
|> fun bonus -> x / 100 * bonus | |
let applyDeposit amount x = | |
let amount = bonus x.Status amount | |
{ x with Balance = x.Balance + amount } | |
let withFraudInfo info x = { x with FraudInfo = info } | |
type Command = | |
| CreateUser of Name | |
| CreateVip of Name | |
| ChangeStatus of UserStatus | |
| Deposit of int | |
type CommandMeta = { | |
AggregateId: Guid | |
Command: Command | |
} | |
type Event = | |
| UserCreated | |
| FraudInfoFetched of FraudInfo | |
type EventMeta = { | |
AggregateId: Guid | |
Created: DateTimeOffset | |
Payload: Event | |
} | |
let mkCmd id x = { AggregateId = id; Command = x } | |
let mkEvent id now x = { AggregateId = id; Created = now; Payload = x } | |
let handleCore now (cmd: CommandMeta) state = | |
let mkCmd = mkCmd cmd.AggregateId | |
let mkEvent = mkEvent cmd.AggregateId now | |
match cmd.Command with | |
| CreateUser name -> state |> withName name, [mkEvent UserCreated] | |
| CreateVip name -> state |> withName name |> withStatus VIP, [mkEvent UserCreated] | |
| ChangeStatus status -> state |> withStatus status, [] | |
| Deposit amount -> state |> applyDeposit amount, [] | |
let applyEvent event state = | |
match event.Payload with | |
| UserCreated -> state | |
| FraudInfoFetched info -> state |> withFraudInfo info | |
module Store = | |
let source = ConcurrentDictionary() | |
let save x = | |
source.[x.Id] <- x | |
let getBy id = source.[id] | |
let exist id = source.ContainsKey id | |
let getOrCreate id = | |
if exist id then source.[id] | |
else newUser() | |
let saveIfNotExist x = | |
if exist x.Id then () | |
else save x | |
let trySave = safeable save | |
let trySaveIfNotExist = safeable saveIfNotExist | |
module Kafka = | |
let publish x = () | |
let publishMany (x: seq<_>) = Seq.iter publish x | |
let consume<'a>(): 'a = | |
Task.Delay(-1).Wait() | |
failwith "" | |
let commit() = () | |
let tryPublish = safeable publish | |
let tryPublishMany = safeable publishMany | |
module EventHandlers = | |
type AggregateChanged = { State: User; Events: EventMeta list } | |
let init () = () | |
let fromTuple x = { State = fst x; Events = snd x } | |
let aggregateChangedHandler msg = | |
let ( >>- ) v f = Result.bind (fun _ -> f) v | |
Store.trySaveIfNotExist msg.State | |
>>- Kafka.tryPublishMany msg.Events | |
do | |
let rec consumeAggregateChanged() = | |
let msg = Kafka.consume<AggregateChanged>() | |
match aggregateChangedHandler msg with | |
| Ok _ -> Kafka.commit() | |
| Error e -> printfn "ERROR: %A" e | |
consumeAggregateChanged() | |
//async side effect | |
let rec userCreatedHandler() = | |
let msg = Kafka.consume<EventMeta>() | |
let mkEvent = mkEvent msg.AggregateId DateTimeOffset.Now | |
match msg.Payload with | |
| UserCreated -> | |
printfn "NEW USER CREATED: %A" msg.AggregateId | |
printfn "GETTING FRAUD STATE..." | |
let state = Fraud | |
match Kafka.tryPublish ^ mkEvent ^ FraudInfoFetched state with | |
| Ok _ -> Kafka.commit() | |
| Error e -> printfn "ERROR: %A" e | |
| _ -> Kafka.commit() | |
userCreatedHandler() | |
let rec consumeEvents() = | |
let msg = Kafka.consume<EventMeta>() | |
let state = Store.getBy msg.AggregateId | |
applyEvent msg state |> Store.save | |
printfn "STATE CHANGED: %A" state | |
consumeEvents() | |
Task.Run(consumeEvents) |> ignore | |
Task.Run(consumeAggregateChanged) |> ignore | |
Task.Run(userCreatedHandler) |> ignore | |
printfn "HANDLERS REGISTERED" | |
let executeCommand (cmd: CommandMeta) = | |
Store.getOrCreate cmd.AggregateId | |
|> handleCore DateTimeOffset.Now cmd | |
|> EventHandlers.fromTuple | |
|> Kafka.publish | |
EventHandlers.init() | |
let user = newUser() | |
[ | |
CreateUser ^ Name "Test" | |
Deposit 100 | |
ChangeStatus SuperVIP | |
Deposit 100 | |
] | |
|> List.map ^ mkCmd user.Id | |
|> List.iter executeCommand |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment