Created
December 27, 2024 17:26
-
-
Save paulvictor/4de6f2e255d6105dd3c34be1d1525e58 to your computer and use it in GitHub Desktop.
Merge bit vectors
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env magix | |
#!magix haskell | |
#!haskellPackages bytestring conduit relude unliftio vector | |
#!ghcFlags -threaded -O2 -rtsopts | |
-- #! /usr/bin/env nix-shell | |
-- #! nix-shell -p "haskell.packages.ghc96.ghcWithPackages (pkgs: with pkgs; [ bytestring conduit relude unliftio vector ])" ghcid | |
-- #! nix-shell --pure | |
-- #! nix-shell -i "runghc --ghc-arg=-threaded --ghc-arg=-O2" | |
-- #! nix-shell -i ghcid | |
{-# LANGUAGE OverloadedStrings #-} | |
{-# LANGUAGE NoImplicitPrelude #-} | |
{-# LANGUAGE ScopedTypeVariables #-} | |
import Data.ByteString qualified as BS | |
import qualified Data.ByteString.Internal as BS | |
import Data.Conduit | |
import Data.Conduit.Combinators (repeatM) | |
import qualified Data.Conduit.List as C | |
import qualified Conduit as C | |
import Foreign.ForeignPtr (castForeignPtr) | |
import Foreign.Ptr (ptrToIntPtr, IntPtr(..)) | |
import Conduit | |
import System.IO (openFile) | |
import Data.Functor | |
import qualified Data.Vector as V | |
import Data.Vector (Vector, (!)) | |
import Relude | |
import Data.Bits ((.|.)) | |
import Data.Traversable | |
import Control.Monad ((>>=)) | |
import Control.Monad.IO.Unlift | |
import Control.Concurrent (forkIO) | |
import UnliftIO.Async | |
import qualified Control.Concurrent.STM.TBQueue as Q | |
import qualified Data.Vector.Storable as DVS | |
import Foreign.Storable (sizeOf) | |
main :: IO () | |
main = | |
let | |
files = [ "./bf1", "./bf2", "./bf3", "./bf4", "./bf1", "./bf2", "./bf3", "./bf4"] | |
printLen :: ConduitT BS.ByteString o IO () | |
printLen = do | |
await >>= maybe | |
(pure ()) | |
(\bs -> do | |
liftIO $ putStrLn $ show $ BS.length bs | |
printLen) | |
in do | |
handles <- traverse (\f -> openFile f ReadMode) files | |
let | |
chunkSize = 4 * 1024 | |
sources = handles <&> (\h -> sourceHandle h .| chunksOfCE chunkSize .| mapC decodeToVW64) | |
runConduit $ | |
wideMerge sources -- Conduit () (Vector (DVS.Vector Word64) m ()) | |
.| mapC (V.foldl1' (DVS.zipWith (.|.))) | |
.| mapC (\v -> | |
let | |
(fp, off, len) = DVS.unsafeToForeignPtr v | |
bs = BS.PS (castForeignPtr fp) off (len * 8) -- Since we are casting the word64 to word8 | |
in bs | |
) | |
.| stdoutC | |
{-#INLINE decodeToVW64 #-} | |
decodeToVW64 :: ByteString -> DVS.Vector Word64 | |
decodeToVW64 bs = | |
let | |
(fptr, l) = BS.toForeignPtr0 bs | |
n = l `quot` (sizeOf (undefined :: Word64)) | |
in | |
DVS.unsafeFromForeignPtr0 (castForeignPtr fptr) n | |
wideMerge :: forall m a i. MonadUnliftIO m => [ConduitT () a m ()] -> ConduitT () (Vector a) m () | |
wideMerge sources = do | |
let | |
!l = length sources | |
mutexes <- | |
liftIO $ V.replicateM l (Q.newTBQueueIO 32) | |
st <- lift $ askUnliftIO | |
let | |
srcWithIdx = zip sources [0..] | |
produce = | |
forConcurrently_ srcWithIdx (\(src, idx) -> | |
let | |
loop :: ConduitT a o m () | |
loop = do | |
let | |
mutex = mutexes ! idx | |
await >>= | |
maybe | |
(atomically $ Q.writeTBQueue mutex Nothing $> ()) | |
(\a -> do | |
atomically $ Q.writeTBQueue mutex (Just a) | |
loop) | |
in | |
unliftIO st (runConduit (src .| loop))) | |
liftIO $ forkIO $ produce -- TODO handle the cleanup, use async | |
-- liftIO $ withAsync (pure ()) (const produce) | |
repeatM (for mutexes (\m -> atomically ( Q.readTBQueue m))) | |
-- .| traceC | |
.| C.mapC sequence | |
.| C.takeWhileC isJust | |
.| C.mapMaybe id | |
traceC :: (MonadIO m, Show i) => ConduitT i i m () | |
traceC = C.mapM (\a -> print a $> a) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment