Created
November 20, 2018 14:03
-
-
Save alorma/6471342a52c9042e39b80787b81b7dea to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import io.reactivex.disposables.Disposable | |
import kotlinx.coroutines.CancellableContinuation | |
import kotlinx.coroutines.Dispatchers | |
import kotlinx.coroutines.suspendCancellableCoroutine | |
import kotlinx.coroutines.withContext | |
import rx.Subscription | |
import kotlin.coroutines.resume | |
import kotlin.coroutines.resumeWithException | |
import io.reactivex.Completable as Rx2Completable | |
import io.reactivex.Flowable as Rx2Flowable | |
import io.reactivex.Observable as Rx2Observable | |
import io.reactivex.Single as Rx2Single | |
import rx.Completable as Rx1Completable | |
import rx.Observable as Rx1Observable | |
import rx.Single as Rx1Single | |
suspend inline fun <T> withIO( | |
crossinline block: () -> T | |
): T = withContext(Dispatchers.IO) { block() } | |
suspend fun <T> Rx1Observable<T>.await(): T = suspendCancellableCoroutine { continuation -> | |
val subscription = subscribe({ | |
continuation.resume(it) | |
}, { | |
continuation.resumeWithException(it) | |
}) | |
addOnSubscribe(continuation, subscription) | |
} | |
suspend fun <T> Rx2Observable<T>.await(): T = suspendCancellableCoroutine { continuation -> | |
val disposable = subscribe({ | |
continuation.resume(it) | |
}, { | |
continuation.resumeWithException(it) | |
}) | |
addOnDispose(continuation, disposable) | |
} | |
suspend fun <T> Rx2Flowable<T>.await(): T = suspendCancellableCoroutine { continuation -> | |
val disposable = subscribe({ | |
continuation.resume(it) | |
}, { | |
continuation.resumeWithException(it) | |
}) | |
addOnDispose(continuation, disposable) | |
} | |
suspend fun <T> Rx1Single<T>.await(): T = suspendCancellableCoroutine { continuation -> | |
val subscription = subscribe({ | |
continuation.resume(it) | |
}, { | |
continuation.resumeWithException(it) | |
}) | |
addOnSubscribe(continuation, subscription) | |
} | |
suspend fun <T> Rx2Single<T>.await(): T = suspendCancellableCoroutine { continuation -> | |
val disposable = subscribe({ | |
continuation.resume(it) | |
}, { | |
continuation.resumeWithException(it) | |
}) | |
addOnDispose(continuation, disposable) | |
} | |
suspend fun Rx1Completable.await(): Unit = suspendCancellableCoroutine { continuation -> | |
val subscription = subscribe({ | |
continuation.resume(Unit) | |
}, { | |
continuation.resumeWithException(it) | |
}) | |
addOnSubscribe(continuation, subscription) | |
} | |
suspend fun Rx2Completable.await(): Unit = suspendCancellableCoroutine { continuation -> | |
val disposable = subscribe({ | |
continuation.resume(Unit) | |
}, { | |
continuation.resumeWithException(it) | |
}) | |
addOnDispose(continuation, disposable) | |
} | |
private fun <T> addOnSubscribe(continuation: CancellableContinuation<T>, subscription: Subscription) { | |
continuation.invokeOnCancellation { subscription.unsubscribe() } | |
} | |
private fun <T> addOnDispose(continuation: CancellableContinuation<T>, disposable: Disposable) { | |
continuation.invokeOnCancellation { disposable.dispose() } | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment