Skip to content

Instantly share code, notes, and snippets.

@stucchio
Created September 11, 2014 16:35
Show Gist options
  • Save stucchio/e096ed7bc7b9a7c130e1 to your computer and use it in GitHub Desktop.
Save stucchio/e096ed7bc7b9a7c130e1 to your computer and use it in GitHub Desktop.
Stream benchmark
package benchmark
import scala.concurrent._
import scala.concurrent.ExecutionContext
import scalaz._
import Scalaz._
import scalaz.stream._
import scalaz.stream.async._
import scalaz.concurrent.{Task, Strategy}
import scala.concurrent.duration._
object StreamBenchmark {
implicit object ListCatchable extends Catchable[List] {
def attempt[A](f: List[A]) = f.map(x => \/-(x))
def fail[A](err: Throwable) = List()
}
def sendReceiveTest = {
val q = async.unboundedQueue[Wrapper]
val intermediate = q.dequeue.scan(Wrapper(0,0))( (state, w) => (state + w) )
var finishTime: Long = 0
val finishCount = Benchmark.MAX
val result = intermediate.scan(0L)( (count, w) => {
if (count == finishCount) {
finishTime = System.currentTimeMillis
}
count + 1
})
val startTime: Long = System.currentTimeMillis()
future({
println("Started production")
var i: Long = 0
while(i < Benchmark.MAX+1) {
q.enqueueOne(Wrapper(i, -i)).run
i += 1
}
q.close.run
println("Finished production")
})(ExecutionContext.global)
println("Starting consumption")
result.run.run
println("Finished consumption")
Thread.sleep(10000)
println("Start time: " + startTime)
println("End time: " + finishTime)
println("Delta: " + (finishTime - startTime))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment