Skip to content

Instantly share code, notes, and snippets.

@paulvictor
Created December 27, 2024 17:26
Show Gist options
  • Save paulvictor/4de6f2e255d6105dd3c34be1d1525e58 to your computer and use it in GitHub Desktop.
Save paulvictor/4de6f2e255d6105dd3c34be1d1525e58 to your computer and use it in GitHub Desktop.
Merge bit vectors
#!/usr/bin/env magix
#!magix haskell
#!haskellPackages bytestring conduit relude unliftio vector
#!ghcFlags -threaded -O2 -rtsopts
-- #! /usr/bin/env nix-shell
-- #! nix-shell -p "haskell.packages.ghc96.ghcWithPackages (pkgs: with pkgs; [ bytestring conduit relude unliftio vector ])" ghcid
-- #! nix-shell --pure
-- #! nix-shell -i "runghc --ghc-arg=-threaded --ghc-arg=-O2"
-- #! nix-shell -i ghcid
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE NoImplicitPrelude #-}
{-# LANGUAGE ScopedTypeVariables #-}
import Data.ByteString qualified as BS
import qualified Data.ByteString.Internal as BS
import Data.Conduit
import Data.Conduit.Combinators (repeatM)
import qualified Data.Conduit.List as C
import qualified Conduit as C
import Foreign.ForeignPtr (castForeignPtr)
import Foreign.Ptr (ptrToIntPtr, IntPtr(..))
import Conduit
import System.IO (openFile)
import Data.Functor
import qualified Data.Vector as V
import Data.Vector (Vector, (!))
import Relude
import Data.Bits ((.|.))
import Data.Traversable
import Control.Monad ((>>=))
import Control.Monad.IO.Unlift
import Control.Concurrent (forkIO)
import UnliftIO.Async
import qualified Control.Concurrent.STM.TBQueue as Q
import qualified Data.Vector.Storable as DVS
import Foreign.Storable (sizeOf)
main :: IO ()
main =
let
files = [ "./bf1", "./bf2", "./bf3", "./bf4", "./bf1", "./bf2", "./bf3", "./bf4"]
printLen :: ConduitT BS.ByteString o IO ()
printLen = do
await >>= maybe
(pure ())
(\bs -> do
liftIO $ putStrLn $ show $ BS.length bs
printLen)
in do
handles <- traverse (\f -> openFile f ReadMode) files
let
chunkSize = 4 * 1024
sources = handles <&> (\h -> sourceHandle h .| chunksOfCE chunkSize .| mapC decodeToVW64)
runConduit $
wideMerge sources -- Conduit () (Vector (DVS.Vector Word64) m ())
.| mapC (V.foldl1' (DVS.zipWith (.|.)))
.| mapC (\v ->
let
(fp, off, len) = DVS.unsafeToForeignPtr v
bs = BS.PS (castForeignPtr fp) off (len * 8) -- Since we are casting the word64 to word8
in bs
)
.| stdoutC
{-#INLINE decodeToVW64 #-}
decodeToVW64 :: ByteString -> DVS.Vector Word64
decodeToVW64 bs =
let
(fptr, l) = BS.toForeignPtr0 bs
n = l `quot` (sizeOf (undefined :: Word64))
in
DVS.unsafeFromForeignPtr0 (castForeignPtr fptr) n
wideMerge :: forall m a i. MonadUnliftIO m => [ConduitT () a m ()] -> ConduitT () (Vector a) m ()
wideMerge sources = do
let
!l = length sources
mutexes <-
liftIO $ V.replicateM l (Q.newTBQueueIO 32)
st <- lift $ askUnliftIO
let
srcWithIdx = zip sources [0..]
produce =
forConcurrently_ srcWithIdx (\(src, idx) ->
let
loop :: ConduitT a o m ()
loop = do
let
mutex = mutexes ! idx
await >>=
maybe
(atomically $ Q.writeTBQueue mutex Nothing $> ())
(\a -> do
atomically $ Q.writeTBQueue mutex (Just a)
loop)
in
unliftIO st (runConduit (src .| loop)))
liftIO $ forkIO $ produce -- TODO handle the cleanup, use async
-- liftIO $ withAsync (pure ()) (const produce)
repeatM (for mutexes (\m -> atomically ( Q.readTBQueue m)))
-- .| traceC
.| C.mapC sequence
.| C.takeWhileC isJust
.| C.mapMaybe id
traceC :: (MonadIO m, Show i) => ConduitT i i m ()
traceC = C.mapM (\a -> print a $> a)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment