Last active
August 29, 2015 14:03
-
-
Save roman/70b003eb148502c5c42b to your computer and use it in GitHub Desktop.
Example of an "Actor-Like" Reactive Extension management in Haskell
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{-# LANGUAGE DeriveDataTypeable #-} | |
{-# LANGUAGE GeneralizedNewtypeDeriving #-} | |
{-# LANGUAGE ScopedTypeVariables #-} | |
module Rx.Actor | |
( GenericEvent, EventBus | |
, ActorBuilder, ActorM, ActorDef, Actor, RestartDirective(..), InitResult(..) | |
, SupervisorBuilder, SupervisorStrategy, SupervisorDef, Supervisor | |
-- ^ * Actor Builder API | |
, defActor, actorKey, preStart, postStop, preRestart, postRestart | |
, onError, desc, receive | |
-- ^ * Actor message handlers API | |
, getState, setState, modifyState, emit | |
-- ^ * SupervisorBuilder API | |
, defSupervisor, addChild, buildChild | |
-- ^ * Supervisor API | |
, startSupervisor, startSupervisorWithEventBus, stopSupervisor | |
, joinSupervisorThread | |
) where | |
import Control.Exception (ErrorCall(..), AssertionFailed, finally, fromException, assert) | |
import Data.Typeable (Typeable) | |
import Control.Monad (void) | |
import Control.Monad.Trans (liftIO) | |
import Control.Concurrent.Async (async) | |
import Rx.Observable (onNext) | |
import Rx.Subject (newPublishSubject) | |
import Tiempo (seconds) | |
import Tiempo.Concurrent (threadDelay) | |
import Rx.Actor.ActorBuilder | |
import Rx.Actor.Monad | |
import System.IO (BufferMode(LineBuffering), hSetBuffering, stdout) | |
import Rx.Actor.Supervisor | |
import Rx.Actor.Supervisor.SupervisorBuilder | |
import Rx.Actor.Types | |
newtype PrintNumber = PrintNumber () deriving (Typeable, Show) | |
newtype CallFail = CallFail () deriving (Typeable, Show) | |
newtype AssertFail = AssertFail () deriving (Typeable, Show) | |
numberPrinter :: ActorDef () | |
numberPrinter = defActor $ do | |
actorKey "printer" | |
startDelay (seconds 5) | |
preStart $ do | |
putStrLn "preStart printer" | |
return $ InitOk () | |
postStop $ do | |
putStrLn "postStop printer" | |
preRestart $ \() err _ev -> do | |
putStrLn $ "preRestart => " ++ show err | |
postRestart $ \() err _ev -> do | |
putStrLn $ "postRestart: recovering from failure" | |
return $ InitOk () | |
desc "Print integers on terminal" | |
receive printNumber | |
where | |
printNumber :: Int -> ActorM () () | |
printNumber n = liftIO $ putStrLn $ "=> " ++ show n | |
numberAccumulator :: ActorDef Int | |
numberAccumulator = defActor $ do | |
-- This is the identifier that the Supervisor will | |
-- use internally to refer to this actor | |
actorKey "accum" | |
-- Everytime an error of the given type happens, you have | |
-- to return a directive of what to do, options are: | |
-- * Restart - restarts to the current actor | |
-- * Stop - stops the current actor | |
-- * Resume - ignore the error and resume | |
-- * Raise - raise the error on the supervisor level | |
onError $ \(err :: ErrorCall) _st -> return Restart | |
onError $ \(err :: AssertionFailed) _st -> do | |
putStrLn "Resuming assertion failed error" | |
return $ Resume | |
-- This function is the "constructor" of the actor | |
-- here you can return a value that can be InitOk or | |
-- InitFailure | |
preStart $ do | |
putStrLn "preStart accum" | |
return $ InitOk 0 | |
-- This function is the "finalizer" of the actor | |
-- here you should do cleanup of external resources | |
postStop $ do | |
putStrLn "postStop accum" | |
-- This gets executed before a restart is about to happen | |
-- You will receive: | |
-- - current state of the actor | |
-- - the exception that happened on the actor | |
-- - the event that caused it | |
preRestart $ \_ err _ev -> do | |
putStrLn $ "preRestart accum => " ++ show err | |
-- This gets executed after a restart has happened | |
-- You will receive: | |
-- - current state of the actor | |
-- - the exception that happened on the actor | |
-- - the event that caused it | |
postRestart $ \prevCount err _ev -> do | |
putStrLn $ "postRestart accum: recovering from failure" | |
putStrLn $ "count was: " ++ show prevCount | |
return $ InitOk 0 | |
-- General description for the event handler | |
desc "sums to a total the given integer" | |
-- actual definition of the event handler | |
receive accumulateNumber | |
desc "Prints number on terminal" | |
receive printNumber | |
desc "Fails the actor via ErrorCall" | |
receive callError | |
desc "Fails the actor via AssertionFailed" | |
receive assertError | |
where | |
-- All the receive actions work inside the ActorM monad, | |
-- they allow you to get the internal state of the actor | |
-- and also to emit events to the main eventBus | |
accumulateNumber :: Int -> ActorM Int () | |
accumulateNumber n = modifyState (+n) | |
-- this particular action will print the current state | |
-- and also it will emit the event "CallFail" which this | |
-- actor can handle | |
printNumber :: PrintNumber -> ActorM Int () | |
printNumber _ = do | |
n <- getState | |
emit (CallFail ()) | |
liftIO $ putStrLn $ "acc => " ++ show n | |
-- This raises a CallError exception | |
callError :: CallFail -> ActorM Int () | |
callError = error "I want to fail" | |
-- This raises a AssertionFailure exception | |
assertError :: AssertFail -> ActorM Int () | |
assertError = assert False $ undefined | |
mySystem :: SupervisorDef | |
mySystem = defSupervisor $ do | |
-- Should I restart only the child that failed or | |
-- all of them | |
strategy OneForOne | |
-- If I restart a child, how much time should I await for each attempt? | |
backoff $ \attempt -> seconds $ 2 ^ attempt | |
addChild numberPrinter | |
addChild numberAccumulator | |
main :: IO () | |
main = do | |
hSetBuffering stdout LineBuffering | |
-- All the different events (requests) of the system are going to be "published" | |
-- to this event bus, they may be of different types | |
eventBus <- newPublishSubject | |
print $ length (_supervisorDefChildren mySystem) | |
sup <- startSupervisorWithEventBus eventBus mySystem | |
void . async $ do | |
threadDelay $ seconds 3 | |
-- When emitting an Event to the supervisor, this will broadcast the | |
-- event to all the associated children (defined in lines [166, 167]) | |
-- If the child actor doesn't know how to react to the type of event, it | |
-- will ignore it, if it understands it, it will react to it by doing side-effects | |
-- or emitting new events to the main eventBus | |
emitEvent sup (1 :: Int) | |
emitEvent sup (2 :: Int) | |
emitEvent sup (AssertFail ()) | |
onNext eventBus $ toGenericEvent (3 :: Int) | |
emitEvent sup (PrintNumber ()) | |
-- Uncomment this to see the backoff strategy | |
-- emitEvent sup (PrintNumber ()) | |
-- emitEvent sup (PrintNumber ()) | |
-- emitEvent sup (PrintNumber ()) | |
-- emitEvent sup (PrintNumber ()) | |
-- make the supervisor thread the `main` thread, and | |
-- cleanup all the actors once is finished | |
joinSupervisorThread sup `finally` stopSupervisor sup |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[ThreadId 113] Supervisor: Starting new actor accum | |
[ThreadId 113] Supervisor: Starting new actor printer | |
preStart accum | |
preStart printer | |
[ThreadId 117][actorKey:accum][type: Int] Handling event | |
[ThreadId 117][actorKey:accum][type: Int] Handling event | |
[ThreadId 117][actorKey:accum][type: AssertFail] Handling event | |
Received error on accum: src/Rx/Actor.hs:157:19-24: Assertion failed | |
Resuming assertion failed error | |
[ThreadId 117][actorKey:accum][error: src/Rx/Actor.hs:157:19-24: Assertion failed | |
] Resume actor | |
[ThreadId 117][actorKey:accum][type: Int] Handling event | |
[ThreadId 117][actorKey:accum][type: PrintNumber] Handling event | |
acc => 6 | |
[ThreadId 117][actorKey:accum][type: CallFail] Handling event | |
Received error on accum: I want to fail | |
[ThreadId 117][actorKey:accum][error: I want to fail] Send message to supervisor actor | |
preRestart accum => I want to fail | |
[ThreadId 117][actorKey:accum] Notify supervisor to restart actor | |
[ThreadId 113] Supervisor: Removing actor accum | |
[ThreadId 113][actorKey:accum] Disposable called | |
[ThreadId 113] Supervisor: Restarting actor accum with delay TimeInterval Seconds 1 | |
[ThreadId 113] Supervisor: loop | |
postRestart accum: recovering from failure | |
count was: 6 | |
[ThreadId 118][actorKey:printer][type: Int] Handling event | |
=> 1 | |
[ThreadId 118][actorKey:printer][type: Int] Handling event | |
=> 2 | |
[ThreadId 118][actorKey:printer][type: Int] Handling event | |
=> 3 | |
[ThreadId 112] Supervisor: Disposing | |
[ThreadId 112][actorKey:accum] Disposable called | |
[ThreadId 112][actorKey:printer] Disposable called | |
Actor.hs: thread blocked indefinitely in an STM transaction |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment