-
-
Save rsuniev/1006900 to your computer and use it in GitHub Desktop.
print the first 10 chunks of input
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import scalaz.{Failure => _, _} | |
import Scalaz._ | |
import effects._ | |
import iteratees._ | |
import java.io._ | |
object Head { | |
def main(args: Array[String]) { | |
val enum = enumStream[Seq[Byte], IO]((1 to 50).toStream.map(i => ("line " + i + "\n").getBytes.toSeq)) | |
val enee = take[Seq[Byte], IO, Unit](10) | |
val iter = enee(iterOutputStream(System.out)) | |
println("=== without join ===") | |
val nonJoinedIo = iter <<: enum flatMap (_.run) flatMap (_.run) except printStackTrace | |
nonJoinedIo.unsafePerformIO | |
println("=== with join ===") | |
val joinedIo = joinI(iter) <<: enum flatMap (_.run) except printStackTrace | |
joinedIo.unsafePerformIO | |
} | |
def enumStream[C, M[_]](s: Stream[C]) = new Enumerator[C, M] { | |
def apply[A](iter: Iteratee[C, M, A])(implicit m: Monad[M]) = { | |
def loop(s: Stream[C], iter: Iteratee[C, M, A]): M[Iteratee[C, M, A]] = s match { | |
case x #:: xs => iter.fold( | |
done = (_, _) => iter.pure[M], | |
error = (_, _) => iter.pure[M], | |
cont = k => loop(xs, k(Chunk(x)))) | |
case _ => iter.pure[M] | |
} | |
loop(s, iter) | |
} | |
} | |
def iterOutputStream(os: OutputStream): Iteratee[Seq[Byte], IO, Unit] = { | |
val write: Seq[Byte] => IO[Unit] = bs => IO(rw => (rw, os.write(bs.toArray))) | |
def step: Input[Seq[Byte]] => Iteratee[Seq[Byte], IO, Unit] = in => in match { | |
case EmptyChunk() => Cont(step) | |
case Chunk(c) => FlattenI(write(c) >>=| Cont(step).pure[IO]) | |
case EOF(None) => Done((), in) | |
case EOF(Some(err)) => Failure(err, in) | |
} | |
Cont(step) | |
} | |
def printStackTrace(e: Throwable) = IO(rw => (rw, e.printStackTrace)) | |
implicit def iterateeAsMA[A, M[_], B](iter: Iteratee[A, M, B]) = ma[({type λ[α]=Iteratee[A, M, α]})#λ, B](iter) | |
/** | |
* Read n elements from a stream and apply the given iteratee to the | |
* stream of the read elements. Unless the stream is terminated early, we | |
* read exactly n elements, even if the iteratee has accepted fewer. | |
*/ | |
def take[C: EmptyChunk, M[_], A](n: Int): Enumeratee[C, C, M, A] = new Enumeratee[C, C, M, A] { | |
def apply(iter: Iteratee[C, M, A])(implicit m: Monad[M]): Iteratee[C, M, Iteratee[C, M, A]] = { | |
def step(n: Int, k: Input[C] => Iteratee[C, M, A]): Input[C] => Iteratee[C, M, Iteratee[C, M, A]] = in => in match { | |
case EmptyChunk() => Cont(step(n, k)) | |
case Chunk(c) => take(n-1) apply k(in) | |
case EOF(None) => Done(k(in), in) | |
case EOF(Some(err)) => Failure(err, in) | |
} | |
if (n <= 0) Done(iter, EmptyChunk[C]) | |
else FlattenI(iter.fold[Iteratee[C, M, Iteratee[C, M, A]]]( | |
done = (a, in) => | |
(drop(n) >>=| Done(Done(a, EmptyChunk[C]), EmptyChunk[C])).pure, | |
error = (err, in) => Done(Failure[C, M, A](err, in), EmptyChunk[C]).pure, | |
cont = k => | |
if (n == 0) Done(Cont(k), EmptyChunk[C]).pure | |
else Cont(step(n, k)).pure | |
)) | |
} | |
} | |
def drop[C, M[_]: Monad](n: Int): Iteratee[C, M, Unit] = { | |
def step(n: Int): Input[C] => Iteratee[C, M, Unit] = in => in match { | |
case Chunk(c) => Cont(step(n-1)) | |
case EOF(None) => Done((), in) | |
case EOF(Some(err)) => Failure(err, in) | |
} | |
Cont(step(n)) | |
} | |
def joinI[CFrom: EmptyChunk, CTo, M[_]: Monad, A](outer: Iteratee[CFrom, M, Iteratee[CTo, M, A]]): Iteratee[CFrom, M, A] = { | |
val onDone: (A, Input[CTo]) => M[Iteratee[CFrom, M, A]] = (a, i) => Done(a, EmptyChunk[CFrom]).pure | |
val onError: (Error, Input[CTo]) => M[Iteratee[CFrom, M, A]] = (e, _) => Failure(e, EmptyChunk[CFrom]).pure | |
outer flatMap { inner => FlattenI(inner.fold( | |
done = onDone, | |
error = onError, | |
cont = k => k(EOF(None)).fold( | |
done = onDone, | |
error = onError, | |
cont = k => sys.error("joinI: Divergent iteratee!")))) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment