Skip to content

Instantly share code, notes, and snippets.

@odbol
Created November 1, 2016 18:00
Show Gist options
  • Save odbol/00f299c7a6b418eb8e5781e5ec08b90e to your computer and use it in GitHub Desktop.
Save odbol/00f299c7a6b418eb8e5781e5ec08b90e to your computer and use it in GitHub Desktop.
A class for translating Android's BroadcastReceiver events into an RxJava Observable. Automatically unregisters the BroadcastReciever when you unsubscribe.
package com.odbol.rx;
import android.content.BroadcastReceiver;
import android.content.Context;
import android.content.Intent;
import android.content.IntentFilter;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Action0;
import rx.subscriptions.Subscriptions;
/**
* Creates an Observable that can respond to Android Broadcast intents.
* Handles registering and unregistering from the BroadcastReceiver for you.
*/
public abstract class RxBroadcastReceiver<T> implements Observable.OnSubscribe<T> {
private BroadcastReceiver mReceiver;
private final Context mContext;
private final IntentFilter mIntentFilter;
private Subscriber<? super T> mSubscriber;
public RxBroadcastReceiver(Context ctx, IntentFilter filter) {
mContext = ctx.getApplicationContext();
mIntentFilter = filter;
}
/***
* Convenience function to create an IntentFilter that filters for the given actions.
*
* @param ctx
* @param actions One or more actions to filter for.
*/
public RxBroadcastReceiver(Context ctx, String... actions) {
mContext = ctx.getApplicationContext();
mIntentFilter = new IntentFilter();
for (String action : actions) {
mIntentFilter.addAction(action);
}
}
/***
* Receives the broadcasts you registered for.
*
* You should call onNext, onCompleted, or onError from here.
*
* @param intent
*/
public abstract void onReceive(Intent intent);
/***
* Called when subscription starts. This is where you should start any initial processing to
* trigger broadcasts to be later received by onReceive().
*
*/
public abstract void onSubscribe();
/***
* Called when all listeners are unregistered and this object is going to be destroyed.
*/
protected void onUnregister() {
// no-op in default implementation
}
/***
* Call to report an error to all subscribers and then unregister all listeners.
*
* Must be called after onSubscribe().
* @param msg
*/
protected void onError(String msg) {
onError(new Exception(msg));
}
/***
* Call to report an error to all subscribers and then unregister all listeners.
*
* Must be called after onSubscribe().
*
* @param e The error to pass on.
*/
protected void onError(Throwable e) {
if (mSubscriber == null) {
throw new IllegalArgumentException("onError called before onSubscribe()");
}
onError(mSubscriber, e);
}
/***
* Call to report a result to all subscribers.
*
* Must be called after onSubscribe().
*
* @param result
*/
protected void onNext(T result) {
if (mSubscriber == null) {
throw new IllegalArgumentException("onNext called before onSubscribe()");
}
onNext(mSubscriber, result);
}
/***
* Call to report onCompleted to all subscribers and then unregister all listeners.
*
* Must be called after onSubscribe().
*/
protected void onCompleted() {
if (mSubscriber == null) {
throw new IllegalArgumentException("onCompleted called before onSubscribe()");
}
onCompleted(mSubscriber);
}
/***
* Call to report the FINAL result to all subscribers, send an onCompleted(),
* and unregister all listeners.
*
* Must be called after onSubscribe().
*
* @param result
*/
protected void onSuccess(T result) {
if (mSubscriber == null) {
throw new IllegalArgumentException("onSuccess called before onSubscribe()");
}
onNext(mSubscriber, result);
onCompleted(mSubscriber);
}
private void registerListener(final BroadcastReceiver rec) {
mReceiver = rec;
mContext.registerReceiver(mReceiver, mIntentFilter);
}
private void unregisterListener() {
if (mReceiver != null) {
mContext.unregisterReceiver(mReceiver);
mReceiver = null;
}
onUnregister();
}
private void onError(Subscriber<? super T> subscriber, Throwable e) {
//Log.e(TAG, e.getMessage());
if (!subscriber.isUnsubscribed()) {
subscriber.onError(e);
}
unregisterListener();
}
private void onNext(Subscriber<? super T> subscriber, T result) {
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(result);
}
}
private void onCompleted(Subscriber<? super T> subscriber) {
if (!subscriber.isUnsubscribed()) {
subscriber.onCompleted();
}
unregisterListener();
}
@Override
public void call(final Subscriber<? super T> subscriber) {
mSubscriber = subscriber;
registerListener(new BroadcastReceiver() {
@Override
public void onReceive(Context context, Intent intent) {
RxBroadcastReceiver.this.onReceive(intent);
}
});
// unregister when we've been unsubscribed from, so we don't leak listeners
subscriber.add(Subscriptions.create(new Action0() {
@Override
public void call() {
unregisterListener();
}
}));
onSubscribe();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment