Skip to content

Instantly share code, notes, and snippets.

@larskuhtz
Last active January 7, 2020 06:04
Show Gist options
  • Save larskuhtz/f8eb5616d5eb9a0c7afa228171c45e76 to your computer and use it in GitHub Desktop.
Save larskuhtz/f8eb5616d5eb9a0c7afa228171c45e76 to your computer and use it in GitHub Desktop.
Reproduce EventSource + HTTP/2 + TLS issue
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NumericUnderscores #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}
-- |
-- To reproduce with curl, run mainTls (from ghci) and use
--
-- @
-- time curl -kv -H "Accept:text/event-stream" "https://localhost:8080" -tlsv1.3
-- @
--
module Events
( mainTls
, main
) where
import Control.Concurrent
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Monad
import Control.Monad.Catch
import Control.StopWatch
import Data.Binary.Builder (fromByteString)
import qualified Data.ByteString.Char8 as B8
import qualified Data.ByteString.Lazy as BL
import Data.IORef
import qualified Network.Connection as HTTP
import qualified Network.HTTP.Client as HTTP
import qualified Network.HTTP.Client.TLS as HTTP (getGlobalManager, setGlobalManager, newTlsManagerWith, mkManagerSettings)
import Network.HTTP.Types.Status
import Network.Wai (responseLBS, Application, pathInfo)
import Network.Wai.EventSource (ServerEvent(..), eventSourceAppIO)
import Network.Wai.Handler.Warp (run, defaultSettings, setPort)
import Network.Wai.Handler.WarpTLS
import Network.X509.SelfSigned
import Network.TLS hiding (HashSHA256, HashSHA512, SHA512)
import Network.TLS.Extra (ciphersuite_default)
import System.Random
-- -------------------------------------------------------------------------- --
-- Main
main :: IO ()
main = do
c <- newIORef 10
withIntProducer 5 $ \p -> run 8080 (updatesHandler c p)
mainTls :: IO ()
mainTls = do
(_, c@(X509CertPem cert), k@(X509KeyPem key)) <- generateLocalhostCertificate @RsaCert 365
let tlsSettings = (tlsServerSettings c k)
{ tlsCiphers = ciphersuite_default
, tlsAllowedVersions = [TLS13, TLS12, TLS11, TLS10]
}
c <- newIORef 10
print "ready"
withIntProducer 5 $ \p ->
runTLS tlsSettings (setPort 8080 defaultSettings) $ route
clientApp
(updatesHandler c p)
mainWithClient :: IO ()
mainWithClient = do
setMgr
(_, t) <- stopWatch $ race_ client mainTls
print t
setMgr :: IO ()
setMgr = do
mgr <- HTTP.newTlsManagerWith
$ HTTP.mkManagerSettings (HTTP.TLSSettingsSimple True True True) Nothing
HTTP.setGlobalManager mgr
client :: IO ()
client = do
mgr <- HTTP.getGlobalManager
req <- HTTP.parseUrlThrow "https://localhost:8080"
HTTP.withResponse req mgr $ go . HTTP.responseBody
where
go r = HTTP.brRead r >>= \case
"" -> return ()
x -> print x >> go r
-- -------------------------------------------------------------------------- --
-- Route
route :: Application -> Application -> Application
route client stream req resp = case pathInfo req of
["client"] -> client req resp
_ -> stream req resp
-- -------------------------------------------------------------------------- --
-- Client
clientApp :: Application
clientApp req respond = respond $ responseLBS status200 [] body
where
body = BL.intercalate "\n"
[ "<head>"
, "</head>"
, "<div id='list'></div>"
, "<body>"
, " <script>"
, " var start = Date.now();"
, " function logg(label) {"
, " const e = document.createElement('div');"
, " e.innerHTML = `${label}: ${(Date.now() - start) / 1000}`;"
, " list.appendChild(e);"
, " }"
, " const stream = new EventSource('https://localhost:8080/', { withCredentials: true });"
, " stream.onopen = function() {"
, " logg('start');"
, " };"
, " stream.onerror = function() {"
, " start = Date.now();"
, " logg('error');"
, " }"
, " stream.onmessage = function(event) {"
, " logg('message');"
, " };"
, " stream.addEventListener('Event', function(event) {"
, " logg('Event');"
, " });"
, " </script>"
, "</body>"
]
-- -------------------------------------------------------------------------- --
-- Application
updatesHandler :: IORef Int -> IntProducer -> Application
updatesHandler count prod req respond = withLimit respond $ do
c <- readIORef count -- can race but we don't care
print $ "start: " <> show c
cv <- return 0 >>= newIORef
-- An update stream is closed after @timeout@ seconds. We add some jitter to
-- availablility of streams is uniformily distributed over time and not
-- predictable.
--
jitter <- randomRIO @Double (0.9, 1.1)
timer <- registerDelay (round $ jitter * realToFrac timeout * 1_000_000)
do
r <- eventSourceAppIO (go timer cv) req respond
print $ "exit: " <> show c
return r
`catch` \(e :: SomeException) -> do
print e
throwM e
where
timeout = 240
f :: Int -> ServerEvent
f c = ServerEvent (Just $ fromByteString "Event") Nothing [fromByteString $ B8.pack (show c)]
go :: TVar Bool -> IORef Int -> IO ServerEvent
go timer cv = do
c <- readIORef cv
-- await either a timeout or a new event
maybeInt <- atomically $ do
t <- readTVar timer
if t
then return Nothing
else Just <$> awaitNewInt prod c
case maybeInt of
Nothing -> return CloseEvent
Just c' -> do
writeIORef cv $! c'
return (f c')
withLimit respond inner = bracket
(atomicModifyIORef' count $ \x -> (x - 1, x - 1))
(const $ atomicModifyIORef' count $ \x -> (x + 1, ()))
(\x -> if x <= 0 then ret503 respond else inner)
ret503 respond = do
respond $ responseLBS status503 [] "No more update streams available currently. Retry later."
-- -------------------------------------------------------------------------- --
-- Producer of Integers
newtype IntProducer = IntProducer (TVar Int)
withIntProducer :: Int -> (IntProducer -> IO r) -> IO r
withIntProducer s inner = do
var <- newTVarIO 0
race (producer var) (inner $ IntProducer var) >>= \case
Left () -> error "producer failed"
Right a -> return a
where
producer var = forever $ do
jitter <- randomRIO @Double (0.9, 1.1)
threadDelay $ round $ jitter * 1_000_000
x <- randomRIO (0,s-1)
atomically $ writeTVar var x
awaitNewInt (IntProducer var) c = do
x <- readTVar var
when (x == c) retry
return x
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment