Skip to content

Instantly share code, notes, and snippets.

@bradparker
Last active April 29, 2022 05:54
Show Gist options
  • Save bradparker/48254f30a8fb64fce7c0bbdadc83524c to your computer and use it in GitHub Desktop.
Save bradparker/48254f30a8fb64fce7c0bbdadc83524c to your computer and use it in GitHub Desktop.
Streaming server
cabal.project.local
dist-newstyle
*.prof*
*.pem
*.html*
{ haskellPackages }:
haskellPackages.callCabal2nix "server" ./. {}
{-# LANGUAGE BlockArguments #-}
{-# LANGUAGE DerivingVia #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE InstanceSigs #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE LiberalTypeSynonyms #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE NumericUnderscores #-}
{-# LANGUAGE OverloadedLists #-}
{-# LANGUAGE OverloadedRecordDot #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE UndecidableInstances #-}
{-# LANGUAGE NoFieldSelectors #-}
{-# OPTIONS_GHC -Wall #-}
{-# OPTIONS_GHC -fno-warn-name-shadowing #-}
module Main (main) where
import Control.Applicative (Alternative (empty, (<|>)), many)
import Control.Concurrent (forkIO)
import Control.Concurrent.STM
( TMVar,
atomically,
isEmptyTMVar,
modifyTVar,
newEmptyTMVarIO,
newTVarIO,
putTMVar,
readTVar,
retry,
takeTMVar,
)
import Control.Exception (SomeException, bracket, bracket_, catch)
import Control.Monad (MonadPlus (mplus, mzero), ap, guard, join, when, (<=<))
import Control.Monad.Error.Class (MonadError (catchError, throwError))
import Control.Monad.IO.Class (MonadIO (liftIO))
import Control.Monad.State (StateT (StateT, runStateT), get)
import Control.Monad.Trans.Class (MonadTrans (lift))
import Control.Monad.Trans.Maybe (MaybeT (MaybeT, runMaybeT))
import Data.Bifunctor (bimap, first)
import Data.ByteString (ByteString)
import qualified Data.ByteString as BS
import qualified Data.ByteString.Char8 as BSC
import Data.Char (chr, digitToInt, isDigit, ord, toLower)
import Data.Coerce (coerce)
import Data.Functor (void, ($>))
import qualified Data.List as List
import Data.Map (Map)
import qualified Data.Map as Map
import Data.Maybe (listToMaybe)
import Data.String (IsString (fromString))
import Foreign.C (CInt)
import GHC.Exts (IsList (Item, fromList, toList))
import GHC.IO.Exception (IOErrorType (NoSuchThing), IOException (..))
import GHC.Word (Word8)
import Network.Simple.TCP (HostPreference (Host), ServiceName, SockAddr, Socket)
import qualified Network.Simple.TCP as Network
import Network.Simple.TCP.TLS (Context)
import qualified Network.Simple.TCP.TLS as NetworkTLS
import qualified Network.Socket as Socket
import Numeric (readHex, showHex)
import qualified System.Environment.Blank as System
import System.FilePath ((</>))
import qualified System.FilePath as FilePath
import System.IO (Handle, IOMode (ReadMode))
import qualified System.IO as System
import System.Posix.Files (FileStatus)
import qualified System.Posix.Files as Files
import qualified System.Posix.Internals as System
import System.Posix.Process (getProcessID)
import System.Posix.Signals (installHandler, sigTERM)
import qualified System.Posix.Signals as System
import Text.Read (readMaybe)
import Prelude hiding (drop, mapM, readFile, splitAt, take)
newtype Stream o m a
= Stream (m (Either a (o, Stream o m a)))
next :: Stream o m a -> m (Either a (o, Stream o m a))
next (Stream s) = s
yield :: forall m a. Monad m => a -> Stream a m ()
yield = yieldM . pure
yieldM :: forall m a. Monad m => m a -> Stream a m ()
yieldM = Stream . ((Right . (,pure ())) <$>)
newtype StreamF o m a r
= StreamF (m (Either a (o, r)))
instance Functor m => Functor (StreamF o m r) where
fmap :: (a -> b) -> StreamF o m r a -> StreamF o m r b
fmap f (StreamF act) = StreamF (fmap (fmap f) <$> act)
project :: Stream o m a -> StreamF o m a (Stream o m a)
project = coerce
embed :: StreamF o m a (Stream o m a) -> Stream o m a
embed = coerce
cata :: forall o m a r. Functor m => (StreamF o m a r -> r) -> Stream o m a -> r
cata f = c where c = f . fmap c . project
fold :: forall o m a r. Functor m => (m (Either a (o, r)) -> r) -> Stream o m a -> r
fold f = cata (\(StreamF action) -> f action)
ana :: forall a o m r. Functor m => (a -> StreamF o m r a) -> a -> Stream o m r
ana g = a where a = embed . fmap a . g
unfold :: forall a o m r. Functor m => (a -> m (Either r (o, a))) -> a -> Stream o m r
unfold f = ana (StreamF . f)
reread :: forall m a b. Monad m => (a -> m (Maybe b)) -> a -> Stream b m ()
reread reader = unfold \a -> do
step <- reader a
case step of
Nothing -> pure (Left ())
Just b -> pure (Right (b, a))
-- | Run a stream, evaluating its effects and producing its result
-- >>> :{
-- run do
-- lift (putStrLn "Hello,")
-- lift (putStrLn "Streams")
-- pure "Done!"
-- :}
-- Hello,
-- Streams
-- "Done!"
run :: forall o m a. (Monad m) => Stream o m a -> m a
run = fold \action -> do
step <- action
case step of
Left result -> pure result
Right (_, rest) -> rest
flush :: forall m a r. (Monad m) => Stream a m r -> Stream a m r
flush = fold \action -> Stream do
step <- action
case step of
Left r -> pure (Left r)
Right (_, rest) -> next rest
-- | Effectfully transform the values yielded by a stream
-- >>> stream = (yield "Hello," *> yield "Streams") :: Stream String IO ()
-- >>> :t mapM (\i -> putStrLn i *> pure (length i)) stream
-- mapM (\i -> putStrLn i *> pure (length i)) stream
-- :: Stream Int IO ()
mapM :: forall a b m r. Monad m => (a -> m b) -> Stream a m r -> Stream b m r
mapM f = fold \action -> Stream do
step <- action
case step of
Left result -> pure (Left result)
Right (a, rest) -> Right . (,rest) <$> f a
instance forall o m. (Functor m) => Functor (Stream o m) where
fmap :: forall a b. (a -> b) -> Stream o m a -> Stream o m b
fmap f = Stream . fmap (bimap f (fmap (fmap f))) . next
instance forall o m. (Monad m) => Applicative (Stream o m) where
pure :: forall a. a -> Stream o m a
pure = Stream . pure . Left
(<*>) :: forall a b. Stream o m (a -> b) -> Stream o m a -> Stream o m b
(<*>) = ap
(*>) :: forall a b. Stream o m a -> Stream o m b -> Stream o m b
streamA *> streamB = Stream do
step <- next streamA
case step of
Left _ -> next streamB
Right layer -> pure (Right (fmap (*> streamB) layer))
instance forall o m. (Monad m) => Monad (Stream o m) where
(>>=) :: Stream o m a -> (a -> Stream o m b) -> Stream o m b
stream >>= k = Stream do
step <- next stream
case step of
Left a -> next (k a)
Right layer -> pure (Right (fmap (>>= k) layer))
instance forall o. MonadTrans (Stream o) where
lift :: forall m a. Functor m => m a -> Stream o m a
lift = Stream . fmap Left
instance forall o m. (MonadIO m) => MonadIO (Stream o m) where
liftIO :: forall a. IO a -> Stream o m a
liftIO = lift . liftIO
instance forall o e m. (MonadError e m) => MonadError e (Stream o m) where
throwError :: e -> Stream f m a
throwError = lift . throwError
catchError :: Stream f m a -> (e -> Stream f m a) -> Stream f m a
catchError stream catcher = Stream (next stream `catchError` (next . catcher))
newtype Pipe i o f a
= Pipe (forall x. StateT (Stream i f x) (Stream o f) a)
instance forall i o f. Functor f => Functor (Pipe i o f) where
fmap :: forall a b. (a -> b) -> Pipe i o f a -> Pipe i o f b
fmap f (Pipe sa) = Pipe (fmap f sa)
instance forall i o f. Monad f => Applicative (Pipe i o f) where
pure :: a -> Pipe i o f a
pure a = Pipe (pure a)
(*>) :: Pipe i o f a -> Pipe i o f b -> Pipe i o f b
Pipe as *> Pipe bs = Pipe (as *> bs)
(<*>) :: Pipe i o f (a -> b) -> Pipe i o f a -> Pipe i o f b
Pipe fs <*> Pipe as = Pipe (fs <*> as)
instance forall i o f. Monad f => Monad (Pipe i o f) where
(>>=) :: forall a b. Pipe i o f a -> (a -> Pipe i o f b) -> Pipe i o f b
Pipe p >>= k = Pipe (p >>= (\a -> case k a of Pipe q -> q))
instance forall i o f. MonadFail f => MonadFail (Pipe i o f) where
fail :: String -> Pipe i o f a
fail = lift . fail
instance forall i o. MonadTrans (Pipe i o) where
lift :: forall f a. Monad f => f a -> Pipe i o f a
lift fa = Pipe (lift (lift fa))
instance forall i o f. MonadIO f => MonadIO (Pipe i o f) where
liftIO :: forall a. IO a -> Pipe i o f a
liftIO = lift . liftIO
runPipe ::
forall i o f a b.
Pipe i o f b ->
Stream i f a ->
Stream o f (b, Stream i f a)
runPipe (Pipe s) = runStateT s
execPipe ::
forall i o f a b.
Functor f =>
Pipe i o f b ->
Stream i f a ->
Stream o f (Stream i f a)
execPipe p s = fmap snd (runPipe p s)
receive :: forall i o f. Monad f => Pipe i o f (Maybe i)
receive = Pipe $ StateT \input -> Stream do
step <- next input
case step of
Left a -> pure (Left (Nothing, pure a))
Right (i, rest) -> pure (Left (Just i, rest))
send :: forall i o f. Monad f => o -> Pipe i o f ()
send o = Pipe (lift (yield o))
-- | Compose
--
-- >>> import Control.Monad (replicateM_, forever)
-- >>> :{
-- pipe :: forall m a b. Monad m => (a -> b) -> Pipe a b m ()
-- pipe f = do
-- input <- receive
-- case input of
-- Nothing -> pure ()
-- Just a -> send (f a)
-- :}
--
-- >>> a = replicateM_ 5 (pipe (\n -> replicate n 'a'))
-- >>> b = replicateM_ 6 (pipe (* 2))
-- >>> c = replicateM_ 7 (pipe (+ 1))
-- >>> d = a `compose` (b `compose` c)
-- >>> run (mapM print (runPipe d (forever (yield 3))))
-- "aaaaaaaa"
-- "aaaaaaaa"
-- "aaaaaaaa"
-- "aaaaaaaa"
-- "aaaaaaaa"
-- >>> e = (a `compose` b) `compose` c
-- >>> run (mapM print (runPipe e (forever (yield 3))))
-- "aaaaaaaa"
-- "aaaaaaaa"
-- "aaaaaaaa"
-- "aaaaaaaa"
-- "aaaaaaaa"
compose :: forall m a b c r. Monad m => Pipe b c m r -> Pipe a b m r -> Pipe a c m r
compose pipeA pipeB =
Pipe
( StateT
( \input -> do
(r, s) <- runPipe pipeA (runPipe pipeB input)
first (const r) <$> lift (run s)
)
)
type Consumer i f a = forall x. Pipe i x f a
subState ::
forall f s t a.
Monad f =>
(s -> t) ->
(t -> s) ->
StateT t f a ->
StateT s f a
subState i o st = StateT (fmap (fmap o) . runStateT st . i)
subInput ::
forall i o m n a.
Monad m =>
(forall x. Stream i m x -> Stream i m (n x)) ->
(forall x. Stream i m (n x) -> Stream i m x) ->
Pipe i o m a ->
Pipe i o m a
subInput i o (Pipe b) = Pipe (subState i o b)
data ParserResult a
= Done !a
| Error !String
| Continue (ByteString -> ParserResult a)
-- $setup
-- >>> :{
-- resume :: ParserResult a -> ByteString -> ParserResult a
-- resume result input =
-- case result of
-- Error e -> Error e
-- Done a -> Done a
-- Continue cont -> cont input
-- :}
finalize :: ParserResult a -> Either String a
finalize result =
case result of
Error e -> Left e
Done a -> Right a
Continue _ -> Left "Unexpected EOF"
instance Functor ParserResult where
fmap :: forall a b. (a -> b) -> ParserResult a -> ParserResult b
fmap f result =
case result of
Error e -> Error e
Done a -> Done (f a)
Continue cont -> Continue (fmap f . cont)
instance Applicative ParserResult where
pure :: a -> ParserResult a
pure = Done
(<*>) :: ParserResult (a -> b) -> ParserResult a -> ParserResult b
(<*>) = ap
instance Alternative ParserResult where
empty :: ParserResult a
empty = Error "empty"
(<|>) :: ParserResult a -> ParserResult a -> ParserResult a
resultA <|> resultB =
case resultA of
Error _ -> resultB
Done _ -> resultA
Continue contA ->
Continue
( \input ->
case resultB of
Continue contB -> contA input <|> contB input
_ -> contA input <|> resultB
)
instance MonadPlus ParserResult where
mzero = empty
mplus = (<|>)
instance Monad ParserResult where
(>>=) :: forall a b. ParserResult a -> (a -> ParserResult b) -> ParserResult b
result >>= f =
case result of
Error e -> Error e
Done a -> f a
Continue cont -> Continue (f <=< cont)
instance MonadFail ParserResult where
fail :: forall a. String -> ParserResult a
fail = Error
newtype Parser a
= Parser (StateT (Int, ByteString) ParserResult a)
deriving (Functor, Applicative, Monad, Alternative, MonadFail)
parse :: Parser a -> ByteString -> ParserResult (a, (Int, ByteString))
parse (Parser parser) input = runStateT parser (0, input)
-- | Fetch next byte without consuming input
-- >>> finalize (parse peek "abc")
-- Right (Just 97,(0,"abc"))
peek :: Parser (Maybe Word8)
peek = Parser do
(i, input) <- get
if i >= BS.length input
then pure Nothing
else pure (Just (BS.index input i))
-- |
-- >>> import Data.Char (ord)
-- >>> finalize (parse (satisfy (== 97)) "a")
-- Right (97,(1,"a"))
--
-- >>> finalize (parse (satisfy (== 97)) "")
-- Left "Unexpected EOF"
--
-- >>> finalize (resume (parse (satisfy (== 97)) "") "a")
-- Right (97,(1,"a"))
--
-- >>> finalize (parse (satisfy (== 97)) "b")
-- Left "Unexpected byte: 98"
--
-- >>> finalize (parse ((,) <$> satisfy (== 97) <*> satisfy (== 98)) "abc")
-- Right ((97,98),(2,"abc"))
--
-- >>> finalize (resume (parse ((,) <$> satisfy (== 97) <*> satisfy (== 98)) "a") "bc")
-- Right ((97,98),(1,"bc"))
--
-- >>> finalize (parse (satisfy (== 97) <|> satisfy (== 98)) "b")
-- Right (98,(1,"b"))
--
-- >>> finalize (resume (parse (satisfy (== 97) <|> satisfy (== 98)) "") "b")
-- Right (98,(1,"b"))
satisfy :: (Word8 -> Bool) -> Parser Word8
satisfy p =
Parser
( StateT \(i, input) ->
if i >= BS.length input
then Continue (parse (satisfy p))
else
let currentByte = BS.index input i
in if p currentByte
then Done (currentByte, (i + 1, input))
else Error ("Unexpected byte: " <> show currentByte)
)
byte :: Word8 -> Parser Word8
byte = satisfy . (==)
char :: Char -> Parser Word8
char = satisfy . (==) . fromIntegral . ord
-- | Matches a string of bytes
--
-- >>> finalize (parse (string "true") "true")
-- Right ("true",(4,"true"))
--
-- >>> finalize (parse (string "true" <|> string "false") "true")
-- Right ("true",(4,"true"))
--
-- >>> finalize (parse (string "true" <|> string "false") "false")
-- Right ("false",(5,"false"))
--
-- >>> finalize (resume (parse (string "true") "tr") "ue")
-- Right ("true",(2,"ue"))
--
-- >>> finalize (resume (parse (string "true" <|> string "false") "tr") "ue")
-- Right ("true",(2,"ue"))
--
-- >>> finalize (resume (parse (string "true" <|> string "false") "fal") "se")
-- Right ("false",(2,"se"))
--
-- >>> finalize (resume (parse (string "foobar" <|> string "foobaz") "fooba") "r")
-- Right ("foobar",(1,"r"))
--
-- >>> finalize (resume (parse (string "foobar" <|> string "foobaz") "fooba") "z")
-- Right ("foobaz",(1,"z"))
--
-- >>> finalize (resume (parse (string "foobaar" <|> string "foobaz") "fooba") "ar")
-- Right ("foobaar",(2,"ar"))
--
-- >>> finalize (resume (parse (string "foobaar" <|> string "foobaz") "fooba") "z")
-- Right ("foobaz",(1,"z"))
--
-- >>> finalize (resume (parse (string "foobar" <|> string "foobaaz") "fooba") "az")
-- Right ("foobaaz",(2,"az"))
--
-- >>> finalize (resume (resume (parse (string "HTTP" *> string "/" *> string "1.1") "HT") "TP/1") ".1")
-- Right ("1.1",(2,".1"))
string :: ByteString -> Parser ByteString
string = fmap BS.pack . traverse byte . BS.unpack
-- |
-- >>> import Data.Functor.Identity (Identity(runIdentity))
-- >>> input = yield "fooba" *> yield "z" :: Stream ByteString Identity ()
-- >>> fst (runIdentity (parseStream (string "foobar" <|> string "foobaz") input))
-- Right "foobaz"
parseStream ::
forall a m r.
Monad m =>
Parser a ->
Stream ByteString m r ->
m (Either String a, Stream ByteString m r)
parseStream parser stream0 = do
step <- next stream0
case step of
Left _ -> pure (fst <$> finalize (parse parser ""), stream0)
Right (chunk, rest) -> loop (parse parser chunk) rest
where
loop ::
ParserResult (a, (Int, ByteString)) ->
Stream ByteString m r ->
m (Either String a, Stream ByteString m r)
loop result stream =
case result of
Error e -> pure (Left e, stream)
Done (a, (i, remaining)) -> pure (Right a, leftovers i remaining stream)
Continue cont -> do
step <- next stream
case step of
Left _ ->
pure case finalize result of
Left e -> (Left e, stream)
Right (a, (i, remaining)) -> (Right a, leftovers i remaining stream)
Right (chunk, rest) -> loop (cont chunk) rest
leftovers :: Int -> ByteString -> Stream ByteString m r -> Stream ByteString m r
leftovers i remaining stream = do
when (i < BS.length remaining) do
yield (BS.drop i remaining)
stream
manyTill :: Alternative m => m a -> m end -> m [a]
manyTill p end = go where go = ([] <$ end) <|> ((:) <$> p <*> go)
-- |
-- >>> :{
-- endOfInput :: Parser ()
-- endOfInput =
-- Parser
-- ( StateT
-- ( \(i, input) ->
-- if i == BS.length input
-- then Done ((), (i, input))
-- else Error "Expected EOF"
-- )
-- )
-- :}
--
-- >>> finalize (parse anyByte "abc")
-- Right (97,(1,"abc"))
-- >>> finalize (parse (manyTill anyByte endOfInput) "abc")
-- Right ([97,98,99],(3,"abc"))
anyByte :: Parser Word8
anyByte = satisfy (const True)
spaceParser :: Parser Word8
spaceParser = char ' '
tabParser :: Parser Word8
tabParser = char '\t'
-- | Carriage return, line feed
crlfParser :: Parser ByteString
crlfParser = string "\r\n"
newtype Method = Method ByteString
deriving (Show)
newtype RequestTarget = RequestTarget ByteString
deriving (Show)
data HTTPVersion = HTTPVersion Int Int
deriving (Show)
newtype Headers = Headers [HeaderField]
deriving (Show, Semigroup)
instance IsList Headers where
type Item Headers = HeaderField
fromList :: [HeaderField] -> Headers
fromList = Headers
toList :: Headers -> [HeaderField]
toList (Headers fields) = fields
deleteHeader :: HeaderFieldName -> Headers -> Headers
deleteHeader name = fromList . List.filter ((name /=) . (.name)) . toList
insertHeader :: HeaderField -> Headers -> Headers
insertHeader field = fromList . (field :) . toList . deleteHeader field.name
alterHeader :: HeaderFieldName -> (Maybe ByteString -> Maybe ByteString) -> Headers -> Headers
alterHeader name f headers =
case f (findHeader name headers) of
Nothing -> deleteHeader name headers
Just value -> insertHeader (HeaderField name value) headers
findHeader :: HeaderFieldName -> Headers -> Maybe ByteString
findHeader name = fmap (.value) . List.find ((== name) . (.name)) . toList
contentLength :: Headers -> Maybe Int
contentLength = readMaybe <=< fmap BSC.unpack . findHeader "Content-Length"
transferEncoding :: Headers -> Maybe [ByteString]
transferEncoding = fmap (fmap BSC.strip . BSC.split ',') . findHeader "Transfer-Encoding"
isChunked :: Headers -> Bool
isChunked = (Just "chunked" ==) . (listToMaybe . reverse <=< transferEncoding)
newtype HeaderFieldName = HeaderFieldName ByteString
deriving (Show, Eq)
headerFieldName :: ByteString -> HeaderFieldName
headerFieldName = HeaderFieldName . BSC.map toLower
instance IsString HeaderFieldName where
fromString = headerFieldName . BSC.pack
data HeaderField = HeaderField {name :: HeaderFieldName, value :: ByteString}
deriving (Show)
(=:) :: HeaderFieldName -> ByteString -> HeaderField
(=:) = HeaderField
data Request = Request
{ method :: Method,
target :: RequestTarget,
httpVersion :: HTTPVersion,
headers :: Headers
}
deriving (Show)
data Status = Status Int ByteString
deriving stock (Eq)
status200 :: Status
status200 = Status 200 "OK"
status400 :: Status
status400 = Status 400 "Bad request"
status404 :: Status
status404 = Status 404 "Not found"
data Response i o m r = Response
{ httpVersion :: HTTPVersion,
status :: Status,
headers :: Headers,
body :: Pipe i o m r
}
response :: Status -> Headers -> Pipe i o m r -> Response i o m r
response = Response (HTTPVersion 1 1)
type Handler i o m r = Request -> Consumer i m (Response i o m r)
-- | Request line
--
-- >>> finalize (parse requestParser "GET /posts HTTP/1.1\r\n\r\n")
-- Right (Request {method = Method "GET", target = RequestTarget "/posts", httpVersion = HTTPVersion 1 1, headers = Headers []},(23,"GET /posts HTTP/1.1\r\n\r\n"))
-- >>> finalize (parse requestParser "GET /posts/getting-close-to-the-conceptual-metal HTTP/1.0\r\nAccept: text/html\r\n\r\n")
-- Right (Request {method = Method "GET", target = RequestTarget "/posts/getting-close-to-the-conceptual-metal", httpVersion = HTTPVersion 1 0, headers = Headers [HeaderField {name = HeaderFieldName "accept", value = "text/html"}]},(80,"GET /posts/getting-close-to-the-conceptual-metal HTTP/1.0\r\nAccept: text/html\r\n\r\n"))
-- >>> finalize (parse requestParser "GET /hello.txt HTTP/1.1\r\nUser-Agent: curl/7.16.3 libcurl/7.16.3 OpenSSL/0.9.7l zlib/1.2.3\r\nHost: www.example.com\r\nAccept-Language: en, mi\r\n\r\n")
-- Right (Request {method = Method "GET", target = RequestTarget "/hello.txt", httpVersion = HTTPVersion 1 1, headers = Headers [HeaderField {name = HeaderFieldName "user-agent", value = "curl/7.16.3 libcurl/7.16.3 OpenSSL/0.9.7l zlib/1.2.3"},HeaderField {name = HeaderFieldName "host", value = "www.example.com"},HeaderField {name = HeaderFieldName "accept-language", value = "en, mi"}]},(141,"GET /hello.txt HTTP/1.1\r\nUser-Agent: curl/7.16.3 libcurl/7.16.3 OpenSSL/0.9.7l zlib/1.2.3\r\nHost: www.example.com\r\nAccept-Language: en, mi\r\n\r\n"))
-- >>> finalize (resume (parse requestParser "POST /foobar HTTP/1.1\r\nHost: localhost:5000\r\nUser-Agent: curl/7.74.0\r\nAccept: */*\r\nContent-Length: 2\r") "\nContent-Type: application/x-www-form-urlencoded\r\n\r\n")
-- Right (Request {method = Method "POST", target = RequestTarget "/foobar", httpVersion = HTTPVersion 1 1, headers = Headers [HeaderField {name = HeaderFieldName "host", value = "localhost:5000"},HeaderField {name = HeaderFieldName "user-agent", value = "curl/7.74.0"},HeaderField {name = HeaderFieldName "accept", value = "*/*"},HeaderField {name = HeaderFieldName "content-length", value = "2"},HeaderField {name = HeaderFieldName "content-type", value = "application/x-www-form-urlencoded"}]},(52,"\nContent-Type: application/x-www-form-urlencoded\r\n\r\n"))
-- >>> finalize (parse requestParser "GET /hello HTTP/1.1\r\nHost: localhost:8080\r\nUser-Agent: curl/7.74.0\r\nAccept: */*\r\n\r\n")
-- Right (Request {method = Method "GET", target = RequestTarget "/hello", httpVersion = HTTPVersion 1 1, headers = Headers [HeaderField {name = HeaderFieldName "host", value = "localhost:8080"},HeaderField {name = HeaderFieldName "user-agent", value = "curl/7.74.0"},HeaderField {name = HeaderFieldName "accept", value = "*/*"}]},(83,"GET /hello HTTP/1.1\r\nHost: localhost:8080\r\nUser-Agent: curl/7.74.0\r\nAccept: */*\r\n\r\n"))
requestParser :: Parser Request
requestParser =
Request
<$> (Method . BS.pack <$> manyTill anyByte spaceParser)
<*> (RequestTarget . BS.pack <$> manyTill anyByte spaceParser)
<*> (HTTPVersion <$> (string "HTTP/" *> digitParser) <*> (char '.' *> digitParser))
<* crlfParser
<*> headersParser
where
digitParser :: Parser Int
digitParser = digitToInt . chr . fromIntegral <$> satisfy (isDigit . chr . fromIntegral)
-- | Header field name
--
-- >>> finalize (parse headerFieldNameParser "Content-type:")
-- Right (HeaderFieldName "content-type",(13,"Content-type:"))
headerFieldNameParser :: Parser HeaderFieldName
headerFieldNameParser = headerFieldName . BS.pack <$> manyTill anyByte (char ':')
-- | Header field content
--
-- >>> finalize (parse headerFieldContentParser "Foo\r\n\tBar\r\n Baz\r\n")
-- Right ("Foo\r\n\tBar\r\n Baz",(18,"Foo\r\n\tBar\r\n Baz\r\n"))
headerFieldContentParser :: Parser ByteString
headerFieldContentParser =
BS.pack
<$> manyTill
anyByte
( crlfParser *> do
nextByte <- peek
case nextByte of
Nothing -> pure ()
Just b ->
if b == 32 || b == 9
then fail "whitespace"
else pure ()
)
-- | Header field
--
-- >>> finalize (parse headerFieldParser "Content-type: application/json\r\n")
-- Right (HeaderField {name = HeaderFieldName "content-type", value = "application/json"},(32,"Content-type: application/json\r\n"))
headerFieldParser :: Parser HeaderField
headerFieldParser =
HeaderField
<$> headerFieldNameParser
<*> (many (spaceParser <|> tabParser) *> headerFieldContentParser)
-- |
-- >>> finalize (parse headersParser "Host: example.com\r\nContent-type: application/json\r\n\r\n")
-- Right (Headers [HeaderField {name = HeaderFieldName "host", value = "example.com"},HeaderField {name = HeaderFieldName "content-type", value = "application/json"}],(53,"Host: example.com\r\nContent-type: application/json\r\n\r\n"))
headersParser :: Parser Headers
headersParser = Headers <$> manyTill headerFieldParser crlfParser
chunkSizeParser :: Parser Int
chunkSizeParser = do
digits <- fmap (chr . fromIntegral) <$> manyTill (satisfy (`elem` hexadecimalDigitBytes)) crlfParser
case fst <$> listToMaybe (readHex digits) of
Nothing -> fail ("Expected hexadecimal number, got: " <> digits)
Just result -> pure result
where
hexadecimalDigitBytes :: [Word8]
hexadecimalDigitBytes = BS.unpack "0123456789abcdef"
response400 :: Monad m => Response i o m ()
response400 = response status400 [] (pure ())
splitAt :: forall m a. Monad m => Int -> Stream ByteString m a -> Stream ByteString m (Stream ByteString m a)
splitAt n stream
| n <= 0 = pure stream
| otherwise = do
step <- lift (next stream)
case step of
Left a -> pure (pure a)
Right (bytes, rest) ->
if BS.length bytes > n
then do
yield (BS.take n bytes)
pure (yield (BS.drop n bytes) *> rest)
else do
yield bytes
splitAt (n - BS.length bytes) rest
drop :: forall m a. Monad m => Int -> Stream ByteString m a -> Stream ByteString m a
drop n stream
| n <= 0 = stream
| otherwise = do
step <- lift (next stream)
case step of
Left a -> pure a
Right (bytes, rest) -> do
if n < BS.length bytes
then do
yield (BS.drop n bytes)
rest
else drop (n - BS.length bytes) rest
requestBody ::
forall m a.
Monad m =>
Headers ->
Stream ByteString m a ->
Stream ByteString m (Stream ByteString m a)
requestBody headers stream =
if isChunked headers
then chunkedBody stream
else case contentLength headers of
Nothing -> pure stream
Just i -> knownBody i stream
knownBody ::
forall m a.
Monad m =>
Int ->
Stream ByteString m a ->
Stream ByteString m (Stream ByteString m a)
knownBody = splitAt
chunkedBody ::
forall m a.
Monad m =>
Stream ByteString m a ->
Stream ByteString m (Stream ByteString m a)
chunkedBody stream = do
(result, rest) <- lift (parseStream chunkSizeParser stream)
case result of
Left _e -> pure rest
Right size -> do
if size == 0
then pure (drop 2 rest)
else chunkedBody . drop 2 =<< splitAt size rest
sendResponse :: forall m. Monad m => Response ByteString ByteString m () -> Pipe ByteString ByteString m ()
sendResponse response = (`compose` response.body) do
input <- receive
case input of
Nothing -> do
send
( serializeResponsePreamble
response.httpVersion
response.status
(response.headers <> ["Content-Length" =: "0"])
)
Just bytes -> do
send
( serializeResponsePreamble
response.httpVersion
response.status
(addChunkedEncoding response.headers)
)
send (encodeChunk bytes)
encodeChunks
where
addChunkedEncoding :: Headers -> Headers
addChunkedEncoding = alterHeader "Transfer-Encoding" \case
Nothing -> Just "chunked"
Just existing -> Just (existing <> ", chunked")
serializeResponsePreamble :: HTTPVersion -> Status -> Headers -> ByteString
serializeResponsePreamble httpVersion status headers =
serializeVersion httpVersion
<> " "
<> serializeStatus status
<> "\r\n"
<> serializeHeaders headers
<> "\r\n"
serializeVersion :: HTTPVersion -> ByteString
serializeVersion (HTTPVersion major minor) = BSC.pack ("HTTP/" <> show major <> "." <> show minor)
serializeStatus :: Status -> ByteString
serializeStatus (Status code message) = BSC.pack (show code) <> " " <> message
serializeHeaders :: Headers -> ByteString
serializeHeaders = foldMap serializeHeader . toList
serializeHeader :: HeaderField -> ByteString
serializeHeader (HeaderField (HeaderFieldName name) content) = name <> ": " <> content <> "\r\n"
encodeChunks :: Pipe ByteString ByteString m ()
encodeChunks = do
input <- receive
case input of
Nothing -> send finalChunk
Just bytes -> do
when (BS.length bytes > 0) do
send (encodeChunk bytes)
encodeChunks
finalChunk :: ByteString
finalChunk = "0\r\n\r\n"
encodeChunk :: ByteString -> ByteString
encodeChunk bytes = BSC.pack (showHex (BS.length bytes) "\r\n") <> bytes <> "\r\n"
handleRequest ::
Monad m =>
Handler ByteString ByteString m () ->
Request ->
Pipe ByteString ByteString m ()
handleRequest handler request =
subInput (requestBody request.headers) (join . flush) do
when (findHeader "Expect" request.headers == Just "100-continue") do
send "HTTP/1.1 100 Continue\r\n\r\n"
sendResponse =<< handler request
newtype Managed a = Managed (forall r. (a -> IO r) -> IO r)
unManaged :: Managed a -> (forall r. (a -> IO r) -> IO r)
unManaged (Managed cont) = cont
instance Functor Managed where
fmap :: (a -> b) -> Managed a -> Managed b
fmap f (Managed cont) = Managed \return_ -> cont (return_ . f)
instance Applicative Managed where
pure :: a -> Managed a
pure a = Managed \return_ -> return_ a
(<*>) :: Managed (a -> b) -> Managed a -> Managed b
(<*>) = ap
instance Alternative Managed where
empty :: Managed e
empty = liftIO empty
(<|>) :: Managed a -> Managed a -> Managed a
Managed contA <|> Managed contB = Managed \return_ -> do
contA return_ <|> contB return_
instance Monad Managed where
(>>=) :: Managed a -> (a -> Managed b) -> Managed b
Managed cont >>= f = Managed \return_ ->
cont \a -> unManaged (f a) return_
instance MonadIO Managed where
liftIO :: IO a -> Managed a
liftIO io = Managed \action -> do
a <- io
action a
managed :: (forall r. (a -> IO r) -> IO r) -> Managed a
managed = Managed
runManaged :: Managed a -> IO a
runManaged (Managed cont) = cont return
withFile :: FilePath -> IOMode -> Managed Handle
withFile path mode = managed (System.withFile path mode)
readFile :: FilePath -> Stream ByteString Managed ()
readFile path = do
handle <- lift (withFile path ReadMode)
unfold alg handle
where
alg :: Handle -> Managed (Either () (ByteString, Handle))
alg handle = do
bytes <- liftIO (BSC.hGet handle 16_376)
if BS.null bytes
then Left <$> liftIO (System.hClose handle)
else pure (Right (bytes, handle))
sendFile :: forall a. FilePath -> Pipe a ByteString Managed ()
sendFile path = Pipe (StateT \s -> readFile path $> ((), s))
type StaticSiteHandler m a =
forall input.
Request ->
Consumer input m (Maybe (FilePath, Response input ByteString m a))
cachedStaticDirectory ::
StaticSiteHandler Managed () ->
StaticSiteHandler Managed ()
cachedStaticDirectory baseHandler request = runMaybeT do
(path, currentResponse) <- MaybeT (baseHandler request)
etagResult <- fetchEtag path
let newResponse = case etagResult of
Nothing -> currentResponse
Just etag ->
let headers = addEtag etag currentResponse.headers
in case findHeader "If-None-Match" request.headers of
Nothing -> currentResponse {headers = headers, body = currentResponse.body}
Just reqEtag ->
if etag == reqEtag
then response (Status 304 "Not modified") [] (pure ())
else currentResponse {headers = headers, body = currentResponse.body}
pure (path, newResponse)
where
fetchEtag :: forall m. MonadIO m => FilePath -> m (Maybe ByteString)
fetchEtag path = do
let etagFilePath = path <> ".etag"
exists <- liftIO (Files.fileExist etagFilePath)
if exists
then Just . BSC.strip <$> liftIO (BSC.readFile etagFilePath)
else pure Nothing
addEtag :: ByteString -> Headers -> Headers
addEtag etag = alterHeader "Etag" (const (Just etag))
gzippedStaticDirectory ::
StaticSiteHandler Managed () ->
StaticSiteHandler Managed ()
gzippedStaticDirectory baseHandler request = runMaybeT do
(path, currentResponse) <- MaybeT (baseHandler request)
case findHeader "Accept-Encoding" request.headers of
Nothing -> pure (path, currentResponse)
Just acceptEncoding ->
if not (BSC.isInfixOf "gzip" acceptEncoding)
then pure (path, currentResponse)
else do
staticGzippedFilePathResult <- staticGzippedFilePath path
let newResponse = case staticGzippedFilePathResult of
Nothing -> currentResponse
Just gzPath ->
let headers = addGzipContentEnconding currentResponse.headers
in currentResponse {headers = headers, body = sendFile gzPath}
pure (path, newResponse)
where
staticGzippedFilePath :: forall m. MonadIO m => FilePath -> m (Maybe FilePath)
staticGzippedFilePath path = do
let gzippedFilePath = path <> ".gz"
exists <- liftIO (Files.fileExist path)
if exists
then pure (Just gzippedFilePath)
else pure Nothing
addGzipContentEnconding :: Headers -> Headers
addGzipContentEnconding = alterHeader "Content-Encoding" (const (Just "gzip"))
staticDirectory ::
FilePath ->
StaticSiteHandler Managed ()
staticDirectory directory request = do
staticPathResult <- staticFilePath directory request
case staticPathResult of
Nothing -> pure Nothing
Just path ->
let headers = addContentType path []
in pure (Just (path, response status200 headers (sendFile path)))
where
staticFilePath :: forall m. MonadIO m => FilePath -> Request -> m (Maybe FilePath)
staticFilePath directory request = runMaybeT do
target <- (directory </>) <$> safeTarget
status <- fileStatus target
pure
if Files.isDirectory status
then target </> "index.html"
else target
where
safeTarget :: MaybeT m FilePath
safeTarget = (MaybeT . pure) case request.target of
RequestTarget bytes ->
if BSC.isInfixOf ".." bytes
then Nothing
else Just (BSC.unpack (BSC.dropWhile (== '/') bytes))
fileStatus :: FilePath -> MaybeT m FileStatus
fileStatus path = (MaybeT . liftIO) do
(Just <$> Files.getFileStatus path) `catchError` \(e :: IOException) ->
case e.ioe_type of
NoSuchThing -> pure Nothing
_ -> throwError e
addContentType :: FilePath -> Headers -> Headers
addContentType path = alterHeader "Content-Type" (const (mimeType path))
mimeType :: FilePath -> Maybe ByteString
mimeType path =
Map.lookup (FilePath.takeExtension path) mimeTypes
mimeTypes :: Map String ByteString
mimeTypes =
Map.fromList
[ (".css", "text/css"),
(".eot", "application/vnd.ms-fontobject"),
(".gif", "image/gif"),
(".html", "text/html"),
(".jpg", "image/jpeg"),
(".png", "image/png"),
(".pdf", "application/pdf"),
(".svg", "image/svg+xml"),
(".ttf", "font/ttf"),
(".woff", "font/woff"),
(".woff2", "font/woff2")
]
staticSite :: forall input. FilePath -> Handler input ByteString Managed ()
staticSite directory =
let handler = cachedStaticDirectory (gzippedStaticDirectory (staticDirectory directory))
in \request -> do
result <- handler request
case result of
Nothing -> pure (response status404 [] (pure ()))
Just (_, currentResponse) -> pure currentResponse
-- # Continuous streaming in and out
-- $ yes "Whoa" | curl -i -T - -H "Content-Type: application/octet-stream" -H "Transfer-encoding: chunked" localhost:8080 > /dev/null
-- % Total % Received % Xferd Average Speed Time Time Time Current
-- Dload Upload Total Spent Left Speed
-- 100 84.4G 0 42.2G 0 42.2G 710M 710M --:--:-- 0:01:00 --:--:-- 1440M^C
--
-- # Sending a ~3GB File
-- $ curl 127.0.0.1:8080 127.0.0.1:8080 > /dev/null
-- % Total % Received % Xferd Average Speed Time Time Time Current
-- Dload Upload Total Spent Left Speed
-- 100 3030M 0 3030M 0 0 880M 0 --:--:-- 0:00:03 --:--:-- 880M
-- % Total % Received % Xferd Average Speed Time Time Time Current
-- Dload Upload Total Spent Left Speed
-- 100 3030M 0 3030M 0 0 1047M 0 --:--:-- 0:00:02 --:--:-- 1047M
data Connection = Connection
{ send :: ByteString -> Managed (),
inputStream :: Stream ByteString Managed ()
}
connectionFromSocket :: Socket -> Connection
connectionFromSocket socket =
Connection
{ send = Network.send socket,
inputStream = reread (`Network.recv` 16_384) socket
}
connectionFromContext :: Context -> Connection
connectionFromContext context =
Connection
{ send = NetworkTLS.send context,
inputStream = reread NetworkTLS.recv context
}
handleConnection ::
Handler ByteString ByteString Managed () ->
Stream ByteString Managed () ->
Stream ByteString Managed ()
handleConnection handler = loop
where
loop :: Stream ByteString Managed () -> Stream ByteString Managed ()
loop stream = do
step <- lift (next stream)
case step of
Left _ -> pure ()
Right (bytes, rest) -> do
loop =<< do
(parseResult, unconsumed) <- lift (parseStream requestParser (yield bytes *> rest))
case parseResult of
Left _e -> execPipe (sendResponse response400) (pure ())
Right request -> execPipe (handleRequest handler request) unconsumed
data TLSConfig = TLSConfig
{ publicCertFile :: FilePath,
privateKeyFile :: FilePath
}
deriving (Show)
data Config = Config
{ tls :: Maybe TLSConfig,
listen :: ListenSocketConfig
}
deriving (Show)
data OwnListenSocketConfig = OwnListenSocketConfig
{ hostPreference :: HostPreference,
port :: ServiceName
}
deriving (Show)
newtype ProvidedSocketConfig = ProvidedSocketConfig
{ fd :: CInt
}
deriving (Show)
data ListenSocketConfig
= OwnListenSocket OwnListenSocketConfig
| ProvidedSocket ProvidedSocketConfig
deriving (Show)
defaultConfig :: Config
defaultConfig =
Config
{ listen =
OwnListenSocket
( OwnListenSocketConfig
{ hostPreference = Host "127.0.0.1",
port = "8080"
}
),
tls = Nothing
}
whileM_ :: (Monad m) => m Bool -> m () -> m ()
whileM_ predicate action = go
where
go = do
x <- predicate
when x do
action
go
serve :: Config -> Handler ByteString ByteString Managed () -> IO ()
serve config handler = do
shutdown <- newEmptyTMVarIO @()
connectionCount <- newTVarIO @Int 0
void (installHandler sigTERM (System.CatchOnce (onSigTERM shutdown)) Nothing)
accept <- getAccept
withListeningSocket \socket -> do
whileM_
(atomically (isEmptyTMVar shutdown))
( catch
( accept
socket
\(connection, _) ->
bracket_
(atomically (modifyTVar connectionCount (+ 1)))
(atomically (modifyTVar connectionCount (subtract 1)))
(runManaged (run (mapM connection.send (handleConnection handler connection.inputStream))))
)
(\exception -> System.hPrint System.stderr (exception :: SomeException))
)
atomically do
takeTMVar shutdown
activeConnections <- readTVar connectionCount
when (activeConnections /= 0) retry
where
onSigTERM :: TMVar () -> IO ()
onSigTERM shutdown = atomically (putTMVar shutdown ())
acceptPlain :: Socket -> ((Connection, SockAddr) -> IO ()) -> IO ()
acceptPlain socket connectionHandler =
void (Network.acceptFork socket (connectionHandler . first connectionFromSocket))
acceptSecure :: NetworkTLS.ServerParams -> Socket -> ((Connection, SockAddr) -> IO ()) -> IO ()
acceptSecure params socket connectionHandler =
void (NetworkTLS.acceptFork params socket (connectionHandler . first connectionFromContext))
withListeningSocket :: (Socket -> IO ()) -> IO ()
withListeningSocket acceptLoop =
(void . forkIO) case config.listen of
OwnListenSocket ownListenSocketConfig ->
Network.listen
ownListenSocketConfig.hostPreference
ownListenSocketConfig.port
(acceptLoop . fst)
ProvidedSocket providedListenSocketConfig ->
bracket
( do
socket <- Socket.mkSocket providedListenSocketConfig.fd
Socket.setSocketOption socket Socket.NoDelay 1
Socket.setSocketOption socket Socket.ReuseAddr 1
Socket.setSocketOption socket Socket.KeepAlive 1
Socket.withFdSocket socket (`System.setNonBlockingFD` True)
Network.listenSock socket 2_048
pure socket
)
Network.closeSock
acceptLoop
getAccept :: IO (Socket -> ((Connection, SockAddr) -> IO ()) -> IO ())
getAccept = do
case config.tls of
Nothing -> pure acceptPlain
Just tlsConfig -> do
credentialResult <-
NetworkTLS.credentialLoadX509
tlsConfig.publicCertFile
tlsConfig.privateKeyFile
case credentialResult of
Left e -> throwError (userError e)
Right creds -> acceptSecure <$> NetworkTLS.newDefaultServerParams creds
fdStart :: CInt
fdStart = 3
getActivatedSocketFd :: IO (Maybe CInt)
getActivatedSocketFd = runMaybeT $ do
listenPid <- read <$> MaybeT (System.getEnv "LISTEN_PID")
listenFDs <- read @CInt <$> MaybeT (System.getEnv "LISTEN_FDS")
myPid <- lift getProcessID
guard (listenPid == myPid && listenFDs == 1)
pure fdStart
main :: IO ()
main = do
args <- System.getArgs
activatedSocketFd <- getActivatedSocketFd
let listenConfig = case activatedSocketFd of
Nothing -> defaultConfig.listen
Just fd -> ProvidedSocket (ProvidedSocketConfig {fd = fd})
let config = case args of
["https", publicCertFile, privateKeyFile] ->
defaultConfig
{ tls =
Just
( TLSConfig
{ publicCertFile = publicCertFile,
privateKeyFile = privateKeyFile
}
),
listen = listenConfig
}
_ -> defaultConfig {listen = listenConfig}
serve config (staticSite ".")
cabal-version: 2.4
name: server
version: 0.1.0.0
author: Brad Parker
maintainer: [email protected]
license: BSD-3-Clause
executable Server
main-is: Main.hs
build-depends: base ^>=4.16.0.0,
bytestring,
containers,
filepath,
mtl,
network,
network-simple,
network-simple-tls,
stm,
transformers,
unix,
default-language: Haskell2010
ghc-options: -fspecialise-aggressively -fexpose-all-unfoldings
executable Warp
main-is: Warp.hs
build-depends: base ^>=4.16.0.0,
bytestring,
http-types,
wai,
warp,
default-language: Haskell2010
ghc-options: -fspecialise-aggressively -fexpose-all-unfoldings
let
nixpkgs = import <nixpkgs> {};
haskellPackages = nixpkgs.haskell.packages.ghc921;
package = import ./. { haskellPackages = haskellPackages; };
in
haskellPackages.shellFor {
packages = _: [ package ];
nativeBuildInputs = [
haskellPackages.doctest
haskellPackages.haskell-language-server
haskellPackages.hlint
nixpkgs.mkcert
nixpkgs.nssTools
];
}
{-# LANGUAGE BlockArguments #-}
module Main (main) where
import Control.Monad (forever)
import qualified Data.ByteString as BS
import qualified Data.ByteString.Builder as BSBuilder
import qualified Data.ByteString.Char8 as BSC
import qualified Network.HTTP.Types as HTTPTypes
import qualified Network.Wai as Wai
import qualified Network.Wai.Handler.Warp as Warp
-- $ yes "Whoa" | curl -i -T - -H "Content-Type: application/octet-stream" -H "Transfer-encoding: chunked" localhost:8080 > /dev/null
-- % Total % Received % Xferd Average Speed Time Time Time Current
-- Dload Upload Total Spent Left Speed
-- 100 82.4G 0 41.2G 0 41.2G 692M 693M --:--:-- 0:01:00 --:--:-- 1383M^C
main :: IO ()
main =
Warp.run 8080 \request respond ->
respond $ Wai.responseStream HTTPTypes.status200 [] \write flush ->
let loop = do
bytes <- Wai.getRequestBodyChunk request
if BS.length bytes > 0
then do
write (BSBuilder.byteString bytes)
flush
loop
else pure ()
in loop
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment