Skip to content

Instantly share code, notes, and snippets.

@rsuniev
Forked from purefn/Head.scala
Created June 3, 2011 18:39
Show Gist options
  • Save rsuniev/1006900 to your computer and use it in GitHub Desktop.
Save rsuniev/1006900 to your computer and use it in GitHub Desktop.
print the first 10 chunks of input
import scalaz.{Failure => _, _}
import Scalaz._
import effects._
import iteratees._
import java.io._
object Head {
def main(args: Array[String]) {
val enum = enumStream[Seq[Byte], IO]((1 to 50).toStream.map(i => ("line " + i + "\n").getBytes.toSeq))
val enee = take[Seq[Byte], IO, Unit](10)
val iter = enee(iterOutputStream(System.out))
println("=== without join ===")
val nonJoinedIo = iter <<: enum flatMap (_.run) flatMap (_.run) except printStackTrace
nonJoinedIo.unsafePerformIO
println("=== with join ===")
val joinedIo = joinI(iter) <<: enum flatMap (_.run) except printStackTrace
joinedIo.unsafePerformIO
}
def enumStream[C, M[_]](s: Stream[C]) = new Enumerator[C, M] {
def apply[A](iter: Iteratee[C, M, A])(implicit m: Monad[M]) = {
def loop(s: Stream[C], iter: Iteratee[C, M, A]): M[Iteratee[C, M, A]] = s match {
case x #:: xs => iter.fold(
done = (_, _) => iter.pure[M],
error = (_, _) => iter.pure[M],
cont = k => loop(xs, k(Chunk(x))))
case _ => iter.pure[M]
}
loop(s, iter)
}
}
def iterOutputStream(os: OutputStream): Iteratee[Seq[Byte], IO, Unit] = {
val write: Seq[Byte] => IO[Unit] = bs => IO(rw => (rw, os.write(bs.toArray)))
def step: Input[Seq[Byte]] => Iteratee[Seq[Byte], IO, Unit] = in => in match {
case EmptyChunk() => Cont(step)
case Chunk(c) => FlattenI(write(c) >>=| Cont(step).pure[IO])
case EOF(None) => Done((), in)
case EOF(Some(err)) => Failure(err, in)
}
Cont(step)
}
def printStackTrace(e: Throwable) = IO(rw => (rw, e.printStackTrace))
implicit def iterateeAsMA[A, M[_], B](iter: Iteratee[A, M, B]) = ma[({type λ[α]=Iteratee[A, M, α]})#λ, B](iter)
/**
* Read n elements from a stream and apply the given iteratee to the
* stream of the read elements. Unless the stream is terminated early, we
* read exactly n elements, even if the iteratee has accepted fewer.
*/
def take[C: EmptyChunk, M[_], A](n: Int): Enumeratee[C, C, M, A] = new Enumeratee[C, C, M, A] {
def apply(iter: Iteratee[C, M, A])(implicit m: Monad[M]): Iteratee[C, M, Iteratee[C, M, A]] = {
def step(n: Int, k: Input[C] => Iteratee[C, M, A]): Input[C] => Iteratee[C, M, Iteratee[C, M, A]] = in => in match {
case EmptyChunk() => Cont(step(n, k))
case Chunk(c) => take(n-1) apply k(in)
case EOF(None) => Done(k(in), in)
case EOF(Some(err)) => Failure(err, in)
}
if (n <= 0) Done(iter, EmptyChunk[C])
else FlattenI(iter.fold[Iteratee[C, M, Iteratee[C, M, A]]](
done = (a, in) =>
(drop(n) >>=| Done(Done(a, EmptyChunk[C]), EmptyChunk[C])).pure,
error = (err, in) => Done(Failure[C, M, A](err, in), EmptyChunk[C]).pure,
cont = k =>
if (n == 0) Done(Cont(k), EmptyChunk[C]).pure
else Cont(step(n, k)).pure
))
}
}
def drop[C, M[_]: Monad](n: Int): Iteratee[C, M, Unit] = {
def step(n: Int): Input[C] => Iteratee[C, M, Unit] = in => in match {
case Chunk(c) => Cont(step(n-1))
case EOF(None) => Done((), in)
case EOF(Some(err)) => Failure(err, in)
}
Cont(step(n))
}
def joinI[CFrom: EmptyChunk, CTo, M[_]: Monad, A](outer: Iteratee[CFrom, M, Iteratee[CTo, M, A]]): Iteratee[CFrom, M, A] = {
val onDone: (A, Input[CTo]) => M[Iteratee[CFrom, M, A]] = (a, i) => Done(a, EmptyChunk[CFrom]).pure
val onError: (Error, Input[CTo]) => M[Iteratee[CFrom, M, A]] = (e, _) => Failure(e, EmptyChunk[CFrom]).pure
outer flatMap { inner => FlattenI(inner.fold(
done = onDone,
error = onError,
cont = k => k(EOF(None)).fold(
done = onDone,
error = onError,
cont = k => sys.error("joinI: Divergent iteratee!"))))
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment