Skip to content

Instantly share code, notes, and snippets.

@murano500k
Last active March 29, 2017 09:58
Show Gist options
  • Save murano500k/f7efee9a093c038c2a538d9adc3f309f to your computer and use it in GitHub Desktop.
Save murano500k/f7efee9a093c038c2a538d9adc3f309f to your computer and use it in GitHub Desktop.
RxAndroid_samples
{
BackpressureMode.NONE,
BackpressureMode.ERROR,
BackpressureMode.BUFFER,
BackpressureMode.DROP,
BackpressureMode.LATEST
}
/*
To address the first issue from our change, we can make use of any of the blocking operators available to an Observable such as blockingFirst() and blockingNext(). Essentially, both of these operators will block until an item is emitted downstream: blockingFirst() will return the first element emitted and finish, whereas blockingNext() will return an Iterable which allows you to perform a for-each loop on the underlying data (each iteration through the loop will block).
A side-effect of using a blocking operation that is important to be aware of, though, is that exceptions are thrown on the calling thread rather than being passed to an observer’s onError() method.
Using a blocking operator to change the method signature back to a List<User>, our snippet would now look like this:
*/
public List<User> getUsersWithBlogs() {
return Observable.fromIterable(UserCache.getAllUsers())
.filter(user -> user.blog != null && !user.blog.isEmpty())
.sorted((user1, user2) -> user1.name.compareTo(user2.name))
.toList()
.blockingGet();
}
//From:
apiClient.updateMyData(myUpdatedData)
.subscribe(myData -> {
// handle data fetched successfully and API call completed
}, throwable -> {
// handle error event
}, () -> {
// handle completion - what we actually care about
});
//To:
apiClient.updateMyData(myUpdatedData)
.subscribe(() -> {
// handle completion
}, throwable -> {
// handle error
});
//Or:
apiClient.updateMyData(myUpdatedData)
.andThen(performOtherOperation()) // a Single<OtherResult>
.subscribe(otherResult -> {
// handle otherResult
}, throwable -> {
// handle error
});
public Observable<Boolean> enablePushNotifications(boolean enable) {
return Observable.fromCallable(() -> sharedPrefs
.edit()
.putBoolean(KEY_PUSH_NOTIFICATIONS_PREFS, enable)
.commit());
}
import android.bluetooth.le.BluetoothLeScanner;
import android.bluetooth.le.ScanCallback;
import android.bluetooth.le.ScanResult;
import android.support.annotation.NonNull;
import rx.Emitter;
import rx.Observable;
import java.util.List;
public class RxBluetoothScanner {
public static class ScanResultException extends RuntimeException {
public ScanResultException(int errorCode) {
super("Bluetooth scan failed. Error code: " + errorCode);
}
}
private RxBluetoothScanner() {
}
@NonNull
public static Observable<ScanResult> scan(@NonNull final BluetoothLeScanner scanner) {
return Observable.fromEmitter(scanResultEmitter -> {
final ScanCallback scanCallback = new ScanCallback() {
@Override
public void onScanResult(int callbackType, @NonNull ScanResult result) {
scanResultEmitter.onNext(result);
}
@Override
public void onBatchScanResults(@NonNull List<ScanResult> results) {
for (ScanResult r : results) {
scanResultEmitter.onNext(r);
}
}
@Override
public void onScanFailed(int errorCode) {
scanResultEmitter.onError(new ScanResultException(errorCode));
}
};
scanResultEmitter.setCancellation(() -> scanner.stopScan(scanCallback));
scanner.startScan(scanCallback);
}, Emitter.BackpressureMode.BUFFER);
}
}
//From:
public List<User> getUsersWithBlogs() {
final List<User> allUsers = UserCache.getAllUsers();
final List<User> usersWithBlogs = new ArrayList<>();
for (User user : allUsers) {
if (user.blog != null && !user.blog.isEmpty()) {
usersWithBlogs.add(user);
}
}
Collections.sort(usersWithBlogs, (user1, user2) -> user1.name.compareTo(user2.name));
return usersWithBlogs;
}
//To:
public Observable<User> getUsersWithBlogs() {
return Observable.fromIterable(UserCache.getAllUsers())
.filter(user -> user.blog != null && !user.blog.isEmpty())
.sorted((user1, user2) -> user1.name.compareTo(user2.name));
}
public Observable<SearchResult> search(@NotNull EditText searchView) {
return RxTextView.textChanges(searchView) // In production, share this text view observable, don't create a new one each time
.map(CharSequence::toString)
.debounce(500, TimeUnit.MILLISECONDS) // Avoid getting spammed with key stroke changes
.filter(s -> s.length() > 1) // Only interested in queries of length greater than 1
.observeOn(workerScheduler) // Next set of operations will be network so switch to an IO Scheduler (or worker)
.switchMap(query -> searchService.query(query)) // Take the latest observable from upstream and unsubscribe from any previous subscriptions
.onErrorResumeNext(Observable.empty()); // <-- This will terminate upstream (ie. we will stop receiving text view changes after an error!)
}
public Observable<SearchResult> search(@NotNull EditText searchView) {
return RxTextView.textChanges(searchView) // In production, share this text view observable, don't create a new one each time
.map(CharSequence::toString)
.debounce(500, TimeUnit.MILLISECONDS) // Avoid getting spammed with key stroke changes
.filter(s -> s.length() > 1) // Only interested in queries of length greater than 1
.observeOn(workerScheduler) // Next set of operations will be network so switch to an IO Scheduler (or worker)
.switchMap(query -> searchService.query(query) // Take the latest observable from upstream and unsubscribe from any previous subscriptions
.onErrorResumeNext(Observable.empty()); // <-- This fixes the problem since the error is not seen by the upstream observable
}
public void touchEventHandler(@NotNull View view) {
final ConnectedObservable<MotionEvent> motionEventObservable = RxView.touches(view).publish();
// Capture down events
final Observable<MotionEvent> downEventsObservable = motionEventObservable
.filter(event -> event.getAction() == MotionEvent.ACTION_DOWN);
// Capture up events
final Observable<MotionEvent> upEventsObservable = motionEventObservable
.filter(event -> event.getAction() == MotionEvent.ACTION_UP);
// Show a red circle at the position where the down event ocurred
subscriptions.add(downEventsObservable.subscribe(event ->
view.showCircle(event.getX(), event.getY(), Color.RED)));
// Show a blue circle at the position where the up event ocurred
subscriptions.add(upEventsObservable.subscribe(event ->
view.showCircle(event.getX(), event.getY(), Color.BLUE)));
// Connect the source observable to begin emitting events
subscriptions.add(motionEventObservable.connect());
}
public void touchEventHandler(@NotNull View view) {
final Observable<MotionEvent> motionEventObservable = RxView.touches(view).share();
// Capture down events
final Observable<MotionEvent> downEventsObservable = motionEventObservable
.filter(event -> event.getAction() == MotionEvent.ACTION_DOWN);
// Capture up events
final Observable<MotionEvent> upEventsObservable = motionEventObservable
.filter(event -> event.getAction() == MotionEvent.ACTION_UP);
// Show a red circle at the position where the down event ocurred
subscriptions.add(downEventsObservable.subscribe(event ->
view.showCircle(event.getX(), event.getY(), Color.RED)));
// Show a blue circle at the position where the up event ocurred
subscriptions.add(upEventsObservable.subscribe(event ->
view.showCircle(event.getX(), event.getY(), Color.BLUE)));
}
//From:
apiClient.getMyData()
.subscribe(myData -> {
// handle data fetched successfully
}, throwable -> {
// handle error event
}, () -> {
// handle on complete event
});
//To:
apiClient.getMyData()
.subscribe(myData -> {
// handle data fetched successfully and API call completed
}, throwable -> {
// handle error event
});
public Observable<byte[]> readFile(@NonNull FileInputStream stream) {
final SyncOnSubscribe<FileInputStream, byte[]> fileReader = SyncOnSubscribe.createStateful(
() -> stream,
(stream, output) -> {
try {
final byte[] buffer = new byte[BUFFER_SIZE];
int count = stream.read(buffer);
if (count < 0) {
output.onCompleted();
} else {
output.onNext(buffer);
}
} catch (IOException error) {
output.onError(error);
}
return stream;
},
s -> IOUtil.closeSilently(s));
return Observable.create(fileReader);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment