EDIT: This has become soon obsolete when Observable.fromAsync()
has been released in RxJava
. An issue from JakeWarthon here: ReactiveX/RxJava#4177 generated a pull request which is being implemented / discussed here: ReactiveX/RxJava#4179 . See below on how to use it.
I'm learning RxJava
. Many Rx-libraries out there are using Obvservable.create()
with an inline OnSubscribe
implementation to wrap legacy APIs, like this one for the Android GoogleMap API:
class MapFragmentMapReadyOnSubscribe implements Observable.OnSubscribe<GoogleMap> {
final MapFragment fragment;
MapFragmentMapReadyOnSubscribe(MapFragment fragment) {
this.fragment = fragment;
}
@Override public void call(final Subscriber<? super GoogleMap> subscriber) {
MainThreadSubscription.verifyMainThread();
OnMapReadyCallback callback = new OnMapReadyCallback() {
@Override public void onMapReady(GoogleMap googleMap) {
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(googleMap);
}
}
};
fragment.getMapAsync(callback);
}
}
According to Dávid Karnok this is a very bad practice, quoting him: "What about unsubscription and backpressure? No amount of operators can fix that for this source".
I wrote to him and following his input I tried to create something generic to help wrapping legacy API by writing something that take into account back pressure. I'm not sure at all this is ACTUALLY better, it's a work in progress.
EDIT: this is is the new way you are supposed to do that:
class MyActivity extends AppCompatActivity {
@Override
public Observable<GoogleMap> getGoogleMapObservale(GoogleMapFragment mapFragment) {
return Observable<GoogleMap> = Observable.fromAsync(emitter -> {
OnMapReadyCallback callback = new OnMapReadyCallback() {
@Override
public void onMapReady(GoogleMap googleMap) {
emitter.onNext(googleMap);
emitter.onCompleted();
}
};
mapFragment.getMapAsync(callback);
}, AsyncEmitter.BackpressureMode.BUFFER);
}
}
This snipped is to be used for those API that return a SINGLE value or fail ONCE. More or less like a Promise. It does not support multiple returned values or both values and errors. It does, however, support APIs that return you something that can be used to cancel the request.
Copy the class into your code and use it to create your Observable
s.
To create the observable from the Map API above you would do:
@NonNull
private Observable<GoogleMap> mapObservable(MapFragment mapFragment) {
return RxWrapCallbackAPI.wrapAPI(
(successCallback, failCallback) -> mapFragment.getMapAsync(successCallback::onSuccess));
}
In general suppose you have an API like this one:
public interface API<DATA> {
void nonCancelableMethod(Callback<DATA> callback) throws Exception;
Cancelable cancelableMethod(Callback<DATA> callback) throws Exception;
// The callback
interface Callback<DATA> {
void onDataReady(DATA data, Exception error);
void onDataFail(Exception error);
}
// this allow you to cancel the request
interface Cancelable {
void cancel() throws Exception; // note this can throw a Checked Exception
}
}
When you need to wrap an API that has no way of canceling it's execution:
// Non Cancelable API wrapping
Observable<Data> nonCancelableObservable = RxWrapCallbackAPI.wrapAPI(
// successCallback and failCallback are provided by the RxWrapCallbackAPI
// Here you call your API as you normally would,
// it will be executed when you subscribe to the observable
(successCallback, failCallback) -> api.nonCancelableMethod(new Callback<Data>() {
@Override
public void onDataReady(Data data, Exception error) {
// this is your API callback
if (error == null) {
// we have a non error result, pass it to the successCallback
successCallback.onSuccess(data);
} else {
// we have an error :( pass it to the error callback
failCallback.onFailed(error);
}
}
@Override
public void onDataFail(Exception error) {
// another method of your API, it's an error, so use the failCallback
failCallback.onFailed(error);
}
})
);
And when your legacy API do expose some method to cancel it it can be automatically used when there are no more subscriptions to your Observable, just use the wrapAPICancelable()
method in that case:
// Cancelable API wrapping
Observable<Data> cancelableObservable = RxWrapCallbackAPI.wrapAPICancelable(
// successCallback and failCallback are provided by the RxWrapCallbackAPI
// Here you call your API as you normally would,
// it will be executed when you subscribe to the observable
// the only difference is that your API here return something to cancel the execution
(successCallback, failCallback) -> api.cancelableMethod(new Callback<Data>() {
@Override
public void onDataReady(Data data, Exception error) {
// this is your API callback
if (error == null) {
// we have a non error result, pass it to the successCallback
successCallback.onSuccess(data);
} else {
// we have an error :( pass it to the error callback
failCallback.onFailed(error);
}
}
@Override
public void onDataFail(Exception error) {
// another method of your API, it's an error, so use the failCallback
failCallback.onFailed(error);
}
}),
// cancelable here is the object your API returned
// you just need to do whatever your API require to cancel the execution
cancelable -> () -> cancelable.cancel()
);
This should be flexible on any asynchrouns API with a callback.
Not supported:
- api providing multiple values via different callbacks or at different times
- api that may not return nor fail