Skip to content

Instantly share code, notes, and snippets.

@borkdude
Last active July 28, 2018 08:54
Show Gist options
  • Select an option

  • Save borkdude/fa1dc0862c010dac98b7bc97a178c5ba to your computer and use it in GitHub Desktop.

Select an option

Save borkdude/fa1dc0862c010dac98b7bc97a178c5ba to your computer and use it in GitHub Desktop.
Concurrent IO actions in Haskell with bounded parallelism
-- based on https://stackoverflow.com/a/29155440/6264
import Data.Traversable
import Control.Concurrent
import Control.Concurrent.Async
import Control.Exception
import Control.Monad
import System.Random
import Data.IORef
import Control.Concurrent.Chan
import System.IO
import Control.Concurrent.STM
import Control.Concurrent.STM.TChan
import Control.Applicative ((<|>))
incHead (a : xs) = (a + 1) : a : xs
decHead (a : xs) = (a - 1) : a : xs
-- https://www.snoyman.com/blog/2016/11/haskells-missing-concurrency-basics
data TCChan a = TCChan (TChan a) (TVar Bool)
newTCChan :: IO (TCChan a)
newTCChan = atomically $ TCChan <$> newTChan <*> newTVar False
closeTCChan :: TCChan a -> IO ()
closeTCChan (TCChan _ var) = atomically $ writeTVar var True
writeTCChan :: TCChan a -> a -> IO ()
writeTCChan (TCChan chan var) val = atomically $ do
closed <- readTVar var
if closed
-- Could use nicer exception types, or return a Bool to
-- indicate if writing failed
then error "Wrote to a closed TCChan"
else writeTChan chan val
readTCChan :: TCChan a -> IO (Maybe a)
readTCChan (TCChan chan var) = atomically $
(Just <$> readTChan chan) <|> (do
closed <- readTVar var
check closed
return Nothing)
dupTCChan :: TCChan a -> IO (TCChan a)
dupTCChan (TCChan chan var) = atomically $ do
dup <- dupTChan chan
return $ TCChan dup var
action :: IORef [Int] -> TCChan String -> String -> IO ()
action ref chan s = do
modifyIORef ref incHead -- this thread is doing an action
r <- randomRIO (1, 10)
threadDelay $ r * 100000
modifyIORef ref decHead -- we're done here
writeTCChan chan (reverse s)
inputHandler chan input =
let go input = case input of
[] -> closeTCChan chan
(x : xs) -> do
writeTCChan chan x
go (tail input)
in forkIO $ go input
handleResult s = do
putStrLn $ "Result: " ++ (show s)
resultsHandler chan = do
s <- readTCChan chan
case s of
Nothing -> return ()
Just s -> handleResult s >> resultsHandler chan
flow :: (TCChan a) -> (TCChan a) -> IO ThreadId
flow inc outc = do
incDup <- dupTCChan inc
outcDup <- dupTCChan outc
let go' inCount outCount = do
-- putStrLn $ "outcount" ++ (show outCount)
if inCount == outCount
then closeTCChan outcDup
else do
v <- readTCChan outcDup
go' inCount (outCount + 1)
let go inCount = do
-- putStrLn $ "incount" ++ (show inCount)
v <- readTCChan incDup
case v of
Nothing -> go' inCount 0
Just x -> go (inCount + 1)
forkIO $ go 0
testFlow = do
hSetBuffering stdout LineBuffering
inc <- newTCChan
outc <- newTCChan
flow inc outc
writeTCChan inc 1
writeTCChan inc 2
closeTCChan inc
let goOut = do
v <- readTCChan outc
case v of
Nothing -> print "out channel closed"
Just x -> putStrLn ("value x" ++ (show x)) >> goOut
forkIO $ goOut
let goIn = do
v <- readTCChan inc
case v of
Nothing -> print "in channel closed"
Just x -> writeTCChan outc (x * 100) >> goIn
forkIO $ goIn
runThrottled :: Show a => Int -> (a -> IO ()) -> TCChan a -> IO ()
runThrottled concLevel action inChan = do
sem <- newQSem concLevel
let throttledAction = bracket_ (waitQSem sem) (signalQSem sem) . action
go = do
inv <- readTCChan inChan
case inv of
Nothing -> return ()
Just n -> do
forkIO (throttledAction n)
go
go
main = do
hSetBuffering stdout LineBuffering
ref <- newIORef [0]
inChan <- newTCChan
outChan <- newTCChan
let act = action ref outChan
input = take 10 $ cycle ["foo", "bar", "baz"]
flow inChan outChan
inputHandler inChan input
runThrottled 3 act inChan
resultsHandler outChan
threads <- readIORef ref
putStrLn $ "Threads active: " ++ show threads
-- Result: "rab"
-- Result: "oof"
-- Result: "zab"
-- Result: "oof"
-- Result: "rab"
-- Result: "zab"
-- Result: "oof"
-- Result: "zab"
-- Result: "oof"
-- Result: "rab"
-- Threads active: [0,1,2,3,2,3,2,3,2,3,2,3,2,3,2,3,2,3,2,1,0]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment