Created
April 28, 2015 14:33
-
-
Save passsy/9a6b5fe7fbec78789bd5 to your computer and use it in GitHub Desktop.
A Presenter when using RxAndroid which delays delivering to the View when the View isn't ready
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* The MIT License (MIT) | |
* | |
* Copyright (c) 2014 Konstantin Mikheev sirstripy-at-gmail-com | |
* | |
* Permission is hereby granted, free of charge, to any person obtaining a copy | |
* of this software and associated documentation files (the "Software"), to deal | |
* in the Software without restriction, including without limitation the rights | |
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
* copies of the Software, and to permit persons to whom the Software is | |
* furnished to do so, subject to the following conditions: | |
* | |
* The above copyright notice and this permission notice shall be included in all | |
* copies or substantial portions of the Software. | |
* | |
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | |
* SOFTWARE. | |
*/ | |
package net.grandcentrix.rx; | |
import java.util.ArrayList; | |
import rx.Observable; | |
import rx.Subscriber; | |
import rx.functions.Action1; | |
/** | |
* This operator delays onNext, onComplete and onError emissions until a True value received from a | |
* given observable. When the given observable emits False, the operator starts delaying emissions | |
* again. | |
* <p/> | |
* semaphoreLatest variant drops older not emitted onNext value if a new value has been received. | |
* <p/> | |
* semaphoreLatestCache keeps the latest value after emission and sends it on each True value from a | |
* given observable received. This variant never emits onCompleted. | |
* | |
* @param <T> a type of onNext value | |
* @author konmik https://github.com/konmik/nucleus/blob/53ecd398cec6e85b58545b4d30cfa961470d9f68/nucleus-example-with-tests/src/main/java/nucleus/example/main/MainPresenter.java | |
*/ | |
public class OperatorSemaphore<T> implements Observable.Operator<T, T> { | |
private boolean cache; | |
private Observable<Boolean> go; | |
private boolean latest; | |
private OperatorSemaphore(Observable<Boolean> go) { | |
this.go = go; | |
} | |
private OperatorSemaphore(Observable<Boolean> go, boolean latest) { | |
this.go = go; | |
this.latest = latest; | |
} | |
private OperatorSemaphore(Observable<Boolean> go, boolean latest, boolean cache) { | |
this.go = go; | |
this.latest = latest; | |
this.cache = cache; | |
} | |
@Override | |
public Subscriber<? super T> call(final Subscriber<? super T> child) { | |
return new Subscriber<T>() { | |
boolean deliverCompleted; | |
boolean deliverError; | |
Throwable error; | |
boolean hasCache; | |
boolean isOpen; | |
ArrayList<T> next = new ArrayList<>(); | |
T nextCache; | |
@Override | |
public void onCompleted() { | |
if (!cache) { | |
deliverCompleted = true; | |
tick(false); | |
} | |
} | |
@Override | |
public void onError(Throwable e) { | |
error = e; | |
deliverError = true; | |
tick(false); | |
} | |
@Override | |
public void onNext(T o) { | |
if (latest) { | |
next.clear(); | |
} | |
next.add(o); | |
tick(false); | |
} | |
@Override | |
public void onStart() { | |
super.onStart(); | |
add(go.subscribe(new Action1<Boolean>() { | |
@Override | |
public void call(Boolean aBoolean) { | |
isOpen = aBoolean; | |
tick(cache); | |
} | |
})); | |
child.add(this); | |
} | |
void tick(boolean deliverCache) { | |
if (!isUnsubscribed() && isOpen) { | |
while (next.size() > 0) { | |
T value = next.remove(0); | |
child.onNext(value); | |
deliverCache = false; | |
if (cache) { | |
nextCache = value; | |
hasCache = true; | |
} | |
} | |
if (deliverCache && hasCache) { | |
child.onNext(nextCache); | |
} | |
if (deliverCompleted) { | |
child.onCompleted(); | |
unsubscribe(); | |
} | |
if (deliverError) { | |
child.onError(error); | |
unsubscribe(); | |
} | |
} | |
} | |
}; | |
} | |
/** | |
* Returns an operator that delays onNext, onComplete and onError emissions until a True value | |
* received from a given observable. When the given observable emits False, the operator starts | |
* delaying emissions again. | |
* | |
* @param go an operator that controls emission. | |
* @param <T> a type of onNext value. | |
* @return an operator that delays onNext, onComplete and onError emissions until a True value | |
* received from a given observable. When the given observable emits False, the operator starts | |
* delaying emissions again. | |
*/ | |
public static <T> OperatorSemaphore<T> semaphore(Observable<Boolean> go) { | |
return new OperatorSemaphore<>(go); | |
} | |
/** | |
* Returns an operator that delays onNext, onComplete and onError emissions until a True value | |
* received from a given observable. When the given observable emits False, the operator starts | |
* delaying emissions again. | |
* <p/> | |
* This variant drops older not emitted value if a new value has been received. | |
* | |
* @param go an operator that controls emission. | |
* @param <T> a type of onNext value. | |
* @return an operator that delays onNext, onComplete and onError emissions until a True value | |
* received from a given observable. When the given observable emits False, the operator starts | |
* delaying emissions again. | |
* <p/> | |
* This variant drops older not emitted value if a new value has been received. | |
*/ | |
public static <T> OperatorSemaphore<T> semaphoreLatest(Observable<Boolean> go) { | |
return new OperatorSemaphore<>(go, true); | |
} | |
/** | |
* Returns an operator that delays onNext, onComplete and onError emissions until a True value | |
* received from a given observable. When the given observable emits False, the operator starts | |
* delaying emissions again. | |
* <p/> | |
* This variant drops older not emitted value if a new value has been received. | |
* <p/> | |
* It also keeps the latest value after emission and sends it on each True value from a given | |
* observable received. This variant never emits onCompleted. | |
* | |
* @param go an operator that controls emission. | |
* @param <T> a type of onNext value. | |
* @return an operator that delays onNext, onComplete and onError emissions until a True value | |
* received from a given observable. When the given observable emits False, the operator starts | |
* delaying emissions again. | |
* <p/> | |
* This variant drops older not emitted value if a new value has been received. | |
* <p/> | |
* It also keeps the latest value after emission and sends it on each True value from a given | |
* observable received. This variant never emits onCompleted. | |
*/ | |
public static <T> OperatorSemaphore<T> semaphoreLatestCache(Observable<Boolean> go) { | |
return new OperatorSemaphore<>(go, true, true); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package net.grandcentrix.rx; | |
import android.os.Bundle; | |
import android.support.v4.app.Fragment; | |
import android.view.LayoutInflater; | |
import android.view.View; | |
import android.view.ViewGroup; | |
import net.grandcentrix.rx.OperatorSemaphore; | |
import rx.Observable; | |
import rx.Observer; | |
import rx.Subscription; | |
import rx.subjects.BehaviorSubject; | |
import rx.subscriptions.CompositeSubscription; | |
/** | |
* Represents the Presenter of the popular Model-View-Presenter design pattern. | |
* <p/> | |
* The presenter connects the View V to a model which don't know each other. The View is passive and | |
* provides this Presenter with events from the UI. It's an RxPresenter because it works with {@link | |
* rx.Observable} from RxJava to communicate with the View. | |
* <p/> | |
* Created by pascalwelsch on 4/17/15. | |
*/ | |
public abstract class RxPresenter<V> { | |
private final V mView; | |
private CompositeSubscription mUiSubscriptions = new CompositeSubscription(); | |
private BehaviorSubject<Boolean> mViewReady = BehaviorSubject.create(false); | |
public RxPresenter(final V view) { | |
mView = view; | |
} | |
/** | |
* @return the view of this presenter | |
*/ | |
protected V getView() { | |
return mView; | |
} | |
/** | |
* add your subscriptions for View events to this method to get them automatically cleaned up in | |
* {@link #sleep()}. typically call this in {@link #wakeUp()} where you subscribe to the UI | |
* events | |
*/ | |
protected void manageViewSubscription(final Subscription subscription) { | |
mUiSubscriptions.add(subscription); | |
} | |
/** | |
* Returns a transformer that will delay onNext, onError and onComplete emissions unless a view | |
* become available. getView() is guaranteed to be != null during all emissions. This | |
* transformer can only be used on application's main thread. | |
* <p/> | |
* If the transformer receives a next value while the previous value has not been delivered, the | |
* previous value will be dropped. | |
* <p/> | |
* The transformer will duplicate the latest onNext emission in case if a view has been | |
* reattached. | |
* <p/> | |
* This operator ignores onComplete emission and never sends one. | |
* <p/> | |
* Use this operator when you need to show updatable data that needs to be cached in memory. | |
* | |
* @param <T> a type of onNext value. | |
* @return the delaying operator. | |
*/ | |
public <T> Observable.Transformer<T, T> deliverLatestCacheToView() { | |
return new Observable.Transformer<T, T>() { | |
@Override | |
public Observable<T> call(Observable<T> observable) { | |
return observable.lift(OperatorSemaphore.<T>semaphoreLatestCache(isViewReady())); | |
} | |
}; | |
} | |
/** | |
* Returns a transformer that will delay onNext, onError and onComplete emissions unless a view | |
* become available. getView() is guaranteed to be != null during all emissions. This | |
* transformer can only be used on application's main thread. | |
* <p/> | |
* If this transformer receives a next value while the previous value has not been delivered, | |
* the previous value will be dropped. | |
* <p/> | |
* Use this operator when you need to show updatable data. | |
* | |
* @param <T> a type of onNext value. | |
* @return the delaying operator. | |
*/ | |
public <T> Observable.Transformer<T, T> deliverLatestToView() { | |
return new Observable.Transformer<T, T>() { | |
@Override | |
public Observable<T> call(Observable<T> observable) { | |
return observable.lift(OperatorSemaphore.<T>semaphoreLatest(isViewReady())); | |
} | |
}; | |
} | |
/** | |
* Returns a transformer that will delay onNext, onError and onComplete emissions unless a view | |
* become available. getView() is guaranteed to be != null during all emissions. This | |
* transformer can only be used on application's main thread. | |
* <p/> | |
* Use this operator if you need to deliver *all* emissions to a view, in example when you're | |
* sending items into adapter one by one. | |
* | |
* @param <T> a type of onNext value. | |
* @return the delaying operator. | |
*/ | |
public <T> Observable.Transformer<T, T> deliverToView() { | |
return new Observable.Transformer<T, T>() { | |
@Override | |
public Observable<T> call(Observable<T> observable) { | |
return observable.lift(OperatorSemaphore.<T>semaphore(isViewReady())); | |
} | |
}; | |
} | |
/** | |
* completes all observables of this presenter. Should be called when the view is about to die | |
* and will never come back. | |
* <p/> | |
* call this in {@link Fragment#onDestroy()} | |
* <p/> | |
* complete all {@link rx.Observer}, i.e. BehaviourSubjects with {@link Observer#onCompleted()} | |
* to unsubscribe all observers | |
*/ | |
public void destroy() { | |
mViewReady.onNext(false); | |
} | |
/** | |
* call sleep as the opposite of {@link #wakeUp()} to unsubscribe all observers listening to the | |
* UI observables of the view. Calling sleep in {@link Fragment#onDestroyView()} makes sense | |
* because observing a discarded view does not. | |
*/ | |
public void sleep() { | |
mViewReady.onNext(false); | |
// unsubscribe all UI subscriptions created in wakeUp() and added | |
// via manageViewSubscription(Subscription) | |
mUiSubscriptions.unsubscribe(); | |
// there is no reuse possible. recreation works fine | |
mUiSubscriptions = new CompositeSubscription(); | |
} | |
/** | |
* when calling wakeUp the presenter starts to observe the observables of the View. | |
* <p/> | |
* Call this in a Fragment after {@link Fragment#onCreateView(LayoutInflater, ViewGroup, | |
* Bundle)} and after you created and published all observables the presenter will use. At the | |
* end of {@link Fragment#onViewCreated(View, Bundle)} is an appropriate place. | |
*/ | |
public void wakeUp() { | |
mViewReady.onNext(true); | |
} | |
/** | |
* Observable of the view state. The View is ready to receive calls after calling {@link | |
* #wakeUp()} and before calling {@link #sleep()}. | |
*/ | |
private Observable<Boolean> isViewReady() { | |
return mViewReady.asObservable().distinctUntilChanged(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* usage of the RxPresenter in a Fragment | |
*/ | |
public class ViewFragment extends Fragment { | |
private RxPresenter mPresenter; | |
@Override | |
public void onAttach(final Activity activity) { | |
super.onAttach(activity); | |
if (mPresenter == null) { | |
mPresenter = new SomePresenter(this, activity.getApplicationContext()); | |
} | |
} | |
@Override | |
public void onDestroy() { | |
super.onDestroy(); | |
mPresenter.destroy(); | |
} | |
@Override | |
public void onDestroyView() { | |
super.onDestroyView(); | |
mPresenter.sleep(); | |
} | |
@Override | |
public void onViewCreated(final View view, final Bundle savedInstanceState) { | |
super.onViewCreated(view, savedInstanceState); | |
mPresenter.wakeUp(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment