Last active
April 11, 2017 15:10
-
-
Save pommedeterresautee/6752846 to your computer and use it in GitHub Desktop.
Here is a simple implementation of a wrapper to execute Observable in a dedicated Fragment.
The main purpose is to manage screen rotation during the Async execution of an Observable.
In my application several Activities implement the Observer Interface, without Fragment, so this implementation is built with that in mind. Of course, it can be upd…
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.pommedeterresautee.rxtest; | |
import android.content.Intent; | |
import android.os.Bundle; | |
import android.app.Activity; | |
import android.view.Menu; | |
import android.widget.TextView; | |
import rx.Observer; | |
public class Activity1 extends Activity implements Observer<String> { | |
@Override | |
protected void onCreate(Bundle savedInstanceState) { | |
super.onCreate(savedInstanceState); | |
setContentView(R.layout.act); | |
((TextView)findViewById(R.id.main)).setText("Activity 1"); | |
ObservableFragmentWrapper.register(this, new FunctionDumb()); | |
} | |
@Override | |
public void onCompleted() { | |
((TextView)findViewById(R.id.main)).setText("Finished"); | |
} | |
@Override | |
public void onError(Throwable throwable) { | |
((TextView)findViewById(R.id.main)).setText(throwable.getLocalizedMessage()); | |
} | |
@Override | |
public void onNext(String s) { | |
((TextView)findViewById(R.id.main)).setText(s); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.pommedeterresautee.rxtest; | |
import com.google.common.base.Charsets; | |
import com.google.common.hash.Hashing; | |
import rx.Observable; | |
import rx.Observer; | |
import rx.Subscription; | |
import rx.subscriptions.Subscriptions; | |
public class FunctionDumb implements Observable.OnSubscribeFunc<String> { | |
@Override | |
public Subscription onSubscribe(Observer<? super String> observer) { | |
for (int i =0 ; i < 10 ; i++){ | |
try { | |
Thread.sleep(2000); | |
} catch (InterruptedException e) { | |
observer.onError(e); | |
} | |
observer.onNext("Iteration n" + String.valueOf(i)); | |
} | |
return Subscriptions.empty(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.pommedeterresautee.rxtest; | |
import android.app.Activity; | |
import android.app.Fragment; | |
import android.app.FragmentManager; | |
import android.os.Bundle; | |
import java.lang.ref.WeakReference; | |
import java.util.LinkedList; | |
import rx.Observable; | |
import rx.Observer; | |
import rx.Subscription; | |
import rx.android.concurrency.AndroidSchedulers; | |
import rx.concurrency.Schedulers; | |
/** | |
* Helper class to manage an Observable during the rotation of the screen. | |
* | |
* @param <T> Type of the result. | |
*/ | |
public class ObservableFragmentWrapper<T> extends Fragment { | |
private static final String TAG = "observable_task"; | |
private WeakReference<Observer<T>> mCallbacks; | |
private Observable.OnSubscribeFunc<T> mObservable; | |
private LinkedList<T> mCache = new LinkedList<T>(); | |
private Throwable mCacheError; | |
private boolean mCompleted = false; | |
private Subscription mSubscription; | |
/** | |
* Register an observable to make it survive a rotation screen. | |
* | |
* @param activity an activity implementing Observer interface | |
* @param function a task to execute | |
* @param <T2> the type of result | |
* @return the fragment wrapping the task | |
*/ | |
public static <T2> ObservableFragmentWrapper<T2> register(Activity activity, Observable.OnSubscribeFunc<T2> function) { | |
FragmentManager fm = activity.getFragmentManager(); | |
ObservableFragmentWrapper<T2> mObservableFragmentWrapper = (ObservableFragmentWrapper<T2>) fm.findFragmentByTag(TAG); | |
if (mObservableFragmentWrapper == null) { | |
mObservableFragmentWrapper = new ObservableFragmentWrapper<T2>(function); | |
fm | |
.beginTransaction() | |
.add(mObservableFragmentWrapper, TAG) | |
.commit(); | |
mObservableFragmentWrapper.startTask(); | |
} | |
return mObservableFragmentWrapper; | |
} | |
public static <T2> ObservableFragmentWrapper<T2> getFragment(Activity activity) { | |
return (ObservableFragmentWrapper<T2>) activity | |
.getFragmentManager() | |
.findFragmentByTag(TAG); | |
} | |
public ObservableFragmentWrapper(Observable.OnSubscribeFunc<T> obs) { | |
this.mObservable = obs; | |
} | |
public Subscription getSubscription() { | |
return mSubscription; | |
} | |
public Observable.OnSubscribeFunc<T> getObservable() { | |
return mObservable; | |
} | |
/** | |
* Will be called at the first execution or at each rotation. * | |
* | |
* @param activity | |
*/ | |
@Override | |
public void onAttach(Activity activity) { | |
super.onAttach(activity); | |
if (activity instanceof Observer) { | |
mCallbacks = new WeakReference<Observer<T>>((Observer<T>) activity); | |
while (mCache.size() > 0 && isCallBackNotNull()) { | |
mCallbacks.get().onNext(mCache.poll()); | |
} | |
if (mCacheError != null && isCallBackNotNull()) { | |
mCallbacks.get().onError(mCacheError); | |
} else if (mCompleted && isCallBackNotNull()) { | |
mCallbacks.get().onCompleted(); | |
} | |
} else { | |
throw new IllegalArgumentException("The Activity " + activity.getClass().getSimpleName() + " doesn't implement the Observer interface."); | |
} | |
} | |
/** | |
* This method will only be called once when the retained | |
* Fragment is first created. | |
*/ | |
@Override | |
public void onCreate(Bundle savedInstanceState) { | |
super.onCreate(savedInstanceState); | |
setRetainInstance(true); | |
} | |
@Override | |
public void onDetach() { | |
super.onDetach(); | |
mCallbacks = null; | |
} | |
private void startTask() { | |
mSubscription = Observable | |
.create(mObservable) | |
.subscribeOn(Schedulers.newThread()) | |
.observeOn(AndroidSchedulers.mainThread()) | |
.subscribe(new ProxyObserver()); | |
} | |
private boolean isCallBackNotNull(){ | |
return mCallbacks != null && mCallbacks.get() != null; | |
} | |
private class ProxyObserver implements Observer<T> { | |
@Override | |
public void onCompleted() { | |
if (isCallBackNotNull()) { | |
mCallbacks.get().onCompleted(); | |
} else { | |
mCompleted = true; | |
} | |
} | |
@Override | |
public void onError(Throwable throwable) { | |
if (isCallBackNotNull()) { | |
mCallbacks.get().onError(throwable); | |
} else { | |
mCacheError = throwable; | |
} | |
} | |
@Override | |
public void onNext(T o) { | |
if (mCallbacks != null && mCache.size() == 0) { | |
mCallbacks.get().onNext(o); | |
} else { | |
mCache.push(o); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment