Created
July 9, 2017 21:15
-
-
Save sebaslogen/73a3f2707410bab4a989716513bd8ede to your computer and use it in GitHub Desktop.
LiveObservable: Android lifecycle aware wrapper for an RxJava Observable
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import android.arch.lifecycle.Lifecycle | |
import android.arch.lifecycle.LifecycleObserver | |
import android.arch.lifecycle.LifecycleOwner | |
import android.arch.lifecycle.OnLifecycleEvent | |
import com.jakewharton.rxrelay.BehaviorRelay | |
import rx.Observable | |
import rx.android.schedulers.AndroidSchedulers | |
import rx.functions.Action1 | |
import rx.subscriptions.CompositeSubscription | |
/** | |
* Android lifecycle aware wrapper for an RX Observable (source). | |
* | |
* This object allows multiple observers to register for events and all receive the same events using a BehaviorRelay. | |
* | |
* When there is at least one observable with an active lifecycle state (Started or Resumed), only then | |
* the source Observable will be subscribed and start emitting items to active observers through a Relay. | |
* | |
* Once all observers are inactive (Not Started nor Resumed) the source observable will be unsubscribed, | |
* letting the source observable free resources and no more items will be emitted | |
* until an observer becomes active again. | |
*/ | |
class LiveObservable<T : Any>(private val source: Observable<T>) : LifecycleObserver { | |
private val lifecycleOwners = mutableMapOf<LifecycleOwner, Set<Action1<T>>>() | |
private val internalSubscriptions = CompositeSubscription() | |
private val externalSubscriptions = mutableMapOf<LifecycleOwner, CompositeSubscription>() | |
private val valueRelay: BehaviorRelay<T?> = BehaviorRelay.create() | |
fun observeWhileStarted(lifecycleOwner: LifecycleOwner, observer: Action1<T>) { | |
synchronized(lifecycleOwners) { | |
lifecycleOwners[lifecycleOwner] = lifecycleOwners[lifecycleOwner].orEmpty() + observer | |
lifecycleOwner.lifecycle.addObserver(this) | |
} | |
} | |
@OnLifecycleEvent(Lifecycle.Event.ON_START) | |
fun subscribeForUpdates(lifecycleOwner: LifecycleOwner) { | |
synchronized(lifecycleOwners) { | |
subscribeToSourceObservable() | |
subscribeExternalObservers(lifecycleOwner) | |
} | |
} | |
/** | |
* Only the first lifecycle that is active enables Subscription to source Observable | |
*/ | |
private fun subscribeToSourceObservable() { | |
if (!internalSubscriptions.hasSubscriptions()) { | |
val subscription = source | |
.observeOn(AndroidSchedulers.mainThread()) | |
.subscribe(valueRelay) | |
internalSubscriptions.add(subscription) | |
} | |
} | |
private fun subscribeExternalObservers(activeLifecycleOwner: LifecycleOwner) { | |
lifecycleOwners[activeLifecycleOwner]?.let { observers -> | |
observers.forEach { observer -> | |
val subscription = valueRelay | |
.asObservable() | |
.filterNotNull() | |
.subscribe(observer) | |
if (externalSubscriptions[activeLifecycleOwner] == null) externalSubscriptions[activeLifecycleOwner] = CompositeSubscription() | |
externalSubscriptions[activeLifecycleOwner]?.add(subscription) | |
} | |
} | |
} | |
@OnLifecycleEvent(Lifecycle.Event.ON_STOP) | |
fun unsubscribeForUpdates(lifecycleOwner: LifecycleOwner) { | |
synchronized(lifecycleOwners) { | |
unsubscribeFromSourceObservable() | |
unsubscribeExternalObservers(lifecycleOwner) | |
} | |
} | |
/** | |
* Stop receiving items and unsubscribe from source Observable only when no one is active | |
*/ | |
private fun unsubscribeFromSourceObservable() { | |
if (lifecycleOwners.keys.none { isActiveState(it.lifecycle) }) internalSubscriptions.clear() | |
} | |
private fun unsubscribeExternalObservers(inactiveLifecycleOwner: LifecycleOwner) { | |
lifecycleOwners[inactiveLifecycleOwner]?.let { | |
externalSubscriptions[inactiveLifecycleOwner]?.clear() | |
} | |
} | |
@OnLifecycleEvent(Lifecycle.Event.ON_DESTROY) | |
fun unregisterObserver(lifecycleOwner: LifecycleOwner) { | |
synchronized(lifecycleOwners) { | |
lifecycleOwner.lifecycle.removeObserver(this) | |
lifecycleOwners.remove(lifecycleOwner) | |
} | |
} | |
fun value(): T? = valueRelay.value | |
private fun isActiveState(lifecycle: Lifecycle) = lifecycle.currentState.isAtLeast(Lifecycle.State.STARTED) | |
private fun <T : Any> Observable<T?>.filterNotNull(): Observable<T> = this.filter { it != null }.map { it } | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment