module MultipleConsumerCoroutine where |
import Prelude |
import Data.List as List |
import Data.List.NonEmpty as NEList |
import Control.Coroutine (Consumer, Producer, Transformer, await, connect, emit, joinConsumers, runProcess, transform, transformConsumer) |
import Control.Monad.Aff (Aff, launchAff) |
import Control.Monad.Aff.Console (log, CONSOLE) |
import Control.Monad.Eff (Eff) |
import Control.Monad.Eff.Exception (EXCEPTION) |
import Control.Monad.Rec.Class (class MonadRec, Step(Done, Loop), forever, tailRecM2) |
import Control.Monad.Trans.Class (lift) |
import Control.Parallel (class Parallel) |
import Data.Array (range) |
import Data.List.Types (NonEmptyList) |
import Data.Maybe (Maybe(Just, Nothing)) |
import Data.Tuple (Tuple(..)) |
main :: Eff (err :: EXCEPTION, console :: CONSOLE) Unit |
main = void $ launchAff $ do |
-- -- Note, if you increase the number of consumers to be joined, you will get a |
-- -- RangeError: Maximum call stack size exceeded error. I have yet to find the |
-- -- cause of this or the way to avoid this. |
-- let consumers' = ((map >>> map) (const logger) (NEList.fromFoldable (range 0 1000))) |
let consumerList = ((map >>> map) (\i -> consumeLT i) (NEList.fromFoldable (range 0 10))) |
case consumerList of |
Nothing -> pure unit |
Just consumers' -> do |
consumers <- (joinMany consumers') |
runProcess (connect |
numberProducer |
consumers |
) |
tupleTransform :: forall m a. (Monad m) => Transformer a (Tuple a a) m Unit |
tupleTransform = transform (\i -> Tuple i i) |
joinMany :: forall a m par. (Monad m, MonadRec m, Parallel par m) => NonEmptyList (Consumer a m Unit) -> m (Consumer a m Unit) |
joinMany ac = tailRecM2 go (NEList.head ac) (NEList.tail ac) |
where |
go h t = case (List.head t) of |
(Just thead) -> do |
let hh = h |
let newConsumer = (transformConsumer (forever $ tupleTransform) (joinConsumers hh thead)) |
case (List.tail t) of |
(Just ttail) -> do |
pure (Loop { a : newConsumer, b : ttail }) |
_ -> pure $ Done newConsumer |
_ -> pure $ Done h |
consumeLT :: forall e a. (Ord a, Show a) => a -> Consumer a (Aff (console :: CONSOLE | e)) Unit |
consumeLT max = forever $ do |
s <- await |
case s < max of |
true -> do |
lift $ log $ (show s) <> " LT " <> (show max) |
false -> pure unit |
numberProducer :: forall m. (Monad m) => Producer Int m Unit |
numberProducer = go 0 |
where |
go i = do |
emit i |
go (i + 1) |