-
-
Save glureau-betclic/ec1e86a3206112459ac659b226d817c1 to your computer and use it in GitHub Desktop.
// Original code: https://www.techyourchance.com/concurrency-frameworks-overrated-android/ | |
import io.reactivex.Single | |
import io.reactivex.disposables.Disposable | |
import io.reactivex.functions.BiFunction | |
import io.reactivex.functions.Consumer | |
import io.reactivex.schedulers.Schedulers | |
import java.io.File | |
import java.io.IOException | |
import java.util.concurrent.CountDownLatch | |
import kotlin.random.Random | |
fun main() { | |
val latch = CountDownLatch(1) | |
UploadFilesUseCase() | |
.uploadFiles(Consumer { state -> | |
println("Resulting state $state received on " + Thread.currentThread().name) | |
latch.countDown() | |
}) | |
latch.await() | |
} | |
class UploadFilesUseCase { | |
enum class OperationState { UPLOADED, FAILED } | |
private var disposable: Disposable? = null | |
fun uploadFiles(listenerOnUiThread: Consumer<OperationState>) { | |
if (disposable?.isRunning == true) { | |
// Log attempt | |
return | |
} | |
disposable = Single.zip( | |
ioSingleFrom(::processAndMergeFilesOfTypeA), | |
ioSingleFrom(::processAndMergeFilesOfTypeB), | |
BiFunction<File, File, File> { aResult: File, bResult: File -> | |
compressMergedFiles(aResult, bResult) | |
}) | |
.flatMap(::uploadResultSingle) | |
.doOnEvent { _, _ -> deleteTempDir() } // Success or Error | |
.retry(2) | |
.map { OperationState.UPLOADED } | |
.onErrorReturnItem(OperationState.FAILED) | |
.observeOn(Schedulers.computation()) // Or .observeOn(AndroidSchedulers.mainThread()) when on Android | |
.subscribe(listenerOnUiThread) | |
} | |
} | |
private val Disposable.isRunning | |
get() = isDisposed.not() | |
// Code mapping a non Rx library to a Rx application. So not required if using Rx wrappers with your libs. | |
private fun <T> ioSingleFrom(method: () -> T) = Single.fromCallable(method).subscribeOn(Schedulers.io()) | |
private fun processAndMergeFilesOfTypeA() = workAndReturnFile("A") | |
private fun processAndMergeFilesOfTypeB() = workAndReturnFile("B") | |
private fun compressMergedFiles(fileA: File, fileB: File) = workAndReturnFile("C") | |
private fun uploadResultSingle(compressedFile: File) = ioSingleFrom { uploadResult(compressedFile) } | |
// ------------------------------------------------------------------------------------- | |
// Dummy implementations for you to be able to understand the threading | |
// ------------------------------------------------------------------------------------- | |
fun workAndReturnFile(path: String): File { | |
println("Thread: " + Thread.currentThread().name + " start working on $path") | |
Thread.sleep(1000) | |
randomizedErrors(path) | |
println("Thread: " + Thread.currentThread().name + " $path DONE") | |
return File(path) | |
} | |
private fun uploadResult(compressedFile: File) { | |
println("Uploading on: " + Thread.currentThread().name + " ($compressedFile)") | |
Thread.sleep(2000) | |
randomizedErrors(compressedFile.path) | |
println("Uploaded from: " + Thread.currentThread().name + " ($compressedFile)") | |
} | |
private fun randomizedErrors(path: String) { | |
if (Random.nextInt(0, 3) == 0) { | |
println("-------- Unlucky crash appears while processing $path") | |
throw IOException("random error") | |
} | |
} | |
private fun deleteTempDir() { | |
println("Deleting files...") | |
} | |
/** | |
* Example of output: | |
* | |
Thread: RxCachedThreadScheduler-2 start working on B | |
Thread: RxCachedThreadScheduler-1 start working on A | |
Thread: RxCachedThreadScheduler-1 A DONE | |
-------- Unlucky crash appears while processing B | |
Deleting files... | |
Thread: RxCachedThreadScheduler-1 start working on A | |
Thread: RxCachedThreadScheduler-2 start working on B | |
Thread: RxCachedThreadScheduler-1 A DONE | |
Thread: RxCachedThreadScheduler-2 B DONE | |
Thread: RxCachedThreadScheduler-2 start working on C | |
Thread: RxCachedThreadScheduler-2 C DONE | |
Uploading on: RxCachedThreadScheduler-1 (C) | |
Uploaded from: RxCachedThreadScheduler-1 (C) | |
Deleting files... | |
Resulting state UPLOADED received on RxComputationThreadPool-1 | |
* | |
* And if 3 unlucky errors: | |
* | |
Thread: RxCachedThreadScheduler-1 start working on A | |
Thread: RxCachedThreadScheduler-2 start working on B | |
-------- Unlucky crash appears while processing A | |
Thread: RxCachedThreadScheduler-2 B DONE | |
Deleting files... | |
Thread: RxCachedThreadScheduler-2 start working on A | |
Thread: RxCachedThreadScheduler-3 start working on B | |
-------- Unlucky crash appears while processing B | |
Thread: RxCachedThreadScheduler-2 A DONE | |
Deleting files... | |
Thread: RxCachedThreadScheduler-2 start working on B | |
Thread: RxCachedThreadScheduler-1 start working on A | |
Thread: RxCachedThreadScheduler-2 B DONE | |
Thread: RxCachedThreadScheduler-1 A DONE | |
Thread: RxCachedThreadScheduler-1 start working on C | |
Thread: RxCachedThreadScheduler-1 C DONE | |
Uploading on: RxCachedThreadScheduler-3 (C) | |
-------- Unlucky crash appears while processing C | |
Deleting files... | |
Resulting state FAILED received on RxComputationThreadPool-1 | |
*/ |
if (disposable?.isDisposed == true) {
I don't think this is safe. Once you successfully run this method, it will forever be silently no-op.
I think the no-op should occur only while disposable is available, and otherwise set disposable to null
in doFinally {
.
Damn, you're right, thanks! Next time I'll write some unit tests before posting a public gist 🤣
I had some issues with setting the nullability on disposable when I provide streams that can be consumed in multiple places. Sometimes a stream is completed (end of stream, RxAndroidLifecycle = terminal event), sometimes finishes with an error (terminal event), and sometimes it's manually disposed (dispose() -> no terminal event), and if the 2 methods doOnTerminate+doOnDispose are not defined to null the disposable, then it depends of the type of cancellation.
To avoid that, I usually prefer an extension function like
private val Disposable.isRunning
get() = isDisposed.not()
And a condition like this:
if (disposable?.isRunning == true) {
// Log attempt
return
}
I find it more explicit on the intent (if it's already running, then stop), and I don't have to care about releasing a Disposable object (low memory footprint anyway).
What do you think?
Gist is now fixed, but for future readers, this is nothing more than a support for a Twitter conversation. 😉
this is equivalent to
You might even be able to replace it with just