- 
      
- 
        Save panesofglass/26f86b3b79898dde5b91b7cd3f43a299 to your computer and use it in GitHub Desktop. 
    Experimental high performance actor implementation for F# using channel and task computation expression.
  
        
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
  | // NOTE: import this Nuget package: TaskBuilder.fs (written using 2.1.0) | |
| // | |
| // Tested in LINQPad (hence Dump method usage). | |
| open System | |
| open System.Collections | |
| open System.Collections.Generic | |
| open System.Diagnostics | |
| open System.Linq | |
| open System.Threading | |
| open System.Threading.Tasks | |
| open System.Threading.Channels | |
| open FSharp.Control.Tasks // Nuget package: TaskBuilder.fs 2.1.0 | |
| // Rough sketch of a high-performance actor; potential alternative to | |
| // MailboxProcessor, built-in F# implementation. This actor implementation | |
| // uses System.Threading.Channel.Channel and TaskBuilder to achive its | |
| // performance. | |
| // TODO testing | |
| // TODO more features | |
| // TODO lost letters | |
| // TODO unhandled exceptions | |
| // TODO exception handling exceptions | |
| /// Require actor message types to have a general excepton handler; | |
| /// for message cases that have a TaskCompletionSource, the impl. | |
| /// can use SetException on that; this allows exceptions to be caught | |
| /// without necessarily having to use try-catch blocks for each | |
| /// message case. | |
| [<RequireQualifiedAccess>] | |
| type IActorMessage = | |
| // If the handler throws an exception, then this can be propagated back to the caller if | |
| // it has a callback mechanism; otherwise it is lost. | |
| // TODO "lost letters" outbox for unhandled top-level exceptions. | |
| abstract member TryHandleException : Exception -> bool | |
| /// The states that an actor can be in. | |
| [<RequireQualifiedAccess>] | |
| type ActorState = | |
| /// New state; actor not yet started. | |
| | New | |
| /// Actor is now running and accepting messages. | |
| | Started | |
| /// Actor is stopping; it will not accept new messages | |
| /// but it will process all pending messages queued up. | |
| | StoppingGracefully | |
| /// Actor has stopped and is in a terminal state. | |
| | Stopped | |
| /// Actor. | |
| type IActor<'Msg when 'Msg :> IActorMessage> = | |
| // Queries | |
| /// Lifecycle state. | |
| abstract member State : ActorState | |
| // Commands (Lifecycle) | |
| // Start the actor. | |
| abstract member Start : unit -> unit | |
| // Process all queued up messages and then stop the actor. | |
| // TODO Task | |
| abstract member StopGracefully : unit -> unit | |
| // Stop the actor immediately. | |
| abstract member Stop : unit -> unit | |
| // Commands (Communication) | |
| // Fire and forget. | |
| abstract member Tell : 'Msg -> unit | |
| // Ask and reply synchronously. | |
| abstract member Ask : (TaskCompletionSource<'Reply> -> 'Msg) -> 'Reply | |
| // Ask and reply asynchronously. | |
| abstract member AskAsync : (TaskCompletionSource<'Reply> -> 'Msg) -> Task<'Reply> | |
| /// Actor message handler. | |
| and ActorMessageHandler<'Msg when 'Msg :> IActorMessage> = | |
| Actor<'Msg> * 'Msg -> Task<unit> | |
| /// Actor implementation. | |
| and Actor<'Msg when 'Msg :> IActorMessage> | |
| private | |
| (inbox : Channel<'Msg>, | |
| messageHandler : ActorMessageHandler<'Msg>) | |
| as self = | |
| let gate = new Object() | |
| let cts = new CancellationTokenSource() | |
| let mutable state = ActorState.New | |
| let mutable worker = None | |
| let assertStarted () = | |
| match state with | |
| | ActorState.Started -> | |
| () | |
| | ActorState.New -> failwith "Actor not started." | |
| | ActorState.Stopped -> failwith "Actor stopped." | |
| | ActorState.StoppingGracefully -> failwith "Actor stopping." | |
| let work (cancellationToken : CancellationToken) = | |
| task { | |
| state <- ActorState.Started | |
| try | |
| while not cancellationToken.IsCancellationRequested do | |
| let! ok = inbox.Reader.WaitToReadAsync(cancellationToken) | |
| let mutable read = ok | |
| while read do | |
| let (ok, msg) = inbox.Reader.TryRead() | |
| read <- ok | |
| if ok then | |
| try | |
| do! messageHandler(self, msg) | |
| with messageHandlerEx -> | |
| try | |
| if not (msg.TryHandleException(messageHandlerEx)) then | |
| () // TODO internal error propagation | |
| with exceptionHandlerEx -> | |
| () // TODO internal error propagation | |
| finally | |
| state <- ActorState.Stopped | |
| } | |
| static member NewUnbounded (handler : ActorMessageHandler<'Msg>) : IActor<'Msg> = | |
| let ch = Channel.CreateUnbounded<'Msg>() | |
| new Actor<'Msg>(ch, handler) :> IActor<'Msg> | |
| interface IActor<'Msg> with | |
| member this.State : ActorState = | |
| state | |
| member this.Start () = | |
| if worker.IsNone then | |
| lock gate (fun () -> | |
| if worker.IsNone then | |
| worker <- Some (work(cts.Token)) | |
| ) | |
| member this.StopGracefully () = | |
| lock gate (fun () -> | |
| if state <> ActorState.Started && state <> ActorState.StoppingGracefully then | |
| failwith "Actor must be started." | |
| state <- ActorState.StoppingGracefully | |
| inbox.Writer.Complete() | |
| ) | |
| member this.Stop () = | |
| lock gate (fun () -> | |
| match state with | |
| | ActorState.New -> | |
| failwith "Actor must have started first." | |
| | ActorState.Started | |
| | ActorState.StoppingGracefully -> | |
| if not (inbox.Writer.TryComplete()) then | |
| failwith "Error stopping actor (unbounded channel completioon failure)." | |
| //(inbox :> IDisposable).Dispose() | |
| cts.Cancel() | |
| | ActorState.Stopped -> | |
| () // Idempotent | |
| ) | |
| member this.Tell (message : 'Msg) : unit = | |
| assertStarted() | |
| if not (inbox.Writer.TryWrite(message)) then | |
| failwith "Unexpected write failure (unbounded channel write failure)." | |
| member this.Ask (messageBuilder : TaskCompletionSource<'Reply> -> 'Msg) : 'Reply = | |
| (this :> IActor<_>).AskAsync(messageBuilder).GetAwaiter().GetResult() | |
| member this.AskAsync (messageBuilder : TaskCompletionSource<'Reply> -> 'Msg) : Task<'Reply> = | |
| assertStarted() | |
| let completion = new TaskCompletionSource<'Reply>() | |
| let message = messageBuilder completion | |
| (this :> IActor<_>).Tell(message) | |
| completion.Task | |
| type TestMesssage = | |
| | Hello of string | |
| | Not of bool * TaskCompletionSource<bool> | |
| | Incr | |
| | Counter of TaskCompletionSource<int> | |
| interface IActorMessage with | |
| member this.TryHandleException (error : exn) = | |
| match this with | |
| | Not (_, reply) -> reply.TrySetException(error) | |
| | _ -> false | |
| let testActor = | |
| let rnd = new Random() | |
| let mutable counter = 0 | |
| Actor.NewUnbounded(fun (_, msg) -> | |
| task { | |
| match msg with | |
| | Hello str -> str.Dump("Hello") | |
| | Not (b, reply) -> | |
| let result = not b | |
| if rnd.Next(0, 5) > 0 then | |
| reply.SetResult(result) | |
| else | |
| reply.SetResult(result) | |
| //failwith "oops, unexpected error" | |
| | Incr -> | |
| counter <- counter + 1 | |
| | Counter reply -> | |
| reply.SetResult(counter) | |
| } | |
| ) | |
| // Start the actor | |
| testActor.Start() | |
| // Fire and forget synchronous post | |
| testActor.Tell(Hello "world") | |
| // Asynchronous ask (post and async reply) | |
| testActor.Ask(fun reply -> Not (true, reply)).Dump("Ask(Not true)") | |
| testActor.Ask(fun reply -> Not (false, reply)).Dump("Ask(Not false)") | |
| // Using task computaton expression | |
| let job = task { | |
| let! notTrue = testActor.AskAsync(fun tcs -> Not (true, tcs)) | |
| let! notFalse = testActor.AskAsync(fun tcs -> Not (false, tcs)) | |
| notTrue.Dump("AskAsync(Not true)") | |
| notFalse.Dump("AskAsync(Not false)") | |
| } | |
| job.GetAwaiter().GetResult() // force synchronous wait | |
| // Performance test using 1,000,000 messages | |
| let clock = Stopwatch.StartNew() | |
| for i = 1 to 1_000_000 do | |
| testActor.Tell(Incr) | |
| let counter = testActor.Ask(Counter) | |
| counter.Dump("Counter") | |
| clock.Elapsed.TotalSeconds.Dump("Seconds elapsed") | |
| // Performance test using 1,000,000 messages with reply | |
| let notting = task { | |
| let clock = Stopwatch.StartNew() | |
| let mutable b = false | |
| for i = 1 to 1_000_000 do | |
| let! b' = testActor.AskAsync(fun reply -> Not (b, reply)) | |
| b <- b' | |
| b.Dump("b") | |
| clock.Elapsed.TotalSeconds.Dump("Seconds elapsed") | |
| } | |
| notting.GetAwaiter().GetResult() | |
| // Now stop it | |
| testActor.Stop() | |
| // ...and see that we cannot accept more messages. | |
| //testActor.Tell(Hello "world again") | 
  
    Sign up for free
    to join this conversation on GitHub.
    Already have an account?
    Sign in to comment