Created
March 13, 2025 14:45
-
-
Save paulvictor/6a7b365e917dac26e75458c72c4fad22 to your computer and use it in GitHub Desktop.
Merge conduits
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env magix | |
#!magix haskell | |
#!haskellPackages bytestring conduit conduit-merge | |
#!ghcFlags -threaded | |
-- #!/usr/bin/env nix-shell | |
-- #! nix-shell -p "haskell.packages.ghc96.ghcWithPackages (pkgs: with pkgs; [ conduit bytestring conduit-extra conduit-merge ]) " ghcid | |
-- #! nix-shell -I nixpkgs=/etc/nix/inputs/nixpkgs | |
-- #! nix-shell -i ghcid | |
{-# LANGUAGE OverloadedStrings #-} | |
import Data.ByteString qualified as BS | |
import qualified Data.Conduit.Combinators as C | |
import qualified Data.Conduit as C | |
import Data.Conduit (ConduitT, await, sealConduitT, yield, ($$++)) | |
import qualified Conduit as C | |
import Data.Conduit (ConduitT, (.|), runConduit) | |
-- import Data.Conduit.Merge | |
import Control.Monad.IO.Class | |
import Control.Concurrent (threadDelay) | |
import Data.Foldable | |
import Control.Monad.Trans (lift) | |
import Data.List (sortOn) | |
import Data.Maybe | |
delayC :: MonadIO m => Int -> ConduitT a a m () | |
delayC d = | |
C.await >>= maybe (return ()) go | |
where | |
go i = do | |
liftIO $ threadDelay d | |
C.yield i | |
delayC d | |
stream1 :: MonadIO m => ConduitT a Int m () | |
stream1 = | |
C.iterate (+1) 1 .| C.filter even .| delayC 100000 .| C.take 10 | |
stream2 :: MonadIO m => ConduitT a Int m () | |
stream2 = | |
C.iterate (+1) 1 .| C.filter odd .| delayC 10000 .| C.take 100 | |
main :: IO () | |
main = | |
runConduit $ | |
mergeSourcesOn id [stream1, stream2] | |
.| C.printC | |
-- | Merge multiple sorted sources into one sorted producer using specified sorting key. | |
mergeSourcesOn | |
:: (Ord b, Foldable f, Monad m) | |
=> (a -> b) -> f (ConduitT () a m ()) -> ConduitT i a m () | |
mergeSourcesOn key = mergeSealed . fmap sealConduitT . toList where | |
mergeSealed sources = do | |
prefetchedSources <- lift $ traverse (\s -> s $$++ await) sources | |
let | |
srcs = | |
mapMaybe | |
(\(s, ma) -> (,s) <$> ma) | |
prefetchedSources | |
go srcs | |
go sources = | |
case sortOn (key . fst) sources of | |
[] -> pure () | |
(a, src1) : sources1 -> do | |
yield a | |
(src2, mb) <- | |
lift $ src1 $$++ await | |
let sources2 = case mb of | |
Nothing -> sources1 | |
Just b -> (b, src2) : sources1 | |
go sources2 | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment