Last active
July 28, 2018 08:54
-
-
Save borkdude/fa1dc0862c010dac98b7bc97a178c5ba to your computer and use it in GitHub Desktop.
Concurrent IO actions in Haskell with bounded parallelism
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| -- based on https://stackoverflow.com/a/29155440/6264 | |
| import Data.Traversable | |
| import Control.Concurrent | |
| import Control.Concurrent.Async | |
| import Control.Exception | |
| import Control.Monad | |
| import System.Random | |
| import Data.IORef | |
| import Control.Concurrent.Chan | |
| import System.IO | |
| import Control.Concurrent.STM | |
| import Control.Concurrent.STM.TChan | |
| import Control.Applicative ((<|>)) | |
| incHead (a : xs) = (a + 1) : a : xs | |
| decHead (a : xs) = (a - 1) : a : xs | |
| -- https://www.snoyman.com/blog/2016/11/haskells-missing-concurrency-basics | |
| data TCChan a = TCChan (TChan a) (TVar Bool) | |
| newTCChan :: IO (TCChan a) | |
| newTCChan = atomically $ TCChan <$> newTChan <*> newTVar False | |
| closeTCChan :: TCChan a -> IO () | |
| closeTCChan (TCChan _ var) = atomically $ writeTVar var True | |
| writeTCChan :: TCChan a -> a -> IO () | |
| writeTCChan (TCChan chan var) val = atomically $ do | |
| closed <- readTVar var | |
| if closed | |
| -- Could use nicer exception types, or return a Bool to | |
| -- indicate if writing failed | |
| then error "Wrote to a closed TCChan" | |
| else writeTChan chan val | |
| readTCChan :: TCChan a -> IO (Maybe a) | |
| readTCChan (TCChan chan var) = atomically $ | |
| (Just <$> readTChan chan) <|> (do | |
| closed <- readTVar var | |
| check closed | |
| return Nothing) | |
| dupTCChan :: TCChan a -> IO (TCChan a) | |
| dupTCChan (TCChan chan var) = atomically $ do | |
| dup <- dupTChan chan | |
| return $ TCChan dup var | |
| action :: IORef [Int] -> TCChan String -> String -> IO () | |
| action ref chan s = do | |
| modifyIORef ref incHead -- this thread is doing an action | |
| r <- randomRIO (1, 10) | |
| threadDelay $ r * 100000 | |
| modifyIORef ref decHead -- we're done here | |
| writeTCChan chan (reverse s) | |
| inputHandler chan input = | |
| let go input = case input of | |
| [] -> closeTCChan chan | |
| (x : xs) -> do | |
| writeTCChan chan x | |
| go (tail input) | |
| in forkIO $ go input | |
| handleResult s = do | |
| putStrLn $ "Result: " ++ (show s) | |
| resultsHandler chan = do | |
| s <- readTCChan chan | |
| case s of | |
| Nothing -> return () | |
| Just s -> handleResult s >> resultsHandler chan | |
| flow :: (TCChan a) -> (TCChan a) -> IO ThreadId | |
| flow inc outc = do | |
| incDup <- dupTCChan inc | |
| outcDup <- dupTCChan outc | |
| let go' inCount outCount = do | |
| -- putStrLn $ "outcount" ++ (show outCount) | |
| if inCount == outCount | |
| then closeTCChan outcDup | |
| else do | |
| v <- readTCChan outcDup | |
| go' inCount (outCount + 1) | |
| let go inCount = do | |
| -- putStrLn $ "incount" ++ (show inCount) | |
| v <- readTCChan incDup | |
| case v of | |
| Nothing -> go' inCount 0 | |
| Just x -> go (inCount + 1) | |
| forkIO $ go 0 | |
| testFlow = do | |
| hSetBuffering stdout LineBuffering | |
| inc <- newTCChan | |
| outc <- newTCChan | |
| flow inc outc | |
| writeTCChan inc 1 | |
| writeTCChan inc 2 | |
| closeTCChan inc | |
| let goOut = do | |
| v <- readTCChan outc | |
| case v of | |
| Nothing -> print "out channel closed" | |
| Just x -> putStrLn ("value x" ++ (show x)) >> goOut | |
| forkIO $ goOut | |
| let goIn = do | |
| v <- readTCChan inc | |
| case v of | |
| Nothing -> print "in channel closed" | |
| Just x -> writeTCChan outc (x * 100) >> goIn | |
| forkIO $ goIn | |
| runThrottled :: Show a => Int -> (a -> IO ()) -> TCChan a -> IO () | |
| runThrottled concLevel action inChan = do | |
| sem <- newQSem concLevel | |
| let throttledAction = bracket_ (waitQSem sem) (signalQSem sem) . action | |
| go = do | |
| inv <- readTCChan inChan | |
| case inv of | |
| Nothing -> return () | |
| Just n -> do | |
| forkIO (throttledAction n) | |
| go | |
| go | |
| main = do | |
| hSetBuffering stdout LineBuffering | |
| ref <- newIORef [0] | |
| inChan <- newTCChan | |
| outChan <- newTCChan | |
| let act = action ref outChan | |
| input = take 10 $ cycle ["foo", "bar", "baz"] | |
| flow inChan outChan | |
| inputHandler inChan input | |
| runThrottled 3 act inChan | |
| resultsHandler outChan | |
| threads <- readIORef ref | |
| putStrLn $ "Threads active: " ++ show threads | |
| -- Result: "rab" | |
| -- Result: "oof" | |
| -- Result: "zab" | |
| -- Result: "oof" | |
| -- Result: "rab" | |
| -- Result: "zab" | |
| -- Result: "oof" | |
| -- Result: "zab" | |
| -- Result: "oof" | |
| -- Result: "rab" | |
| -- Threads active: [0,1,2,3,2,3,2,3,2,3,2,3,2,3,2,3,2,3,2,1,0] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment