Skip to content

Instantly share code, notes, and snippets.

@BashkaMen
Last active April 11, 2021 18:17
Show Gist options
  • Save BashkaMen/b30b31c024ea88bc7a39b7b16a5b3eaa to your computer and use it in GitHub Desktop.
Save BashkaMen/b30b31c024ea88bc7a39b7b16a5b3eaa to your computer and use it in GitHub Desktop.
open System
open System.Collections.Concurrent
open System.Threading.Tasks
let inline ( ^ ) f x = f x
let inline safeable f x =
try
Ok (f x)
with e -> Error e
type UserStatus = Normal | VIP | SuperVIP
type Name = Name of string
type FraudInfo = Unknown | Fraud | NotFraud
type User = {
Id: Guid
Name: Name
Status: UserStatus
FraudInfo: FraudInfo
Balance: int
}
let newUser() = {
Id = Guid.NewGuid()
Name = Name ""
Status = Normal
FraudInfo = Unknown
Balance = 0
}
let withName name x = { x with Name = name }
let withStatus status x = { x with Status = status }
let bonus status x =
match status with
| Normal -> 0
| VIP -> 10
| SuperVIP -> 20
|> fun bonus -> x / 100 * bonus
let applyDeposit amount x =
let amount = bonus x.Status amount
{ x with Balance = x.Balance + amount }
let withFraudInfo info x = { x with FraudInfo = info }
type Command =
| CreateUser of Name
| CreateVip of Name
| ChangeStatus of UserStatus
| Deposit of int
type CommandMeta = {
AggregateId: Guid
Command: Command
}
type Event =
| UserCreated
| FraudInfoFetched of FraudInfo
type EventMeta = {
AggregateId: Guid
Created: DateTimeOffset
Payload: Event
}
let mkCmd id x = { AggregateId = id; Command = x }
let mkEvent id now x = { AggregateId = id; Created = now; Payload = x }
let handleCore now (cmd: CommandMeta) state =
let mkCmd = mkCmd cmd.AggregateId
let mkEvent = mkEvent cmd.AggregateId now
match cmd.Command with
| CreateUser name -> state |> withName name, [mkEvent UserCreated]
| CreateVip name -> state |> withName name |> withStatus VIP, [mkEvent UserCreated]
| ChangeStatus status -> state |> withStatus status, []
| Deposit amount -> state |> applyDeposit amount, []
let applyEvent event state =
match event.Payload with
| UserCreated -> state
| FraudInfoFetched info -> state |> withFraudInfo info
module Store =
let source = ConcurrentDictionary()
let save x =
source.[x.Id] <- x
let getBy id = source.[id]
let exist id = source.ContainsKey id
let getOrCreate id =
if exist id then source.[id]
else newUser()
let saveIfNotExist x =
if exist x.Id then ()
else save x
let trySave = safeable save
let trySaveIfNotExist = safeable saveIfNotExist
module Kafka =
let publish x = ()
let publishMany (x: seq<_>) = Seq.iter publish x
let consume<'a>(): 'a =
Task.Delay(-1).Wait()
failwith ""
let commit() = ()
let tryPublish = safeable publish
let tryPublishMany = safeable publishMany
module EventHandlers =
type AggregateChanged = { State: User; Events: EventMeta list }
let init () = ()
let fromTuple x = { State = fst x; Events = snd x }
let aggregateChangedHandler msg =
let ( >>- ) v f = Result.bind (fun _ -> f) v
Store.trySaveIfNotExist msg.State
>>- Kafka.tryPublishMany msg.Events
do
let rec consumeAggregateChanged() =
let msg = Kafka.consume<AggregateChanged>()
match aggregateChangedHandler msg with
| Ok _ -> Kafka.commit()
| Error e -> printfn "ERROR: %A" e
consumeAggregateChanged()
//async side effect
let rec userCreatedHandler() =
let msg = Kafka.consume<EventMeta>()
let mkEvent = mkEvent msg.AggregateId DateTimeOffset.Now
match msg.Payload with
| UserCreated ->
printfn "NEW USER CREATED: %A" msg.AggregateId
printfn "GETTING FRAUD STATE..."
let state = Fraud
match Kafka.tryPublish ^ mkEvent ^ FraudInfoFetched state with
| Ok _ -> Kafka.commit()
| Error e -> printfn "ERROR: %A" e
| _ -> Kafka.commit()
userCreatedHandler()
let rec consumeEvents() =
let msg = Kafka.consume<EventMeta>()
let state = Store.getBy msg.AggregateId
applyEvent msg state |> Store.save
printfn "STATE CHANGED: %A" state
consumeEvents()
Task.Run(consumeEvents) |> ignore
Task.Run(consumeAggregateChanged) |> ignore
Task.Run(userCreatedHandler) |> ignore
printfn "HANDLERS REGISTERED"
let executeCommand (cmd: CommandMeta) =
Store.getOrCreate cmd.AggregateId
|> handleCore DateTimeOffset.Now cmd
|> EventHandlers.fromTuple
|> Kafka.publish
EventHandlers.init()
let user = newUser()
[
CreateUser ^ Name "Test"
Deposit 100
ChangeStatus SuperVIP
Deposit 100
]
|> List.map ^ mkCmd user.Id
|> List.iter executeCommand
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment