Last active
June 16, 2022 03:53
-
-
Save DarinM223/0d3263544a919fba714df2ae28ed21cb to your computer and use it in GitHub Desktop.
Select for TBQueue similar to Golang's select
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{-# LANGUAGE ExistentialQuantification #-} | |
import Control.Concurrent.STM (STM, TBQueue, atomically, isEmptyTBQueue, isFullTBQueue, newTBQueueIO, readTBQueue, retry, writeTBQueue) | |
import Control.Monad (filterM, replicateM_) | |
import System.Random (StdGen, newStdGen, uniformR) | |
data Handler a | |
= forall b. Recv (TBQueue b) (b -> IO a) | |
| forall b. Send (TBQueue b) b (IO a) | |
handlerBlocked :: Handler a -> STM Bool | |
handlerBlocked (Recv queue _) = isEmptyTBQueue queue | |
handlerBlocked (Send queue _ _) = isFullTBQueue queue | |
select :: StdGen -> [Handler a] -> STM (IO a, StdGen) | |
select gen hs = do | |
ready <- filterM (fmap not . handlerBlocked) hs | |
if (null ready) | |
then retry | |
else do | |
let (i, gen') = uniformR (0, length ready - 1) gen | |
handler = ready !! i | |
case handler of | |
Recv queue fn -> do | |
v <- readTBQueue queue | |
pure (fn v, gen') | |
Send queue v fn -> do | |
writeTBQueue queue v | |
pure (fn, gen') | |
selectIO :: [Handler a] -> IO a | |
selectIO hs = do | |
gen <- newStdGen | |
(action, _) <- atomically $ select gen hs | |
action | |
test :: IO () | |
test = do | |
q1 <- newTBQueueIO 1 | |
q2 <- newTBQueueIO 1 | |
atomically $ writeTBQueue q1 (1 :: Int) | |
let msg = "hello" | |
selectIO | |
[ Recv q1 $ \i -> putStrLn $ "Received: " ++ show i, | |
Send q2 msg $ putStrLn $ "Sent: " ++ show msg, | |
Send q1 2 $ putStrLn "This shouldn't happen", | |
Recv q2 $ \_ -> putStrLn "This shouldn't happen" | |
] | |
main :: IO () | |
main = replicateM_ 1000 test |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{-# LANGUAGE ExistentialQuantification #-} | |
import Control.Concurrent (yield) | |
import Control.Concurrent.Chan.Unagi.Bounded (Element (tryRead), InChan, OutChan, newChan, tryReadChan, tryWriteChan, writeChan) | |
import Control.Monad (replicateM_) | |
import System.Random.Shuffle (shuffleM) | |
data Handler a | |
= forall b. Recv (OutChan b) (b -> IO a) | |
| forall b. Send (InChan b) b (IO a) | |
-- Problem with this is that it can spinlock unlike the STM version | |
-- which only wakes up when one of the queues changes. | |
select :: (IO a -> IO a) -> [Handler a] -> IO a | |
select retry hs0 = go hs0 | |
where | |
go ((Recv chan f) : hs) = do | |
(element, _) <- tryReadChan chan | |
tryRead element >>= maybe (go hs) f | |
go ((Send chan v m) : hs) = do | |
succeeded <- tryWriteChan chan v | |
if succeeded then m else go hs | |
go [] = retry (select retry hs0) | |
main :: IO () | |
main = replicateM_ 100 $ do | |
(in1, out1) <- newChan 1 | |
(in2, out2) <- newChan 1 | |
writeChan in1 (1 :: Int) | |
let msg = "hello" | |
select (\retry -> yield >> retry) | |
=<< shuffleM | |
[ Recv out1 $ \i -> putStrLn $ "Received: " ++ show i, | |
Send in2 msg $ putStrLn $ "Sent: " ++ show msg, | |
Send in1 2 $ putStrLn "This shouldn't happen", | |
Recv out2 $ \_ -> putStrLn "This shouldn't happen" | |
] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment