Created
November 10, 2013 08:34
-
-
Save pchlupacek/7395481 to your computer and use it in GitHub Desktop.
Memory leak example
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package scalaz.stream | |
import org.scalacheck.Prop._ | |
import org.scalacheck.Properties | |
import scala.concurrent.SyncVar | |
import scalaz.\/ | |
import scalaz.stream.Process._ | |
import scalaz.stream.async.mutable.Queue | |
import java.util.concurrent.{TimeUnit, Executors} | |
object LeakTest extends Properties("leak") { | |
property("find-a-leak") = secure { | |
Thread.sleep(10000) | |
println("starting") | |
val count = 500000 | |
//two asyc queues that will be fed in from the other thread | |
val (qA, qsA) = async.queue[String] | |
val (qB, qsB) = async.queue[String] | |
val sigTerm = async.signal[Boolean] | |
val result = new SyncVar[Throwable \/ Unit] | |
// sources of A and B merged together, then chunked and flatmapped to get only head and limit the resulting elements | |
val mergedChunked = (qsA merge qsB).chunk(count / 100).flatMap { v => /*suspend(emit(println(v))).drain ++*/ emit(v.headOption.getOrElse("LAST EMPTY")) } | |
//Stream that interrupts on left side, that actually is only set intitially to false but never is set to true in this test | |
sigTerm.discrete.wye(mergedChunked)(wye.interrupt).run.runAsync(result.put) | |
sigTerm.set(false).run | |
println("Staring to feed") | |
for {i <- 0 until count} yield { | |
def put(q: Queue[String]) = { | |
q.enqueue(i.toString) | |
if (i % (count/100) == 0) println("fed "+ (i / (count / 100)) + "pct") | |
} | |
i match { | |
case each3 if each3 % 3 == 0 => put(qB) | |
case other => put(qA) | |
} | |
} | |
println("Done with feeding") | |
//refreshing the signal every single second | |
val scheduler = Executors.newScheduledThreadPool(1) | |
scheduler.scheduleAtFixedRate(new Runnable{ | |
def run() = { | |
println("Refreshing") | |
sigTerm.set(false).run | |
} | |
}, 0, 1, TimeUnit.SECONDS) | |
//just awaiting on final result | |
println(result.get(3000000)) | |
true | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment