Created
May 6, 2020 16:10
-
-
Save glureau-betclic/22c67ad77c3be512b8882b9bc1a07c78 to your computer and use it in GitHub Desktop.
TechYourChance / Concurrency Frameworks in Android are overrated / Kotlin+RxJava
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Original code: https://www.techyourchance.com/concurrency-frameworks-overrated-android/ | |
import io.reactivex.Observable | |
import io.reactivex.disposables.Disposable | |
import io.reactivex.functions.BiFunction | |
import io.reactivex.functions.Consumer | |
import io.reactivex.schedulers.Schedulers | |
import java.io.File | |
import java.io.IOException | |
import java.util.concurrent.CountDownLatch | |
import kotlin.random.Random | |
fun main() { | |
val latch = CountDownLatch(1) | |
UploadFilesUseCase() | |
.uploadFiles(Consumer { state -> | |
println("Resulting state $state received on " + Thread.currentThread().name) | |
latch.countDown() | |
}) | |
latch.await() | |
} | |
class UploadFilesUseCase { | |
enum class OperationState { UPLOADED, FAILED } | |
private var disposable: Disposable? = null | |
fun uploadFiles(listenerOnUiThread: Consumer<OperationState>) { | |
if (disposable?.isDisposed == true) { | |
// Log attempt | |
return | |
} | |
disposable = Observable.combineLatest( | |
ioObservableFrom(::processAndMergeFilesOfTypeA), | |
ioObservableFrom(::processAndMergeFilesOfTypeB), | |
BiFunction<File, File, File> { aResult: File, bResult: File -> | |
compressMergedFiles(aResult, bResult) | |
}) | |
.switchMap(::uploadResultObservable) | |
.doOnTerminate { deleteTempDir() } // Success or Error | |
.retry(2) | |
.map { OperationState.UPLOADED } | |
.onErrorReturnItem(OperationState.FAILED) | |
.observeOn(Schedulers.computation()) // Or .observeOn(AndroidSchedulers.mainThread()) when on Android | |
.subscribe(listenerOnUiThread) | |
} | |
} | |
// Code mapping a non Rx library to a Rx application. So not required if using Rx wrappers with your libs. | |
private fun <T> ioObservableFrom(method: () -> T) = Observable.create<T> { emitter -> | |
try { | |
emitter.onNext(method()) | |
emitter.onComplete() | |
} catch (throwable: Throwable) { | |
emitter.onError(throwable) | |
} | |
}.subscribeOn(Schedulers.io()) | |
private fun processAndMergeFilesOfTypeA() = workAndReturnFile("A") | |
private fun processAndMergeFilesOfTypeB() = workAndReturnFile("B") | |
private fun compressMergedFiles(fileA: File, fileB: File) = workAndReturnFile("C") | |
private fun uploadResultObservable(compressedFile: File) = ioObservableFrom { uploadResult(compressedFile) } | |
// ------------------------------------------------------------------------------------- | |
// Dummy implementations for you to be able to understand the threading | |
// ------------------------------------------------------------------------------------- | |
fun workAndReturnFile(path: String): File { | |
println("Thread: " + Thread.currentThread().name + " start working on $path") | |
Thread.sleep(1000) | |
randomizedErrors(path) | |
println("Thread: " + Thread.currentThread().name + " $path DONE") | |
return File(path) | |
} | |
private fun uploadResult(compressedFile: File) { | |
println("Uploading on: " + Thread.currentThread().name + " ($compressedFile)") | |
Thread.sleep(2000) | |
randomizedErrors(compressedFile.path) | |
println("Uploaded from: " + Thread.currentThread().name + " ($compressedFile)") | |
} | |
private fun randomizedErrors(path: String) { | |
if (Random.nextInt(0, 3) == 0) { | |
println("-------- Unlucky crash appears while processing $path") | |
throw IOException("random error") | |
} | |
} | |
private fun deleteTempDir() { | |
println("Deleting files...") | |
} | |
/** | |
* Example of output: | |
* | |
Thread: RxCachedThreadScheduler-2 start working on B | |
Thread: RxCachedThreadScheduler-1 start working on A | |
Thread: RxCachedThreadScheduler-1 A DONE | |
-------- Unlucky crash appears while processing B | |
Deleting files... | |
Thread: RxCachedThreadScheduler-1 start working on A | |
Thread: RxCachedThreadScheduler-2 start working on B | |
Thread: RxCachedThreadScheduler-1 A DONE | |
Thread: RxCachedThreadScheduler-2 B DONE | |
Thread: RxCachedThreadScheduler-2 start working on C | |
Thread: RxCachedThreadScheduler-2 C DONE | |
Uploading on: RxCachedThreadScheduler-1 (C) | |
Uploaded from: RxCachedThreadScheduler-1 (C) | |
Deleting files... | |
Resulting state UPLOADED received on RxComputationThreadPool-1 | |
*/ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment