Created
August 4, 2014 15:27
-
-
Save benjchristensen/f310cd4329b9e1977714 to your computer and use it in GitHub Desktop.
BufferExample
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.concurrent.TimeUnit; | |
import rx.Observable; | |
import rx.Subscriber; | |
import rx.schedulers.Schedulers; | |
public class BufferExample { | |
public static void main(String args[]) { | |
// buffer every 500ms | |
hotStream().buffer(500, TimeUnit.MILLISECONDS).take(10).toBlocking().forEach(System.out::println); | |
// buffer 10 items at a time | |
hotStream().buffer(10).take(10).toBlocking().forEach(System.out::println); | |
} | |
/** | |
* This is an artificial source to demonstrate an infinite stream that bursts intermittently | |
*/ | |
public static Observable<Integer> hotStream() { | |
return Observable.create((Subscriber<? super Integer> s) -> { | |
while (!s.isUnsubscribed()) { | |
// burst some number of items | |
for (int i = 0; i < Math.random() * 20; i++) { | |
s.onNext(i); | |
} | |
try { | |
// sleep for a random amount of time | |
// NOTE: Only using Thread.sleep here as an artificial demo. | |
Thread.sleep((long) (Math.random() * 1000)); | |
} catch (Exception e) { | |
// do nothing | |
} | |
} | |
}).subscribeOn(Schedulers.newThread()); // use newThread since we are using sleep to block | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// sample output => buffer(500, TimeUnit.MILLISECONDS) | |
[0, 1, 2, 3, 4] | |
[0, 1, 2, 3, 4, 5, 6, 7, 0, 1, 2] | |
[] | |
[0, 1] | |
[0, 1, 2, 3] | |
[0, 1, 2, 3, 4, 5] | |
[0, 1, 2, 3, 4, 5, 0, 1, 2] | |
[] | |
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10] | |
[0, 1, 2, 3, 4, 5, 6, 7, 8] | |
// sample output => buffer(10) | |
[0, 1, 2, 3, 4, 5, 6, 7, 8, 0] | |
[1, 2, 3, 4, 5, 0, 1, 2, 3, 4] | |
[0, 1, 2, 3, 4, 0, 1, 2, 3, 4] | |
[5, 0, 1, 2, 3, 4, 5, 0, 1, 2] | |
[3, 4, 0, 1, 2, 3, 0, 1, 2, 3] | |
[0, 0, 1, 2, 3, 4, 5, 6, 0, 1] | |
[2, 3, 0, 1, 2, 3, 4, 5, 6, 7] | |
[8, 0, 1, 2, 3, 0, 1, 2, 3, 4] | |
[5, 0, 0, 1, 2, 3, 4, 5, 6, 7] | |
[8, 9, 10, 11, 0, 1, 2, 3, 4, 5] | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment