Last active
September 12, 2017 08:51
-
-
Save Takhion/5c99828084af49baeb8b8a321959f2fa to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import rx.Observable | |
import rx.subjects.BehaviorSubject | |
import SingletonObservable.ItemWrapper | |
/** | |
* Represents the current subscription status (either 'unsubscribed' or 'subscribed') of a [SingletonObservable]. | |
* The index starts at [SingletonObservable.firstSubscriptionIndex] as 'unsubscribed', and every subsequent value alternates between the two states. | |
* Use [SingletonObservable.isSubscribed] to calculate the status for a specific [SubscriptionIndex]. | |
* | |
* Note that a subscription index is constant for the lifetime of a stream, which is completed by the first time either one of | |
* [Observable.doOnTerminate] or [Observable.doOnUnsubscribe] is called. | |
*/ | |
typealias SubscriptionIndex = Long | |
/** | |
* Exposes [observable] and [observableWithSubscription], which allow only one subscriber at a time to consume an observable created each time with | |
* [createObservable], while other subscribers will queue and wait their turn. | |
* | |
* In order to allow observers to coordinate over un/subscriptions, it also exposes: | |
* + [isCurrentlySubscribed] | |
* + [subscriptionIndex] | |
* + [subscriptionIndexObservable] | |
* + [observeUnsubscription] | |
* + [observeResubscription] | |
*/ | |
class SingletonObservable<T>( | |
private val createObservable: () -> Observable<T> | |
) { | |
companion object { | |
/** | |
* @see [SubscriptionIndex] | |
*/ | |
const val firstSubscriptionIndex: SubscriptionIndex = Long.MIN_VALUE | |
private const val unsubscribedRemainder = firstSubscriptionIndex % 2 | |
/** | |
* @see [SubscriptionIndex] | |
*/ | |
fun isSubscribed(subscriptionIndex: SubscriptionIndex) = | |
subscriptionIndex % 2 != unsubscribedRemainder | |
} | |
/** | |
* Used to synchronize access to the subscription. | |
*/ | |
private val subscriptionLock = Any() | |
/** | |
* @see [SubscriptionIndex] | |
*/ | |
@Volatile | |
var subscriptionIndex: SubscriptionIndex = firstSubscriptionIndex | |
private set | |
/** | |
* Emits the latest [subscriptionIndex] and every successive value. | |
*/ | |
private val subscriptionIndexSubject: BehaviorSubject<SubscriptionIndex> = | |
BehaviorSubject.create(subscriptionIndex) | |
/** | |
* Emits the latest [subscriptionIndex] and every successive value. | |
* Honours backpressure by using [Observable.onBackpressureLatest]. | |
*/ | |
val subscriptionIndexObservable: Observable<SubscriptionIndex> = | |
subscriptionIndexSubject.onBackpressureLatest() | |
/** | |
* Returns whether any observer is currently consuming this [SingletonObservable]. | |
*/ | |
val isCurrentlySubscribed: Boolean | |
get() = isSubscribed(subscriptionIndex) | |
/** | |
* @see [SingletonObservable] | |
*/ | |
val observable: Observable<T> = | |
singletonObservable(this::observable) { observable, _ -> observable } | |
/** | |
* @see [SingletonObservable] | |
*/ | |
val observableWithSubscription: Observable<ItemWrapper<T, T>> = | |
singletonObservable(this::observableWithSubscription) { observable, subscriptionIndex -> | |
observable.map { item -> ItemWrapper(item, subscriptionIndex, this) } | |
} | |
/** | |
* Wraps [createObservable] using [subscriptionLock] to guard access to the un/subscriptions (see [SingletonObservable]). | |
* | |
* @param [selfReference] reference to the property that will hold the generated observable | |
* @param [transformObservable] transformation to be applied right after creating each new observable through [createObservable] | |
*/ | |
private inline fun <R> singletonObservable( | |
crossinline selfReference: () -> Observable<R>, | |
crossinline transformObservable: (Observable<T>, SubscriptionIndex) -> Observable<R> | |
): Observable<R> = | |
Observable.defer<R> { | |
synchronized(subscriptionLock) { | |
val index = subscriptionIndex | |
if (isSubscribed(index)) { | |
selfReference().delaySubscription(observeUnsubscription(index)) | |
} | |
else { | |
val newIndex = toggleSubscription() | |
transformObservable(createObservable(), newIndex) | |
.doOnTerminate { unsubscribe(newIndex) } | |
.doOnUnsubscribe { unsubscribe(newIndex) } | |
} | |
} | |
} | |
/** | |
* Unsubscribes from [subscriptionIndex] if it's not already unsubscribed. | |
*/ | |
private fun unsubscribe(subscriptionIndex: SubscriptionIndex) { | |
synchronized(subscriptionLock) { | |
if (this.subscriptionIndex == subscriptionIndex) { | |
toggleSubscription() | |
} | |
} | |
} | |
/** | |
* Toggles between the 'unsubscribed'/'subscribed' subscription state by advancing the [subscriptionIndex]. | |
*/ | |
private fun toggleSubscription(): SubscriptionIndex { | |
val oldIndex = subscriptionIndex | |
return (oldIndex + 1).also { newIndex -> | |
subscriptionIndex = newIndex | |
subscriptionIndexSubject.onNext(newIndex) | |
} | |
} | |
/** | |
* Returns an [Observable] that emits nothing and completes when the subscription related to the provided [subscriptionIndex] is unsubscribed | |
* (or immediately if it already happened). | |
* | |
* It's generally easier and safer to use [ItemWrapper.observeUnsubscription]. | |
*/ | |
fun observeUnsubscription(subscriptionIndex: SubscriptionIndex) = | |
observeUntilSubscription(subscriptionIndex + 1) | |
/** | |
* Returns an [Observable] that emits nothing and completes when the subscription related to the provided [subscriptionIndex] is unsubscribed | |
* and followed by another subscription (or immediately if it already happened). | |
* | |
* It's generally easier and safer to use [ItemWrapper.observeResubscription]. | |
*/ | |
fun observeResubscription(subscriptionIndex: SubscriptionIndex) = | |
observeUntilSubscription(subscriptionIndex + 2) | |
/** | |
* Returns an [Observable] that emits nothing and completes when [subscriptionIndex] is reached (or immediately if it already happened). | |
*/ | |
private fun observeUntilSubscription(subscriptionIndex: SubscriptionIndex): Observable<Nothing> = | |
subscriptionIndexSubject | |
.takeWhile { it < subscriptionIndex } | |
.ignoreElements() | |
.cast(Nothing::class.java) | |
/** | |
* Wraps [item] with the [subscriptionIndex] of the stream from which it's emitted through the [source]'s [observableWithSubscription]. | |
*/ | |
data class ItemWrapper<out T, out R>( | |
val item: R, | |
val subscriptionIndex: SubscriptionIndex, | |
val source: SingletonObservable<out T> | |
) { | |
/** | |
* @see [SingletonObservable.observeUnsubscription] | |
*/ | |
fun observeUnsubscription() = source.observeUnsubscription(subscriptionIndex) | |
/** | |
* @see [SingletonObservable.observeResubscription] | |
*/ | |
fun observeResubscription() = source.observeResubscription(subscriptionIndex) | |
/** | |
* Returns a new [ItemWrapper] with a different [item] but the same [subscriptionIndex] and [source]. | |
*/ | |
fun <R2> withItem(item: R2): ItemWrapper<T, R2> = | |
ItemWrapper( | |
item = item, | |
subscriptionIndex = subscriptionIndex, | |
source = source) | |
} | |
} | |
/** | |
* Using [Observable.flatMap], applies [transform] to every [ItemWrapper.item] in the sequence, then wraps back every element in the | |
* resulting flattened sequence in a new [ItemWrapper] that has the same [ItemWrapper.subscriptionIndex] and [ItemWrapper.source]. | |
*/ | |
inline fun <T, R, R2> Observable<ItemWrapper<T, R>>.flatMapItem( | |
crossinline transform: (item: R) -> Observable<R2> | |
): Observable<ItemWrapper<T, R2>> = | |
flatMap { wrapper -> transform(wrapper.item).map { item -> wrapper.withItem(item) } } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment