Skip to content

Instantly share code, notes, and snippets.

@kariyayo
Last active August 16, 2016 12:03
Show Gist options
  • Save kariyayo/4f9ac960707c862a3f7930cc40760326 to your computer and use it in GitHub Desktop.
Save kariyayo/4f9ac960707c862a3f7930cc40760326 to your computer and use it in GitHub Desktop.
Reactive Extension(ReactiveX, Rx)に入門したときのコード http://bati11blog.hatenablog.com/entry/2015/04/30/170343
import rx.subjects.BehaviorSubject;
public class DataBindingSample {
public void start() {
ViewModel viewModel = new ViewModel("default");
View view1 = new View();
System.out.println("######################");
System.out.println("view1: " + view1.value);
viewModel.bind(view1);
System.out.println("######################");
System.out.println("view1: " + view1.value);
viewModel.set("hoge");
System.out.println("######################");
System.out.println("view1: " + view1.value);
View view2 = new View();
viewModel.bind(view2);
System.out.println("######################");
System.out.println("view1: " + view1.value);
System.out.println("view2: " + view2.value);
viewModel.set("fuga");
System.out.println("######################");
System.out.println("view1: " + view1.value);
System.out.println("view2: " + view2.value);
}
private static class View {
public String value;
}
private static class ViewModel {
private BehaviorSubject<String> behaviorSubject;
private String s;
public ViewModel(String s) {
this.s = s;
this.behaviorSubject = BehaviorSubject.create(s);
}
public String get() {
return s;
}
public void set(String s) {
this.s = s;
behaviorSubject.onNext(s);
}
public void bind(View view) {
behaviorSubject.subscribe(s -> view.value = s);
}
}
}
import rx.Observable;
import rx.subjects.PublishSubject;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
public class EventBusSample {
private int observerCount = 0;
public void start() {
Observable<String> observable = fromSystemInEvent();
PublishSubject<String> subject = PublishSubject.create();
subject.subscribe(s -> {
if (s.equalsIgnoreCase("add")) {
observerCount = observerCount + 1;
int observerId = observerCount;
subject.subscribe(string -> System.out.println("[" + observerId + "] " + string));
}
});
observable.subscribe(subject);
}
private Observable<String> fromSystemInEvent() {
return Observable.create(subscriber -> {
InputStreamReader inputStreamReader = new InputStreamReader(System.in);
try (BufferedReader in = new BufferedReader(inputStreamReader)) {
while (true) {
if (subscriber.isUnsubscribed() == false) {
System.out.print("> ");
String s = new String(in.readLine());
subscriber.onNext(s);
} else {
break;
}
}
} catch (IOException e) {
subscriber.onError(e);
}
});
}
}
import com.squareup.okhttp.Response;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
public class MultiAsyncHttpClient extends HttpClient {
@Override
public void exec(List<String> urls) throws IOException {
for (final String url : urls) {
CompletableFuture<Response>future = asyncHttpCall(url);
future.thenAccept(response -> {
System.out.println("[" + url + "] OK!");
});
}
}
}
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
public class MultiSyncHttpClient extends HttpClient {
@Override
public void exec(List<String> urls) throws IOException {
Instant start = Instant.now();
for (String url : urls) {
syncHttpCall(url);
System.out.println("[" + url + "] OK!");
}
System.out.println("TIME: " + Duration.between(start, Instant.now()).toMillis());
}
}
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>RxJs</title>
<script src="http://code.jquery.com/jquery-1.11.2.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/2.5.2/rx.lite.js"></script>
<script>
$(function(){
var inputObservable = Rx.Observable.fromEvent($('#user_name'), 'input');
inputObservable.subscribe(function(e) {
var count = $('#user_name').val().length;
$('#count_text').html(count);
});
inputObservable.subscribe(function(e) {
console.log("hoge");
});
Rx.Observable.fromEvent($('#input_button'), 'click')
.map(function(e) {
var userName = $('#user_name').val();
if (userName) return userName
else return null
})
.filter(function(userName) {
return userName != null;
})
.flatMap(function(userName) {
return Rx.Observable.fromPromise($.getJSON("https://api.github.com/users/" + userName));
})
.subscribe(function(response) {
console.log(response);
$('#result_text').html(response.login);
$('#result_img').attr('src', response.avatar_url);
});
});
</script>
</head>
<body>
<p id="count_text"></p>
<input type="text" id="user_name" />
<input type="button" id="input_button" value="click!" />
<p id="result_text"></p>
<img id="result_img" />
</body>
</html>
import rx.Observable;
import rx.Observer;
import rx.schedulers.Schedulers;
import java.util.stream.IntStream;
public class SimpleSample {
public static void start() {
rangeObservable();
System.out.println("\n############");
myObservable();
System.out.println("\n############");
myObserver();
System.out.println("\n############");
threadCheck();
}
private static void rangeObservable() {
Observable.range(1, 20)
.filter(n -> n % 3 == 0)
.map(n -> String.format("[%02d] ", n))
.subscribe(System.out::print);
}
private static void myObservable() {
Observable<Integer> observable = Observable.create(subscriber -> {
IntStream.range(1, 20).forEach(x -> subscriber.onNext(x));
subscriber.onCompleted();
});
observable.filter(n -> n % 3 == 0)
.map(n -> String.format("[%02d] ", n))
.subscribe(System.out::print);
}
private static void myObserver() {
Observable<Integer> observable = Observable.create(subscriber -> {
IntStream.range(1, 20).forEach(x -> subscriber.onNext(x));
subscriber.onCompleted();
});
Observer<Integer> observer = new Observer<Integer>() {
@Override
public void onCompleted() {
System.out.print("COMPLETE!");
}
@Override
public void onError(Throwable e) {
System.out.print("ERROR!");
}
@Override
public void onNext(Integer integer) {
if (integer % 3 == 0) System.out.print(String.format("[%02d] ", integer));
}
};
observable.subscribe(observer);
}
private static void threadCheck() {
Observable<Integer> observable = Observable.create(subscriber -> {
System.out.println(Thread.currentThread().getName() + ": observable.call");
IntStream.range(1, 20).forEach(x -> subscriber.onNext(x));
subscriber.onCompleted();
});
Observer<String> observer = new Observer<String>() {
@Override
public void onCompleted() {
System.out.print("[" + Thread.currentThread().getName() + ": COMPLETE!]");
}
@Override
public void onError(Throwable e) {
System.out.print("ERROR!");
}
@Override
public void onNext(String s) {
System.out.print("[" + Thread.currentThread().getName() + ": " + s + "] ");
}
};
observable
.subscribeOn(Schedulers.newThread())
.map(x -> Thread.currentThread().getName() + ": " + x.toString())
.observeOn(Schedulers.computation())
.subscribe(observer);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
import com.squareup.okhttp.OkHttpClient;
import com.squareup.okhttp.Request;
import com.squareup.okhttp.Response;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
public class SingleAsyncHttpClient extends HttpClient {
@Override
public void exec(List<String> urls) throws IOException {
final String url = urls.get(0);
CompletableFuture<Response> future = asyncHttpCall(url);
future.thenAccept(response -> {
System.out.println("[" + url + "] OK!");
});
}
}
import java.io.IOException;
import java.util.List;
public class SingleSyncHttpClient extends HttpClient {
@Override
public void exec(List<String> urls) throws IOException {
final String url = urls.get(0);
syncHttpCall(url);
System.out.println("[" + url + "] OK!");
}
}
import com.squareup.okhttp.OkHttpClient;
import com.squareup.okhttp.Request;
import com.squareup.okhttp.Response;
import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.CompletableFuture;
public class StreamHttpClient extends HttpClient {
@Override
public void exec(List<String> urls) throws IOException {
final Instant start = Instant.now();
Observable<Response> observable = Observable.empty();
for (String url : urls) {
observable = observable.concatWith(Observable.from(asyncHttpCall(url)));
}
observable
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.zipWith(Observable.from(urls), (r, url) -> new Pair<>(r, url))
.map(pair -> "[" + pair._2 + "] OK! ")
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("TIME: " + Duration.between(start, Instant.now()).toMillis());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
System.out.println(s);
}
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment