Skip to content

Instantly share code, notes, and snippets.

@DarinM223
Last active March 26, 2018 08:16
Show Gist options
  • Save DarinM223/e3b1fb2c11e735372ea19ee6c3cf8e2a to your computer and use it in GitHub Desktop.
Save DarinM223/e3b1fb2c11e735372ea19ee6c3cf8e2a to your computer and use it in GitHub Desktop.
Conduit TCP Server that handles messages one at a time with timeout handling and backpressure
module Main where
import Conduit
import Control.Concurrent (forkIO)
import Control.Concurrent.STM
import Data.Conduit.Network
import Data.ByteString hiding (putStrLn)
import Data.String
import qualified Data.STM.RollingQueue as RQ
import qualified Timer
timeout :: Int
timeout = 5000000
maxBackpressure :: Int
maxBackpressure = 1000000
main :: IO ()
main = do
rq <- RQ.newIO maxBackpressure :: IO (RQ.RollingQueue ByteString)
timer <- Timer.newIO
Timer.reset timer timeout
forkIO $ runTCPServer (serverSettings 4000 "*") $ \appData ->
runConduit $ appSource appData .| sinkRollingQueue rq
run rq timer
run :: (Data.String.IsString s, Show s)
=> RQ.RollingQueue s
-> Timer.Timer
-> IO ()
run rq timer = do
msg <- atomically $
(const ("Timer", 0 :: Int) <$> Timer.await timer)
`orElse` RQ.read rq
putStrLn $ "Msg: " ++ show msg
Timer.reset timer timeout
run rq timer
sourceRollingQueue :: (MonadIO m) => RQ.RollingQueue o -> ConduitT i o m b
sourceRollingQueue q = go
where
go = do
(e, _) <- liftIO $ atomically $ RQ.read q
yield e
go
sinkRollingQueue :: (MonadIO m) => RQ.RollingQueue a -> ConduitT a o m ()
sinkRollingQueue q =
awaitForever (liftIO . atomically . RQ.write q)
module Timer where
import Control.Concurrent
import Control.Concurrent.STM
import Control.Monad.IO.Class
data Timer = Timer
{ _thread :: TMVar ThreadId
, _var :: TMVar ()
}
new :: STM Timer
new = do
thread <- newEmptyTMVar
var <- newEmptyTMVar
return $ Timer thread var
newIO :: IO Timer
newIO = do
thread <- newEmptyTMVarIO
var <- newEmptyTMVarIO
return $ Timer thread var
reset :: (MonadIO m) => Timer -> Int -> m ()
reset t i = liftIO $ do
cancel t
n <- forkIO $ do
threadDelay i
atomically $ tryPutTMVar (_var t) ()
return ()
atomically $ putTMVar (_thread t) n
cancel :: (MonadIO m) => Timer -> m ()
cancel t = liftIO $ do
tid <- atomically $ tryTakeTMVar (_thread t)
mapM_ killThread tid
await :: Timer -> STM ()
await = takeTMVar . _var
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment