Created
April 25, 2013 17:41
-
-
Save gregorycollins/5461584 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- | Internal implementation of the @io-streams@ library, intended for library | |
-- writers | |
-- | |
-- Library users should use the interface provided by "System.IO.Streams" | |
{-# LANGUAGE BangPatterns #-} | |
{-# LANGUAGE DeriveDataTypeable #-} | |
{-# LANGUAGE FlexibleInstances #-} | |
{-# LANGUAGE GeneralizedNewtypeDeriving #-} | |
{-# LANGUAGE OverloadedStrings #-} | |
{-# LANGUAGE TypeSynonymInstances #-} | |
module System.IO.Streams.Internal2 where | |
------------------------------------------------------------------------------ | |
import Control.Applicative (Applicative (..)) | |
import Control.Concurrent (newMVar, withMVar) | |
import Control.Exception (throwIO) | |
import Control.Monad (when) | |
import Control.Monad.IO.Class (MonadIO (..)) | |
import Data.ByteString.Char8 (ByteString) | |
import qualified Data.ByteString.Char8 as S | |
import qualified Data.ByteString.Internal as S | |
import qualified Data.ByteString.Unsafe as S | |
import Data.IORef (newIORef, readIORef, writeIORef) | |
import Data.Maybe (isNothing) | |
import Data.Typeable (Typeable) | |
import Data.Word (Word8) | |
import Foreign.Marshal.Utils (copyBytes) | |
import Foreign.Ptr (castPtr) | |
import qualified GHC.IO.Buffer as H | |
import qualified GHC.IO.BufferedIO as H | |
import qualified GHC.IO.Device as H | |
import GHC.IO.Exception (unsupportedOperation) | |
import Prelude hiding (read) | |
------------------------------------------------------------------------------ | |
-- | A strict pair type. | |
data SP a b = SP !a !b | |
deriving (Typeable) | |
------------------------------------------------------------------------------ | |
data InputStream a = InputStream { | |
_read :: IO (Maybe a) | |
, _unRead :: a -> IO () | |
} deriving (Typeable) | |
------------------------------------------------------------------------------ | |
data OutputStream a = OutputStream { | |
_write :: Maybe a -> IO () | |
} deriving (Typeable) | |
------------------------------------------------------------------------------ | |
read :: InputStream a -> IO (Maybe a) | |
read = _read | |
{-# INLINE read #-} | |
------------------------------------------------------------------------------ | |
write :: Maybe a -> OutputStream a -> IO () | |
write = flip _write | |
{-# INLINE write #-} | |
------------------------------------------------------------------------------ | |
peek :: InputStream a -> IO (Maybe a) | |
peek s = do | |
x <- read s | |
maybe (return $! ()) (_unRead s) x | |
return x | |
------------------------------------------------------------------------------ | |
unRead :: a -> InputStream a -> IO () | |
unRead = flip _unRead | |
------------------------------------------------------------------------------ | |
-- | Connects an 'InputStream' and 'OutputStream', supplying values from the | |
-- 'InputStream' to the 'OutputStream', and propagating the end-of-stream | |
-- message from the 'InputStream' through to the 'OutputStream'. | |
-- | |
-- The connection ends when the 'InputStream' yields a 'Nothing'. | |
connect :: InputStream a -> OutputStream a -> IO () | |
connect p q = loop | |
where | |
loop = do | |
m <- read p | |
maybe (write Nothing q) | |
(const $ write m q >> loop) | |
m | |
{-# INLINE connect #-} | |
------------------------------------------------------------------------------ | |
-- | The 'connectTo' function is just @'flip' 'connect'@. | |
-- | |
-- Useful for writing expressions like @fromList [1,2,3] >>= connectTo foo@. | |
-- | |
connectTo :: OutputStream a -> InputStream a -> IO () | |
connectTo = flip connect | |
{-# INLINE connectTo #-} | |
------------------------------------------------------------------------------ | |
-- | Connects an 'InputStream' to an 'OutputStream' without passing the | |
-- end-of-stream notification through to the 'OutputStream'. | |
-- | |
-- Use this to supply an 'OutputStream' with multiple 'InputStream's and use | |
-- 'connect' for the final 'InputStream' to finalize the 'OutputStream', like | |
-- so: | |
-- | |
-- @ | |
-- do Streams.'supply' input1 output | |
-- Streams.'supply' input2 output | |
-- Streams.'connect' input3 output | |
-- @ | |
-- | |
supply :: InputStream a -> OutputStream a -> IO () | |
supply p q = loop | |
where | |
loop = do | |
m <- read p | |
maybe (return $! ()) | |
(const $ write m q >> loop) | |
m | |
{-# INLINE supply #-} | |
------------------------------------------------------------------------------ | |
-- | 'supply' with the arguments flipped. | |
supplyTo :: OutputStream a -> InputStream a -> IO () | |
supplyTo = flip supply | |
{-# INLINE supplyTo #-} | |
------------------------------------------------------------------------------ | |
-- | Creates an 'InputStream' from a value-producing action. | |
-- | |
-- (@makeInputStream m@) calls the action @m@ each time you request a value | |
-- from the 'InputStream'. The given action is extended with the default | |
-- pushback mechanism (see "System.IO.Streams.Internal#pushback"). | |
makeInputStream :: IO (Maybe a) -> IO (InputStream a) | |
makeInputStream m = do | |
doneRef <- newIORef False | |
pbRef <- newIORef [] | |
return $! InputStream (grab doneRef pbRef) (pb pbRef) | |
where | |
grab doneRef pbRef = do | |
l <- readIORef pbRef | |
case l of | |
[] -> do done <- readIORef doneRef | |
if done | |
then return Nothing | |
else do | |
x <- m | |
when (isNothing x) $ writeIORef doneRef True | |
return x | |
(x:xs) -> writeIORef pbRef xs >> (return $! Just x) | |
pb ref x = readIORef ref >>= \xs -> writeIORef ref (x:xs) | |
{-# INLINE makeInputStream #-} | |
------------------------------------------------------------------------------ | |
-- | Creates an 'OutputStream' from a value-consuming action. | |
-- | |
-- (@makeOutputStream f@) runs the computation @f@ on each value fed to it. | |
makeOutputStream :: (Maybe a -> IO ()) -> IO (OutputStream a) | |
makeOutputStream = return . OutputStream | |
------------------------------------------------------------------------------ | |
-- | Converts an 'InputStream' into a thread-safe 'InputStream', at a slight | |
-- performance penalty. | |
-- | |
-- For performance reasons, this library provides non-thread-safe streams by | |
-- default. Use the @locking@ functions to convert these streams into slightly | |
-- slower, but thread-safe, equivalents. | |
lockingInputStream :: InputStream a -> IO (InputStream a) | |
lockingInputStream s = do | |
mv <- newMVar $! () | |
return $! InputStream (grab mv) (pb mv) | |
where | |
grab mv = withMVar mv $ const $ read s | |
pb mv x = withMVar mv $ const $ unRead x s | |
{-# INLINE lockingInputStream #-} | |
------------------------------------------------------------------------------ | |
-- | Converts an 'OutputStream' into a thread-safe 'OutputStream', at a slight | |
-- performance penalty. | |
-- | |
-- For performance reasons, this library provides non-thread-safe streams by | |
-- default. Use the @locking@ functions to convert these streams into slightly | |
-- slower, but thread-safe, equivalents. | |
lockingOutputStream :: OutputStream a -> IO (OutputStream a) | |
lockingOutputStream s = do | |
mv <- newMVar $! () | |
makeOutputStream $ f mv | |
where | |
f mv x = withMVar mv $ const $ write x s | |
{-# INLINE lockingOutputStream #-} | |
------------------------------------------------------------------------------ | |
-- | An empty 'InputStream' that yields 'Nothing' immediately. | |
nullInput :: IO (InputStream a) | |
nullInput = makeInputStream $ return Nothing | |
------------------------------------------------------------------------------ | |
-- | An empty 'OutputStream' that discards any input fed to it. | |
nullOutput :: IO (OutputStream a) | |
nullOutput = makeOutputStream $ const $ return $! () | |
------------------------------------------------------------------------------ | |
-- | 'appendInputStream' concatenates two 'InputStream's, analogous to ('++') | |
-- for lists. | |
-- | |
-- The second 'InputStream' continues where the first 'InputStream' ends. | |
-- | |
-- Note: values pushed back to 'appendInputStream' are not propagated to either | |
-- wrapped 'InputStream'. | |
appendInputStream :: InputStream a -> InputStream a -> IO (InputStream a) | |
appendInputStream s1 s2 = concatInputStreams [s1, s2] | |
------------------------------------------------------------------------------ | |
-- | 'concatInputStreams' concatenates a list of 'InputStream's, analogous to | |
-- ('++') for lists. | |
-- | |
-- Subsequent 'InputStream's continue where the previous one 'InputStream' | |
-- ends. | |
-- | |
-- Note: values pushed back to the 'InputStream' returned by | |
-- 'concatInputStreams' are not propagated to any of the source | |
-- 'InputStream's. | |
concatInputStreams :: [InputStream a] -> IO (InputStream a) | |
concatInputStreams inputStreams = do | |
ref <- newIORef inputStreams | |
makeInputStream $! run ref | |
where | |
run ref = go | |
where | |
go = do | |
streams <- readIORef ref | |
case streams of | |
[] -> return Nothing | |
(s:rest) -> do | |
next <- read s | |
case next of | |
Nothing -> writeIORef ref rest >> go | |
Just _ -> return next | |
------------------------------------------------------------------------------ | |
-- | Checks if an 'InputStream' is at end-of-stream. | |
atEOF :: InputStream a -> IO Bool | |
atEOF s = read s >>= maybe (return True) (\k -> unRead k s >> return False) | |
------------------------------------------------------------------------------ | |
-- $pushback | |
-- #pushback# | |
-- | |
-- Users can push a value back into an input stream using the 'unRead' | |
-- function. Usually this will use the default pushback mechanism which | |
-- provides a buffer for the stream. Some stream transformers, like | |
-- 'takeBytes', produce streams that send pushed-back values back to the | |
-- streams that they wrap. A function like 'System.IO.Streams.Combinators.map' | |
-- cannot do this because the types don't match up: | |
-- | |
-- @ | |
-- 'System.IO.Streams.Combinators.map' :: (a -> b) -> 'InputStream' a -> 'IO' ('InputStream' b) | |
-- @ | |
-- | |
-- A function will usually document if its pushback behaviour differs from the | |
-- default. No matter what the case, input streams should obey the following | |
-- law: | |
-- | |
-- @ | |
-- Streams.'unRead' c stream >> Streams.'read' stream === 'return' ('Just' c) | |
-- @ | |
-------------------------------------------- | |
-- Typeclass instances for Handle support -- | |
-------------------------------------------- | |
------------------------------------------------------------------------------ | |
bUFSIZ :: Int | |
bUFSIZ = 32752 | |
------------------------------------------------------------------------------ | |
unsupported :: IO a | |
unsupported = throwIO unsupportedOperation | |
------------------------------------------------------------------------------ | |
bufferToBS :: H.Buffer Word8 -> ByteString | |
bufferToBS buf = S.copy $! S.fromForeignPtr raw l sz | |
where | |
raw = H.bufRaw buf | |
l = H.bufL buf | |
r = H.bufR buf | |
sz = r - l | |
------------------------------------------------------------------------------ | |
instance H.RawIO (InputStream ByteString) where | |
read is ptr n = read is >>= maybe (return 0) f | |
where | |
f s = S.unsafeUseAsCStringLen s $ \(cstr, l) -> do | |
let c = min n l | |
copyBytes ptr (castPtr cstr) c | |
return $! c | |
readNonBlocking _ _ _ = unsupported | |
write _ _ _ = unsupported | |
writeNonBlocking _ _ _ = unsupported | |
------------------------------------------------------------------------------ | |
instance H.RawIO (OutputStream ByteString) where | |
read _ _ _ = unsupported | |
readNonBlocking _ _ _ = unsupported | |
write os ptr n = S.packCStringLen (castPtr ptr, n) >>= | |
flip write os . Just | |
writeNonBlocking _ _ _ = unsupported | |
------------------------------------------------------------------------------ | |
-- | Internal convenience synonym for a pair of input\/output streams. | |
type StreamPair a = SP (InputStream a) (OutputStream a) | |
instance H.RawIO (StreamPair ByteString) where | |
read (SP is _) ptr n = H.read is ptr n | |
readNonBlocking _ _ _ = unsupported | |
write (SP _ os) ptr n = H.write os ptr n | |
writeNonBlocking _ _ _ = unsupported | |
------------------------------------------------------------------------------ | |
instance H.BufferedIO (OutputStream ByteString) where | |
newBuffer !_ bs = H.newByteBuffer bUFSIZ bs | |
fillReadBuffer !_ _ = unsupported | |
fillReadBuffer0 !_ _ = unsupported | |
flushWriteBuffer !os !buf = do | |
write (Just $! bufferToBS buf) os | |
emptyWriteBuffer buf | |
flushWriteBuffer0 !os !buf = do | |
let s = bufferToBS buf | |
let l = S.length s | |
write (Just s) os | |
buf' <- emptyWriteBuffer buf | |
return $! (l, buf') | |
------------------------------------------------------------------------------ | |
instance H.BufferedIO (InputStream ByteString) where | |
newBuffer !_ !bs = H.newByteBuffer bUFSIZ bs | |
fillReadBuffer !is !buf = H.readBuf is buf | |
fillReadBuffer0 _ _ = unsupported | |
flushWriteBuffer _ _ = unsupported | |
flushWriteBuffer0 _ _ = unsupported | |
------------------------------------------------------------------------------ | |
instance H.BufferedIO (StreamPair ByteString) where | |
newBuffer !_ bs = H.newByteBuffer bUFSIZ bs | |
fillReadBuffer (SP is _) = H.fillReadBuffer is | |
fillReadBuffer0 _ _ = unsupported | |
flushWriteBuffer (SP _ !os) = H.flushWriteBuffer os | |
flushWriteBuffer0 (SP _ !os) = H.flushWriteBuffer0 os | |
------------------------------------------------------------------------------ | |
instance H.IODevice (OutputStream ByteString) where | |
ready _ _ _ = return True | |
close = write Nothing | |
devType _ = return H.Stream | |
------------------------------------------------------------------------------ | |
instance H.IODevice (InputStream ByteString) where | |
ready _ _ _ = return True | |
close _ = return $! () | |
devType _ = return H.Stream | |
------------------------------------------------------------------------------ | |
instance H.IODevice (StreamPair ByteString) where | |
ready _ _ _ = return True | |
close (SP _ os) = write Nothing os | |
devType _ = return H.Stream | |
------------------------------------------------------------------------------ | |
emptyWriteBuffer :: H.Buffer Word8 | |
-> IO (H.Buffer Word8) | |
emptyWriteBuffer buf | |
= return buf { H.bufL=0, H.bufR=0, H.bufState = H.WriteBuffer } | |
------------------------------------------------------------------------------ | |
-- | A 'Generator' is a coroutine monad that can be used to define complex | |
-- 'InputStream's. You can cause a value of type @Just r@ to appear when the | |
-- 'InputStream' is read by calling 'yield': | |
-- | |
-- @ | |
-- g :: 'Generator' Int () | |
-- g = do | |
-- Streams.'yield' 1 | |
-- Streams.'yield' 2 | |
-- Streams.'yield' 3 | |
-- @ | |
-- | |
-- A 'Generator' can be turned into an 'InputStream' by calling | |
-- 'fromGenerator': | |
-- | |
-- @ | |
-- m :: 'IO' ['Int'] | |
-- m = Streams.'fromGenerator' g >>= Streams.'System.IO.Streams.toList' \-\- value returned is [1,2,3] | |
-- @ | |
-- | |
-- You can perform IO by calling 'liftIO', and turn a 'Generator' into an | |
-- 'InputStream' with 'fromGenerator'. | |
-- | |
-- As a general rule, you should not acquire resources that need to be freed | |
-- from a 'Generator', because there is no guarantee the coroutine continuation | |
-- will ever be called, nor can you catch an exception from within a | |
-- 'Generator'. | |
newtype Generator r a = Generator { | |
unG :: IO (Either (SP r (Generator r a)) a) | |
} deriving (Typeable) | |
------------------------------------------------------------------------------ | |
generatorBind :: Generator r a -> (a -> Generator r b) -> Generator r b | |
generatorBind (Generator m) f = Generator (m >>= either step value) | |
where | |
step (SP v r) = return $! Left $! SP v (generatorBind r f) | |
value = unG . f | |
{-# INLINE generatorBind #-} | |
------------------------------------------------------------------------------ | |
instance Monad (Generator r) where | |
return = Generator . return . Right | |
(>>=) = generatorBind | |
------------------------------------------------------------------------------ | |
instance MonadIO (Generator r) where | |
liftIO = Generator . (Right `fmap`) | |
------------------------------------------------------------------------------ | |
instance Functor (Generator r) where | |
fmap f (Generator m) = Generator $ m >>= either step value | |
where | |
step (SP v m') = return $! Left $! SP v (fmap f m') | |
value v = return $! Right $! f v | |
------------------------------------------------------------------------------ | |
instance Applicative (Generator r) where | |
pure = Generator . return . Right | |
m <*> n = do | |
f <- m | |
v <- n | |
return $! f v | |
------------------------------------------------------------------------------ | |
-- | Calling @'yield' x@ causes the value @'Just' x@ to appear on the input | |
-- when this generator is converted to an 'InputStream'. The rest of the | |
-- computation after the call to 'yield' is resumed later when the | |
-- 'InputStream' is 'read' again. | |
yield :: r -> Generator r () | |
yield x = Generator $! return $! Left $! SP x (return $! ()) | |
------------------------------------------------------------------------------ | |
-- | Turns a 'Generator' into an 'InputStream'. | |
fromGenerator :: Generator r a -> IO (InputStream r) | |
fromGenerator (Generator m) = do | |
ref <- newIORef m | |
makeInputStream $! go ref | |
where | |
go ref = readIORef ref >>= (\n -> n >>= either step finish) | |
where | |
step (SP v gen) = do | |
writeIORef ref $! unG gen | |
return $! Just v | |
finish _ = return Nothing | |
------------------------------------------------------------------------------ | |
newtype Consumer c a = Consumer { | |
unC :: IO (Either (Maybe c -> Consumer c a) a) | |
} deriving (Typeable) | |
------------------------------------------------------------------------------ | |
instance Monad (Consumer c) where | |
return = Consumer . return . Right | |
(Consumer m) >>= f = Consumer $ m >>= either step value | |
where | |
step g = return $! Left $! (>>= f) . g | |
value v = unC $ f v | |
------------------------------------------------------------------------------ | |
instance MonadIO (Consumer c) where | |
liftIO = Consumer . fmap Right | |
------------------------------------------------------------------------------ | |
instance Functor (Consumer r) where | |
fmap f (Consumer m) = Consumer (m >>= either step value) | |
where | |
step g = return $! Left $! (fmap f) . g | |
value v = return $! Right $! f v | |
------------------------------------------------------------------------------ | |
instance Applicative (Consumer r) where | |
pure = return | |
m <*> n = do | |
f <- m | |
v <- n | |
return $! f v | |
------------------------------------------------------------------------------ | |
await :: Consumer r (Maybe r) | |
await = Consumer $ return (Left return) | |
------------------------------------------------------------------------------ | |
fromConsumer :: Consumer r a -> IO (OutputStream r) | |
fromConsumer c0 = newIORef c0 >>= makeOutputStream . go | |
where | |
go ref mb = do | |
c <- readIORef ref | |
c' <- unC c >>= either step (const $! return c) | |
writeIORef ref c' | |
where | |
force c = do e <- unC c | |
return $! Consumer $! return e | |
step g = force $! g mb |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment