Skip to content

Instantly share code, notes, and snippets.

@paulvictor
Created March 13, 2025 14:45
Show Gist options
  • Save paulvictor/6a7b365e917dac26e75458c72c4fad22 to your computer and use it in GitHub Desktop.
Save paulvictor/6a7b365e917dac26e75458c72c4fad22 to your computer and use it in GitHub Desktop.
Merge conduits
#!/usr/bin/env magix
#!magix haskell
#!haskellPackages bytestring conduit conduit-merge
#!ghcFlags -threaded
-- #!/usr/bin/env nix-shell
-- #! nix-shell -p "haskell.packages.ghc96.ghcWithPackages (pkgs: with pkgs; [ conduit bytestring conduit-extra conduit-merge ]) " ghcid
-- #! nix-shell -I nixpkgs=/etc/nix/inputs/nixpkgs
-- #! nix-shell -i ghcid
{-# LANGUAGE OverloadedStrings #-}
import Data.ByteString qualified as BS
import qualified Data.Conduit.Combinators as C
import qualified Data.Conduit as C
import Data.Conduit (ConduitT, await, sealConduitT, yield, ($$++))
import qualified Conduit as C
import Data.Conduit (ConduitT, (.|), runConduit)
-- import Data.Conduit.Merge
import Control.Monad.IO.Class
import Control.Concurrent (threadDelay)
import Data.Foldable
import Control.Monad.Trans (lift)
import Data.List (sortOn)
import Data.Maybe
delayC :: MonadIO m => Int -> ConduitT a a m ()
delayC d =
C.await >>= maybe (return ()) go
where
go i = do
liftIO $ threadDelay d
C.yield i
delayC d
stream1 :: MonadIO m => ConduitT a Int m ()
stream1 =
C.iterate (+1) 1 .| C.filter even .| delayC 100000 .| C.take 10
stream2 :: MonadIO m => ConduitT a Int m ()
stream2 =
C.iterate (+1) 1 .| C.filter odd .| delayC 10000 .| C.take 100
main :: IO ()
main =
runConduit $
mergeSourcesOn id [stream1, stream2]
.| C.printC
-- | Merge multiple sorted sources into one sorted producer using specified sorting key.
mergeSourcesOn
:: (Ord b, Foldable f, Monad m)
=> (a -> b) -> f (ConduitT () a m ()) -> ConduitT i a m ()
mergeSourcesOn key = mergeSealed . fmap sealConduitT . toList where
mergeSealed sources = do
prefetchedSources <- lift $ traverse (\s -> s $$++ await) sources
let
srcs =
mapMaybe
(\(s, ma) -> (,s) <$> ma)
prefetchedSources
go srcs
go sources =
case sortOn (key . fst) sources of
[] -> pure ()
(a, src1) : sources1 -> do
yield a
(src2, mb) <-
lift $ src1 $$++ await
let sources2 = case mb of
Nothing -> sources1
Just b -> (b, src2) : sources1
go sources2
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment