Last active
January 7, 2016 22:48
-
-
Save philcleveland/4fbfae0e3fc91939d6a2 to your computer and use it in GitHub Desktop.
Akka Producer/Consumer with infinite polling loop
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Learn more about F# at http://fsharp.org | |
// See the 'F# Tutorial' project for more help. | |
open System | |
open System.IO | |
open Akka.FSharp | |
type ProducerCommand = | |
| Start | |
| Stop | |
[<EntryPoint>] | |
let main argv = | |
printfn "%A" argv | |
let system = System.create "system" <| Configuration.defaultConfig () | |
let ProducerEndpoint consumer id interval (mailbox: Actor<_>) = | |
printfn "Created Producer %s" id | |
let mutable cancellationSource = new System.Threading.CancellationTokenSource() | |
let rec timedPollingLoop interval = | |
let timer = new System.Diagnostics.Stopwatch() | |
async{ | |
timer.Start() | |
while timer.ElapsedMilliseconds < interval do | |
do! Async.Sleep (int(interval - timer.ElapsedMilliseconds)) | |
consumer <! printfn "Hello from Producer %s Thread %d" id System.Threading.Thread.CurrentThread.ManagedThreadId | |
timer.Reset() | |
do!timedPollingLoop interval | |
} | |
let rec loop () = actor { | |
let! message = mailbox.Receive () | |
match message with | |
| Start -> | |
cancellationSource <- new System.Threading.CancellationTokenSource() | |
printfn "Producer %s starting on Thread %d" id System.Threading.Thread.CurrentThread.ManagedThreadId | |
Async.Start(timedPollingLoop interval, cancellationSource.Token) | |
| Stop -> | |
printfn "Producer stopping on Thread %d" System.Threading.Thread.CurrentThread.ManagedThreadId | |
cancellationSource.Cancel() | |
return! loop () | |
} | |
loop () | |
let ConsumerEndpoint (mailbox: Actor<_>) = | |
let rec loop () = actor { | |
let! message = mailbox.Receive () | |
printfn "Consumer received %A" message | |
} | |
loop() | |
let consumer1 = spawn system "ConsumerEndpoint" ConsumerEndpoint | |
let producer = spawn system "ProducerA" <| ProducerEndpoint consumer1 "A" 15L | |
let producer2 = spawn system "ProducerB" <| ProducerEndpoint consumer1 "B" 15L | |
let producer3 = spawn system "ProducerC" <| ProducerEndpoint consumer1 "C" 15L | |
let rec processKeyInput() = | |
let key = Console.ReadKey() | |
match key.Key with | |
| ConsoleKey.Spacebar -> | |
producer <! Start | |
producer2 <! Start | |
producer3 <! Start | |
processKeyInput() | |
| ConsoleKey.S -> | |
producer <! Stop | |
producer2 <! Stop | |
producer3 <! Stop | |
processKeyInput() | |
| ConsoleKey.Escape -> printfn "Leaving" | |
| _ -> | |
printfn "Escape to leave, Space to start, S to stop" | |
processKeyInput() | |
processKeyInput() |> ignore | |
0 // return an integer exit code |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment