Skip to content

Instantly share code, notes, and snippets.

@benjchristensen
Last active February 14, 2016 11:20
Show Gist options
  • Save benjchristensen/4679246 to your computer and use it in GitHub Desktop.
Save benjchristensen/4679246 to your computer and use it in GitHub Desktop.
Example of using RxJava in Groovy to compose nested asynchronous calls.
package rx.examples.groovy;
import rx.observables.Observable
import rx.observables.Observer;
import rx.observables.Subscription;
import rx.util.functions.Func1;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
class VideoExample {
static void main(String[] args) {
VideoExample v = new VideoExample();
println("---- sequence of video dictionaries ----")
v.getVideoGridForDisplay(1).subscribe(
{ videoDictionary -> // onNext
// this will print the dictionary for each video
// and is a good representation of how progressive rendering could work
println(videoDictionary) },
{ exception -> // onError
println("Error: " + exception) },
{ // onCompleted
v.executor.shutdownNow();
});
v = new VideoExample();
v.getVideoGridForDisplay(1).toList().subscribe(
{ videoDictionaryList -> // onNext
// this will be called once with a list
// and demonstrates how a sequence can be combined
// for document style responses (most webservices)
println("\n ---- single list of video dictionaries ----\n" + videoDictionaryList) },
{ exception -> // onError
println("Error: " + exception) },
{ // onCompleted
v.executor.shutdownNow();
});
}
/**
* Demonstrate how Rx is used to compose Observables together such as
* how a web service would to generate a JSON response.
*
* The simulated methods for the metadata represent different services
* that are often backed by network calls.
*
* This will return a sequence of dictionaries such as this:
*
* [id:1000, title:video-1000-title, length:5428, bookmark:0,
* rating:[actual:4, average:3, predicted:0]]
*/
def Observable getVideoGridForDisplay(userId) {
getListOfLists(userId).mapMany({ VideoList list ->
// for each VideoList we want to fetch the videos
list.getVideos()
.take(10) // we only want the first 10 of each list
.mapMany({ Video video ->
// for each video we want to fetch metadata
def m = video.getMetadata().map({ Map<String, String> md ->
// transform to the data and format we want
return [title: md.get("title"),
length: md.get("duration")]
})
def b = video.getBookmark(userId).map({ position ->
return [bookmark: position]
})
def r = video.getRating(userId).map({ VideoRating rating ->
return [rating:
[actual: rating.getActualStarRating(),
average: rating.getAverageStarRating(),
predicted: rating.getPredictedStarRating()]]
})
// compose these together
return Observable.zip(m, b, r, {
metadata, bookmark, rating ->
// now transform to complete dictionary of data
// we want for each Video
return [id: video.videoId] << metadata << bookmark << rating
})
})
})
}
/**
* Retrieve a list of lists of videos (grid).
*
* Observable<VideoList> is the "push" equivalent to List<VideoList>
*/
def Observable<VideoList> getListOfLists(userId) {
return Observable.create({ observer ->
// this will happen on a separate thread as it requires a network call
executor.execute(new Runnable() {
def void run() {
// simulate network latency
Thread.sleep(180);
for(i in 0..15) {
observer.onNext(new VideoList(i))
}
observer.onCompleted();
}
})
})
}
/**
* Represents a list of videos as part of a grid (list of lists).
*/
class VideoList {
int listPosition;
VideoList(int position) {
this.listPosition = position
}
def String getListName() {
return "ListName-" + listPosition
}
def Integer getListPosition() {
return listPosition
}
def Observable<Video> getVideos() {
return Observable.create({ observer ->
// we already have the videos once a list is loaded
// so we won't launch another thread but return
// the sequence of videos via push
for(i in 0..50) {
observer.onNext(new Video((listPosition*1000)+i))
}
observer.onCompleted();
})
}
}
class Video {
int videoId;
Video(int videoId) {
this.videoId = videoId;
}
// synchronous
def Observable<Map<String, String>> getMetadata() {
// simulate fetching metadata from an in-memory cache
// so it will not asynchronously execute on a thread but
// immediately return an Observable with the data
return Observable.create({ observer ->
observer.onNext([
title: "video-" + videoId + "-title",
actors: ["actor1", "actor2"],
duration: 5428])
observer.onCompleted();
});
}
// asynchronous
def Observable<Integer> getBookmark(userId) {
// simulate fetching the bookmark for this user
// that specifies the last played position if
// this video has been played before
return Observable.create({ observer ->
// this will happen on a separate thread as it requires a network call
executor.execute(new Runnable() {
def void run() {
// simulate network latency
Thread.sleep(4);
if(randint(6) > 1) {
// most of the time they haven't watched a movie
// so the position is 0
observer.onNext(randint(0));
} else {
observer.onNext(randint(4000));
}
observer.onCompleted();
}
})
})
}
// asynchronous
def Observable<VideoRating> getRating(userId) {
// simulate fetching the VideoRating for this user
return Observable.create({ observer ->
// this will happen on a separate thread as it requires a network call
executor.execute(new Runnable() {
def void run() {
// simulate network latency
Thread.sleep(10);
observer.onNext(new VideoRating(videoId, userId))
observer.onCompleted();
}
})
})
}
}
class VideoRating {
int videoId, userId
VideoRating(videoId, userId) {
this.videoId = videoId;
this.userId = userId;
}
def Integer getPredictedStarRating() {
return randint(5)
}
def Integer getAverageStarRating() {
return randint(4)
}
def Integer getActualStarRating() {
return randint(5)
}
}
ExecutorService executor = new ThreadPoolExecutor(4, 4, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>());
def randint(int max) {
return Math.round(Math.random() * max)
}
def combine( Map... m ) {
m.collectMany { it.entrySet() }.inject( [:] ) { result, e ->
result << [ (e.key):e.value + ( result[ e.key ] ?: 0 ) ]
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment