Last active
August 2, 2023 08:37
-
-
Save kakai248/d3ac349cf2aa54da7a935fc1ab23024b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class UploadFilesUseCase( | |
private val schedulerProvider: SchedulerProvider, | |
private val httpManager: HttpManager | |
) { | |
private var operation: Completable? = null | |
fun uploadFiles(): Completable = synchronized(this) { | |
operation | |
?: (doUploadFiles() | |
.doFinally { operation = null } | |
.cache() | |
.also { operation = it }) | |
} | |
private fun doUploadFiles(): Completable = | |
Singles | |
.zip( | |
processAndMergeFilesOfTypeA().subscribeOn(schedulerProvider.io), | |
processAndMergeFilesOfTypeB().subscribeOn(schedulerProvider.io) | |
) | |
.flatMap { (fileA, fileB) -> compressMergedFiles(fileA, fileB) } | |
.flatMap(::uploadFileToServer) | |
.ignoreElement() | |
.doOnComplete(::deleteTempDir) | |
.doOnError { deleteTempDir() } | |
.retry(MAX_RETRIES) | |
.observeOn(schedulerProvider.ui) | |
private fun uploadFileToServer(archive: File) = | |
httpManager.uploadFiles(archive) | |
.map { response -> | |
if (response.code / 100 != 2) { | |
throw OperationFailedException() | |
} | |
} | |
private fun processAndMergeFilesOfTypeA(): Single<File> = Single.just(File("")) | |
private fun processAndMergeFilesOfTypeB(): Single<File> = Single.just(File("")) | |
private fun compressMergedFiles(fileA: File, fileB: File): Single<File> = Single.just(File("")) | |
private fun deleteTempDir() {} | |
companion object { | |
private const val MAX_RETRIES = 3L | |
} | |
} | |
class HttpManager { | |
fun uploadFiles(archive: File): Single<Response> = Single.just(Response(200, byteArrayOf())) | |
} | |
class Response( | |
val code: Int, | |
val body: ByteArray | |
) | |
class OperationFailedException : Throwable() |
Looks really dope, even though I don't know enough RxJava to make sure it implements the same requirements.
Could you also add hooks for logging of exceptions (potentially thrown during processing) and of concurrent invocation attemps?
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
A more reactive alternative without synchronization and extra state. Kind of a pseudo code sketch of an idea, so forgive me if something is not really working.