Created
January 22, 2025 12:21
-
-
Save paulvictor/91035c97afacc574d4b24d51d16dcf14 to your computer and use it in GitHub Desktop.
json-to-csv-middleware.hs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env magix | |
#!magix haskell | |
#!haskellPackages bytestring aeson conduit lens lens-aeson wai warp http-types streaming-commons text vault | |
#!ghcFlags -threaded | |
-- #!/usr/bin/env nix-shell | |
-- #! nix-shell -p "haskell.packages.ghc910.ghcWithPackages (pkgs: with pkgs; [ conduit bytestring wai lens lens-aeson text streaming-commons vault warp http-types ]) " ghcid | |
-- #! nix-shell -I nixpkgs=/etc/nix/inputs/nixpkgs | |
-- #! nix-shell -i "ghcid -c 'ghci -Wall' -T main" | |
{-# LANGUAGE OverloadedStrings #-} | |
{-# LANGUAGE TypeApplications #-} | |
{-# LANGUAGE DeriveAnyClass #-} | |
{-# LANGUAGE LambdaCase #-} | |
module Main where | |
import Data.Text (Text) | |
import qualified Data.Text as Text | |
import GHC.Generics (Generic) | |
import Data.Aeson | |
import Network.Wai | |
import Data.List (sort) | |
import qualified Data.Streaming.ByteString.Builder as B | |
import qualified Data.ByteString.Builder as B | |
import Data.Foldable | |
import Data.ByteString.Char8 (ByteString) | |
import qualified Data.ByteString.Char8 as BS | |
import Data.Maybe | |
import Data.IORef | |
import qualified Data.Binary.Builder as B (fromByteString) | |
import qualified Data.Vault.Strict as Vault | |
import Data.Vault.Strict (Vault) | |
import Control.Lens.Operators | |
import Control.Lens.Combinators | |
import Data.Aeson.Lens | |
import Data.Aeson.KeyMap (keys) | |
import qualified Data.Aeson.Key as Key | |
import qualified Data.Text.Encoding as Text | |
import Network.Wai.Handler.Warp (run) | |
import Network.HTTP.Types | |
import Control.Concurrent (threadDelay) | |
import Conduit | |
import Data.Conduit.List (sourceList) | |
linesAndLastIncomplete :: ByteString -> ([ByteString], ByteString) | |
linesAndLastIncomplete bs = | |
let | |
ls = BS.lines bs | |
in | |
if BS.last bs == toEnum (fromEnum '\n') | |
then | |
-- Complete lines | |
(ls, "") | |
else | |
fromMaybe | |
([], "") -- for the empty bytestring | |
(unsnoc ls) | |
xformMiddleware :: forall s. JSONXForm s -> Middleware | |
xformMiddleware (JSONXForm{onFirst, xform, done}) app request sendResponse = | |
app request $ \response -> | |
let | |
(s, hs, withBody) = responseToStream response | |
in withBody $ \body -> sendResponse $ | |
responseStream s hs $ \send flush -> do | |
prevBSRef <- newIORef mempty | |
isFirstRef <- newIORef True | |
vaultRef <- newIORef Vault.empty | |
key <- Vault.newKey @s | |
let | |
sendByteString thisBS = do | |
isFirst <- readIORef isFirstRef -- We can skip the isFirst and see if Vault.lookup is Nothing | |
vault <- readIORef vaultRef | |
if isFirst | |
then | |
let | |
firstS = onFirst thisBS | |
newVault = Vault.insert key firstS vault | |
in | |
writeIORef vaultRef newVault *> writeIORef isFirstRef False | |
else pure () -- use whenM if possible | |
prevBS <- readIORef prevBSRef | |
let | |
newBS = prevBS <> thisBS | |
(ls, lastPartialLine) = linesAndLastIncomplete newBS | |
-- print lastPartialLine | |
for_ ls $ \l -> do | |
vault <- readIORef vaultRef | |
let | |
maybePrevS = Vault.lookup key vault | |
maybe | |
(pure ()) -- What should we do? | |
(\prevS -> do | |
let | |
(bs', newS) = xform prevS l | |
send (B.fromByteString bs') | |
writeIORef vaultRef (Vault.insert key newS vault) | |
) | |
maybePrevS | |
writeIORef prevBSRef lastPartialLine | |
body (B.toByteStringIO sendByteString) flush | |
-- TODO call done and send that as well | |
data JSONXForm s | |
= JSONXForm | |
{ onFirst :: ByteString -> s | |
, xform :: s -> ByteString -> (ByteString, s) | |
, done :: s -> ByteString | |
} | |
jsonToCSV :: JSONXForm (Bool, [Key]) | |
jsonToCSV = JSONXForm _onFirst _xform _done | |
where | |
_onFirst bs = | |
let | |
obj = bs ^? _Object | |
ks = fromMaybe [] $ obj ^? _Just.to keys | |
in | |
(False, ks) | |
_xform (headersSent, ks) bs = | |
let | |
obj = bs ^? _Object | |
headersBS = Text.encodeUtf8 $ Text.intercalate "," (Key.toText <$> ks) | |
valuesBS = | |
Text.encodeUtf8 $ | |
Text.intercalate "," $ | |
fromMaybe [] $ | |
traverse (\k -> obj ^? _Just.ix k.to valueToString) ks | |
in | |
if not headersSent | |
then (headersBS <> "\n" <> valuesBS <> "\n", (True, ks)) | |
else (valuesBS <> "\n", (True, ks)) | |
_done _ = "" | |
valueToString :: Value -> Text | |
valueToString = \case | |
String s -> s | |
Number i -> Text.pack $ show i | |
Bool True -> "true" | |
Bool False -> "false" | |
Null -> "" | |
data FoosAndBars = FoosAndBars { foo :: Maybe Int, bar :: Text, baz :: Bool } deriving (Generic, FromJSON, ToJSON ) | |
samples = | |
generateFoosAndBars <$> enumFromTo 1 1000 | |
where | |
generateFoosAndBars i = FoosAndBars { foo = Just i, bar = "a reasonably long string with a number" <> (Text.pack (show i)), baz = even i} | |
mainApp _ sendResponse = | |
sendResponse $ responseStream status200 [] $ \send flush -> do | |
runConduit $ do | |
sourceList samples | |
.| mapC encode | |
.| unlinesAsciiC | |
-- .| delayC 3000 | |
-- .| traceC | |
.| mapC B.lazyByteString | |
.| mapM_C (\b -> send b >> flush) | |
flush | |
app = (xformMiddleware jsonToCSV) mainApp | |
traceC :: (Show a, MonadIO m) => ConduitT a a m () | |
traceC = | |
await >>= maybe (return ()) go | |
where | |
go i = do | |
liftIO $ print i | |
yield i | |
traceC | |
delayC :: (Show a, MonadIO m) => Int -> ConduitT a a m () | |
delayC d = | |
await >>= maybe (return ()) go | |
where | |
go i = do | |
liftIO $ threadDelay d | |
yield i | |
delayC d | |
main :: IO () | |
main = do | |
run 8081 app |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment