Skip to content

Instantly share code, notes, and snippets.

@glureau-betclic
Created May 6, 2020 16:10
Show Gist options
  • Save glureau-betclic/22c67ad77c3be512b8882b9bc1a07c78 to your computer and use it in GitHub Desktop.
Save glureau-betclic/22c67ad77c3be512b8882b9bc1a07c78 to your computer and use it in GitHub Desktop.
TechYourChance / Concurrency Frameworks in Android are overrated / Kotlin+RxJava
// Original code: https://www.techyourchance.com/concurrency-frameworks-overrated-android/
import io.reactivex.Observable
import io.reactivex.disposables.Disposable
import io.reactivex.functions.BiFunction
import io.reactivex.functions.Consumer
import io.reactivex.schedulers.Schedulers
import java.io.File
import java.io.IOException
import java.util.concurrent.CountDownLatch
import kotlin.random.Random
fun main() {
val latch = CountDownLatch(1)
UploadFilesUseCase()
.uploadFiles(Consumer { state ->
println("Resulting state $state received on " + Thread.currentThread().name)
latch.countDown()
})
latch.await()
}
class UploadFilesUseCase {
enum class OperationState { UPLOADED, FAILED }
private var disposable: Disposable? = null
fun uploadFiles(listenerOnUiThread: Consumer<OperationState>) {
if (disposable?.isDisposed == true) {
// Log attempt
return
}
disposable = Observable.combineLatest(
ioObservableFrom(::processAndMergeFilesOfTypeA),
ioObservableFrom(::processAndMergeFilesOfTypeB),
BiFunction<File, File, File> { aResult: File, bResult: File ->
compressMergedFiles(aResult, bResult)
})
.switchMap(::uploadResultObservable)
.doOnTerminate { deleteTempDir() } // Success or Error
.retry(2)
.map { OperationState.UPLOADED }
.onErrorReturnItem(OperationState.FAILED)
.observeOn(Schedulers.computation()) // Or .observeOn(AndroidSchedulers.mainThread()) when on Android
.subscribe(listenerOnUiThread)
}
}
// Code mapping a non Rx library to a Rx application. So not required if using Rx wrappers with your libs.
private fun <T> ioObservableFrom(method: () -> T) = Observable.create<T> { emitter ->
try {
emitter.onNext(method())
emitter.onComplete()
} catch (throwable: Throwable) {
emitter.onError(throwable)
}
}.subscribeOn(Schedulers.io())
private fun processAndMergeFilesOfTypeA() = workAndReturnFile("A")
private fun processAndMergeFilesOfTypeB() = workAndReturnFile("B")
private fun compressMergedFiles(fileA: File, fileB: File) = workAndReturnFile("C")
private fun uploadResultObservable(compressedFile: File) = ioObservableFrom { uploadResult(compressedFile) }
// -------------------------------------------------------------------------------------
// Dummy implementations for you to be able to understand the threading
// -------------------------------------------------------------------------------------
fun workAndReturnFile(path: String): File {
println("Thread: " + Thread.currentThread().name + " start working on $path")
Thread.sleep(1000)
randomizedErrors(path)
println("Thread: " + Thread.currentThread().name + " $path DONE")
return File(path)
}
private fun uploadResult(compressedFile: File) {
println("Uploading on: " + Thread.currentThread().name + " ($compressedFile)")
Thread.sleep(2000)
randomizedErrors(compressedFile.path)
println("Uploaded from: " + Thread.currentThread().name + " ($compressedFile)")
}
private fun randomizedErrors(path: String) {
if (Random.nextInt(0, 3) == 0) {
println("-------- Unlucky crash appears while processing $path")
throw IOException("random error")
}
}
private fun deleteTempDir() {
println("Deleting files...")
}
/**
* Example of output:
*
Thread: RxCachedThreadScheduler-2 start working on B
Thread: RxCachedThreadScheduler-1 start working on A
Thread: RxCachedThreadScheduler-1 A DONE
-------- Unlucky crash appears while processing B
Deleting files...
Thread: RxCachedThreadScheduler-1 start working on A
Thread: RxCachedThreadScheduler-2 start working on B
Thread: RxCachedThreadScheduler-1 A DONE
Thread: RxCachedThreadScheduler-2 B DONE
Thread: RxCachedThreadScheduler-2 start working on C
Thread: RxCachedThreadScheduler-2 C DONE
Uploading on: RxCachedThreadScheduler-1 (C)
Uploaded from: RxCachedThreadScheduler-1 (C)
Deleting files...
Resulting state UPLOADED received on RxComputationThreadPool-1
*/
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment