Skip to content

Instantly share code, notes, and snippets.

@pommedeterresautee
Last active April 11, 2017 15:10
Show Gist options
  • Save pommedeterresautee/6752846 to your computer and use it in GitHub Desktop.
Save pommedeterresautee/6752846 to your computer and use it in GitHub Desktop.
Here is a simple implementation of a wrapper to execute Observable in a dedicated Fragment. The main purpose is to manage screen rotation during the Async execution of an Observable. In my application several Activities implement the Observer Interface, without Fragment, so this implementation is built with that in mind. Of course, it can be upd…
package com.pommedeterresautee.rxtest;
import android.content.Intent;
import android.os.Bundle;
import android.app.Activity;
import android.view.Menu;
import android.widget.TextView;
import rx.Observer;
public class Activity1 extends Activity implements Observer<String> {
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.act);
((TextView)findViewById(R.id.main)).setText("Activity 1");
ObservableFragmentWrapper.register(this, new FunctionDumb());
}
@Override
public void onCompleted() {
((TextView)findViewById(R.id.main)).setText("Finished");
}
@Override
public void onError(Throwable throwable) {
((TextView)findViewById(R.id.main)).setText(throwable.getLocalizedMessage());
}
@Override
public void onNext(String s) {
((TextView)findViewById(R.id.main)).setText(s);
}
}
package com.pommedeterresautee.rxtest;
import com.google.common.base.Charsets;
import com.google.common.hash.Hashing;
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.subscriptions.Subscriptions;
public class FunctionDumb implements Observable.OnSubscribeFunc<String> {
@Override
public Subscription onSubscribe(Observer<? super String> observer) {
for (int i =0 ; i < 10 ; i++){
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
observer.onError(e);
}
observer.onNext("Iteration n" + String.valueOf(i));
}
return Subscriptions.empty();
}
}
package com.pommedeterresautee.rxtest;
import android.app.Activity;
import android.app.Fragment;
import android.app.FragmentManager;
import android.os.Bundle;
import java.lang.ref.WeakReference;
import java.util.LinkedList;
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.android.concurrency.AndroidSchedulers;
import rx.concurrency.Schedulers;
/**
* Helper class to manage an Observable during the rotation of the screen.
*
* @param <T> Type of the result.
*/
public class ObservableFragmentWrapper<T> extends Fragment {
private static final String TAG = "observable_task";
private WeakReference<Observer<T>> mCallbacks;
private Observable.OnSubscribeFunc<T> mObservable;
private LinkedList<T> mCache = new LinkedList<T>();
private Throwable mCacheError;
private boolean mCompleted = false;
private Subscription mSubscription;
/**
* Register an observable to make it survive a rotation screen.
*
* @param activity an activity implementing Observer interface
* @param function a task to execute
* @param <T2> the type of result
* @return the fragment wrapping the task
*/
public static <T2> ObservableFragmentWrapper<T2> register(Activity activity, Observable.OnSubscribeFunc<T2> function) {
FragmentManager fm = activity.getFragmentManager();
ObservableFragmentWrapper<T2> mObservableFragmentWrapper = (ObservableFragmentWrapper<T2>) fm.findFragmentByTag(TAG);
if (mObservableFragmentWrapper == null) {
mObservableFragmentWrapper = new ObservableFragmentWrapper<T2>(function);
fm
.beginTransaction()
.add(mObservableFragmentWrapper, TAG)
.commit();
mObservableFragmentWrapper.startTask();
}
return mObservableFragmentWrapper;
}
public static <T2> ObservableFragmentWrapper<T2> getFragment(Activity activity) {
return (ObservableFragmentWrapper<T2>) activity
.getFragmentManager()
.findFragmentByTag(TAG);
}
public ObservableFragmentWrapper(Observable.OnSubscribeFunc<T> obs) {
this.mObservable = obs;
}
public Subscription getSubscription() {
return mSubscription;
}
public Observable.OnSubscribeFunc<T> getObservable() {
return mObservable;
}
/**
* Will be called at the first execution or at each rotation. *
*
* @param activity
*/
@Override
public void onAttach(Activity activity) {
super.onAttach(activity);
if (activity instanceof Observer) {
mCallbacks = new WeakReference<Observer<T>>((Observer<T>) activity);
while (mCache.size() > 0 && isCallBackNotNull()) {
mCallbacks.get().onNext(mCache.poll());
}
if (mCacheError != null && isCallBackNotNull()) {
mCallbacks.get().onError(mCacheError);
} else if (mCompleted && isCallBackNotNull()) {
mCallbacks.get().onCompleted();
}
} else {
throw new IllegalArgumentException("The Activity " + activity.getClass().getSimpleName() + " doesn't implement the Observer interface.");
}
}
/**
* This method will only be called once when the retained
* Fragment is first created.
*/
@Override
public void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setRetainInstance(true);
}
@Override
public void onDetach() {
super.onDetach();
mCallbacks = null;
}
private void startTask() {
mSubscription = Observable
.create(mObservable)
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new ProxyObserver());
}
private boolean isCallBackNotNull(){
return mCallbacks != null && mCallbacks.get() != null;
}
private class ProxyObserver implements Observer<T> {
@Override
public void onCompleted() {
if (isCallBackNotNull()) {
mCallbacks.get().onCompleted();
} else {
mCompleted = true;
}
}
@Override
public void onError(Throwable throwable) {
if (isCallBackNotNull()) {
mCallbacks.get().onError(throwable);
} else {
mCacheError = throwable;
}
}
@Override
public void onNext(T o) {
if (mCallbacks != null && mCache.size() == 0) {
mCallbacks.get().onNext(o);
} else {
mCache.push(o);
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment