|
module MultipleConsumerCoroutine where |
|
|
|
import Prelude |
|
import Data.List as List |
|
import Data.List.NonEmpty as NEList |
|
import Control.Coroutine (Consumer, Producer, Transformer, await, connect, emit, joinConsumers, runProcess, transform, transformConsumer) |
|
import Control.Monad.Aff (Aff, launchAff) |
|
import Control.Monad.Aff.Console (log, CONSOLE) |
|
import Control.Monad.Eff (Eff) |
|
import Control.Monad.Eff.Exception (EXCEPTION) |
|
import Control.Monad.Rec.Class (class MonadRec, Step(Done, Loop), forever, tailRecM2) |
|
import Control.Monad.Trans.Class (lift) |
|
import Control.Parallel (class Parallel) |
|
import Data.Array (range) |
|
import Data.List.Types (NonEmptyList) |
|
import Data.Maybe (Maybe(Just, Nothing)) |
|
import Data.Tuple (Tuple(..)) |
|
|
|
|
|
main :: Eff (err :: EXCEPTION, console :: CONSOLE) Unit |
|
main = void $ launchAff $ do |
|
|
|
-- -- Note, if you increase the number of consumers to be joined, you will get a |
|
-- -- RangeError: Maximum call stack size exceeded error. I have yet to find the |
|
-- -- cause of this or the way to avoid this. |
|
-- let consumers' = ((map >>> map) (const logger) (NEList.fromFoldable (range 0 1000))) |
|
|
|
let consumerList = ((map >>> map) (\i -> consumeLT i) (NEList.fromFoldable (range 0 10))) |
|
|
|
case consumerList of |
|
Nothing -> pure unit |
|
Just consumers' -> do |
|
consumers <- (joinMany consumers') |
|
|
|
runProcess (connect |
|
numberProducer |
|
consumers |
|
) |
|
|
|
|
|
tupleTransform :: forall m a. (Monad m) => Transformer a (Tuple a a) m Unit |
|
tupleTransform = transform (\i -> Tuple i i) |
|
|
|
joinMany :: forall a m par. (Monad m, MonadRec m, Parallel par m) => NonEmptyList (Consumer a m Unit) -> m (Consumer a m Unit) |
|
joinMany ac = tailRecM2 go (NEList.head ac) (NEList.tail ac) |
|
where |
|
go h t = case (List.head t) of |
|
(Just thead) -> do |
|
let hh = h |
|
let newConsumer = (transformConsumer (forever $ tupleTransform) (joinConsumers hh thead)) |
|
case (List.tail t) of |
|
(Just ttail) -> do |
|
pure (Loop { a : newConsumer, b : ttail }) |
|
_ -> pure $ Done newConsumer |
|
_ -> pure $ Done h |
|
|
|
|
|
consumeLT :: forall e a. (Ord a, Show a) => a -> Consumer a (Aff (console :: CONSOLE | e)) Unit |
|
consumeLT max = forever $ do |
|
s <- await |
|
case s < max of |
|
true -> do |
|
lift $ log $ (show s) <> " LT " <> (show max) |
|
false -> pure unit |
|
|
|
numberProducer :: forall m. (Monad m) => Producer Int m Unit |
|
numberProducer = go 0 |
|
where |
|
go i = do |
|
emit i |
|
go (i + 1) |