Skip to content

Instantly share code, notes, and snippets.

@baronfel
Last active April 6, 2023 02:15
Show Gist options
  • Save baronfel/d6200f92d44c6b41718571b5b1c6c266 to your computer and use it in GitHub Desktop.
Save baronfel/d6200f92d44c6b41718571b5b1c6c266 to your computer and use it in GitHub Desktop.
#r "System.Threading.Channels"
open System.Threading.Channels
open System.Threading
open System
open System.Threading.Tasks
open System.Threading
type Mailbox<'state, 'msg>(initialState, f, ctok) as this =
let c: Channel<'msg> = Channel.CreateUnbounded(UnboundedChannelOptions(SingleReader = true, SingleWriter = true))
let linkedTok = CancellationTokenSource.CreateLinkedTokenSource(ctok: CancellationToken)
let mutable state = initialState
do
task {
while not linkedTok.IsCancellationRequested do
let! message = c.Reader.ReadAsync(linkedTok.Token)
try
let! newState = f state message linkedTok.Token
state <- newState
with
| e -> this.Error.Trigger e
}
|> ignore
let doWrite (msg: 'msg) =
let mutable written = false
while not written do
written <- c.Writer.TryWrite(msg)
member val CurrentQueueLength = if c.Reader.CanCount then c.Reader.Count else Int32.MaxValue
member val Error: Event<Exception> = new Event<Exception>()
member x.Post(msg) = doWrite msg
member x.PostAndAsyncReply(builder: TaskCompletionSource<'t> -> 'msg): Task<'t> =
let completion = new TaskCompletionSource<'t>()
doWrite (builder completion)
completion.Task
interface IDisposable with
member _.Dispose() =
c.Writer.Complete()
linkedTok.Cancel()
linkedTok.Dispose()
type Message = Inc | Dec
let handleMessage counter i ctok =
task {
let next =
match i with
| Inc -> counter + 1
| Dec -> counter - 1
printfn "Message %A -> %d" i next
return next
}
let ctok = new CancellationTokenSource()
let mbp = new Mailbox<int, Message>(0, handleMessage, ctok.Token)
let getMessage () =
if System.Random.Shared.Next() % 2 = 0 then Inc else Dec
for i in 0..10 do
mbp.Post (getMessage())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment