Last active
March 29, 2017 09:58
-
-
Save murano500k/f7efee9a093c038c2a538d9adc3f309f to your computer and use it in GitHub Desktop.
RxAndroid_samples
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
BackpressureMode.NONE, | |
BackpressureMode.ERROR, | |
BackpressureMode.BUFFER, | |
BackpressureMode.DROP, | |
BackpressureMode.LATEST | |
} | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
To address the first issue from our change, we can make use of any of the blocking operators available to an Observable such as blockingFirst() and blockingNext(). Essentially, both of these operators will block until an item is emitted downstream: blockingFirst() will return the first element emitted and finish, whereas blockingNext() will return an Iterable which allows you to perform a for-each loop on the underlying data (each iteration through the loop will block). | |
A side-effect of using a blocking operation that is important to be aware of, though, is that exceptions are thrown on the calling thread rather than being passed to an observer’s onError() method. | |
Using a blocking operator to change the method signature back to a List<User>, our snippet would now look like this: | |
*/ | |
public List<User> getUsersWithBlogs() { | |
return Observable.fromIterable(UserCache.getAllUsers()) | |
.filter(user -> user.blog != null && !user.blog.isEmpty()) | |
.sorted((user1, user2) -> user1.name.compareTo(user2.name)) | |
.toList() | |
.blockingGet(); | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//From: | |
apiClient.updateMyData(myUpdatedData) | |
.subscribe(myData -> { | |
// handle data fetched successfully and API call completed | |
}, throwable -> { | |
// handle error event | |
}, () -> { | |
// handle completion - what we actually care about | |
}); | |
//To: | |
apiClient.updateMyData(myUpdatedData) | |
.subscribe(() -> { | |
// handle completion | |
}, throwable -> { | |
// handle error | |
}); | |
//Or: | |
apiClient.updateMyData(myUpdatedData) | |
.andThen(performOtherOperation()) // a Single<OtherResult> | |
.subscribe(otherResult -> { | |
// handle otherResult | |
}, throwable -> { | |
// handle error | |
}); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public Observable<Boolean> enablePushNotifications(boolean enable) { | |
return Observable.fromCallable(() -> sharedPrefs | |
.edit() | |
.putBoolean(KEY_PUSH_NOTIFICATIONS_PREFS, enable) | |
.commit()); | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import android.bluetooth.le.BluetoothLeScanner; | |
import android.bluetooth.le.ScanCallback; | |
import android.bluetooth.le.ScanResult; | |
import android.support.annotation.NonNull; | |
import rx.Emitter; | |
import rx.Observable; | |
import java.util.List; | |
public class RxBluetoothScanner { | |
public static class ScanResultException extends RuntimeException { | |
public ScanResultException(int errorCode) { | |
super("Bluetooth scan failed. Error code: " + errorCode); | |
} | |
} | |
private RxBluetoothScanner() { | |
} | |
@NonNull | |
public static Observable<ScanResult> scan(@NonNull final BluetoothLeScanner scanner) { | |
return Observable.fromEmitter(scanResultEmitter -> { | |
final ScanCallback scanCallback = new ScanCallback() { | |
@Override | |
public void onScanResult(int callbackType, @NonNull ScanResult result) { | |
scanResultEmitter.onNext(result); | |
} | |
@Override | |
public void onBatchScanResults(@NonNull List<ScanResult> results) { | |
for (ScanResult r : results) { | |
scanResultEmitter.onNext(r); | |
} | |
} | |
@Override | |
public void onScanFailed(int errorCode) { | |
scanResultEmitter.onError(new ScanResultException(errorCode)); | |
} | |
}; | |
scanResultEmitter.setCancellation(() -> scanner.stopScan(scanCallback)); | |
scanner.startScan(scanCallback); | |
}, Emitter.BackpressureMode.BUFFER); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//From: | |
public List<User> getUsersWithBlogs() { | |
final List<User> allUsers = UserCache.getAllUsers(); | |
final List<User> usersWithBlogs = new ArrayList<>(); | |
for (User user : allUsers) { | |
if (user.blog != null && !user.blog.isEmpty()) { | |
usersWithBlogs.add(user); | |
} | |
} | |
Collections.sort(usersWithBlogs, (user1, user2) -> user1.name.compareTo(user2.name)); | |
return usersWithBlogs; | |
} | |
//To: | |
public Observable<User> getUsersWithBlogs() { | |
return Observable.fromIterable(UserCache.getAllUsers()) | |
.filter(user -> user.blog != null && !user.blog.isEmpty()) | |
.sorted((user1, user2) -> user1.name.compareTo(user2.name)); | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public Observable<SearchResult> search(@NotNull EditText searchView) { | |
return RxTextView.textChanges(searchView) // In production, share this text view observable, don't create a new one each time | |
.map(CharSequence::toString) | |
.debounce(500, TimeUnit.MILLISECONDS) // Avoid getting spammed with key stroke changes | |
.filter(s -> s.length() > 1) // Only interested in queries of length greater than 1 | |
.observeOn(workerScheduler) // Next set of operations will be network so switch to an IO Scheduler (or worker) | |
.switchMap(query -> searchService.query(query)) // Take the latest observable from upstream and unsubscribe from any previous subscriptions | |
.onErrorResumeNext(Observable.empty()); // <-- This will terminate upstream (ie. we will stop receiving text view changes after an error!) | |
} | |
public Observable<SearchResult> search(@NotNull EditText searchView) { | |
return RxTextView.textChanges(searchView) // In production, share this text view observable, don't create a new one each time | |
.map(CharSequence::toString) | |
.debounce(500, TimeUnit.MILLISECONDS) // Avoid getting spammed with key stroke changes | |
.filter(s -> s.length() > 1) // Only interested in queries of length greater than 1 | |
.observeOn(workerScheduler) // Next set of operations will be network so switch to an IO Scheduler (or worker) | |
.switchMap(query -> searchService.query(query) // Take the latest observable from upstream and unsubscribe from any previous subscriptions | |
.onErrorResumeNext(Observable.empty()); // <-- This fixes the problem since the error is not seen by the upstream observable | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public void touchEventHandler(@NotNull View view) { | |
final ConnectedObservable<MotionEvent> motionEventObservable = RxView.touches(view).publish(); | |
// Capture down events | |
final Observable<MotionEvent> downEventsObservable = motionEventObservable | |
.filter(event -> event.getAction() == MotionEvent.ACTION_DOWN); | |
// Capture up events | |
final Observable<MotionEvent> upEventsObservable = motionEventObservable | |
.filter(event -> event.getAction() == MotionEvent.ACTION_UP); | |
// Show a red circle at the position where the down event ocurred | |
subscriptions.add(downEventsObservable.subscribe(event -> | |
view.showCircle(event.getX(), event.getY(), Color.RED))); | |
// Show a blue circle at the position where the up event ocurred | |
subscriptions.add(upEventsObservable.subscribe(event -> | |
view.showCircle(event.getX(), event.getY(), Color.BLUE))); | |
// Connect the source observable to begin emitting events | |
subscriptions.add(motionEventObservable.connect()); | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//From: | |
apiClient.getMyData() | |
.subscribe(myData -> { | |
// handle data fetched successfully | |
}, throwable -> { | |
// handle error event | |
}, () -> { | |
// handle on complete event | |
}); | |
//To: | |
apiClient.getMyData() | |
.subscribe(myData -> { | |
// handle data fetched successfully and API call completed | |
}, throwable -> { | |
// handle error event | |
}); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public Observable<byte[]> readFile(@NonNull FileInputStream stream) { | |
final SyncOnSubscribe<FileInputStream, byte[]> fileReader = SyncOnSubscribe.createStateful( | |
() -> stream, | |
(stream, output) -> { | |
try { | |
final byte[] buffer = new byte[BUFFER_SIZE]; | |
int count = stream.read(buffer); | |
if (count < 0) { | |
output.onCompleted(); | |
} else { | |
output.onNext(buffer); | |
} | |
} catch (IOException error) { | |
output.onError(error); | |
} | |
return stream; | |
}, | |
s -> IOUtil.closeSilently(s)); | |
return Observable.create(fileReader); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment