Skip to content

Instantly share code, notes, and snippets.

@alexandru
Last active June 28, 2016 13:10
Show Gist options
  • Select an option

  • Save alexandru/55b46a140391c5d40b4a2199f8d0bae2 to your computer and use it in GitHub Desktop.

Select an option

Save alexandru/55b46a140391c5d40b4a2199f8d0bae2 to your computer and use it in GitHub Desktop.
/*
* Copyright (c) 2014-2016 by its authors. Some rights reserved.
* See the project homepage at: https://monix.io
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package monix.reactive.internal.builders
import monix.execution.Ack.{Stop, Continue}
import monix.execution.cancelables.CompositeCancelable
import monix.execution.{Ack, Cancelable}
import monix.reactive.Observable
import monix.reactive.observers.Subscriber
import scala.concurrent.{Future, Promise}
private[reactive] final class Interleave2Observable[+A]
(obsA1: Observable[A], obsA2: Observable[A]) extends Observable[A] { self =>
val idx = new java.util.concurrent.atomic.AtomicInteger(0)
def unsafeSubscribeFn(out: Subscriber[A]): Cancelable = {
import out.scheduler
// MUST BE synchronized by `self`
var isDone = false
// MUST BE synchronized by `self`
var downstreamAck = Continue : Future[Ack]
// MUST BE synchronized by `self`.
// This essentially serves as a lock for obsA1 when `select` is not assigned to it
var pauseA1 = Promise.successful(Continue : Ack)
// This essentially serves as a lock for obsA2 when `select` is not assigned to it
var pauseA2 = Promise[Ack]()
// MUST BE synchronized by `self`
var completedCount = 0
var lastAck1 = Continue : Future[Ack]
var lastAck2 = Continue : Future[Ack]
def signalOnError(ex: Throwable): Unit =
self.synchronized {
if (!isDone) {
isDone = true
out.onError(ex)
downstreamAck = Stop
pauseA1.tryCompleteWith(Stop)
pauseA2.tryCompleteWith(Stop)
}
}
// MUST BE synchronized by `self`
def signalOnComplete(ack: Future[Ack]): Unit = {
@inline def rawOnComplete(): Unit =
self.synchronized(if (!isDone) {
isDone = true
out.onComplete()
})
val shouldComplete = !isDone && {
completedCount += 1
completedCount >= 2
}
if (shouldComplete)
ack.syncOnContinue(rawOnComplete())
}
val composite = CompositeCancelable()
composite += obsA1.unsafeSubscribeFn(new Subscriber[A] {
implicit val scheduler = out.scheduler
def onNext(elem: A): Future[Ack] = self.synchronized {
@inline def sendSignal(a: A): Future[Ack] = self.synchronized {
if (isDone) Stop else {
downstreamAck = out.onNext(a)
pauseA1 = Promise[Ack]()
pauseA2.tryCompleteWith(downstreamAck)
downstreamAck
}
}
// Pausing A1 until obsA2 allows us to send
lastAck1 = pauseA1.future.syncTryFlatten.syncFlatMap {
case Continue => sendSignal(elem)
case Stop => Stop
}
lastAck1
}
def onError(ex: Throwable): Unit =
signalOnError(ex)
def onComplete(): Unit = self.synchronized {
lastAck1.syncOnContinue {
signalOnComplete(lastAck1)
pauseA2.trySuccess(Continue)
pauseA2 = Promise.successful(Continue)
}
}
})
composite += obsA2.unsafeSubscribeFn(new Subscriber[A] {
implicit val scheduler = out.scheduler
def onNext(elem: A): Future[Ack] = self.synchronized {
@inline def sendSignal(a: A): Future[Ack] = self.synchronized {
if (isDone) Stop else {
downstreamAck = out.onNext(a)
pauseA2 = Promise[Ack]()
pauseA1.tryCompleteWith(downstreamAck)
downstreamAck
}
}
// Pausing A2 until obsA1 allows us to send
lastAck2 = pauseA2.future.syncTryFlatten.syncFlatMap {
case Continue => sendSignal(elem)
case Stop => Stop
}
lastAck2
}
def onError(ex: Throwable): Unit =
signalOnError(ex)
def onComplete(): Unit = self.synchronized {
lastAck2.syncOnContinue {
signalOnComplete(lastAck2)
pauseA1.trySuccess(Continue)
pauseA1 = Promise.successful(Continue)
}
}
})
composite
}
}
/*
* Copyright (c) 2014-2016 by its authors. Some rights reserved.
* See the project homepage at: https://monix.io
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package monix.reactive.internal.operators
import monix.execution.Ack.Continue
import monix.reactive.exceptions.DummyException
import monix.reactive.subjects.PublishSubject
import monix.reactive.{Observable, Observer}
import monix.execution.FutureUtils.extensions._
import scala.concurrent.Future
import scala.concurrent.duration.Duration.Zero
import scala.concurrent.duration._
object Interleave2Suite extends BaseOperatorSuite {
def count(sourceCount: Int) = sourceCount * 2
def sum(sourceCount: Int) = sourceCount * (sourceCount - 1)
def createObservable(sourceCount: Int) = Option {
val source = Observable.range(0, sourceCount)
val o = Observable.interleave(source, source)
Sample(o, count(sourceCount), sum(sourceCount), Zero, Zero)
}
def observableInError(sourceCount: Int, ex: Throwable) = None
def brokenUserCodeObservable(sourceCount: Int, ex: Throwable) = None
override def cancelableObservables(): Seq[Sample] = {
val sample1 = {
val o1 = Observable.range(0, 10).delayOnNext(1.second)
val o2 = Observable.range(0, 10).delayOnNext(1.second)
Observable.interleave(o1, o2)
}
Seq(Sample(sample1, 0, 0, 0.seconds, 0.seconds))
}
test("self starts before other and finishes before other") { implicit s =>
val obs1 = PublishSubject[Int]()
val obs2 = PublishSubject[Int]()
var received = Vector.empty[Int]
var wasCompleted = false
Observable.interleave(obs1, obs2).unsafeSubscribeFn(new Observer[Int] {
def onNext(elem: Int) = {
received :+= elem
Continue
}
def onError(ex: Throwable) = ()
def onComplete() = wasCompleted = true
})
obs1.onNext(1); s.tick()
assertEquals(received, Vector(1))
obs2.onNext(2); s.tick()
assertEquals(received, Vector(1,2))
obs2.onNext(4); s.tick()
assertEquals(received, Vector(1,2))
obs1.onNext(3); s.tick()
assert(received == Vector(1,2,3) || received == Vector(1,2,3,4))
obs1.onComplete(); s.tick()
assert(!wasCompleted)
obs2.onComplete(); s.tick()
assert(wasCompleted)
}
test("self signals error and interrupts the stream before it starts") { implicit s =>
val obs1 = PublishSubject[Int]()
val obs2 = PublishSubject[Int]()
var wasThrown: Throwable = null
var wasCanceled = false
var received = 0
Observable.interleave(obs1, obs2.doOnDownstreamStop { wasCanceled = true })
.unsafeSubscribeFn(new Observer[Int] {
def onNext(elem: Int) = { received = elem; Continue }
def onError(ex: Throwable) = wasThrown = ex
def onComplete() = ()
})
obs1.onError(DummyException("dummy"))
assertEquals(wasThrown, DummyException("dummy"))
obs2.onNext(2); s.tickOne()
assertEquals(received, 0)
assert(wasCanceled)
}
test("other signals error and interrupts the stream before it starts") { implicit s =>
val obs1 = PublishSubject[Int]()
val obs2 = PublishSubject[Int]()
var wasThrown: Throwable = null
var wasCanceled = false
var received = 0
Observable.interleave(obs2.doOnDownstreamStop { wasCanceled = true }, obs1)
.unsafeSubscribeFn(new Observer[Int] {
def onNext(elem: Int) = { received = elem; Continue }
def onError(ex: Throwable) = wasThrown = ex
def onComplete() = ()
})
obs1.onError(DummyException("dummy"))
assertEquals(wasThrown, DummyException("dummy"))
obs2.onNext(2); s.tickOne()
assertEquals(received, 0)
assert(wasCanceled)
}
test("should not back-pressure self.onError") { implicit s =>
val obs1 = PublishSubject[Int]()
val obs2 = PublishSubject[Int]()
var wasThrown: Throwable = null
Observable.interleave(obs1, obs2)
.unsafeSubscribeFn(new Observer[Int] {
def onNext(elem: Int) =
Future.delayedResult(1.second)(Continue)
def onComplete() = ()
def onError(ex: Throwable) =
wasThrown = ex
})
obs1.onNext(1)
obs2.onNext(2)
obs1.onError(DummyException("dummy"))
s.tick()
assertEquals(wasThrown, DummyException("dummy"))
s.tick(2.second)
}
test("should not back-pressure other.onError") { implicit s =>
val obs1 = PublishSubject[Int]()
val obs2 = PublishSubject[Int]()
var wasThrown: Throwable = null
Observable.interleave(obs1, obs2).unsafeSubscribeFn(new Observer[Int] {
def onNext(elem: Int) =
Future.delayedResult(1.second)(Continue)
def onComplete() = ()
def onError(ex: Throwable) =
wasThrown = ex
})
obs1.onNext(1)
obs2.onNext(2)
obs2.onError(DummyException("dummy"))
s.tick()
assertEquals(wasThrown, DummyException("dummy"))
s.tick(2.second)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment