Last active
January 7, 2020 06:04
-
-
Save larskuhtz/f8eb5616d5eb9a0c7afa228171c45e76 to your computer and use it in GitHub Desktop.
Reproduce EventSource + HTTP/2 + TLS issue
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{-# LANGUAGE LambdaCase #-} | |
{-# LANGUAGE NumericUnderscores #-} | |
{-# LANGUAGE OverloadedStrings #-} | |
{-# LANGUAGE ScopedTypeVariables #-} | |
{-# LANGUAGE TypeApplications #-} | |
-- | | |
-- To reproduce with curl, run mainTls (from ghci) and use | |
-- | |
-- @ | |
-- time curl -kv -H "Accept:text/event-stream" "https://localhost:8080" -tlsv1.3 | |
-- @ | |
-- | |
module Events | |
( mainTls | |
, main | |
) where | |
import Control.Concurrent | |
import Control.Concurrent.Async | |
import Control.Concurrent.STM | |
import Control.Monad | |
import Control.Monad.Catch | |
import Control.StopWatch | |
import Data.Binary.Builder (fromByteString) | |
import qualified Data.ByteString.Char8 as B8 | |
import qualified Data.ByteString.Lazy as BL | |
import Data.IORef | |
import qualified Network.Connection as HTTP | |
import qualified Network.HTTP.Client as HTTP | |
import qualified Network.HTTP.Client.TLS as HTTP (getGlobalManager, setGlobalManager, newTlsManagerWith, mkManagerSettings) | |
import Network.HTTP.Types.Status | |
import Network.Wai (responseLBS, Application, pathInfo) | |
import Network.Wai.EventSource (ServerEvent(..), eventSourceAppIO) | |
import Network.Wai.Handler.Warp (run, defaultSettings, setPort) | |
import Network.Wai.Handler.WarpTLS | |
import Network.X509.SelfSigned | |
import Network.TLS hiding (HashSHA256, HashSHA512, SHA512) | |
import Network.TLS.Extra (ciphersuite_default) | |
import System.Random | |
-- -------------------------------------------------------------------------- -- | |
-- Main | |
main :: IO () | |
main = do | |
c <- newIORef 10 | |
withIntProducer 5 $ \p -> run 8080 (updatesHandler c p) | |
mainTls :: IO () | |
mainTls = do | |
(_, c@(X509CertPem cert), k@(X509KeyPem key)) <- generateLocalhostCertificate @RsaCert 365 | |
let tlsSettings = (tlsServerSettings c k) | |
{ tlsCiphers = ciphersuite_default | |
, tlsAllowedVersions = [TLS13, TLS12, TLS11, TLS10] | |
} | |
c <- newIORef 10 | |
print "ready" | |
withIntProducer 5 $ \p -> | |
runTLS tlsSettings (setPort 8080 defaultSettings) $ route | |
clientApp | |
(updatesHandler c p) | |
mainWithClient :: IO () | |
mainWithClient = do | |
setMgr | |
(_, t) <- stopWatch $ race_ client mainTls | |
print t | |
setMgr :: IO () | |
setMgr = do | |
mgr <- HTTP.newTlsManagerWith | |
$ HTTP.mkManagerSettings (HTTP.TLSSettingsSimple True True True) Nothing | |
HTTP.setGlobalManager mgr | |
client :: IO () | |
client = do | |
mgr <- HTTP.getGlobalManager | |
req <- HTTP.parseUrlThrow "https://localhost:8080" | |
HTTP.withResponse req mgr $ go . HTTP.responseBody | |
where | |
go r = HTTP.brRead r >>= \case | |
"" -> return () | |
x -> print x >> go r | |
-- -------------------------------------------------------------------------- -- | |
-- Route | |
route :: Application -> Application -> Application | |
route client stream req resp = case pathInfo req of | |
["client"] -> client req resp | |
_ -> stream req resp | |
-- -------------------------------------------------------------------------- -- | |
-- Client | |
clientApp :: Application | |
clientApp req respond = respond $ responseLBS status200 [] body | |
where | |
body = BL.intercalate "\n" | |
[ "<head>" | |
, "</head>" | |
, "<div id='list'></div>" | |
, "<body>" | |
, " <script>" | |
, " var start = Date.now();" | |
, " function logg(label) {" | |
, " const e = document.createElement('div');" | |
, " e.innerHTML = `${label}: ${(Date.now() - start) / 1000}`;" | |
, " list.appendChild(e);" | |
, " }" | |
, " const stream = new EventSource('https://localhost:8080/', { withCredentials: true });" | |
, " stream.onopen = function() {" | |
, " logg('start');" | |
, " };" | |
, " stream.onerror = function() {" | |
, " start = Date.now();" | |
, " logg('error');" | |
, " }" | |
, " stream.onmessage = function(event) {" | |
, " logg('message');" | |
, " };" | |
, " stream.addEventListener('Event', function(event) {" | |
, " logg('Event');" | |
, " });" | |
, " </script>" | |
, "</body>" | |
] | |
-- -------------------------------------------------------------------------- -- | |
-- Application | |
updatesHandler :: IORef Int -> IntProducer -> Application | |
updatesHandler count prod req respond = withLimit respond $ do | |
c <- readIORef count -- can race but we don't care | |
print $ "start: " <> show c | |
cv <- return 0 >>= newIORef | |
-- An update stream is closed after @timeout@ seconds. We add some jitter to | |
-- availablility of streams is uniformily distributed over time and not | |
-- predictable. | |
-- | |
jitter <- randomRIO @Double (0.9, 1.1) | |
timer <- registerDelay (round $ jitter * realToFrac timeout * 1_000_000) | |
do | |
r <- eventSourceAppIO (go timer cv) req respond | |
print $ "exit: " <> show c | |
return r | |
`catch` \(e :: SomeException) -> do | |
print e | |
throwM e | |
where | |
timeout = 240 | |
f :: Int -> ServerEvent | |
f c = ServerEvent (Just $ fromByteString "Event") Nothing [fromByteString $ B8.pack (show c)] | |
go :: TVar Bool -> IORef Int -> IO ServerEvent | |
go timer cv = do | |
c <- readIORef cv | |
-- await either a timeout or a new event | |
maybeInt <- atomically $ do | |
t <- readTVar timer | |
if t | |
then return Nothing | |
else Just <$> awaitNewInt prod c | |
case maybeInt of | |
Nothing -> return CloseEvent | |
Just c' -> do | |
writeIORef cv $! c' | |
return (f c') | |
withLimit respond inner = bracket | |
(atomicModifyIORef' count $ \x -> (x - 1, x - 1)) | |
(const $ atomicModifyIORef' count $ \x -> (x + 1, ())) | |
(\x -> if x <= 0 then ret503 respond else inner) | |
ret503 respond = do | |
respond $ responseLBS status503 [] "No more update streams available currently. Retry later." | |
-- -------------------------------------------------------------------------- -- | |
-- Producer of Integers | |
newtype IntProducer = IntProducer (TVar Int) | |
withIntProducer :: Int -> (IntProducer -> IO r) -> IO r | |
withIntProducer s inner = do | |
var <- newTVarIO 0 | |
race (producer var) (inner $ IntProducer var) >>= \case | |
Left () -> error "producer failed" | |
Right a -> return a | |
where | |
producer var = forever $ do | |
jitter <- randomRIO @Double (0.9, 1.1) | |
threadDelay $ round $ jitter * 1_000_000 | |
x <- randomRIO (0,s-1) | |
atomically $ writeTVar var x | |
awaitNewInt (IntProducer var) c = do | |
x <- readTVar var | |
when (x == c) retry | |
return x | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment