Skip to content

Instantly share code, notes, and snippets.

@roman
Last active August 29, 2015 14:03
Show Gist options
  • Save roman/70b003eb148502c5c42b to your computer and use it in GitHub Desktop.
Save roman/70b003eb148502c5c42b to your computer and use it in GitHub Desktop.
Example of an "Actor-Like" Reactive Extension management in Haskell
{-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Rx.Actor
( GenericEvent, EventBus
, ActorBuilder, ActorM, ActorDef, Actor, RestartDirective(..), InitResult(..)
, SupervisorBuilder, SupervisorStrategy, SupervisorDef, Supervisor
-- ^ * Actor Builder API
, defActor, actorKey, preStart, postStop, preRestart, postRestart
, onError, desc, receive
-- ^ * Actor message handlers API
, getState, setState, modifyState, emit
-- ^ * SupervisorBuilder API
, defSupervisor, addChild, buildChild
-- ^ * Supervisor API
, startSupervisor, startSupervisorWithEventBus, stopSupervisor
, joinSupervisorThread
) where
import Control.Exception (ErrorCall(..), AssertionFailed, finally, fromException, assert)
import Data.Typeable (Typeable)
import Control.Monad (void)
import Control.Monad.Trans (liftIO)
import Control.Concurrent.Async (async)
import Rx.Observable (onNext)
import Rx.Subject (newPublishSubject)
import Tiempo (seconds)
import Tiempo.Concurrent (threadDelay)
import Rx.Actor.ActorBuilder
import Rx.Actor.Monad
import System.IO (BufferMode(LineBuffering), hSetBuffering, stdout)
import Rx.Actor.Supervisor
import Rx.Actor.Supervisor.SupervisorBuilder
import Rx.Actor.Types
newtype PrintNumber = PrintNumber () deriving (Typeable, Show)
newtype CallFail = CallFail () deriving (Typeable, Show)
newtype AssertFail = AssertFail () deriving (Typeable, Show)
numberPrinter :: ActorDef ()
numberPrinter = defActor $ do
actorKey "printer"
startDelay (seconds 5)
preStart $ do
putStrLn "preStart printer"
return $ InitOk ()
postStop $ do
putStrLn "postStop printer"
preRestart $ \() err _ev -> do
putStrLn $ "preRestart => " ++ show err
postRestart $ \() err _ev -> do
putStrLn $ "postRestart: recovering from failure"
return $ InitOk ()
desc "Print integers on terminal"
receive printNumber
where
printNumber :: Int -> ActorM () ()
printNumber n = liftIO $ putStrLn $ "=> " ++ show n
numberAccumulator :: ActorDef Int
numberAccumulator = defActor $ do
-- This is the identifier that the Supervisor will
-- use internally to refer to this actor
actorKey "accum"
-- Everytime an error of the given type happens, you have
-- to return a directive of what to do, options are:
-- * Restart - restarts to the current actor
-- * Stop - stops the current actor
-- * Resume - ignore the error and resume
-- * Raise - raise the error on the supervisor level
onError $ \(err :: ErrorCall) _st -> return Restart
onError $ \(err :: AssertionFailed) _st -> do
putStrLn "Resuming assertion failed error"
return $ Resume
-- This function is the "constructor" of the actor
-- here you can return a value that can be InitOk or
-- InitFailure
preStart $ do
putStrLn "preStart accum"
return $ InitOk 0
-- This function is the "finalizer" of the actor
-- here you should do cleanup of external resources
postStop $ do
putStrLn "postStop accum"
-- This gets executed before a restart is about to happen
-- You will receive:
-- - current state of the actor
-- - the exception that happened on the actor
-- - the event that caused it
preRestart $ \_ err _ev -> do
putStrLn $ "preRestart accum => " ++ show err
-- This gets executed after a restart has happened
-- You will receive:
-- - current state of the actor
-- - the exception that happened on the actor
-- - the event that caused it
postRestart $ \prevCount err _ev -> do
putStrLn $ "postRestart accum: recovering from failure"
putStrLn $ "count was: " ++ show prevCount
return $ InitOk 0
-- General description for the event handler
desc "sums to a total the given integer"
-- actual definition of the event handler
receive accumulateNumber
desc "Prints number on terminal"
receive printNumber
desc "Fails the actor via ErrorCall"
receive callError
desc "Fails the actor via AssertionFailed"
receive assertError
where
-- All the receive actions work inside the ActorM monad,
-- they allow you to get the internal state of the actor
-- and also to emit events to the main eventBus
accumulateNumber :: Int -> ActorM Int ()
accumulateNumber n = modifyState (+n)
-- this particular action will print the current state
-- and also it will emit the event "CallFail" which this
-- actor can handle
printNumber :: PrintNumber -> ActorM Int ()
printNumber _ = do
n <- getState
emit (CallFail ())
liftIO $ putStrLn $ "acc => " ++ show n
-- This raises a CallError exception
callError :: CallFail -> ActorM Int ()
callError = error "I want to fail"
-- This raises a AssertionFailure exception
assertError :: AssertFail -> ActorM Int ()
assertError = assert False $ undefined
mySystem :: SupervisorDef
mySystem = defSupervisor $ do
-- Should I restart only the child that failed or
-- all of them
strategy OneForOne
-- If I restart a child, how much time should I await for each attempt?
backoff $ \attempt -> seconds $ 2 ^ attempt
addChild numberPrinter
addChild numberAccumulator
main :: IO ()
main = do
hSetBuffering stdout LineBuffering
-- All the different events (requests) of the system are going to be "published"
-- to this event bus, they may be of different types
eventBus <- newPublishSubject
print $ length (_supervisorDefChildren mySystem)
sup <- startSupervisorWithEventBus eventBus mySystem
void . async $ do
threadDelay $ seconds 3
-- When emitting an Event to the supervisor, this will broadcast the
-- event to all the associated children (defined in lines [166, 167])
-- If the child actor doesn't know how to react to the type of event, it
-- will ignore it, if it understands it, it will react to it by doing side-effects
-- or emitting new events to the main eventBus
emitEvent sup (1 :: Int)
emitEvent sup (2 :: Int)
emitEvent sup (AssertFail ())
onNext eventBus $ toGenericEvent (3 :: Int)
emitEvent sup (PrintNumber ())
-- Uncomment this to see the backoff strategy
-- emitEvent sup (PrintNumber ())
-- emitEvent sup (PrintNumber ())
-- emitEvent sup (PrintNumber ())
-- emitEvent sup (PrintNumber ())
-- make the supervisor thread the `main` thread, and
-- cleanup all the actors once is finished
joinSupervisorThread sup `finally` stopSupervisor sup
[ThreadId 113] Supervisor: Starting new actor accum
[ThreadId 113] Supervisor: Starting new actor printer
preStart accum
preStart printer
[ThreadId 117][actorKey:accum][type: Int] Handling event
[ThreadId 117][actorKey:accum][type: Int] Handling event
[ThreadId 117][actorKey:accum][type: AssertFail] Handling event
Received error on accum: src/Rx/Actor.hs:157:19-24: Assertion failed
Resuming assertion failed error
[ThreadId 117][actorKey:accum][error: src/Rx/Actor.hs:157:19-24: Assertion failed
] Resume actor
[ThreadId 117][actorKey:accum][type: Int] Handling event
[ThreadId 117][actorKey:accum][type: PrintNumber] Handling event
acc => 6
[ThreadId 117][actorKey:accum][type: CallFail] Handling event
Received error on accum: I want to fail
[ThreadId 117][actorKey:accum][error: I want to fail] Send message to supervisor actor
preRestart accum => I want to fail
[ThreadId 117][actorKey:accum] Notify supervisor to restart actor
[ThreadId 113] Supervisor: Removing actor accum
[ThreadId 113][actorKey:accum] Disposable called
[ThreadId 113] Supervisor: Restarting actor accum with delay TimeInterval Seconds 1
[ThreadId 113] Supervisor: loop
postRestart accum: recovering from failure
count was: 6
[ThreadId 118][actorKey:printer][type: Int] Handling event
=> 1
[ThreadId 118][actorKey:printer][type: Int] Handling event
=> 2
[ThreadId 118][actorKey:printer][type: Int] Handling event
=> 3
[ThreadId 112] Supervisor: Disposing
[ThreadId 112][actorKey:accum] Disposable called
[ThreadId 112][actorKey:printer] Disposable called
Actor.hs: thread blocked indefinitely in an STM transaction
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment