Last active
September 26, 2016 15:15
-
-
Save austynmahoney/f440444477ad0ec3ca32 to your computer and use it in GitHub Desktop.
RxJava Refreshable Pattern
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public abstract class CachedRefreshable<P, T> extends Refreshable<P, T> { | |
protected abstract Observable<T> getSourceObservable(P parameters); | |
/** | |
* Return the Observable that gets data from a cached source. | |
* | |
* @return Observable from cache item, or null if the cache misses. | |
*/ | |
protected abstract Observable<T> getCachedObservable(P parameters); | |
/** | |
* When source data is acquired, this function will be used as a callback so that the acquired data can be cached. | |
*/ | |
protected abstract void updateCache(P parameters, T data); | |
/** | |
* When data comes from a miscellaneous source, this provides a mechanism to broadcast those changes to subscribers. | |
*/ | |
public void setValue(P parameters, T data) { | |
updateCache(parameters, data); | |
mInternalObserver.onNext(data); | |
} | |
/** | |
* @return Cache-enabled Observable without providing parameters. | |
*/ | |
public Observable<T> getObservable() { | |
return getObservable(null); | |
} | |
/** | |
* @param parameters Parameters for the action being observed by this Refreshable. | |
* @return Cache-enabled Observable with the specified parameters. | |
*/ | |
public Observable<T> getObservable(P parameters) { | |
return getObservable(parameters, true); | |
} | |
/** | |
* Provides an {@link Observable} that will never call {@link rx.Observer#onCompleted() Observer#onComplete}, leaving the subscription | |
* open all the time. <p>This allows all observers to always be subscribed, even if the underlying {@link Observable} changes (e.g. | |
* network request or cached data). | |
* | |
* @param parameters parameters Parameters for the action being performed by this Refreshable. This is used as a key to get the proper | |
* {@link Observable} from a map of possible options. | |
* @param allowCache Whether to check for a cached copy of the data being observed. | |
*/ | |
public Observable<T> getObservable(P parameters, boolean allowCache) { | |
final Observable<T> cachedObservable; | |
if (allowCache && (cachedObservable = getCachedObservable(parameters)) != null) { | |
//unsubscribe(); | |
// Use the cache | |
cachedObservable.subscribe(mInternalObserver); | |
} else { | |
Subscription subscription = mSourceSubscriptions.get(parameters); | |
if (subscription == null || subscription.isUnsubscribed()) { | |
// Subscribe to server | |
updateSubscription(parameters); | |
} | |
} | |
return mSubject; | |
} | |
@Override | |
protected void updateSubscription(final P parameters) { | |
Subscription subscription = getSourceObservable(parameters).doOnNext(new Action1<T>() { | |
@Override | |
public void call(T t) { | |
updateCache(parameters, t); | |
} | |
}).subscribeOn(Schedulers.io()) | |
.observeOn(AndroidSchedulers.mainThread()) | |
.subscribe(mInternalObserver); | |
mSourceSubscriptions.put(parameters, subscription); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Singleton | |
public class CurrentUserCachedRefreshable extends CachedRefreshable<Void, UserProfile> { | |
@Inject UserManager mUserManager; | |
@Override | |
protected boolean isCompletable() { | |
return true; | |
} | |
@Inject | |
public CurrentUserCachedRefreshable() { | |
} | |
@Override | |
protected Observable<UserProfile> getSourceObservable(Void parameters) { | |
return mWelbeService.getCurrentUser().map(new Func1<UserResponse, UserProfile>() { | |
@Override | |
public UserProfile call(UserResponse userResponse) { | |
return userResponse.userProfile; | |
} | |
}); | |
} | |
@Override | |
protected Observable<UserProfile> getCachedObservable(Void parameters) { | |
return mUserManager.getUserProfileObservable(); | |
} | |
@Override | |
protected void updateCache(Void parameters, UserProfile data) { | |
mUserManager.updateUserProfile(data); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class LogActivityRefreshable extends Refreshable<List<LogActivityRequest>, LoggingResponse> { | |
/* Retrofit web service interface */ | |
@Inject WebService mWebService; | |
@Inject CurrentUserRefreshable mCurrentUserRefreshable; | |
@Override | |
protected Observable<LoggingResponse> getSourceObservable(final List<LogActivityRequest> requests) { | |
return mCurrentUserRefreshable.getObservable().flatMap(new Func1<UserProfile, Observable<LoggingResponse>>() { | |
@Override | |
public Observable<LoggingResponse> call(UserProfile userProfile) { | |
// We need the user ID before we continue | |
List<Observable<LoggingResponse>> observables = new ArrayList<>(requests.size()); | |
// Create a Retrofit web service Observable for each request | |
for (LogActivityRequest request : requests) { | |
observables.add(mWebService.logActivity(userProfile.id, request)); | |
} | |
return Observable.merge(observables); | |
} | |
}); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public abstract class Refreshable<P, T> { | |
/** | |
* Holds subscriptions based per parameters. | |
*/ | |
protected Map<P, Subscription> mSourceSubscriptions = new HashMap<>(); | |
// internal observation | |
protected final Subject<T, T> mSubject = BehaviorSubject.create(); | |
protected final Observer<T> mInternalObserver = new Observer<T>() { | |
@Override | |
public void onCompleted() { | |
// internalObserver's job is to skip onComplete events. | |
} | |
@Override | |
public void onError(Throwable e) { | |
mSubject.onError(e); | |
} | |
@Override | |
public void onNext(T data) { | |
mSubject.onNext(data); | |
} | |
}; | |
/** | |
* Return the Observable that gets data from an original source, such as a network or database. Useful with Retrofit, where Observables | |
* are returned. | |
*/ | |
protected abstract Observable<T> getSourceObservable(P parameters); | |
/** | |
* @return Cache-enabled Observable without providing parameters. | |
*/ | |
public Observable<T> getObservable() { | |
return getObservable(null); | |
} | |
/** | |
* Provides an {@link Observable} that will never call {@link rx.Observer#onCompleted() Observer#onComplete}, leaving the subscription | |
* open all the time. <p>This allows all observers to always be subscribed, even if the underlying {@link Observable} changes (e.g. | |
* network request or cached data). | |
* | |
* @param parameters parameters Parameters for the action being performed by this Refreshable. This is used as a key to get the proper | |
* {@link Observable} from a map of possible options. | |
*/ | |
public Observable<T> getObservable(P parameters) { | |
getSourceObservable(parameters) | |
.subscribeOn(Schedulers.io()) | |
.observeOn(AndroidSchedulers.mainThread()) | |
.subscribe(mInternalObserver); | |
return mSubject; | |
} | |
public void refreshFromSource(P parameters) { | |
unsubscribe(parameters); | |
updateSubscription(parameters); | |
} | |
private void unsubscribe(P parameters) { | |
Subscription subscription = mSourceSubscriptions.get(parameters); | |
if (subscription != null && !subscription.isUnsubscribed()) { | |
subscription.unsubscribe(); | |
mSourceSubscriptions.remove(parameters); | |
} | |
} | |
protected void updateSubscription(final P parameters) { | |
Subscription subscription = getObservable(parameters) | |
.subscribeOn(Schedulers.io()) | |
.observeOn(AndroidSchedulers.mainThread()) | |
.subscribe(mInternalObserver); | |
mSourceSubscriptions.put(parameters, subscription); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Seems Refreshable is not full