Skip to content

Instantly share code, notes, and snippets.

@austynmahoney
Last active September 26, 2016 15:15
Show Gist options
  • Save austynmahoney/f440444477ad0ec3ca32 to your computer and use it in GitHub Desktop.
Save austynmahoney/f440444477ad0ec3ca32 to your computer and use it in GitHub Desktop.
RxJava Refreshable Pattern
public abstract class CachedRefreshable<P, T> extends Refreshable<P, T> {
protected abstract Observable<T> getSourceObservable(P parameters);
/**
* Return the Observable that gets data from a cached source.
*
* @return Observable from cache item, or null if the cache misses.
*/
protected abstract Observable<T> getCachedObservable(P parameters);
/**
* When source data is acquired, this function will be used as a callback so that the acquired data can be cached.
*/
protected abstract void updateCache(P parameters, T data);
/**
* When data comes from a miscellaneous source, this provides a mechanism to broadcast those changes to subscribers.
*/
public void setValue(P parameters, T data) {
updateCache(parameters, data);
mInternalObserver.onNext(data);
}
/**
* @return Cache-enabled Observable without providing parameters.
*/
public Observable<T> getObservable() {
return getObservable(null);
}
/**
* @param parameters Parameters for the action being observed by this Refreshable.
* @return Cache-enabled Observable with the specified parameters.
*/
public Observable<T> getObservable(P parameters) {
return getObservable(parameters, true);
}
/**
* Provides an {@link Observable} that will never call {@link rx.Observer#onCompleted() Observer#onComplete}, leaving the subscription
* open all the time. <p>This allows all observers to always be subscribed, even if the underlying {@link Observable} changes (e.g.
* network request or cached data).
*
* @param parameters parameters Parameters for the action being performed by this Refreshable. This is used as a key to get the proper
* {@link Observable} from a map of possible options.
* @param allowCache Whether to check for a cached copy of the data being observed.
*/
public Observable<T> getObservable(P parameters, boolean allowCache) {
final Observable<T> cachedObservable;
if (allowCache && (cachedObservable = getCachedObservable(parameters)) != null) {
//unsubscribe();
// Use the cache
cachedObservable.subscribe(mInternalObserver);
} else {
Subscription subscription = mSourceSubscriptions.get(parameters);
if (subscription == null || subscription.isUnsubscribed()) {
// Subscribe to server
updateSubscription(parameters);
}
}
return mSubject;
}
@Override
protected void updateSubscription(final P parameters) {
Subscription subscription = getSourceObservable(parameters).doOnNext(new Action1<T>() {
@Override
public void call(T t) {
updateCache(parameters, t);
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(mInternalObserver);
mSourceSubscriptions.put(parameters, subscription);
}
}
@Singleton
public class CurrentUserCachedRefreshable extends CachedRefreshable<Void, UserProfile> {
@Inject UserManager mUserManager;
@Override
protected boolean isCompletable() {
return true;
}
@Inject
public CurrentUserCachedRefreshable() {
}
@Override
protected Observable<UserProfile> getSourceObservable(Void parameters) {
return mWelbeService.getCurrentUser().map(new Func1<UserResponse, UserProfile>() {
@Override
public UserProfile call(UserResponse userResponse) {
return userResponse.userProfile;
}
});
}
@Override
protected Observable<UserProfile> getCachedObservable(Void parameters) {
return mUserManager.getUserProfileObservable();
}
@Override
protected void updateCache(Void parameters, UserProfile data) {
mUserManager.updateUserProfile(data);
}
}
public class LogActivityRefreshable extends Refreshable<List<LogActivityRequest>, LoggingResponse> {
/* Retrofit web service interface */
@Inject WebService mWebService;
@Inject CurrentUserRefreshable mCurrentUserRefreshable;
@Override
protected Observable<LoggingResponse> getSourceObservable(final List<LogActivityRequest> requests) {
return mCurrentUserRefreshable.getObservable().flatMap(new Func1<UserProfile, Observable<LoggingResponse>>() {
@Override
public Observable<LoggingResponse> call(UserProfile userProfile) {
// We need the user ID before we continue
List<Observable<LoggingResponse>> observables = new ArrayList<>(requests.size());
// Create a Retrofit web service Observable for each request
for (LogActivityRequest request : requests) {
observables.add(mWebService.logActivity(userProfile.id, request));
}
return Observable.merge(observables);
}
});
}
}
public abstract class Refreshable<P, T> {
/**
* Holds subscriptions based per parameters.
*/
protected Map<P, Subscription> mSourceSubscriptions = new HashMap<>();
// internal observation
protected final Subject<T, T> mSubject = BehaviorSubject.create();
protected final Observer<T> mInternalObserver = new Observer<T>() {
@Override
public void onCompleted() {
// internalObserver's job is to skip onComplete events.
}
@Override
public void onError(Throwable e) {
mSubject.onError(e);
}
@Override
public void onNext(T data) {
mSubject.onNext(data);
}
};
/**
* Return the Observable that gets data from an original source, such as a network or database. Useful with Retrofit, where Observables
* are returned.
*/
protected abstract Observable<T> getSourceObservable(P parameters);
/**
* @return Cache-enabled Observable without providing parameters.
*/
public Observable<T> getObservable() {
return getObservable(null);
}
/**
* Provides an {@link Observable} that will never call {@link rx.Observer#onCompleted() Observer#onComplete}, leaving the subscription
* open all the time. <p>This allows all observers to always be subscribed, even if the underlying {@link Observable} changes (e.g.
* network request or cached data).
*
* @param parameters parameters Parameters for the action being performed by this Refreshable. This is used as a key to get the proper
* {@link Observable} from a map of possible options.
*/
public Observable<T> getObservable(P parameters) {
getSourceObservable(parameters)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(mInternalObserver);
return mSubject;
}
public void refreshFromSource(P parameters) {
unsubscribe(parameters);
updateSubscription(parameters);
}
private void unsubscribe(P parameters) {
Subscription subscription = mSourceSubscriptions.get(parameters);
if (subscription != null && !subscription.isUnsubscribed()) {
subscription.unsubscribe();
mSourceSubscriptions.remove(parameters);
}
}
protected void updateSubscription(final P parameters) {
Subscription subscription = getObservable(parameters)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(mInternalObserver);
mSourceSubscriptions.put(parameters, subscription);
}
}
@Kolyall
Copy link

Kolyall commented Sep 26, 2016

Seems Refreshable is not full

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment