Created
April 28, 2016 08:16
-
-
Save channingwalton/545d9f58662172a97e32e518adac8d3a to your computer and use it in GitHub Desktop.
Some concurrency testing for Rx
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package reactive | |
import java.util.concurrent.TimeUnit | |
import rx.lang.scala.{Observable, Subject} | |
import scala.concurrent.duration.Duration | |
/** | |
* The problem: can two threads banging away from two sources be merged and have predictable results? | |
* | |
* In the first example we use 'merge' | |
* | |
* In the second we use a subject into which the generators push values. | |
* | |
* The test is to see if a buffer collecting the resulting values has everything we expect | |
* | |
* The results show that merging is successfully, but using a Subject is not. | |
*/ | |
object RxScalaTinkering1 extends App { | |
var buffer: List[Gen] = Nil | |
case class Gen(g: String, v: Long) | |
val generator1: Observable[Gen] = | |
Observable.interval(Duration(10, TimeUnit.MILLISECONDS)).map(x ⇒ Gen("g1", x)) | |
val generator2: Observable[Gen] = | |
Observable.interval(Duration(10, TimeUnit.MILLISECONDS)).map(x ⇒ Gen("g2", x)) | |
val sub = generator1.merge(generator2).subscribe(v ⇒ { buffer = v :: buffer }) | |
Thread.sleep(1000) | |
val result = buffer.reverse | |
val g1 = result.filter(_.g == "g1").map(_.v) | |
val g2 = result.filter(_.g == "g2").map(_.v) | |
println(g1.zipWithIndex) | |
println(g2.zipWithIndex) | |
println("g1 " + g1.zipWithIndex.forall { case (v, i) ⇒ v == i }) | |
println("g2 " + g2.zipWithIndex.forall { case (v, i) ⇒ v == i }) | |
} | |
/** | |
* Using a Subject. | |
*/ | |
object RxScalaTinkering2 extends App { | |
var buffer: List[Gen] = Nil | |
val subject = Subject[Gen]() | |
val subscription = subject.subscribe(v ⇒ buffer = v :: buffer) | |
case class Gen(g: String, v: Long) | |
val generator1: Observable[Gen] = | |
Observable.interval(Duration(10, TimeUnit.MILLISECONDS)).map(x ⇒ Gen("g1", x)) | |
generator1.subscribe(subject.onNext _) | |
val generator2: Observable[Gen] = | |
Observable.interval(Duration(10, TimeUnit.MILLISECONDS)).map(x ⇒ Gen("g2", x)) | |
generator2.subscribe(subject.onNext _) | |
Thread.sleep(1000) | |
val result = buffer.reverse | |
val g1 = result.filter(_.g == "g1").map(_.v) | |
val g2 = result.filter(_.g == "g2").map(_.v) | |
println(g1.zipWithIndex) | |
println(g2.zipWithIndex) | |
println("g1 " + g1.zipWithIndex.forall { case (v, i) ⇒ v == i }) | |
println("g2 " + g2.zipWithIndex.forall { case (v, i) ⇒ v == i }) | |
} | |
/** | |
* Using two subjects and merging is successful | |
*/ | |
object RxScalaTinkering3 extends App { | |
var buffer: List[Gen] = Nil | |
val subject1 = Subject[Gen]() | |
val subject2 = Subject[Gen]() | |
val subscription = subject1.merge(subject2).subscribe(v ⇒ buffer = v :: buffer) | |
case class Gen(g: String, v: Long) | |
val generator1: Observable[Gen] = | |
Observable.interval(Duration(10, TimeUnit.MILLISECONDS)).map(x ⇒ Gen("g1", x)) | |
generator1.subscribe(subject1.onNext _) | |
val generator2: Observable[Gen] = | |
Observable.interval(Duration(10, TimeUnit.MILLISECONDS)).map(x ⇒ Gen("g2", x)) | |
generator2.subscribe(subject2.onNext _) | |
Thread.sleep(1000) | |
val result = buffer.reverse | |
val g1 = result.filter(_.g == "g1").map(_.v) | |
val g2 = result.filter(_.g == "g2").map(_.v) | |
println(g1.zipWithIndex) | |
println(g2.zipWithIndex) | |
println("g1 " + g1.zipWithIndex.forall { case (v, i) ⇒ v == i }) | |
println("g2 " + g2.zipWithIndex.forall { case (v, i) ⇒ v == i }) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment