Last active
December 21, 2015 14:59
-
-
Save leroix/6323274 to your computer and use it in GitHub Desktop.
a multithreaded udp receiver with graceful exit
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| -- Forked from ThreadPool in Control-Engine and made to use TChan | |
| -- instead of Chan | |
| -- http://new-hackage.haskell.org/package/Control-Engine-1.1.0.1/docs/src/Control-ThreadPool.html | |
| module ThreadPool | |
| ( threadPool | |
| , threadPoolIO | |
| ) where | |
| import Control.Monad (forever, forM_) | |
| import Control.Monad.STM (STM, atomically) | |
| import Control.Concurrent (forkIO) | |
| import Control.Concurrent.STM.TChan | |
| -- |A trival thread pool for pure functions (mappings). Simply specify the number of threads desired and a mutator function. | |
| threadPool :: Int -> (a -> b) -> IO (TChan a, TChan b) | |
| threadPool nr mutator = do | |
| input <- newTChanIO | |
| output <- newTChanIO | |
| forM_ [1..nr] $ | |
| \_ -> forkIO (forever $ do | |
| i <- atomically $ readTChan input | |
| o <- return $! mutator i | |
| atomically $ writeTChan output o) | |
| return (input, output) | |
| -- |A trivial thread pool that allows IO mutator functions. Evaluation of output is not strict | |
| -- - force evaluation if desired! | |
| threadPoolIO :: Int -> (a -> IO b) -> IO (TChan a, TChan b) | |
| threadPoolIO nr mutator = do | |
| input <- newTChanIO | |
| output <- newTChanIO | |
| forM_ [1..nr] $ | |
| \_ -> forkIO (forever $ do | |
| i <- atomically $ readTChan input | |
| o <- mutator i | |
| atomically $ writeTChan output o) | |
| return (input, output) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| module UDPReceiver (listen) where | |
| import Network.Socket hiding (listen) | |
| import System.Posix.Signals | |
| import Control.Exception | |
| import Control.Monad (forever, when) | |
| import Control.Monad.STM (atomically) | |
| import Control.Concurrent (threadDelay) | |
| import Control.Concurrent.STM.TChan (writeTChan, isEmptyTChan) | |
| import ThreadPool (threadPoolIO) | |
| listen :: Int -> Int -> Int -> String -> (String -> IO a) -> IO () | |
| listen nThreads threadTimeout maxLen port handler = withSocketsDo $ do | |
| -- construct server address | |
| addrinfos <- getAddrInfo | |
| (Just (defaultHints {addrFlags = [AI_PASSIVE]})) | |
| Nothing | |
| (Just port) | |
| let serveraddr = head addrinfos | |
| -- construct socket and bind to server address | |
| sock <- socket (addrFamily serveraddr) Datagram defaultProtocol | |
| bindSocket sock (addrAddress serveraddr) | |
| -- initialize worker thread pool | |
| (input, _) <- threadPoolIO nThreads handler | |
| -- Specify handler for graceful exit on SIGINT and SIGTERM | |
| installHandler sigINT | |
| (Catch (handleExit sock)) | |
| (Just (addSignal sigTERM emptySignalSet)) | |
| procMessage threadTimeout maxLen input sock | |
| where | |
| handleExit sock = do | |
| sockIsConnected <- sIsBound sock | |
| when sockIsConnected $ | |
| (sClose sock >> putStrLn "\nClosing socket...") | |
| procMessage threadTimeout maxLen input sock = do | |
| packet <- try (recvFrom sock maxLen) :: IO (Either SomeException (String, Int, SockAddr)) | |
| case packet of | |
| Left exception -> do | |
| putStrLn "Waiting for queue to empty..." | |
| exitWithDelayWhenQueueEmpty threadTimeout input | |
| Right (msg, _, _) -> do | |
| atomically $ writeTChan input msg | |
| procMessage threadTimeout maxLen input sock | |
| where | |
| exitWithDelayWhenQueueEmpty threadTimeout input = do | |
| queueEmpty <- atomically $ isEmptyTChan input | |
| if queueEmpty | |
| then do | |
| putStrLn "Queue is empty." | |
| putStrLn "Giving threads time to complete..." | |
| threadDelay threadTimeout | |
| putStrLn "Exiting..." | |
| else exitWithDelayWhenQueueEmpty threadTimeout input |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment