Skip to content

Instantly share code, notes, and snippets.

@purefn
Created May 20, 2011 20:31
Show Gist options
  • Save purefn/983737 to your computer and use it in GitHub Desktop.
Save purefn/983737 to your computer and use it in GitHub Desktop.
print the first 10 chunks of input
import scalaz.{Failure => _, _}
import Scalaz._
import effects._
import iteratees._
import java.io._
object Head {
def main(args: Array[String]) {
val enum = enumStream[Seq[Byte], IO]((1 to 50).toStream.map(i => ("line " + i + "\n").getBytes.toSeq))
val enee = take[Seq[Byte], IO, Unit](10)
val iter = enee(iterOutputStream(System.out))
println("=== without join ===")
val nonJoinedIo = iter <<: enum flatMap (_.run) flatMap (_.run) except printStackTrace
nonJoinedIo.unsafePerformIO
println("=== with join ===")
val joinedIo = joinI(iter) <<: enum flatMap (_.run) except printStackTrace
joinedIo.unsafePerformIO
}
def enumStream[C, M[_]](s: Stream[C]) = new Enumerator[C, M] {
def apply[A](iter: Iteratee[C, M, A])(implicit m: Monad[M]) = {
def loop(s: Stream[C], iter: Iteratee[C, M, A]): M[Iteratee[C, M, A]] = s match {
case x #:: xs => iter.fold(
done = (_, _) => iter.pure[M],
error = (_, _) => iter.pure[M],
cont = k => loop(xs, k(Chunk(x))))
case _ => iter.pure[M]
}
loop(s, iter)
}
}
def iterOutputStream(os: OutputStream): Iteratee[Seq[Byte], IO, Unit] = {
val write: Seq[Byte] => IO[Unit] = bs => IO(rw => (rw, os.write(bs.toArray)))
def step: Input[Seq[Byte]] => Iteratee[Seq[Byte], IO, Unit] = in => in match {
case EmptyChunk() => Cont(step)
case Chunk(c) => FlattenI(write(c) >>=| Cont(step).pure[IO])
case EOF(None) => Done((), in)
case EOF(Some(err)) => Failure(err, in)
}
Cont(step)
}
def printStackTrace(e: Throwable) = IO(rw => (rw, e.printStackTrace))
implicit def iterateeAsMA[A, M[_], B](iter: Iteratee[A, M, B]) = ma[({type λ[α]=Iteratee[A, M, α]})#λ, B](iter)
/**
* Read n elements from a stream and apply the given iteratee to the
* stream of the read elements. Unless the stream is terminated early, we
* read exactly n elements, even if the iteratee has accepted fewer.
*/
def take[C: EmptyChunk, M[_], A](n: Int): Enumeratee[C, C, M, A] = new Enumeratee[C, C, M, A] {
def apply(iter: Iteratee[C, M, A])(implicit m: Monad[M]): Iteratee[C, M, Iteratee[C, M, A]] = {
def step(n: Int, k: Input[C] => Iteratee[C, M, A]): Input[C] => Iteratee[C, M, Iteratee[C, M, A]] = in => in match {
case EmptyChunk() => Cont(step(n, k))
case Chunk(c) => take(n-1) apply k(in)
case EOF(None) => Done(k(in), in)
case EOF(Some(err)) => Failure(err, in)
}
if (n <= 0) Done(iter, EmptyChunk[C])
else FlattenI(iter.fold[Iteratee[C, M, Iteratee[C, M, A]]](
done = (a, in) =>
(drop(n) >>=| Done(Done(a, EmptyChunk[C]), EmptyChunk[C])).pure,
error = (err, in) => Done(Failure[C, M, A](err, in), EmptyChunk[C]).pure,
cont = k =>
if (n == 0) Done(Cont(k), EmptyChunk[C]).pure
else Cont(step(n, k)).pure
))
}
}
def drop[C, M[_]: Monad](n: Int): Iteratee[C, M, Unit] = {
def step(n: Int): Input[C] => Iteratee[C, M, Unit] = in => in match {
case Chunk(c) => Cont(step(n-1))
case EOF(None) => Done((), in)
case EOF(Some(err)) => Failure(err, in)
}
Cont(step(n))
}
def joinI[CFrom: EmptyChunk, CTo, M[_]: Monad, A](outer: Iteratee[CFrom, M, Iteratee[CTo, M, A]]): Iteratee[CFrom, M, A] = {
val onDone: (A, Input[CTo]) => M[Iteratee[CFrom, M, A]] = (a, i) => Done(a, EmptyChunk[CFrom]).pure
val onError: (Error, Input[CTo]) => M[Iteratee[CFrom, M, A]] = (e, _) => Failure(e, EmptyChunk[CFrom]).pure
outer flatMap { inner => FlattenI(inner.fold(
done = onDone,
error = onError,
cont = k => k(EOF(None)).fold(
done = onDone,
error = onError,
cont = k => sys.error("joinI: Divergent iteratee!"))))
}
}
}
@purefn
Copy link
Author

purefn commented May 20, 2011

This prints

=== without join ===
line 1
line 2
line 3
line 4
line 5
line 6
line 7
line 8
line 9
line 10
=== with join ===
line 1
line 2
line 3
line 4
line 5
line 6
line 7
line 8
line 9
line 10
line 10
line 10
line 10
line 10
line 10

When using joinI, "line 10" is printed multiple times. It is only printed once when not using joinI which makes me pretty confident that 'take' is working as it should be and joinI is the problem.

@purefn
Copy link
Author

purefn commented May 20, 2011

Grr. Ok, found out a little more. It actually looks like joinI might be ok. The write in the iterOutputStream for "line 10" is only actually happening once. Now I'm starting to suspect the problem is at a lower level, like in the IO monad or something.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment