Created
December 2, 2018 21:42
-
-
Save naturalwarren/7e5a04befcf253ca5d2facbac6b07113 to your computer and use it in GitHub Desktop.
A custom CallAdapter.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
internal class CoinbaseRxJavaCallAdapter( | |
private val successBodyType: Type, | |
private val delegateAdapter: CallAdapter<Any, Any>, | |
private val errorConverterFactory: Converter<ResponseBody, Any?>, | |
private val isObservable: Boolean, | |
private val isFlowable: Boolean, | |
private val isSingle: Boolean, | |
private val isMaybe: Boolean | |
) : CallAdapter<Any, Any> { | |
override fun adapt(call: Call<Any>): Any { | |
val stream = delegateAdapter.adapt(call) | |
@Suppress("UNCHECKED_CAST") // Types are checked with boolean flags. | |
val observableStream = when { | |
isObservable -> { | |
stream as Observable<Any> | |
} | |
isFlowable -> { | |
(stream as Flowable<Any>).toObservable() | |
} | |
isSingle -> { | |
(stream as Single<Any>).toObservable() | |
} | |
isMaybe -> { | |
(stream as Maybe<Any>).toObservable() | |
} | |
else -> { | |
throw IllegalStateException("Unrecognized stream type.") | |
} | |
} | |
val coinbaseStream = observableStream | |
.map { | |
CoinbaseResponse<Any, Any>(it, null, null) | |
} | |
.onErrorResumeNext(Function<Throwable, Observable<CoinbaseResponse<Any, Any>>> { | |
throwable -> | |
when (throwable) { | |
is IOException -> { | |
val response: CoinbaseResponse<Any, Any> = CoinbaseResponse( | |
null, | |
null, | |
throwable | |
) | |
Observable.just(response) | |
} | |
is HttpException -> { | |
val error = throwable.response().errorBody() | |
val errorBody = when { | |
error == null -> null | |
error.contentLength() == 0L -> null | |
else -> errorConverterFactory.convert(error) | |
} | |
val response: CoinbaseResponse<Any, Any> = CoinbaseResponse( | |
null, | |
errorBody, | |
null | |
) | |
Observable.just(response) | |
} | |
else -> { | |
throw IllegalStateException("Unrecognized exception.") | |
} | |
} | |
}) | |
return when { | |
isObservable -> coinbaseStream | |
isFlowable -> coinbaseStream.toFlowable(BackpressureStrategy.LATEST) | |
isSingle -> coinbaseStream.singleOrError() | |
isMaybe -> coinbaseStream.singleElement() | |
else -> throw IllegalStateException("Unrecognized stream type.") | |
} | |
} | |
override fun responseType(): Type = successBodyType | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment