Last active
May 6, 2020 21:54
-
-
Save glureau-betclic/ec1e86a3206112459ac659b226d817c1 to your computer and use it in GitHub Desktop.
TechYourChance / Concurrency Frameworks in Android are overrated / Kotlin+RxJava
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Original code: https://www.techyourchance.com/concurrency-frameworks-overrated-android/ | |
import io.reactivex.Single | |
import io.reactivex.disposables.Disposable | |
import io.reactivex.functions.BiFunction | |
import io.reactivex.functions.Consumer | |
import io.reactivex.schedulers.Schedulers | |
import java.io.File | |
import java.io.IOException | |
import java.util.concurrent.CountDownLatch | |
import kotlin.random.Random | |
fun main() { | |
val latch = CountDownLatch(1) | |
UploadFilesUseCase() | |
.uploadFiles(Consumer { state -> | |
println("Resulting state $state received on " + Thread.currentThread().name) | |
latch.countDown() | |
}) | |
latch.await() | |
} | |
class UploadFilesUseCase { | |
enum class OperationState { UPLOADED, FAILED } | |
private var disposable: Disposable? = null | |
fun uploadFiles(listenerOnUiThread: Consumer<OperationState>) { | |
if (disposable?.isRunning == true) { | |
// Log attempt | |
return | |
} | |
disposable = Single.zip( | |
ioSingleFrom(::processAndMergeFilesOfTypeA), | |
ioSingleFrom(::processAndMergeFilesOfTypeB), | |
BiFunction<File, File, File> { aResult: File, bResult: File -> | |
compressMergedFiles(aResult, bResult) | |
}) | |
.flatMap(::uploadResultSingle) | |
.doOnEvent { _, _ -> deleteTempDir() } // Success or Error | |
.retry(2) | |
.map { OperationState.UPLOADED } | |
.onErrorReturnItem(OperationState.FAILED) | |
.observeOn(Schedulers.computation()) // Or .observeOn(AndroidSchedulers.mainThread()) when on Android | |
.subscribe(listenerOnUiThread) | |
} | |
} | |
private val Disposable.isRunning | |
get() = isDisposed.not() | |
// Code mapping a non Rx library to a Rx application. So not required if using Rx wrappers with your libs. | |
private fun <T> ioSingleFrom(method: () -> T) = Single.fromCallable(method).subscribeOn(Schedulers.io()) | |
private fun processAndMergeFilesOfTypeA() = workAndReturnFile("A") | |
private fun processAndMergeFilesOfTypeB() = workAndReturnFile("B") | |
private fun compressMergedFiles(fileA: File, fileB: File) = workAndReturnFile("C") | |
private fun uploadResultSingle(compressedFile: File) = ioSingleFrom { uploadResult(compressedFile) } | |
// ------------------------------------------------------------------------------------- | |
// Dummy implementations for you to be able to understand the threading | |
// ------------------------------------------------------------------------------------- | |
fun workAndReturnFile(path: String): File { | |
println("Thread: " + Thread.currentThread().name + " start working on $path") | |
Thread.sleep(1000) | |
randomizedErrors(path) | |
println("Thread: " + Thread.currentThread().name + " $path DONE") | |
return File(path) | |
} | |
private fun uploadResult(compressedFile: File) { | |
println("Uploading on: " + Thread.currentThread().name + " ($compressedFile)") | |
Thread.sleep(2000) | |
randomizedErrors(compressedFile.path) | |
println("Uploaded from: " + Thread.currentThread().name + " ($compressedFile)") | |
} | |
private fun randomizedErrors(path: String) { | |
if (Random.nextInt(0, 3) == 0) { | |
println("-------- Unlucky crash appears while processing $path") | |
throw IOException("random error") | |
} | |
} | |
private fun deleteTempDir() { | |
println("Deleting files...") | |
} | |
/** | |
* Example of output: | |
* | |
Thread: RxCachedThreadScheduler-2 start working on B | |
Thread: RxCachedThreadScheduler-1 start working on A | |
Thread: RxCachedThreadScheduler-1 A DONE | |
-------- Unlucky crash appears while processing B | |
Deleting files... | |
Thread: RxCachedThreadScheduler-1 start working on A | |
Thread: RxCachedThreadScheduler-2 start working on B | |
Thread: RxCachedThreadScheduler-1 A DONE | |
Thread: RxCachedThreadScheduler-2 B DONE | |
Thread: RxCachedThreadScheduler-2 start working on C | |
Thread: RxCachedThreadScheduler-2 C DONE | |
Uploading on: RxCachedThreadScheduler-1 (C) | |
Uploaded from: RxCachedThreadScheduler-1 (C) | |
Deleting files... | |
Resulting state UPLOADED received on RxComputationThreadPool-1 | |
* | |
* And if 3 unlucky errors: | |
* | |
Thread: RxCachedThreadScheduler-1 start working on A | |
Thread: RxCachedThreadScheduler-2 start working on B | |
-------- Unlucky crash appears while processing A | |
Thread: RxCachedThreadScheduler-2 B DONE | |
Deleting files... | |
Thread: RxCachedThreadScheduler-2 start working on A | |
Thread: RxCachedThreadScheduler-3 start working on B | |
-------- Unlucky crash appears while processing B | |
Thread: RxCachedThreadScheduler-2 A DONE | |
Deleting files... | |
Thread: RxCachedThreadScheduler-2 start working on B | |
Thread: RxCachedThreadScheduler-1 start working on A | |
Thread: RxCachedThreadScheduler-2 B DONE | |
Thread: RxCachedThreadScheduler-1 A DONE | |
Thread: RxCachedThreadScheduler-1 start working on C | |
Thread: RxCachedThreadScheduler-1 C DONE | |
Uploading on: RxCachedThreadScheduler-3 (C) | |
-------- Unlucky crash appears while processing C | |
Deleting files... | |
Resulting state FAILED received on RxComputationThreadPool-1 | |
*/ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Damn, you're right, thanks! Next time I'll write some unit tests before posting a public gist 🤣
I had some issues with setting the nullability on disposable when I provide streams that can be consumed in multiple places. Sometimes a stream is completed (end of stream, RxAndroidLifecycle = terminal event), sometimes finishes with an error (terminal event), and sometimes it's manually disposed (dispose() -> no terminal event), and if the 2 methods doOnTerminate+doOnDispose are not defined to null the disposable, then it depends of the type of cancellation.
To avoid that, I usually prefer an extension function like
And a condition like this:
I find it more explicit on the intent (if it's already running, then stop), and I don't have to care about releasing a Disposable object (low memory footprint anyway).
What do you think?
Gist is now fixed, but for future readers, this is nothing more than a support for a Twitter conversation. 😉