Last active
August 29, 2015 14:05
-
-
Save jcfandino/fd47277ada821f51a9d4 to your computer and use it in GitHub Desktop.
RxJava batches, window vs. buffer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import static org.junit.Assert.assertTrue; | |
import java.util.List; | |
import java.util.concurrent.CountDownLatch; | |
import java.util.concurrent.TimeUnit; | |
import org.junit.Before; | |
import org.junit.Test; | |
import rx.Observable; | |
import rx.functions.Action1; | |
import rx.functions.Func1; | |
public class RxJavaTest { | |
private final class UpdateCountdowns implements Action1<Observable<String>> { | |
String name; | |
protected UpdateCountdowns(String aName) { | |
name = aName; | |
} | |
@Override | |
public void call(Observable<String> w) { | |
batchesCount.countDown(); | |
w.subscribe(new Action1<String>() { | |
public void call(String s) { | |
System.out.println(name + " - " + hashCode() + ": " + s); | |
itemsCount.countDown(); | |
} | |
}); | |
} | |
} | |
@Before | |
public void setUp() throws Exception { | |
} | |
Observable<String> observable = Observable.from("1", "2", "3", "4", "5"); | |
final CountDownLatch itemsCount = new CountDownLatch(5); | |
final CountDownLatch batchesCount = new CountDownLatch(3); | |
Func1<String, String> delayOn4 = new Func1<String, String>() { | |
public String call(String s) { | |
if ("4".equals(s)) { | |
try { | |
Thread.sleep(10); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
} | |
return s; | |
} | |
}; | |
@Test | |
public void testBatchingUsingWindow() throws InterruptedException { | |
observable.map(delayOn4).window(100, TimeUnit.MILLISECONDS, 5) | |
.subscribe(new UpdateCountdowns("window")); | |
assertTrue(itemsCount.await(100, TimeUnit.MILLISECONDS)); | |
assertTrue(batchesCount.await(100, TimeUnit.MILLISECONDS)); | |
} | |
@Test | |
public void testBatchingUsingBuffer() throws InterruptedException { | |
observable.map(delayOn4).buffer(10, TimeUnit.MILLISECONDS, 2) | |
.map(new Func1<List<String>, Observable<String>>() { | |
public Observable<String> call(List<String> l) { | |
return Observable.from(l); | |
} | |
}).subscribe(new UpdateCountdowns("buffer")); | |
assertTrue(itemsCount.await(100, TimeUnit.MILLISECONDS)); | |
assertTrue(batchesCount.await(100, TimeUnit.MILLISECONDS)); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment