Created
December 18, 2019 01:20
-
-
Save lgastako/2ae76c7706f056609881ba14f3340680 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module STMRace where | |
import Control.Concurrent ( threadDelay ) | |
import Control.Concurrent.Async ( Async | |
, async | |
, cancel | |
, wait | |
) | |
import Control.Concurrent.MVar ( MVar | |
, newMVar | |
, withMVar | |
) | |
import Control.Concurrent.STM.TMVar ( TMVar | |
, newEmptyTMVarIO | |
, putTMVar | |
, takeTMVar | |
) | |
import Control.Concurrent.STM.TVar ( TVar | |
, modifyTVar' | |
, newTVarIO | |
, readTVar | |
) | |
import Control.Monad ( void ) | |
import Control.Monad.STM | |
import Data.Sequence ( (|>) | |
, Seq( (:<|) | |
, Empty | |
) | |
) | |
import qualified Data.Sequence as Seq | |
import System.IO.Unsafe ( unsafePerformIO ) | |
productonDelay :: Int | |
productonDelay = 2 | |
consumptionDelay :: Int | |
consumptionDelay = 1 | |
workTimeout :: Int | |
workTimeout = 5 | |
main :: IO () | |
main = do | |
signal <- newEmptyTMVarIO | |
queue <- newTVarIO Empty | |
timer <- async $ wakeup signal | |
_worker <- async $ work signal | |
producer <- async $ produce queue 0 | |
_consumer <- async $ consume signal timer queue | |
wait producer | |
sleep :: Int -> IO () | |
sleep n = threadDelay $ n * 1000 * 1000 | |
produce ::TVar (Seq Int) -> Int -> IO () | |
produce qVar n = do | |
sleep productonDelay | |
atomically $ modifyTVar' qVar (|> n) | |
display $ "Enqueued: " ++ show n | |
produce qVar (n+1) | |
consume :: TMVar () -> Async () -> TVar (Seq Int) -> IO () | |
consume signal timer qVar = do | |
sleep consumptionDelay | |
result <- atomically $ do | |
q <- readTVar qVar | |
if Seq.null q | |
then pure Nothing | |
else do | |
let x :<| xs = q | |
modifyTVar' qVar (const xs) | |
pure $ Just x | |
case result of | |
Nothing -> do | |
cancel timer | |
atomically $ putTMVar signal () | |
timer' <- async $ wakeup signal | |
display "Nothing to consume so I kicked the worker and reset the timer." | |
consume signal timer' qVar | |
Just n -> do | |
display $ "Consumed: " ++ show n | |
consume signal timer qVar | |
wakeup :: TMVar () -> IO () | |
wakeup signal = do | |
sleep workTimeout | |
atomically $ putTMVar signal () | |
wakeup signal | |
work :: TMVar () -> IO () | |
work signal = do | |
void . atomically $ takeTMVar signal | |
display "I'm doing the thing." | |
work signal | |
display :: String -> IO () | |
display s = withMVar displayLock $ const (putStrLn s) | |
{-# NOINLINE displayLock #-} | |
displayLock :: MVar () | |
displayLock = unsafePerformIO $ newMVar () |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment