Skip to content

Instantly share code, notes, and snippets.

@leroix
Last active December 21, 2015 14:59
Show Gist options
  • Select an option

  • Save leroix/6323274 to your computer and use it in GitHub Desktop.

Select an option

Save leroix/6323274 to your computer and use it in GitHub Desktop.
a multithreaded udp receiver with graceful exit
-- Forked from ThreadPool in Control-Engine and made to use TChan
-- instead of Chan
-- http://new-hackage.haskell.org/package/Control-Engine-1.1.0.1/docs/src/Control-ThreadPool.html
module ThreadPool
( threadPool
, threadPoolIO
) where
import Control.Monad (forever, forM_)
import Control.Monad.STM (STM, atomically)
import Control.Concurrent (forkIO)
import Control.Concurrent.STM.TChan
-- |A trival thread pool for pure functions (mappings). Simply specify the number of threads desired and a mutator function.
threadPool :: Int -> (a -> b) -> IO (TChan a, TChan b)
threadPool nr mutator = do
input <- newTChanIO
output <- newTChanIO
forM_ [1..nr] $
\_ -> forkIO (forever $ do
i <- atomically $ readTChan input
o <- return $! mutator i
atomically $ writeTChan output o)
return (input, output)
-- |A trivial thread pool that allows IO mutator functions. Evaluation of output is not strict
-- - force evaluation if desired!
threadPoolIO :: Int -> (a -> IO b) -> IO (TChan a, TChan b)
threadPoolIO nr mutator = do
input <- newTChanIO
output <- newTChanIO
forM_ [1..nr] $
\_ -> forkIO (forever $ do
i <- atomically $ readTChan input
o <- mutator i
atomically $ writeTChan output o)
return (input, output)
module UDPReceiver (listen) where
import Network.Socket hiding (listen)
import System.Posix.Signals
import Control.Exception
import Control.Monad (forever, when)
import Control.Monad.STM (atomically)
import Control.Concurrent (threadDelay)
import Control.Concurrent.STM.TChan (writeTChan, isEmptyTChan)
import ThreadPool (threadPoolIO)
listen :: Int -> Int -> Int -> String -> (String -> IO a) -> IO ()
listen nThreads threadTimeout maxLen port handler = withSocketsDo $ do
-- construct server address
addrinfos <- getAddrInfo
(Just (defaultHints {addrFlags = [AI_PASSIVE]}))
Nothing
(Just port)
let serveraddr = head addrinfos
-- construct socket and bind to server address
sock <- socket (addrFamily serveraddr) Datagram defaultProtocol
bindSocket sock (addrAddress serveraddr)
-- initialize worker thread pool
(input, _) <- threadPoolIO nThreads handler
-- Specify handler for graceful exit on SIGINT and SIGTERM
installHandler sigINT
(Catch (handleExit sock))
(Just (addSignal sigTERM emptySignalSet))
procMessage threadTimeout maxLen input sock
where
handleExit sock = do
sockIsConnected <- sIsBound sock
when sockIsConnected $
(sClose sock >> putStrLn "\nClosing socket...")
procMessage threadTimeout maxLen input sock = do
packet <- try (recvFrom sock maxLen) :: IO (Either SomeException (String, Int, SockAddr))
case packet of
Left exception -> do
putStrLn "Waiting for queue to empty..."
exitWithDelayWhenQueueEmpty threadTimeout input
Right (msg, _, _) -> do
atomically $ writeTChan input msg
procMessage threadTimeout maxLen input sock
where
exitWithDelayWhenQueueEmpty threadTimeout input = do
queueEmpty <- atomically $ isEmptyTChan input
if queueEmpty
then do
putStrLn "Queue is empty."
putStrLn "Giving threads time to complete..."
threadDelay threadTimeout
putStrLn "Exiting..."
else exitWithDelayWhenQueueEmpty threadTimeout input
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment