In response to this reddit post about failing to understand the Pipes library after a couple of hours. I wanted to show how an experienced haskeller does it, but I'm afraid it also took me quite a few hours, which is why the following list of steps goes on and on.
After all those hours, my opinion is that Pipes
is not at all an easy library. I don't know if Conduit is any easier, but otherwise I side with your friend in advising to use something else (perhaps ordinary lazy IO?) instead of Pipes.
Anyway, here is the full brain dump of my steps as I tried to get your three snippets to fit together.
-
Since you say that you have a hard time combining the snippets, I assume that they must have complicated types. So my first goal is to figure out the type of your first snippet.
-
hoogle for
parseUrl
,withManager
, etc. No results. -
Google for
haskell runEffect
, find that it's a method fromPipes.Core
, a module of thepipes
package -
cabal install pipes
-
Write a small program importing
Pipes.Core
giving the namegetHttpStream
to your first snippet.parseUrl
is not in scope, among others. -
At this point I guess that
withManager
,withHTTP
, etc. are probably members of other packages in the Pipes ecosystem. -
Google for
withHTTP pipes
, discover that I needPipes.HTTP
from thepipes-http
package. -
cabal install pipes-http
-
Now
(>->)
is not in scope. I thought it was monad composition, so I importedControl.Monad
, and when it didn't work I looked upControl.Monad
on hoogle and discovered that I was confusing(>->)
with(>=>)
. -
hoogle for
(>->)
. Probably another piece of the Pipes ecosystem. -
Google for ">-> pipes", but of course google is no good for identifiers which consists only of symbols.
-
I remember that hayoo is supposed to support more packages than hoogle, so I search it for "(>->)", but the results don't seem related to my query.
-
I guess that
(>->)
is probably pipe composition, so it's probably in the corepipes
package, just not inPipes.Core
. I quickly find it inPipes
. -
Next,
PB.stdout
is not in scope. At that point I notice that theparseUrl
snippet is actually at the top of thePipes.HTTP
documentation, from which I discover thatPB
isPipes.ByteString
. -
cabal install pipes-bytestring
-
The first snippet finally compiles! I can finally ask ghci about its type.
$ cabal repl > :load Main.hs > :t getHttpStream getHttpStream :: IO ()
-
Okay, so the type isn't as complicated at all. What happens when I run this?
``` > getHttpStream <html> ... </html> ```
-
Okay, so it fetches a webpage and prints it.
-
What do we want to do next? Turning the binary stream into text and splitting the text into lines? Clearly, since the first snippet has type
IO ()
and notIO ByteString
, we're not going to be able to do anything to the binary stream. So my next goal is to modify the first snippet so that it has typeIO ByteString
. -
I add the type annotation
getHttpStream :: IO PB.ByteString
and look at the error messages.withManager
returns the wrong type, but I can't tell if it's because it is a polymorphic function and its body has the wrong type, or if it's because it always returnsIO ()
. -
Looking for
withManager
in the documentation forPipes.Core
andPipes
. It's not there. -
No matter, I can just ask ghci.
``` :t withManager withManager :: ManagerSettings -> (Manager -> IO a) -> IO a ```
-
Great, it's the body which has the wrong type. Same question for
withHTTP
:``` :t withHTTP withHTTP :: Request -> Manager -> (Response (Producer PB.ByteString IO ()) -> IO a) -> IO a ```
-
Okay, we're getting close: the body receives a something-something-ByteString and returns an
IO a
, I just need to somehow extract the ByteString from the something-something-ByteString and return that instead of()
. Still, while we're at it, what's the type ofrunEffect
?``` :t runEffect runEffect :: Monad m => Effect m r -> m r ```
-
No idea what this
Effect
thing is, butm r
is definitely becomingIO ()
, so I should probably try to modify the body which is being passed torunEffect
so that it has typeEffect IO ByteString
instead ofEffect IO ()
. I think this is more likely to work than to try to extract a ByteString from the something-something-ByteString, because the fact thatrunEffect
exists and is used in this example is a sign that it's probably not possible to access the ByteString without it. -
At this point, I predict that
responseBody resp
probably has typeEffect IO ByteString
, thatPB.stdout
probably has typeEffect IO ()
, and that>->
feeds one into the other. Except this can't really work, because the type ofPB.stdout
would have to specify that it expects aByteString
somehow. MaybeByteString -> Effect IO ()
? But in that case,>->
would be a strange choice of name, since it looks closer to(>=>)
than to(>>=)
. Anyway, let's look at the types of those three pieces and figure out what's going on:``` :t responseBody (undefined :: Response (Producer PB.ByteString IO ())) responseBody (undefined :: Response (Producer PB.ByteString IO ())) :: Producer PB.ByteString IO () :t (>->) (>->) :: Monad m => Pipes.Core.Proxy a' a () b m r -> Pipes.Core.Proxy () b c' c m r -> Pipes.Core.Proxy a' a c' c m r :t PB.stdout PB.stdout :: MonadIO m => Pipes.Core.Proxy () PB.ByteString y' y m () ```
-
Wow, those are some pretty long types, and
Effect m r
is nowhere to be found. MaybeEffect
is a type synonym?``` :info Effect type Effect = Pipes.Core.Proxy X () () X -- Defined in `Pipes.Core' ```
-
Yup, it's a type synonym, and I bet
Producer
is one too:``` :info Producer type Producer b = Pipes.Core.Proxy X () () b -- Defined in `Pipes.Core' ```
-
Okay, this is a bit tricky, because there are so many type variables. Let's see. We want an
Effect IO ByteString
, which desugars toProxy X () () X IO ByteString
, and we have aProducer ByteString IO ()
, which desugars toProxy X () () ByteString IO ()
. I don't know what those mysteriousX
s are for, but clearly two of those type variables must be input and output types. We know that(>->)
pipes the output ofresponseBody resp
into the input ofPB.stdout
, and we know that the intermediate type is ByteString, so looking at the position where ByteString occur in the type of those two expressions, I conclude thatProxy
's second type variable is the input type, and the fourth type variable is the output type. -
The last two type variables
m
andr
are clearly the monad under which theProxy
objects execute the computations they represent, and the type of the value returned by the entire pipeline once it has completed. -
I think I need a value of type
Proxy X ByteString () X IO ByteString
. This would be a computation which consumes a ByteString as input and, while it wouldn't produce any output which could be fed to anotherProxy
, it would cause the pipeline to terminate with that ByteString as a final result. If only hoogle supported Pipes, then I could just search for this type... -
Double-checking the type of
(>->)
, I don't think this could work after all, because(>->)
doesn't allow ther
to be changed, and theResponse
object forcesr
to be()
. Maybe we are supposed to do everything inside the Pipes ecosystem, without ever returning values to theIO
level? I guess I should read the documentation. -
Reading the few lines of hackage documentation for the
Proxy
type. -
Oooh, nice! Having input and output types for both upstream and downstream makes a lot of sense to me, because I came up with a similar design a few months ago. I wonder if Gabriel is also using a swapping semantics instead of independent
awaitUp
andyieldUp
commands? -
Reading further down, the
(~>)
and(>~)
operators catch my eye, as they allow the result typer
to be changed. -
Okay, I think I've got it. The type of
await
isConsumer' a m a
, which expands toProxy () a y' y m a
, which almost matches theProxy X ByteString () X IO ByteString
I am looking for. The first type variable is a()
instead of anX
, but I'm not even sure why I need anX
in the first place, I just copied it from the signature ofEffect
. -
Now that I see in the documentation that this
X
stands forV.Void
, I am doubly certain that I got it. In my swapping scheme,Void
was used to indicate the ends of the pipeline, and()
was used when I wanted a swap to be interpreted as a unidirectional transfer: if one side offers a()
in exchange for a value, the end result is that a value was transmitted to that side. Since I want to receive the ByteString from theResponse
object, I definitely want a()
here. -
The only nagging detail left is... do I want
(~>)
or(>~)
? Their documentation helpfully point out that I can combine aProducer
with anEffect
, anEffect
with aConsumer
, etc., but they don't mention my case, combiningProducer
with aConsumer'
. -
Okay, I probably don't want
(~>)
, as it is composing functions instead ofProxy
s. -
Whatever, with all of those type synonym and with the polymorphism of
await
(It's aConsumer'
, not aConsumer
), there is probably a case which covers the composition I want. Let's try it! -
Changing
>-> PB.stdout
to>~ await
.``` Couldn't match type `PB.ByteString' with `X' Expected type: Response (Pipes.Core.Proxy X () () X IO ()) Actual type: Response (Producer PB.ByteString IO ()) In the first argument of `responseBody', namely `resp' ```
-
Er, that's not the part of the code which I expected to cause a type error.
-
Adding the type annotations
(responseBody resp :: Producer PB.ByteString IO ())
and(await :: Consumer' PB.ByteString IO PB.ByteString)
.``` Couldn't match type `PB.ByteString' with `X' Expected type: Pipes.Core.Proxy X () () X IO () Actual type: Producer PB.ByteString IO () In the first argument of `(>~)' Couldn't match type `PB.ByteString' with `()' Expected type: Pipes.Core.Proxy () () () X IO PB.ByteString Actual type: Pipes.Core.Proxy () PB.ByteString () X IO PB.ByteString In the second argument of `(>~)' ```
-
Okay, now I need to figure out whether this means I need to tweak things a bit more, or if I am going in the wrong direction altogether.
-
Expanding the
Producer
type synonyms in order to better understand the type error.``` Couldn't match type `PB.ByteString' with `X' Expected type: Proxy X () () X IO () Actual type: Proxy X () () PB.ByteString IO () In the first argument of `(>~)' Couldn't match type `PB.ByteString' with `()' Expected type: Proxy () () () X IO PB.ByteString Actual type: Proxy () PB.ByteString () X IO PB.ByteString In the second argument of `(>~)' ```
-
This error message is telling me that
(>~)
isn't doing what I want at all. It doesn't want the first argument to send its ByteString down the pipeline, and it doesn't want the second argument to receive it! -
Re-reading the documentation for
(>~)
. -
"loops over p replacing each await with draw"? Well, the right-hand side consists of exactly one await, so I guess what I did was the same as if I had removed the
>-> PB.stdout
altogether. -
Okay, time for a new approach. Since both computations return an
r
, I wonder which of the two values is used when they are composed with(>->)
? -
Adding the
OverloadedStrings
extension, replacingresponseBody resp
withfmap (const "foo") (responseBody resp)
, and replacing(>~)
with(>->)
. -
Finally, this typechecks! I have a computation of type
IO PB.ByteString
! -
Running the computation yields the source of example.com, no "foo" to be seen. Still, this can't be the canonical way to do this; what if the return type was a value which was difficult to construct? Let's try doing everything inside the Pipes system instead.
-
In particular, everything including the output should be done inside the Pipes system. Since we're going to output something other than a single
ByteString
, I need something more general thanPB.stdout
. -
Reverting to the initial version, the one with
>-> PB.stdout
and typeIO ()
. -
Since
Proxy
is a monad transformer, it shouldn't be too hard to create a customProxy
computation which receives and prints any kind of input.``` printResult :: Show a => Consumer a IO () printResult = do x <- await lift $ print x ```
-
Replacing
PB.stdout
withprintResult
. The string representation of the source of example.com is printed. Good! -
Okay, now that I can plug in arbitrary IO code inside a
Proxy
pipeline, I feel like the rest of this exercise is going to be a walk in the park. -
Next snippet:
Pipes.Text.Encoding.decodeUtf8
. I probably need to install yet another package. Based on the name of the previous packges, I guess this one is called "pipes-text"? -
cabal install pipes-text
-
Looks good. What is the type of this new snippet?
``` :t decodeUtf8 decodeUtf8 :: Monad m => Pipes.Core.Producer ByteString m r -> Pipes.Core.Producer Text m (Pipes.Core.Producer ByteString m r) ```
-
Weird, I would have expected some kind of transducer accepting a ByteString and producing a Text, not a function returning a producer of producers. Let's look at the documentation.
-
Oh my, the documentation talks about lenses now. Sounds like the
Pipes
ecosystem requires quite a bit of prior knowledge from its users! -
"[...] if the first bytes are already un-decodable, the whole ByteString producer will be returned". I'm pretty sure that won't happen, so I want to discard the
Producer ByteString
returned by theProducer Text
. That's what thefmap (const ())
you've been told to use must be doing. -
Wait, but what happens if all bytes are decodable? Does it return a producer which doesn't produce any bytes? I would have expected to receive an
Either (Producer ByteString IO r) r
, not unconditionally receiving a producer. Maybe in case of success,decodeUtf8
doesn't return at all? -
Let's test this.
``` asUtf8 :: Producer PB.ByteString IO () -> Producer Text IO () asUtf8 p = do _ <- decodeUtf8 p lift $ putStrLn "decodeUtf8 has returned" ```
-
Replacing
responseBody resp
withasUtf8 (responseBody resp)
. The string "decodeUtf8" is not printed. -
The monadic semantics of
Proxy
seem really weird. I knew it was possible for monads such asMaybe
to abort in case of failure, but now the computation is aborting in case of success? What? -
Maybe the upstream computation doesn't execute until the end in the case where the downstream computation doesn't force it to continue evaluating (pull model). Let's test this hypothesis:
``` produce3 :: Producer Int IO () produce3 = do lift $ putStrLn "producing 1." yield 1 lift $ putStrLn "producing 2." yield 2 lift $ putStrLn "producing 3." yield 3 lift $ putStrLn "done producing." consume2 :: Consumer Int IO () consume2 = do lift $ putStrLn "consuming once:" x <- await lift $ printf "received %d. consuming twice:\n" x y <- await lift $ printf "received %d. done consuming.\n" y -- | -- >>> main -- consuming once: -- producing 1. -- received 1. consuming twice: -- producing 2. -- received 2. done consuming. main :: IO () main = runEffect (produce3 >-> consume2) ```
-
Youhoo! Not only was my hypothesis correct, but this compiled the first time. I'm getting the hang of the Pipes library!
-
Alright, last snippet:
Pipes.Text.lines
. What is its type?``` :t Pipes.Text.lines Pipes.Text.lines :: (Monad m, Functor f, Profunctor p) => p (FreeT (Pipes.Core.Producer Text m) m r) (f (FreeT (Pipes.Core.Producer Text m) m r)) -> p (Pipes.Core.Producer Text m r) (f (Pipes.Core.Producer Text m r)) ```
-
Free monads and profunctors?! Just as I thought the Pipes library wasn't that hard...
-
I think I know what
FreeT
andProfunctor
do, but let's just read the documentation, in case they already simplify the type for common cases. -
Okay, looks like I just need to apply
view
toPipes.Text.Lines
in order to obtain what I hope is aPipe
. But where isview
defined? -
The example imports
Lens.Family
, which is probably from one of the lens packages. But which one? -
google "Lens.Family", discover that one of the lens packages is called "lens-family". Probably this one :)
-
cabal install lens-family
``` :t view Pipes.Text.lines view Pipes.Text.lines :: Monad m => Pipes.Core.Producer Text m r -> FreeT (Pipes.Core.Producer Text m) m r ```
-
I don't understand why all those
Pipes
primitives are functions which return producers and stuff. In order to be composed inside aProxy
pipeline, shouldn't almost everything be aProxy something a something b m r
instead of ana -> something Producer b
? -
Okay, how do I convert this
FreeT
into a[Text]
? I know that a free monad is a tree obtained by nesting occurrences of a given functor, which in this case isProducer Text m
, I guess. So we have a tree of producers, wow. -
Oh, wait, but this isn't just a free monad, it is a free monad transformer. So, in addition to the abilities of the inner monad, it can also do, er... what's the computational interpretation of a free monad?
-
Maybe it would be easier to figure out the container interpretation of a monad transformer. Maybe just container composition?
runFreeT
has typeFreeT f m a -> m (FreeF f a (FreeT f m a))
, so it's anm
-shaped container ofFreeF something
values, which are themselvesf
-shaped containers ofFreeT
values, which is what we're trying to understand in the first place. I think it just means that them
andf
alternate:FreeT f m a ~ m (f (m (f ... a)))
. -
Okay, so in our case the
f
is a producer, and them
is IO. So I should be able to run anIO
action which returns a producer, produce its values until it returns anotherIO
action, from which I can obtain the next producer, and so on. -
Let's write a utility function to do just that:
``` free2list :: Monad m => FreeT (Producer b m) m r -> Producer b m r free2list freeProducers = do freeProducer <- lift $ runFreeT freeProducers case freeProducer of Pure r -> return r Free p -> do nextProducers <- p free2list nextProducers ```
-
I must admit that it took me a while to get the above to compile. In the end, it is replacing the abstract
m
,b
andr
variables with concrete types which made the error messages much easier to understand, and allowed me to complete the implementation. -
Replacing
asUtf8 (responseBody resp)
withfree2list (view lines (asUtf8 (responseBody resp)))
. Now the output is only the first line of example.com, which probably means that my line splitting is okay, but my pretty printing is not, becauseprintResult
stops after the first result. -
How should
printResult
know how many lines to fetch? What happens if it fetches too much? Let's find out.``` printAllResults :: Show a => Int -> Consumer a IO () printAllResults i = do x <- await lift $ printf "%d %s\n" i (show x) printAllResults (i + 1) ```
-
When too many lines are being fetched,
decodeUtf8
does return, even though the text didn't contain any invalid byte sequences.``` 48 [...] 49 "</body>" 50 "</html>" decodeUtf8 has returned () ```
-
I still don't know how to distinguish between
decodeUtf8
returning a producer because there was an error, and returning a producer because all the bytes were successfully parsed. In case of success, I guess the producer will not produce any more bytes, but how do we check this? So far, the only thing we have done with producers is to call them, causing them to produce, but not to return their values. What happens if we try to await a value from the producer inside the monadic code?``` asUtf8 :: Producer PB.ByteString IO () -> Producer Text IO () asUtf8 p = do p' <- decodeUtf8 p p' >-> do _ <- await lift $ putStrLn "decodeUtf8 couldn't parse the text" ```
-
I run the resulting producer, and I try to await a value from it. If I succeed, it means it was non-empty, so there was an error. It works!
Here is the full code:
```
module Main where
import Prelude hiding (lines)
import Lens.Family
import Pipes
import Pipes.HTTP
import Pipes.Text
import Pipes.Text.Encoding
import qualified Pipes.ByteString as PB
getHttpStream :: IO ()
getHttpStream = do
req <- parseUrl "http://www.example.com/"
withManager tlsManagerSettings $ \m ->
withHTTP req m $ \resp -> do
runEffect $ free2list (view lines (asUtf8 (responseBody resp)))
>-> printAllResults 1
asUtf8 :: Producer PB.ByteString IO ()
-> Producer Text IO ()
asUtf8 p = do
p' <- decodeUtf8 p
p' >-> do _ <- await
lift $ putStrLn "decodeUtf8 couldn't parse the text"
free2list :: Monad m
=> FreeT (Producer b m) m r
-> Producer b m r
free2list freeProducers = do
freeProducer <- lift $ runFreeT freeProducers
case freeProducer of
Pure r -> return r
Free p -> do nextProducers <- p
free2list nextProducers
printResult :: Show a => Consumer a IO ()
printResult = do
x <- await
lift $ print x
printAllResults :: Show a => Int -> Consumer a IO ()
printAllResults i = do
x <- await
lift $ putStrLn $ show i ++ " " ++ show x
printAllResults (i + 1)
main :: IO ()
main = getHttpStream >>= print
```
My goal was to combine the OP's snippets together, with no particular concern for the output, but yes, the intent of the full code was to prefix each line with its line number. Thanks for fixing and explaining my defect!