Skip to content

Instantly share code, notes, and snippets.

@wreulicke
Last active January 12, 2018 14:36
Show Gist options
  • Save wreulicke/2182ddfb13d86076a4ba45a4bde2d8a4 to your computer and use it in GitHub Desktop.
Save wreulicke/2182ddfb13d86076a4ba45a4bde2d8a4 to your computer and use it in GitHub Desktop.
RxJava入門した
  • resilient-examples   failsafe, vavr, hystrix, vert.x の比較のコードが載っている
package com.github.wreulicke.spring;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.web.ServerProperties;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController;
import io.reactivex.Flowable;
import io.reactivex.Single;
import io.reactivex.SingleOnSubscribe;
import io.reactivex.schedulers.Schedulers;
import net.jodah.failsafe.CircuitBreaker;
import net.jodah.failsafe.Execution;
import net.jodah.failsafe.RetryPolicy;
import retrofit2.Retrofit;
import retrofit2.adapter.rxjava2.RxJava2CallAdapterFactory;
import retrofit2.converter.jackson.JacksonConverterFactory;
@RestController
@Slf4j
public class ComplexController {
private MyService myService;
ComplexController(ServerProperties serverProperties) {
Integer port = serverProperties.getPort();
log.info("port is {}", port);
Retrofit retrofit = new Retrofit.Builder()
.baseUrl("http://localhost:" + port + "/")
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.addConverterFactory(JacksonConverterFactory.create())
.build();
myService = retrofit.create(MyService.class);
}
private final CircuitBreaker circuitBreaker = new CircuitBreaker()
.withFailureThreshold(2)
.withSuccessThreshold(5)
.withDelay(1, TimeUnit.SECONDS);
@GetMapping("/backend/{number}")
public Map<String, String> backend(@PathVariable("number") int number) {
if (number < 5) {
throw new RuntimeException(number + " is received");
}
if (number < 11) {
return Collections.singletonMap("message", "success:" + number);
}
throw new RuntimeException(number + " is received");
}
@GetMapping("/frontend/{number}")
public Single<Map<String, String>> frontend(@PathVariable("number") int number) {
RetryPolicy retryPolicy = new RetryPolicy()
.withMaxRetries(2)
.withDelay(100, TimeUnit.MILLISECONDS);
Execution execution = new Execution(retryPolicy);
if (circuitBreaker.allowsExecution() == false) {
log.info("circuit is open. {}", number);
return Single.error(new RuntimeException("circuit is open."));
}
log.info("starting... {}", number);
return Single.create((SingleOnSubscribe<Map<String, String>>) ob -> {
myService.call(number)
.subscribeOn(Schedulers.computation())
.subscribe((map, e) -> {
if (e != null) {
log.error("io failed. {}", number);
ob.onError(new RuntimeException("io failed."));
} else {
log.error("io scceeded. {}", number);
circuitBreaker.recordSuccess();
ob.onSuccess(map);
}
});
}).retryWhen(attempts -> {
return attempts.flatMap(failure -> {
if (execution.canRetryOn(failure)) {
// io スレッドで 100ミリ秒後に実行
return Single.timer(
execution.getWaitTime().toMillis(),
TimeUnit.MILLISECONDS, Schedulers.io())
.toFlowable();
} else {
// リトライ出来なかったので、サーキットブレイカーに失敗を蓄積
circuitBreaker.recordFailure(failure);
return Flowable.error(failure);
}
});
})
// 最初の処理は io スレッドで
.subscribeOn(Schedulers.io())
// レスポンスを返すのは comutationのスレッドで
.observeOn(Schedulers.computation());
}
@ResponseStatus(HttpStatus.INTERNAL_SERVER_ERROR)
@ExceptionHandler
public Map<String, String> handler(Exception e) {
return Collections.singletonMap("message", e.getMessage());
}
}
package com.github.wreulicke.spring;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.entry;
import java.util.Map;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
import org.springframework.boot.test.web.client.TestRestTemplate;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit4.SpringRunner;
import org.junit.Test;
import org.junit.runner.RunWith;
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = WebEnvironment.DEFINED_PORT)
@TestPropertySource(properties = "server.port=8080")
public class ComplexControllerTest {
@Autowired
TestRestTemplate testRestTemplate;
@SuppressWarnings("unchecked")
@Test
public void rxjava() throws InterruptedException {
// サーキットブレイカーはこのタイミングだと閉じている
assertThat(testRestTemplate.getForObject("/frontend/{number}", Map.class, 1))
.contains(entry("message", "io failed."));
assertThat(testRestTemplate.getForObject("/frontend/{number}", Map.class, 2))
.contains(entry("message", "io failed."));
// 2回失敗したので開いている
assertThat(testRestTemplate.getForObject("/frontend/{number}", Map.class, 3))
.contains(entry("message", "circuit is open."));
assertThat(testRestTemplate.getForObject("/frontend/{number}", Map.class, 4))
.contains(entry("message", "circuit is open."));
assertThat(testRestTemplate.getForObject("/frontend/{number}", Map.class, 5))
.contains(entry("message", "circuit is open."));
// 2秒待つと半開に移る
Thread.sleep(2000);
// 次は5回成功する
assertThat(testRestTemplate.getForObject("/frontend/{number}", Map.class, 6))
.contains(entry("message", "success:6"));
assertThat(testRestTemplate.getForObject("/frontend/{number}", Map.class, 7))
.contains(entry("message", "success:7"));
assertThat(testRestTemplate.getForObject("/frontend/{number}", Map.class, 8))
.contains(entry("message", "success:8"));
assertThat(testRestTemplate.getForObject("/frontend/{number}", Map.class, 9))
.contains(entry("message", "success:9"));
assertThat(testRestTemplate.getForObject("/frontend/{number}", Map.class, 10))
.contains(entry("message", "success:10"));
// 次は失敗するが、上記5回の成功によってサーキットブレイカーが復帰し(閉じ)ている
assertThat(testRestTemplate.getForObject("/frontend/{number}", Map.class, 11))
.contains(entry("message", "io failed."));
// 上のタイミングで、サーキットブレイカーが半開であったなら、上記呼び出しによって
// サーキットブレイカーが閉じており、次は "circuit is open."を記録するはずであるが
// この状態ではまだ閉じているので、その結果にはならない。
assertThat(testRestTemplate.getForObject("/frontend/{number}", Map.class, 12))
.contains(entry("message", "io failed."));
// 2回失敗したのでまたサーキットブレイカーが閉じる
assertThat(testRestTemplate.getForObject("/frontend/{number}", Map.class, 13))
.contains(entry("message", "circuit is open."));
}
}
package com.github.wreulicke.spring;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController;
import io.reactivex.Flowable;
import io.reactivex.Single;
import io.reactivex.SingleOnSubscribe;
import io.reactivex.schedulers.Schedulers;
import net.jodah.failsafe.CircuitBreaker;
import net.jodah.failsafe.Execution;
import net.jodah.failsafe.RetryPolicy;
@RestController
@Slf4j
public class MyController {
// 2回失敗で開く
// 半開時に5回成功するとサーキットブレイカーが閉じる
// 開いたあとに半開に移るのが 1秒後
private final CircuitBreaker circuitBreaker = new CircuitBreaker()
.withFailureThreshold(2)
.withSuccessThreshold(5)
.withDelay(1, TimeUnit.SECONDS);
@GetMapping("/test")
public Observable<String> test() {
return Observable.just("test");
}
@GetMapping("/rxjava")
public Single<String> rxjava() {
RetryPolicy retryPolicy = new RetryPolicy()
.withMaxRetries(2)
.withDelay(100, TimeUnit.MILLISECONDS);
Execution execution = new Execution(retryPolicy);
if (circuitBreaker.allowsExecution() == false) {
log.info("circuit is open.");
return Single.error(new RuntimeException("circuit is open."));
}
log.info("starting...");
return Single.create((SingleOnSubscribe<String>) ob -> {
log.error("error occured.");
ob.onError(new RuntimeException("io failed."));
// 本来は成功時にサーキットブレイカーに成功を記録させる。
// ob.onSuccess("test");
// circuitBreaker.recordSuccess();
}).retryWhen(attempts -> {
return attempts.flatMap(failure -> {
if (execution.canRetryOn(failure)) {
// io スレッドで 100ミリ秒後に実行
return Single.timer(
execution.getWaitTime().toMillis(),
TimeUnit.MILLISECONDS, Schedulers.io())
.toFlowable();
} else {
// リトライ出来なかったので、サーキットブレイカーに失敗を蓄積
circuitBreaker.recordFailure(failure);
return Flowable.error(failure);
}
});
})
// 最初の処理は io スレッドで
.subscribeOn(Schedulers.io())
// レスポンスを返すのは comutationのスレッドで
.observeOn(Schedulers.computation());
}
@ResponseStatus(HttpStatus.INTERNAL_SERVER_ERROR)
@ExceptionHandler
public Map<String, String> handler(Exception e) {
return Collections.singletonMap("message", e.getMessage());
}
}
package com.github.wreulicke.spring;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.entry;
import java.util.Map;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
import org.springframework.boot.test.web.client.TestRestTemplate;
import org.springframework.test.context.junit4.SpringRunner;
import org.junit.Test;
import org.junit.runner.RunWith;
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT)
public class MyControllerTest {
@Autowired
TestRestTemplate testRestTemplate;
@Test
public void test() {
String actual = testRestTemplate.getForObject("/test", String.class);
assertThat(actual)
.isEqualTo("test");
}
@SuppressWarnings("unchecked")
@Test
public void rxjava() throws InterruptedException {
// サーキットブレイカーはこのタイミングだと閉じている
assertThat(testRestTemplate.getForObject("/rxjava", Map.class))
.contains(entry("message", "io failed."));
assertThat(testRestTemplate.getForObject("/rxjava", Map.class))
.contains(entry("message", "io failed."));
// 2回失敗したので開いている
assertThat(testRestTemplate.getForObject("/rxjava", Map.class))
.contains(entry("message", "circuit is open."));
assertThat(testRestTemplate.getForObject("/rxjava", Map.class))
.contains(entry("message", "circuit is open."));
assertThat(testRestTemplate.getForObject("/rxjava", Map.class))
.contains(entry("message", "circuit is open."));
assertThat(testRestTemplate.getForObject("/rxjava", Map.class))
.contains(entry("message", "circuit is open."));
// 2秒待つと半開に移る
Thread.sleep(2000);
// 半開になったので処理が実行されるがまたもや失敗。
assertThat(testRestTemplate.getForObject("/rxjava", Map.class))
.contains(entry("message", "io failed."));
// 半開時に失敗したので サーキットブレイカーは開く
assertThat(testRestTemplate.getForObject("/rxjava", Map.class))
.contains(entry("message", "circuit is open."));
}
}
package com.github.wreulicke.spring;
import java.util.Map;
import io.reactivex.Single;
import retrofit2.http.GET;
import retrofit2.http.Path;
public interface MyService {
@GET("/backend/{number}")
Single<Map<String, String>> call(@Path("number") int number);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment