Skip to content

Instantly share code, notes, and snippets.

@NicolasT
Created June 9, 2013 21:40
Show Gist options
  • Save NicolasT/5745367 to your computer and use it in GitHub Desktop.
Save NicolasT/5745367 to your computer and use it in GitHub Desktop.
Auto-reconnecting Conduit sink with user-defined backoff
{-# LANGUAGE RankNTypes, OverloadedStrings #-}
module Main (main) where
import Control.Monad (guard)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Monad.Trans.Resource (register, release)
import Control.Exception (tryJust)
import GHC.IO.Exception
import Foreign.C.Error
import Control.Concurrent (threadDelay)
import Data.ByteString (ByteString)
import Network.Socket (Socket, sClose)
import Network.Socket.ByteString (sendAll)
import System.IO (hPutStrLn, stdin, stderr)
import Data.Conduit
import qualified Data.Conduit.Binary as CB
import qualified Data.Conduit.Network as CN
debug :: MonadIO m => String -> m ()
debug s = liftIO $ hPutStrLn stderr s
reconnectingSink :: (MonadResource m, MonadIO m)
=> IO Socket -- Socket creaction / connection action
-> (s -> (Int, s)) -- Backoff calculation given input state
-> s -- Initial backoff calculation state
-> Consumer ByteString m ()
reconnectingSink connect backoff s0 = do
(key, sock) <- getConnection (backoff s0)
loop key sock
where
getConnection (t, s) = do
debug $ "Delay: " ++ show t
liftIO $ threadDelay t
debug "Connecting"
r <- liftIO $ tryJust (guard . isNetworkError) connect
case r of
Left () -> do
debug "Connection failed"
getConnection (backoff s)
Right sock -> do
debug "Connection succeeded"
k <- register (sClose sock)
return (k, sock)
loop key sock = do
v <- await
case v of
Nothing -> do
debug "End of input, closing socket"
release key
return ()
Just s -> do
r <- liftIO $ tryJust (guard . isNetworkError) (sendAll sock s)
case r of
Left () -> do
debug "Send failed, releasing socket & reconnecting"
release key
(key', sock') <- getConnection (backoff s0)
loop key' sock'
Right () -> loop key sock
isNetworkError e = case ioe_errno e of
Just n -> Errno n `elem` [ eBADF
, eCONNABORTED
, eCONNREFUSED
, eCONNRESET
, eHOSTDOWN
, eHOSTUNREACH
, eNETDOWN
, eNETRESET
, eNETUNREACH
, eNONET
, eNOTCONN
, ePIPE
]
Nothing -> False
main :: IO ()
main = runResourceT $
CB.sourceHandle stdin $$ reconnectingSink connect fib s0
where
host = "127.0.0.1"
port = 4321
connect = do
(sock, addr) <- CN.getSocket host port
hPutStrLn stderr $ "Connected to: " ++ show addr
return sock
s0 = (0, 1000)
max' = 60000000
fib (a, b) = (min a max', (b, a + b))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment