Last active
August 16, 2016 12:03
-
-
Save kariyayo/4f9ac960707c862a3f7930cc40760326 to your computer and use it in GitHub Desktop.
Reactive Extension(ReactiveX, Rx)に入門したときのコード http://bati11blog.hatenablog.com/entry/2015/04/30/170343
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import rx.subjects.BehaviorSubject; | |
public class DataBindingSample { | |
public void start() { | |
ViewModel viewModel = new ViewModel("default"); | |
View view1 = new View(); | |
System.out.println("######################"); | |
System.out.println("view1: " + view1.value); | |
viewModel.bind(view1); | |
System.out.println("######################"); | |
System.out.println("view1: " + view1.value); | |
viewModel.set("hoge"); | |
System.out.println("######################"); | |
System.out.println("view1: " + view1.value); | |
View view2 = new View(); | |
viewModel.bind(view2); | |
System.out.println("######################"); | |
System.out.println("view1: " + view1.value); | |
System.out.println("view2: " + view2.value); | |
viewModel.set("fuga"); | |
System.out.println("######################"); | |
System.out.println("view1: " + view1.value); | |
System.out.println("view2: " + view2.value); | |
} | |
private static class View { | |
public String value; | |
} | |
private static class ViewModel { | |
private BehaviorSubject<String> behaviorSubject; | |
private String s; | |
public ViewModel(String s) { | |
this.s = s; | |
this.behaviorSubject = BehaviorSubject.create(s); | |
} | |
public String get() { | |
return s; | |
} | |
public void set(String s) { | |
this.s = s; | |
behaviorSubject.onNext(s); | |
} | |
public void bind(View view) { | |
behaviorSubject.subscribe(s -> view.value = s); | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import rx.Observable; | |
import rx.subjects.PublishSubject; | |
import java.io.BufferedReader; | |
import java.io.IOException; | |
import java.io.InputStreamReader; | |
public class EventBusSample { | |
private int observerCount = 0; | |
public void start() { | |
Observable<String> observable = fromSystemInEvent(); | |
PublishSubject<String> subject = PublishSubject.create(); | |
subject.subscribe(s -> { | |
if (s.equalsIgnoreCase("add")) { | |
observerCount = observerCount + 1; | |
int observerId = observerCount; | |
subject.subscribe(string -> System.out.println("[" + observerId + "] " + string)); | |
} | |
}); | |
observable.subscribe(subject); | |
} | |
private Observable<String> fromSystemInEvent() { | |
return Observable.create(subscriber -> { | |
InputStreamReader inputStreamReader = new InputStreamReader(System.in); | |
try (BufferedReader in = new BufferedReader(inputStreamReader)) { | |
while (true) { | |
if (subscriber.isUnsubscribed() == false) { | |
System.out.print("> "); | |
String s = new String(in.readLine()); | |
subscriber.onNext(s); | |
} else { | |
break; | |
} | |
} | |
} catch (IOException e) { | |
subscriber.onError(e); | |
} | |
}); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.squareup.okhttp.Response; | |
import java.io.IOException; | |
import java.util.List; | |
import java.util.concurrent.CompletableFuture; | |
public class MultiAsyncHttpClient extends HttpClient { | |
@Override | |
public void exec(List<String> urls) throws IOException { | |
for (final String url : urls) { | |
CompletableFuture<Response>future = asyncHttpCall(url); | |
future.thenAccept(response -> { | |
System.out.println("[" + url + "] OK!"); | |
}); | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.IOException; | |
import java.time.Duration; | |
import java.time.Instant; | |
import java.util.List; | |
public class MultiSyncHttpClient extends HttpClient { | |
@Override | |
public void exec(List<String> urls) throws IOException { | |
Instant start = Instant.now(); | |
for (String url : urls) { | |
syncHttpCall(url); | |
System.out.println("[" + url + "] OK!"); | |
} | |
System.out.println("TIME: " + Duration.between(start, Instant.now()).toMillis()); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<!DOCTYPE html> | |
<html lang="en"> | |
<head> | |
<meta charset="UTF-8"> | |
<title>RxJs</title> | |
<script src="http://code.jquery.com/jquery-1.11.2.js"></script> | |
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/2.5.2/rx.lite.js"></script> | |
<script> | |
$(function(){ | |
var inputObservable = Rx.Observable.fromEvent($('#user_name'), 'input'); | |
inputObservable.subscribe(function(e) { | |
var count = $('#user_name').val().length; | |
$('#count_text').html(count); | |
}); | |
inputObservable.subscribe(function(e) { | |
console.log("hoge"); | |
}); | |
Rx.Observable.fromEvent($('#input_button'), 'click') | |
.map(function(e) { | |
var userName = $('#user_name').val(); | |
if (userName) return userName | |
else return null | |
}) | |
.filter(function(userName) { | |
return userName != null; | |
}) | |
.flatMap(function(userName) { | |
return Rx.Observable.fromPromise($.getJSON("https://api.github.com/users/" + userName)); | |
}) | |
.subscribe(function(response) { | |
console.log(response); | |
$('#result_text').html(response.login); | |
$('#result_img').attr('src', response.avatar_url); | |
}); | |
}); | |
</script> | |
</head> | |
<body> | |
<p id="count_text"></p> | |
<input type="text" id="user_name" /> | |
<input type="button" id="input_button" value="click!" /> | |
<p id="result_text"></p> | |
<img id="result_img" /> | |
</body> | |
</html> |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import rx.Observable; | |
import rx.Observer; | |
import rx.schedulers.Schedulers; | |
import java.util.stream.IntStream; | |
public class SimpleSample { | |
public static void start() { | |
rangeObservable(); | |
System.out.println("\n############"); | |
myObservable(); | |
System.out.println("\n############"); | |
myObserver(); | |
System.out.println("\n############"); | |
threadCheck(); | |
} | |
private static void rangeObservable() { | |
Observable.range(1, 20) | |
.filter(n -> n % 3 == 0) | |
.map(n -> String.format("[%02d] ", n)) | |
.subscribe(System.out::print); | |
} | |
private static void myObservable() { | |
Observable<Integer> observable = Observable.create(subscriber -> { | |
IntStream.range(1, 20).forEach(x -> subscriber.onNext(x)); | |
subscriber.onCompleted(); | |
}); | |
observable.filter(n -> n % 3 == 0) | |
.map(n -> String.format("[%02d] ", n)) | |
.subscribe(System.out::print); | |
} | |
private static void myObserver() { | |
Observable<Integer> observable = Observable.create(subscriber -> { | |
IntStream.range(1, 20).forEach(x -> subscriber.onNext(x)); | |
subscriber.onCompleted(); | |
}); | |
Observer<Integer> observer = new Observer<Integer>() { | |
@Override | |
public void onCompleted() { | |
System.out.print("COMPLETE!"); | |
} | |
@Override | |
public void onError(Throwable e) { | |
System.out.print("ERROR!"); | |
} | |
@Override | |
public void onNext(Integer integer) { | |
if (integer % 3 == 0) System.out.print(String.format("[%02d] ", integer)); | |
} | |
}; | |
observable.subscribe(observer); | |
} | |
private static void threadCheck() { | |
Observable<Integer> observable = Observable.create(subscriber -> { | |
System.out.println(Thread.currentThread().getName() + ": observable.call"); | |
IntStream.range(1, 20).forEach(x -> subscriber.onNext(x)); | |
subscriber.onCompleted(); | |
}); | |
Observer<String> observer = new Observer<String>() { | |
@Override | |
public void onCompleted() { | |
System.out.print("[" + Thread.currentThread().getName() + ": COMPLETE!]"); | |
} | |
@Override | |
public void onError(Throwable e) { | |
System.out.print("ERROR!"); | |
} | |
@Override | |
public void onNext(String s) { | |
System.out.print("[" + Thread.currentThread().getName() + ": " + s + "] "); | |
} | |
}; | |
observable | |
.subscribeOn(Schedulers.newThread()) | |
.map(x -> Thread.currentThread().getName() + ": " + x.toString()) | |
.observeOn(Schedulers.computation()) | |
.subscribe(observer); | |
try { | |
Thread.sleep(3000); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.squareup.okhttp.OkHttpClient; | |
import com.squareup.okhttp.Request; | |
import com.squareup.okhttp.Response; | |
import java.io.IOException; | |
import java.util.List; | |
import java.util.concurrent.CompletableFuture; | |
public class SingleAsyncHttpClient extends HttpClient { | |
@Override | |
public void exec(List<String> urls) throws IOException { | |
final String url = urls.get(0); | |
CompletableFuture<Response> future = asyncHttpCall(url); | |
future.thenAccept(response -> { | |
System.out.println("[" + url + "] OK!"); | |
}); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.IOException; | |
import java.util.List; | |
public class SingleSyncHttpClient extends HttpClient { | |
@Override | |
public void exec(List<String> urls) throws IOException { | |
final String url = urls.get(0); | |
syncHttpCall(url); | |
System.out.println("[" + url + "] OK!"); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.squareup.okhttp.OkHttpClient; | |
import com.squareup.okhttp.Request; | |
import com.squareup.okhttp.Response; | |
import rx.Observable; | |
import rx.Subscriber; | |
import rx.schedulers.Schedulers; | |
import java.io.IOException; | |
import java.time.Duration; | |
import java.time.Instant; | |
import java.util.List; | |
import java.util.concurrent.CompletableFuture; | |
public class StreamHttpClient extends HttpClient { | |
@Override | |
public void exec(List<String> urls) throws IOException { | |
final Instant start = Instant.now(); | |
Observable<Response> observable = Observable.empty(); | |
for (String url : urls) { | |
observable = observable.concatWith(Observable.from(asyncHttpCall(url))); | |
} | |
observable | |
.subscribeOn(Schedulers.io()) | |
.observeOn(Schedulers.computation()) | |
.zipWith(Observable.from(urls), (r, url) -> new Pair<>(r, url)) | |
.map(pair -> "[" + pair._2 + "] OK! ") | |
.subscribe(new Subscriber<String>() { | |
@Override | |
public void onCompleted() { | |
System.out.println("TIME: " + Duration.between(start, Instant.now()).toMillis()); | |
} | |
@Override | |
public void onError(Throwable e) { | |
} | |
@Override | |
public void onNext(String s) { | |
System.out.println(s); | |
} | |
}); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment