Created
January 7, 2020 07:59
-
-
Save kazu-yamamoto/370d3ff3e0bc28bbfe6eb2ecc67feb95 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{-# LANGUAGE LambdaCase #-} | |
{-# LANGUAGE NumericUnderscores #-} | |
{-# LANGUAGE OverloadedStrings #-} | |
{-# LANGUAGE ScopedTypeVariables #-} | |
{-# LANGUAGE TypeApplications #-} | |
-- | | |
-- To reproduce with curl, run mainTls (from ghci) and use | |
-- | |
-- @ | |
-- time curl -v --http2-prior-knowledge -H "Accept:text/event-stream" "http://127.0.0.1:8080" | |
-- @ | |
-- | |
module Events where | |
import Control.Concurrent | |
import Control.Concurrent.Async | |
import Control.Concurrent.STM | |
import Control.Monad | |
import Control.Monad.Catch | |
import Data.Binary.Builder (fromByteString) | |
import qualified Data.ByteString.Char8 as B8 | |
import Data.IORef | |
import Network.HTTP.Types.Status | |
import Network.Wai (responseLBS, Application) | |
import Network.Wai.EventSource (ServerEvent(..), eventSourceAppIO) | |
import Network.Wai.Handler.Warp (run) | |
import System.Random | |
-- -------------------------------------------------------------------------- -- | |
-- Main | |
main :: IO () | |
main = do | |
c <- newIORef 10 | |
withIntProducer 5 $ \p -> run 8080 (updatesHandler c p) | |
-- -------------------------------------------------------------------------- -- | |
-- Application | |
updatesHandler :: IORef Int -> IntProducer -> Application | |
updatesHandler count prod req respond = withLimit respond $ do | |
c <- readIORef count -- can race but we don't care | |
print $ "start: " <> show c | |
cv <- return 0 >>= newIORef | |
-- An update stream is closed after @timeout@ seconds. We add some jitter to | |
-- availablility of streams is uniformily distributed over time and not | |
-- predictable. | |
-- | |
jitter <- randomRIO @Double (0.9, 1.1) | |
timer <- registerDelay (round $ jitter * realToFrac timeout * 1_000_000) | |
do | |
r <- eventSourceAppIO (go timer cv) req respond | |
print $ "exit: " <> show c | |
return r | |
`catch` \(e :: SomeException) -> do | |
print e | |
throwM e | |
where | |
timeout = 240 :: Int | |
f :: Int -> ServerEvent | |
f c = ServerEvent (Just $ fromByteString "Event") Nothing [fromByteString $ B8.pack (show c)] | |
go :: TVar Bool -> IORef Int -> IO ServerEvent | |
go timer cv = do | |
c <- readIORef cv | |
-- await either a timeout or a new event | |
maybeInt <- atomically $ do | |
t <- readTVar timer | |
if t | |
then return Nothing | |
else Just <$> awaitNewInt prod c | |
case maybeInt of | |
Nothing -> return CloseEvent | |
Just c' -> do | |
writeIORef cv $! c' | |
return (f c') | |
withLimit respond' inner = bracket | |
(atomicModifyIORef' count $ \x -> (x - 1, x - 1)) | |
(const $ atomicModifyIORef' count $ \x -> (x + 1, ())) | |
(\x -> if x <= 0 then ret503 respond' else inner) | |
ret503 respond' = do | |
respond' $ responseLBS status503 [] "No more update streams available currently. Retry later." | |
-- -------------------------------------------------------------------------- -- | |
-- Producer of Integers | |
newtype IntProducer = IntProducer (TVar Int) | |
withIntProducer :: Int -> (IntProducer -> IO r) -> IO r | |
withIntProducer s inner = do | |
var <- newTVarIO 0 | |
race (producer var) (inner $ IntProducer var) >>= \case | |
Left () -> error "producer failed" | |
Right a -> return a | |
where | |
producer var = forever $ do | |
jitter <- randomRIO @Double (0.9, 1.1) | |
threadDelay $ round $ jitter * 1_000_000 | |
x <- randomRIO (0,s-1) | |
atomically $ writeTVar var x | |
awaitNewInt :: IntProducer -> Int -> STM Int | |
awaitNewInt (IntProducer var) c = do | |
x <- readTVar var | |
when (x == c) retry | |
return x |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment