Skip to content

Instantly share code, notes, and snippets.

@zhEdward
Last active December 23, 2016 03:02
Show Gist options
  • Save zhEdward/ff76772896c2382ea4421be76ac5498c to your computer and use it in GitHub Desktop.
Save zhEdward/ff76772896c2382ea4421be76ac5498c to your computer and use it in GitHub Desktop.
使用 ReactiveX实现 EventBus 事件总线
package which.package.the.DemoActivity.located;
import android.app.Activity;
import android.os.Bundle;
import android.os.PersistableBundle;
import rx.Observer;
import rx.Subscription;
import rx.android.schedulers.AndroidSchedulers;
import rx.functions.Action1;
import rx.subscriptions.CompositeSubscription;
public class DemoActivity extends Activity {
//defind an observable
public static class DummyEvent {
int var1;
String var2;
}
@Override
public void onCreate(Bundle savedInstanceState, PersistableBundle persistentState) {
super.onCreate(savedInstanceState, persistentState);
//emit observable item to target subscriber
RxBus.getDefault().send(new DummyEvent());
//do a rx to reciver this emit data
//pattern1
Subscription s1 = RxBus.getDefault().asObservable()
.observeOn(AndroidSchedulers.mainThread())//spec for android
.subscribe(new Observer<Object>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Object o) {
//do something
}
});
//pattern2
Subscription s2 = RxBus.getDefault().asObservable()
.observeOn(AndroidSchedulers.mainThread()).//spec for android
ofType(DummyEvent.class).subscribe(new Action1<DummyEvent>() {
@Override
public void call(DummyEvent dummyEvent) {
//do something
}
});
//pattern-lamda equal pattern2(need compile with java)
Subscription s3 = RxBus.getDefault().asObservable()
//.observeOn(AndroidSchedulers.mainThread()) spec for android
.subscribe(dummy -> {
}
//do something
);
//onDestory you should unSubscribe all of observables
s1.unsubscribe();
s2.unsubscribe();
s3.unsubscribe();
//or use wrapper subscription
CompositeSubscription rxSubscription = new CompositeSubscription(s1, s2, s3);
//rxSubscription.add(s1);
//rxSubscription.add(s2);
//rxSubscription.add(s3);
//call in onDestory() or when not use it
rxSubscription.clear();
}
}
package which.package.the.Rxbus.located;
import com.jakewharton.rxrelay.PublishRelay;
import com.jakewharton.rxrelay.Relay;
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.android.schedulers.AndroidSchedulers;
import rx.functions.Action1;
import rx.subjects.PublishSubject;
import rx.subjects.SerializedSubject;
import rx.subjects.Subject;
import rx.subscriptions.CompositeSubscription;
/**
* courtesy: https://gist.github.com/benjchristensen/04eef9ca0851f3a5d7bf
* <p>
* fork from https://github.com/kaushikgopal/RxJava-Android-Samples/blob/master/app/src/main/java/com/morihacky/android/rxjava/rxbus/RxBus.java
*/
public class RxBus {
/**
* 使用来自 com.jakewharton.rxrelay library jar 构建的线程安全的 observabler
* <p>
* add compile 'com.jakewharton.rxrelay:rxrelay:1.2.0' in you prokect-level gradle
*/
@Deprecated
private final Relay<Object, Object> _busRy = PublishRelay.create().toSerialized();
/**
* PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者
* PublishSubject 有一个risk:
* 在Subject被创建后到有观察者订阅它之前这个时间段内,一个
* 或多个数据可能会丢失。如果要确保来自原始Observable的所有数据都被分发,你需要这样
* 做:或者使用Create创建那个Observable以便手动给它引入"冷"Observable的行为( 当所有观
* 察者都已经订阅时才开始发射数据) ,或者改用ReplaySubject
*
* 在实际中如果只是实现EventBus的功能,其实没必要使用 PublishRelay
*/
private final Subject<Object, Object> _bus = new SerializedSubject<>(PublishSubject.create());
private static volatile RxBus defaultInstance;
public static RxBus getDefault() {
if (defaultInstance == null) {
synchronized (RxBus.class) {
if (defaultInstance == null) {
defaultInstance = new RxBus();
}
}
}
return defaultInstance;
}
@Deprecated
public void sendRy(Object o) {
_busRy.call(o);
}
/**
* 做为可观察对象
*
* @return
*/
@Deprecated
public Observable<Object> asRyObservable() {
return _busRy.asObservable();
}
@Deprecated
public boolean hasRyObservers() {
return _busRy.hasObservers();
}
//--------------------------------------使用 rxjava jar 中实现 rxbus
public void send(Object o) {
_bus.onNext(o);
}
public Observable<Object> asObservable() {
return _bus.asObservable();
}
public boolean hasObserver() {
return _bus.hasObservers();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment