Last active
March 26, 2018 08:16
-
-
Save DarinM223/e3b1fb2c11e735372ea19ee6c3cf8e2a to your computer and use it in GitHub Desktop.
Conduit TCP Server that handles messages one at a time with timeout handling and backpressure
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module Main where | |
import Conduit | |
import Control.Concurrent (forkIO) | |
import Control.Concurrent.STM | |
import Data.Conduit.Network | |
import Data.ByteString hiding (putStrLn) | |
import Data.String | |
import qualified Data.STM.RollingQueue as RQ | |
import qualified Timer | |
timeout :: Int | |
timeout = 5000000 | |
maxBackpressure :: Int | |
maxBackpressure = 1000000 | |
main :: IO () | |
main = do | |
rq <- RQ.newIO maxBackpressure :: IO (RQ.RollingQueue ByteString) | |
timer <- Timer.newIO | |
Timer.reset timer timeout | |
forkIO $ runTCPServer (serverSettings 4000 "*") $ \appData -> | |
runConduit $ appSource appData .| sinkRollingQueue rq | |
run rq timer | |
run :: (Data.String.IsString s, Show s) | |
=> RQ.RollingQueue s | |
-> Timer.Timer | |
-> IO () | |
run rq timer = do | |
msg <- atomically $ | |
(const ("Timer", 0 :: Int) <$> Timer.await timer) | |
`orElse` RQ.read rq | |
putStrLn $ "Msg: " ++ show msg | |
Timer.reset timer timeout | |
run rq timer | |
sourceRollingQueue :: (MonadIO m) => RQ.RollingQueue o -> ConduitT i o m b | |
sourceRollingQueue q = go | |
where | |
go = do | |
(e, _) <- liftIO $ atomically $ RQ.read q | |
yield e | |
go | |
sinkRollingQueue :: (MonadIO m) => RQ.RollingQueue a -> ConduitT a o m () | |
sinkRollingQueue q = | |
awaitForever (liftIO . atomically . RQ.write q) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module Timer where | |
import Control.Concurrent | |
import Control.Concurrent.STM | |
import Control.Monad.IO.Class | |
data Timer = Timer | |
{ _thread :: TMVar ThreadId | |
, _var :: TMVar () | |
} | |
new :: STM Timer | |
new = do | |
thread <- newEmptyTMVar | |
var <- newEmptyTMVar | |
return $ Timer thread var | |
newIO :: IO Timer | |
newIO = do | |
thread <- newEmptyTMVarIO | |
var <- newEmptyTMVarIO | |
return $ Timer thread var | |
reset :: (MonadIO m) => Timer -> Int -> m () | |
reset t i = liftIO $ do | |
cancel t | |
n <- forkIO $ do | |
threadDelay i | |
atomically $ tryPutTMVar (_var t) () | |
return () | |
atomically $ putTMVar (_thread t) n | |
cancel :: (MonadIO m) => Timer -> m () | |
cancel t = liftIO $ do | |
tid <- atomically $ tryTakeTMVar (_thread t) | |
mapM_ killThread tid | |
await :: Timer -> STM () | |
await = takeTMVar . _var |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment