Skip to content

Instantly share code, notes, and snippets.

@joshcough
Last active August 29, 2015 14:28
Show Gist options
  • Save joshcough/75d9be7e68f5f19f81d9 to your computer and use it in GitHub Desktop.
Save joshcough/75d9be7e68f5f19f81d9 to your computer and use it in GitHub Desktop.
package processes
import scalaz.concurrent.Task
import scalaz.stream._
object TestServer {
def main(args: Array[String]): Unit = {
val reader = Process((1 to 10):_*)
val resultSink: Sink[Task, Int] = sink lift { i => Task(println("i=" + i)) }
def writer(p: Process[Task, Int]): Process[Task, Unit] = p to resultSink
// main loop
StreamingServer.serve(reader)(writer)
}
}
// TODO: make queue size and nr read and write threads configurable.
object StreamingServer {
def serve[A](reader: Process[Task, A])
(writer: Process[Task, A] => Process[Task, Unit]) : Unit = {
def merger[T](p: Process[Task, T]): Process[Task, T] =
merge.mergeN(10)(Process.emit(p).repeat)
val queue = async.boundedQueue[A](100)
val emitter = queue.dequeueAvailable.flatMap(Process.emitAll)
val readers = merger(reader) to queue.enqueue
val writers = merger(writer(emitter))
val server = readers.wye(writers)(wye.mergeHaltBoth)
server.run.run
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment