Created
May 20, 2011 20:31
-
-
Save purefn/983737 to your computer and use it in GitHub Desktop.
print the first 10 chunks of input
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import scalaz.{Failure => _, _} | |
import Scalaz._ | |
import effects._ | |
import iteratees._ | |
import java.io._ | |
object Head { | |
def main(args: Array[String]) { | |
val enum = enumStream[Seq[Byte], IO]((1 to 50).toStream.map(i => ("line " + i + "\n").getBytes.toSeq)) | |
val enee = take[Seq[Byte], IO, Unit](10) | |
val iter = enee(iterOutputStream(System.out)) | |
println("=== without join ===") | |
val nonJoinedIo = iter <<: enum flatMap (_.run) flatMap (_.run) except printStackTrace | |
nonJoinedIo.unsafePerformIO | |
println("=== with join ===") | |
val joinedIo = joinI(iter) <<: enum flatMap (_.run) except printStackTrace | |
joinedIo.unsafePerformIO | |
} | |
def enumStream[C, M[_]](s: Stream[C]) = new Enumerator[C, M] { | |
def apply[A](iter: Iteratee[C, M, A])(implicit m: Monad[M]) = { | |
def loop(s: Stream[C], iter: Iteratee[C, M, A]): M[Iteratee[C, M, A]] = s match { | |
case x #:: xs => iter.fold( | |
done = (_, _) => iter.pure[M], | |
error = (_, _) => iter.pure[M], | |
cont = k => loop(xs, k(Chunk(x)))) | |
case _ => iter.pure[M] | |
} | |
loop(s, iter) | |
} | |
} | |
def iterOutputStream(os: OutputStream): Iteratee[Seq[Byte], IO, Unit] = { | |
val write: Seq[Byte] => IO[Unit] = bs => IO(rw => (rw, os.write(bs.toArray))) | |
def step: Input[Seq[Byte]] => Iteratee[Seq[Byte], IO, Unit] = in => in match { | |
case EmptyChunk() => Cont(step) | |
case Chunk(c) => FlattenI(write(c) >>=| Cont(step).pure[IO]) | |
case EOF(None) => Done((), in) | |
case EOF(Some(err)) => Failure(err, in) | |
} | |
Cont(step) | |
} | |
def printStackTrace(e: Throwable) = IO(rw => (rw, e.printStackTrace)) | |
implicit def iterateeAsMA[A, M[_], B](iter: Iteratee[A, M, B]) = ma[({type λ[α]=Iteratee[A, M, α]})#λ, B](iter) | |
/** | |
* Read n elements from a stream and apply the given iteratee to the | |
* stream of the read elements. Unless the stream is terminated early, we | |
* read exactly n elements, even if the iteratee has accepted fewer. | |
*/ | |
def take[C: EmptyChunk, M[_], A](n: Int): Enumeratee[C, C, M, A] = new Enumeratee[C, C, M, A] { | |
def apply(iter: Iteratee[C, M, A])(implicit m: Monad[M]): Iteratee[C, M, Iteratee[C, M, A]] = { | |
def step(n: Int, k: Input[C] => Iteratee[C, M, A]): Input[C] => Iteratee[C, M, Iteratee[C, M, A]] = in => in match { | |
case EmptyChunk() => Cont(step(n, k)) | |
case Chunk(c) => take(n-1) apply k(in) | |
case EOF(None) => Done(k(in), in) | |
case EOF(Some(err)) => Failure(err, in) | |
} | |
if (n <= 0) Done(iter, EmptyChunk[C]) | |
else FlattenI(iter.fold[Iteratee[C, M, Iteratee[C, M, A]]]( | |
done = (a, in) => | |
(drop(n) >>=| Done(Done(a, EmptyChunk[C]), EmptyChunk[C])).pure, | |
error = (err, in) => Done(Failure[C, M, A](err, in), EmptyChunk[C]).pure, | |
cont = k => | |
if (n == 0) Done(Cont(k), EmptyChunk[C]).pure | |
else Cont(step(n, k)).pure | |
)) | |
} | |
} | |
def drop[C, M[_]: Monad](n: Int): Iteratee[C, M, Unit] = { | |
def step(n: Int): Input[C] => Iteratee[C, M, Unit] = in => in match { | |
case Chunk(c) => Cont(step(n-1)) | |
case EOF(None) => Done((), in) | |
case EOF(Some(err)) => Failure(err, in) | |
} | |
Cont(step(n)) | |
} | |
def joinI[CFrom: EmptyChunk, CTo, M[_]: Monad, A](outer: Iteratee[CFrom, M, Iteratee[CTo, M, A]]): Iteratee[CFrom, M, A] = { | |
val onDone: (A, Input[CTo]) => M[Iteratee[CFrom, M, A]] = (a, i) => Done(a, EmptyChunk[CFrom]).pure | |
val onError: (Error, Input[CTo]) => M[Iteratee[CFrom, M, A]] = (e, _) => Failure(e, EmptyChunk[CFrom]).pure | |
outer flatMap { inner => FlattenI(inner.fold( | |
done = onDone, | |
error = onError, | |
cont = k => k(EOF(None)).fold( | |
done = onDone, | |
error = onError, | |
cont = k => sys.error("joinI: Divergent iteratee!")))) | |
} | |
} | |
} |
Grr. Ok, found out a little more. It actually looks like joinI might be ok. The write in the iterOutputStream for "line 10" is only actually happening once. Now I'm starting to suspect the problem is at a lower level, like in the IO monad or something.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This prints
When using joinI, "line 10" is printed multiple times. It is only printed once when not using joinI which makes me pretty confident that 'take' is working as it should be and joinI is the problem.