Last active
May 29, 2016 16:20
-
-
Save michaelt/d937472ec758e8d56bac855d9a2ba534 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{-#LANGUAGE LambdaCase #-} | |
{-#LANGUAGE BangPatterns #-} | |
module Main where | |
import StoreStream | |
import Data.Store | |
import qualified Data.Store as Store | |
import Data.Store.Streaming | |
import Data.Store.Internal | |
import Streaming | |
import qualified Streaming.Prelude as Str | |
import Streaming.Internal | |
import qualified Data.Conduit as C | |
import qualified Data.Conduit.List as C | |
import qualified Data.Conduit.Binary as CB | |
import qualified Data.ByteString.Streaming as Q | |
import qualified System.IO.Streams as IOS | |
import System.IO.ByteBuffer (ByteBuffer) | |
import qualified System.IO.ByteBuffer as BB | |
import Pipes | |
import Pipes.Safe | |
import qualified Pipes.Prelude as P | |
import qualified Pipes.Safe.Prelude as P | |
import qualified Pipes.ByteString as PB | |
import Control.Exception | |
import Data.ByteString (ByteString) | |
import qualified Foreign.Storable as Storable | |
import qualified System.IO as IO | |
import System.Environment (getArgs) | |
import Data.Char | |
import qualified Data.ByteString as B | |
import Criterion.Main | |
main :: IO () | |
main = do | |
let size = 100 :: Int | |
conduit = CB.sourceFile "picnic.txt" | |
C.=$= decodeChars | |
-- C.$$ C.sinkNull | |
C.=$= C.map (ord . fromMessage) | |
C.$$ C.fold (+) 0 | |
pipe = P.withFile "picnic.txt" IO.ReadMode $ \h -> | |
P.sum $ | |
decodeChars''' (PB.fromHandle h) >-> P.map (ord . fromMessage) | |
stream = Str.sum_ | |
$ Str.map (ord . fromMessage) | |
-- Str.effects | |
$ decodeChars' | |
$ Q.readFile "picnic.txt" | |
ios = IOS.withFileAsInput "picnic.txt" $ \str -> do | |
cs <- decodeChars'' str | |
ns <- IOS.map (ord . fromMessage) cs | |
IOS.fold (+) 0 ns | |
-- IOS.skipToEof | |
conduit_out = C.sourceList picnic_table | |
C.=$= conduitEncode | |
C.$$ CB.sinkFile "picnic.txt" | |
pipe_out = P.withFile "picnic.txt" IO.WriteMode $ \h -> | |
runEffect $ each picnic_table | |
>-> pipeEncode | |
>-> PB.toHandle h | |
stream_out = Q.writeFile "picnic.txt" | |
$ streamEncode | |
$ Str.each picnic_table | |
ios_out = IO.withFile "picnic.txt" IO.WriteMode $ \h -> do | |
cs <- IOS.fromList picnic_table | |
os <- IOS.handleToOutputStream h | |
bs <- iosEncode cs | |
IOS.supply bs os | |
decodeChars :: (MonadIO m, MonadResource m) => C.Conduit ByteString m (Message Char) | |
decodeChars = conduitDecode Nothing | |
decodeChars' :: (MonadIO m, MonadResource m) => | |
Q.ByteString m () -> Stream (Of (Message Char)) m () | |
decodeChars' s = streamDecode Nothing s >>= lift . Q.effects | |
decodeChars'' :: IOS.InputStream ByteString -> IO (IOS.InputStream (Message Char)) | |
decodeChars'' = iosDecode Nothing | |
decodeChars''' :: Producer ByteString (SafeT IO) () -> Producer (Message Char) (SafeT IO) () | |
decodeChars''' s = void $ pipeDecode Nothing s | |
picnic_table = map Message $ unlines (replicate size "Picnic Table" ) | |
defaultMain | |
[bgroup "serialize" | |
[bench "iostreams" $ nfIO ios | |
, bench "streaming" $ nfIO (runResourceT stream) | |
, bench "pipes" $ nfIO (runSafeT pipe) | |
, bench "conduit" $ nfIO (runResourceT conduit) | |
] | |
, bgroup "deserialize" | |
[bench "iostreams" $ nfIO ios_out | |
, bench "streaming" $ nfIO (runResourceT stream_out) | |
, bench "pipes" $ nfIO (runSafeT pipe_out) | |
, bench "conduit" $ nfIO (runResourceT conduit_out) | |
] | |
] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module Main where | |
import qualified Data.Conduit as C | |
import qualified Data.Conduit.List as C | |
import qualified Data.Conduit.Binary as CB | |
import StoreStream | |
import Streaming | |
import qualified Streaming.Prelude as Str | |
import Streaming.Internal | |
import qualified Data.ByteString.Streaming as Q | |
import qualified System.IO.Streams as IOS | |
import Data.Store | |
import qualified Data.Store as Store | |
import Data.Store.Streaming | |
import Data.Store.Internal | |
import Pipes | |
import qualified Pipes.Prelude as P | |
import qualified Pipes.Safe as P | |
import qualified Pipes.ByteString as PB | |
import qualified Data.ByteString.Char8 as BC | |
import qualified Data.ByteString.Lazy as BL | |
import qualified Data.ByteString as B | |
import Control.Monad.Trans.Resource | |
import Control.Monad.Trans | |
import Data.Functor.Identity | |
-- main = runResourceT $ drip_c C.$$ C.mapM_ (lift . print) | |
main = runResourceT $ Str.print $ drip_s | |
ios = drip_ios >>= IOS.mapM_ print >>= IOS.skipToEof | |
drip_ios :: IO (IOS.InputStream (Message Char)) | |
drip_ios = IOS.fromList a >>= iosDecode Nothing | |
a = map B.singleton $ BL.unpack $ runIdentity $ C.sourceList (map Message "picnic") C.=$= conduitEncode C.$$ CB.sinkLbs | |
drip_c :: C.Source (ResourceT IO) (Message Char) | |
drip_c = C.sourceList a C.=$= conduitDecode Nothing | |
drip_s :: Stream (Of (Message Char)) (ResourceT IO) () | |
drip_s = streamDecode Nothing (Q.fromChunks (Str.each a)) >> return () | |
drip_p :: Producer (Message Char) (P.SafeT IO) () | |
drip_p = pipeDecode Nothing (each a) >> return () | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{-#LANGUAGE LambdaCase #-} | |
module StoreStream where | |
import Data.Store | |
import Data.Store.Streaming | |
import Data.Store.Internal | |
import Pipes | |
import qualified Pipes.Prelude as P | |
import qualified Pipes.Safe as P | |
import qualified Pipes.ByteString as PB | |
import qualified Pipes.Parse as P | |
import Streaming | |
import qualified Streaming.Prelude as Str | |
import Streaming.Internal | |
import qualified Data.ByteString.Streaming as Q | |
import qualified System.IO.Streams as IOS | |
import System.IO.ByteBuffer (ByteBuffer) | |
import qualified System.IO.ByteBuffer as BB | |
import Control.Exception | |
import qualified Foreign.Storable as Storable | |
import Data.ByteString (ByteString) | |
import Foreign.Ptr | |
import qualified System.IO as IO | |
import System.Environment (getArgs) | |
import Data.Char | |
import Control.Monad.Trans.State.Strict | |
iosEncode :: Store a | |
=> IOS.InputStream (Message a) | |
-> IO (IOS.InputStream ByteString) | |
iosEncode = IOS.map encodeMessage | |
{-#INLINE iosEncode #-} | |
pipeEncode | |
:: (Monad m, Store a) | |
=> Pipe (Message a) ByteString m r | |
pipeEncode = P.map encodeMessage | |
{-#INLINE pipeEncode #-} | |
streamEncode | |
:: (Monad m, Store a) | |
=> Stream (Of (Message a)) m r | |
-> Q.ByteString m r | |
streamEncode = Q.fromChunks . Str.map encodeMessage | |
{-#INLINE streamEncode #-} | |
iosDecode :: Store a | |
=> Maybe Int | |
-> IOS.InputStream ByteString | |
-> IO (IOS.InputStream (Message a)) | |
iosDecode bufsize str = bracket (BB.new bufsize) BB.free $ \buffer -> | |
IOS.makeInputStream (decodeMessage buffer (IOS.read str)) | |
{-#INLINE iosDecode #-} | |
-- not exported by Store ---------------------------- | |
type SizeTag = Int | |
tagLength :: Int | |
tagLength = Storable.sizeOf (undefined :: SizeTag) | |
{-# INLINE tagLength #-} | |
peekSized :: (MonadIO m, Store a) => ByteBuffer -> Int -> m (PeekMessage m a) | |
peekSized bb n = do | |
e <- BB.unsafeConsume bb n | |
case e of | |
Right ptr -> do | |
(_, a) <- liftIO $ runPeek peek (ptr `plusPtr` n) ptr | |
return (Done (Message a)) | |
Left _ -> return $ NeedMoreInput (\ bs -> BB.copyByteString bb bs | |
>> peekSized bb n) | |
{-#INLINABLE peekSized #-} | |
peekSizeTag :: MonadIO m => ByteBuffer -> m (PeekMessage m SizeTag) | |
peekSizeTag bb = peekSized bb tagLength | |
{-# INLINE peekSizeTag #-} | |
-- ------------------------------------------------------------- | |
pipeDecode | |
:: (Store a, P.MonadSafe m) => | |
Maybe Int | |
-> Producer ByteString m r | |
-> Producer (Message a) m (Producer ByteString m r) | |
pipeDecode bufsize bs0 = P.bracket (BB.new bufsize) BB.free (pipeDecodeLoop_ bs0) | |
where | |
pipeDecodeLoop_ :: (Store a, MonadIO m) | |
=> Producer ByteString m r | |
-> ByteBuffer | |
-> Producer (Message a) m (Producer ByteString m r) | |
pipeDecodeLoop_ str0 bb = do | |
(m,rest) <- lift $ do | |
let size_tag str = do | |
pk <- peekSizeTag bb | |
case pk of | |
Done (Message n) -> return (Just n, Right str) | |
NeedMoreInput _ -> do | |
e <- next str | |
case e of | |
Left r -> do | |
n <- BB.availableBytes bb | |
case n of | |
0 -> return (Nothing, Left r) | |
_ -> liftIO (tooManyBytes tagLength n "Data.Store.Message.SizeTag") | |
Right (bs,rest) -> do | |
BB.copyByteString bb bs | |
size_tag rest | |
(m, e) <- size_tag str0 | |
case e of | |
Left r -> return (Nothing,return r) | |
Right rest -> case m of | |
Nothing -> return (Nothing,rest) | |
Just n -> do | |
let sized str = do | |
pk <- peekSized bb n | |
case pk of | |
Done message -> return (Just message, Right str) | |
NeedMoreInput _ -> do | |
e <- next str | |
case e of | |
Left r -> do | |
available <- BB.availableBytes bb | |
case n of | |
0 -> return (Nothing, Left r) | |
_ -> liftIO (tooManyBytes n available "Data.Store.Message.SizeTag") | |
Right (bs,rest) -> do | |
BB.copyByteString bb bs | |
sized rest | |
(m',e') <- sized rest | |
return (m', either return id e') | |
case m of | |
Nothing -> return rest | |
Just message -> do | |
yield message | |
pipeDecodeLoop_ rest bb | |
{-#INLINABLE pipeDecode #-} | |
streamDecode | |
:: (Store a, MonadResource m) => | |
Maybe Int | |
-> Q.ByteString m r -> Stream (Of (Message a)) m (Q.ByteString m r) | |
streamDecode bufsize bs0 = | |
bracketStream (BB.new bufsize) BB.free (streamDecodeLoop bs0) | |
where | |
streamDecodeLoop :: (Store a, MonadIO m) | |
=> Q.ByteString m r | |
-> ByteBuffer | |
-> Stream (Of (Message a)) m (Q.ByteString m r) | |
streamDecodeLoop str0 bb = do | |
(m,rest) <- lift $ do | |
let dst str = do | |
pk <- peekSizeTag bb | |
case pk of | |
Done (Message n) -> return (Just n, Right str) | |
NeedMoreInput _ -> do | |
e <- Q.nextChunk str | |
case e of | |
Left r -> do | |
n <- BB.availableBytes bb | |
case n of | |
0 -> return (Nothing, Left r) | |
_ -> liftIO (tooManyBytes tagLength n "Data.Store.Message.SizeTag") | |
Right (bs,rest) -> do | |
BB.copyByteString bb bs | |
dst rest | |
(m, e) <- dst str0 | |
case e of | |
Left r -> return (Nothing,return r) | |
Right rest -> case m of | |
Nothing -> return (Nothing,rest) | |
Just n -> do | |
let ds str = do | |
pk <- peekSized bb n | |
case pk of | |
Done message -> return (Just message, Right str) | |
NeedMoreInput _ -> do | |
e <- Q.nextChunk str | |
case e of | |
Left r -> do | |
available <- BB.availableBytes bb | |
case n of | |
0 -> return (Nothing, Left r) | |
_ -> liftIO (tooManyBytes n available "Data.Store.Message.SizeTag") | |
Right (bs,rest) -> do | |
BB.copyByteString bb bs | |
ds rest | |
(m',e') <- ds rest | |
return (m', either return id e') | |
case m of | |
Nothing -> return rest | |
Just message -> do | |
Str.yield message | |
streamDecodeLoop rest bb | |
{-#INLINABLE streamDecode #-} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment