Last active
June 13, 2016 13:48
-
-
Save caoilte/3963b9158a55ca333d0a5018c95dceca to your computer and use it in GitHub Desktop.
scalaz-stream example of problem for Stack Overflow question
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.concurrent.Executors | |
import scalaz.concurrent.{Strategy, Task} | |
import scalaz.stream._ | |
import scalaz.stream.async.mutable.Queue | |
object Processes { | |
val source: Process[Task, String] = Process.emitAll(0 until 10).map(_.toString()) | |
val slowProcessor1: Channel[Task, String, String] = | |
Utils.simulatedWork(100, data => data + "1") | |
val slowProcessor2: Channel[Task, String, String] = | |
Utils.simulatedWork(100, data => data + "2") | |
val sink: Sink[Task, String] = Process.constant { a:String => Task{ println(a)}} | |
} | |
object Simple { | |
import Processes._ | |
val batchJob: Task[Unit] = | |
(source through slowProcessor1 through slowProcessor2 to sink).run | |
} | |
object WithQueue { | |
import Processes._ | |
val buffer:Queue[String] = async.boundedQueue[String](5, false) | |
val enqueueOnBuffer: Channel[Task, String, Unit] = channel.lift(buffer.enqueueOne) | |
val slowProcess1AndEnqueueOnBuffer: Process[Task, Unit] = | |
(source through slowProcessor1 through enqueueOnBuffer).onComplete(Process.eval(buffer.close)) | |
val batchJob: Task[Seq[Unit]] = Task.gatherUnordered(Seq( | |
slowProcess1AndEnqueueOnBuffer.run, | |
(buffer.dequeue through slowProcessor2 to sink).run | |
)) | |
} | |
object Utils { | |
// Initialise really large ExecutorService to ensure Thread.sleep doesn't block work | |
implicit val es = Executors.newFixedThreadPool(15, Strategy.DefaultDaemonThreadFactory) | |
def simulatedWork(millis: Long, work: String => String): Channel[Task, String, String] = { | |
val delayedTask: String => Task[String] = input => Task { | |
Thread.sleep(millis) | |
work(input) | |
} | |
channel.lift(delayedTask) | |
} | |
def runTaskTimed(task: Task[_]): Unit = { | |
val s = System.nanoTime | |
task.run | |
println("time: " + (System.nanoTime - s) / 1e6 + "ms") | |
} | |
} | |
object Demo extends App { | |
println("First I will run the simple batch job") | |
Utils.runTaskTimed(Simple.batchJob) | |
println() | |
println("Now I will run the batch job utilising a queue") | |
Utils.runTaskTimed(WithQueue.batchJob) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment