- resilient-examples failsafe, vavr, hystrix, vert.x の比較のコードが載っている
Last active
January 12, 2018 14:36
-
-
Save wreulicke/2182ddfb13d86076a4ba45a4bde2d8a4 to your computer and use it in GitHub Desktop.
RxJava入門した
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.github.wreulicke.spring; | |
import java.util.Collections; | |
import java.util.Map; | |
import java.util.concurrent.TimeUnit; | |
import lombok.extern.slf4j.Slf4j; | |
import org.springframework.boot.autoconfigure.web.ServerProperties; | |
import org.springframework.http.HttpStatus; | |
import org.springframework.web.bind.annotation.ExceptionHandler; | |
import org.springframework.web.bind.annotation.GetMapping; | |
import org.springframework.web.bind.annotation.PathVariable; | |
import org.springframework.web.bind.annotation.ResponseStatus; | |
import org.springframework.web.bind.annotation.RestController; | |
import io.reactivex.Flowable; | |
import io.reactivex.Single; | |
import io.reactivex.SingleOnSubscribe; | |
import io.reactivex.schedulers.Schedulers; | |
import net.jodah.failsafe.CircuitBreaker; | |
import net.jodah.failsafe.Execution; | |
import net.jodah.failsafe.RetryPolicy; | |
import retrofit2.Retrofit; | |
import retrofit2.adapter.rxjava2.RxJava2CallAdapterFactory; | |
import retrofit2.converter.jackson.JacksonConverterFactory; | |
@RestController | |
@Slf4j | |
public class ComplexController { | |
private MyService myService; | |
ComplexController(ServerProperties serverProperties) { | |
Integer port = serverProperties.getPort(); | |
log.info("port is {}", port); | |
Retrofit retrofit = new Retrofit.Builder() | |
.baseUrl("http://localhost:" + port + "/") | |
.addCallAdapterFactory(RxJava2CallAdapterFactory.create()) | |
.addConverterFactory(JacksonConverterFactory.create()) | |
.build(); | |
myService = retrofit.create(MyService.class); | |
} | |
private final CircuitBreaker circuitBreaker = new CircuitBreaker() | |
.withFailureThreshold(2) | |
.withSuccessThreshold(5) | |
.withDelay(1, TimeUnit.SECONDS); | |
@GetMapping("/backend/{number}") | |
public Map<String, String> backend(@PathVariable("number") int number) { | |
if (number < 5) { | |
throw new RuntimeException(number + " is received"); | |
} | |
if (number < 11) { | |
return Collections.singletonMap("message", "success:" + number); | |
} | |
throw new RuntimeException(number + " is received"); | |
} | |
@GetMapping("/frontend/{number}") | |
public Single<Map<String, String>> frontend(@PathVariable("number") int number) { | |
RetryPolicy retryPolicy = new RetryPolicy() | |
.withMaxRetries(2) | |
.withDelay(100, TimeUnit.MILLISECONDS); | |
Execution execution = new Execution(retryPolicy); | |
if (circuitBreaker.allowsExecution() == false) { | |
log.info("circuit is open. {}", number); | |
return Single.error(new RuntimeException("circuit is open.")); | |
} | |
log.info("starting... {}", number); | |
return Single.create((SingleOnSubscribe<Map<String, String>>) ob -> { | |
myService.call(number) | |
.subscribeOn(Schedulers.computation()) | |
.subscribe((map, e) -> { | |
if (e != null) { | |
log.error("io failed. {}", number); | |
ob.onError(new RuntimeException("io failed.")); | |
} else { | |
log.error("io scceeded. {}", number); | |
circuitBreaker.recordSuccess(); | |
ob.onSuccess(map); | |
} | |
}); | |
}).retryWhen(attempts -> { | |
return attempts.flatMap(failure -> { | |
if (execution.canRetryOn(failure)) { | |
// io スレッドで 100ミリ秒後に実行 | |
return Single.timer( | |
execution.getWaitTime().toMillis(), | |
TimeUnit.MILLISECONDS, Schedulers.io()) | |
.toFlowable(); | |
} else { | |
// リトライ出来なかったので、サーキットブレイカーに失敗を蓄積 | |
circuitBreaker.recordFailure(failure); | |
return Flowable.error(failure); | |
} | |
}); | |
}) | |
// 最初の処理は io スレッドで | |
.subscribeOn(Schedulers.io()) | |
// レスポンスを返すのは comutationのスレッドで | |
.observeOn(Schedulers.computation()); | |
} | |
@ResponseStatus(HttpStatus.INTERNAL_SERVER_ERROR) | |
@ExceptionHandler | |
public Map<String, String> handler(Exception e) { | |
return Collections.singletonMap("message", e.getMessage()); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.github.wreulicke.spring; | |
import static org.assertj.core.api.Assertions.assertThat; | |
import static org.assertj.core.api.Assertions.entry; | |
import java.util.Map; | |
import org.springframework.beans.factory.annotation.Autowired; | |
import org.springframework.boot.test.context.SpringBootTest; | |
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment; | |
import org.springframework.boot.test.web.client.TestRestTemplate; | |
import org.springframework.test.context.TestPropertySource; | |
import org.springframework.test.context.junit4.SpringRunner; | |
import org.junit.Test; | |
import org.junit.runner.RunWith; | |
@RunWith(SpringRunner.class) | |
@SpringBootTest(webEnvironment = WebEnvironment.DEFINED_PORT) | |
@TestPropertySource(properties = "server.port=8080") | |
public class ComplexControllerTest { | |
@Autowired | |
TestRestTemplate testRestTemplate; | |
@SuppressWarnings("unchecked") | |
@Test | |
public void rxjava() throws InterruptedException { | |
// サーキットブレイカーはこのタイミングだと閉じている | |
assertThat(testRestTemplate.getForObject("/frontend/{number}", Map.class, 1)) | |
.contains(entry("message", "io failed.")); | |
assertThat(testRestTemplate.getForObject("/frontend/{number}", Map.class, 2)) | |
.contains(entry("message", "io failed.")); | |
// 2回失敗したので開いている | |
assertThat(testRestTemplate.getForObject("/frontend/{number}", Map.class, 3)) | |
.contains(entry("message", "circuit is open.")); | |
assertThat(testRestTemplate.getForObject("/frontend/{number}", Map.class, 4)) | |
.contains(entry("message", "circuit is open.")); | |
assertThat(testRestTemplate.getForObject("/frontend/{number}", Map.class, 5)) | |
.contains(entry("message", "circuit is open.")); | |
// 2秒待つと半開に移る | |
Thread.sleep(2000); | |
// 次は5回成功する | |
assertThat(testRestTemplate.getForObject("/frontend/{number}", Map.class, 6)) | |
.contains(entry("message", "success:6")); | |
assertThat(testRestTemplate.getForObject("/frontend/{number}", Map.class, 7)) | |
.contains(entry("message", "success:7")); | |
assertThat(testRestTemplate.getForObject("/frontend/{number}", Map.class, 8)) | |
.contains(entry("message", "success:8")); | |
assertThat(testRestTemplate.getForObject("/frontend/{number}", Map.class, 9)) | |
.contains(entry("message", "success:9")); | |
assertThat(testRestTemplate.getForObject("/frontend/{number}", Map.class, 10)) | |
.contains(entry("message", "success:10")); | |
// 次は失敗するが、上記5回の成功によってサーキットブレイカーが復帰し(閉じ)ている | |
assertThat(testRestTemplate.getForObject("/frontend/{number}", Map.class, 11)) | |
.contains(entry("message", "io failed.")); | |
// 上のタイミングで、サーキットブレイカーが半開であったなら、上記呼び出しによって | |
// サーキットブレイカーが閉じており、次は "circuit is open."を記録するはずであるが | |
// この状態ではまだ閉じているので、その結果にはならない。 | |
assertThat(testRestTemplate.getForObject("/frontend/{number}", Map.class, 12)) | |
.contains(entry("message", "io failed.")); | |
// 2回失敗したのでまたサーキットブレイカーが閉じる | |
assertThat(testRestTemplate.getForObject("/frontend/{number}", Map.class, 13)) | |
.contains(entry("message", "circuit is open.")); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.github.wreulicke.spring; | |
import java.util.Collections; | |
import java.util.Map; | |
import java.util.concurrent.TimeUnit; | |
import lombok.extern.slf4j.Slf4j; | |
import org.springframework.http.HttpStatus; | |
import org.springframework.web.bind.annotation.ExceptionHandler; | |
import org.springframework.web.bind.annotation.GetMapping; | |
import org.springframework.web.bind.annotation.ResponseStatus; | |
import org.springframework.web.bind.annotation.RestController; | |
import io.reactivex.Flowable; | |
import io.reactivex.Single; | |
import io.reactivex.SingleOnSubscribe; | |
import io.reactivex.schedulers.Schedulers; | |
import net.jodah.failsafe.CircuitBreaker; | |
import net.jodah.failsafe.Execution; | |
import net.jodah.failsafe.RetryPolicy; | |
@RestController | |
@Slf4j | |
public class MyController { | |
// 2回失敗で開く | |
// 半開時に5回成功するとサーキットブレイカーが閉じる | |
// 開いたあとに半開に移るのが 1秒後 | |
private final CircuitBreaker circuitBreaker = new CircuitBreaker() | |
.withFailureThreshold(2) | |
.withSuccessThreshold(5) | |
.withDelay(1, TimeUnit.SECONDS); | |
@GetMapping("/test") | |
public Observable<String> test() { | |
return Observable.just("test"); | |
} | |
@GetMapping("/rxjava") | |
public Single<String> rxjava() { | |
RetryPolicy retryPolicy = new RetryPolicy() | |
.withMaxRetries(2) | |
.withDelay(100, TimeUnit.MILLISECONDS); | |
Execution execution = new Execution(retryPolicy); | |
if (circuitBreaker.allowsExecution() == false) { | |
log.info("circuit is open."); | |
return Single.error(new RuntimeException("circuit is open.")); | |
} | |
log.info("starting..."); | |
return Single.create((SingleOnSubscribe<String>) ob -> { | |
log.error("error occured."); | |
ob.onError(new RuntimeException("io failed.")); | |
// 本来は成功時にサーキットブレイカーに成功を記録させる。 | |
// ob.onSuccess("test"); | |
// circuitBreaker.recordSuccess(); | |
}).retryWhen(attempts -> { | |
return attempts.flatMap(failure -> { | |
if (execution.canRetryOn(failure)) { | |
// io スレッドで 100ミリ秒後に実行 | |
return Single.timer( | |
execution.getWaitTime().toMillis(), | |
TimeUnit.MILLISECONDS, Schedulers.io()) | |
.toFlowable(); | |
} else { | |
// リトライ出来なかったので、サーキットブレイカーに失敗を蓄積 | |
circuitBreaker.recordFailure(failure); | |
return Flowable.error(failure); | |
} | |
}); | |
}) | |
// 最初の処理は io スレッドで | |
.subscribeOn(Schedulers.io()) | |
// レスポンスを返すのは comutationのスレッドで | |
.observeOn(Schedulers.computation()); | |
} | |
@ResponseStatus(HttpStatus.INTERNAL_SERVER_ERROR) | |
@ExceptionHandler | |
public Map<String, String> handler(Exception e) { | |
return Collections.singletonMap("message", e.getMessage()); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.github.wreulicke.spring; | |
import static org.assertj.core.api.Assertions.assertThat; | |
import static org.assertj.core.api.Assertions.entry; | |
import java.util.Map; | |
import org.springframework.beans.factory.annotation.Autowired; | |
import org.springframework.boot.test.context.SpringBootTest; | |
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment; | |
import org.springframework.boot.test.web.client.TestRestTemplate; | |
import org.springframework.test.context.junit4.SpringRunner; | |
import org.junit.Test; | |
import org.junit.runner.RunWith; | |
@RunWith(SpringRunner.class) | |
@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT) | |
public class MyControllerTest { | |
@Autowired | |
TestRestTemplate testRestTemplate; | |
@Test | |
public void test() { | |
String actual = testRestTemplate.getForObject("/test", String.class); | |
assertThat(actual) | |
.isEqualTo("test"); | |
} | |
@SuppressWarnings("unchecked") | |
@Test | |
public void rxjava() throws InterruptedException { | |
// サーキットブレイカーはこのタイミングだと閉じている | |
assertThat(testRestTemplate.getForObject("/rxjava", Map.class)) | |
.contains(entry("message", "io failed.")); | |
assertThat(testRestTemplate.getForObject("/rxjava", Map.class)) | |
.contains(entry("message", "io failed.")); | |
// 2回失敗したので開いている | |
assertThat(testRestTemplate.getForObject("/rxjava", Map.class)) | |
.contains(entry("message", "circuit is open.")); | |
assertThat(testRestTemplate.getForObject("/rxjava", Map.class)) | |
.contains(entry("message", "circuit is open.")); | |
assertThat(testRestTemplate.getForObject("/rxjava", Map.class)) | |
.contains(entry("message", "circuit is open.")); | |
assertThat(testRestTemplate.getForObject("/rxjava", Map.class)) | |
.contains(entry("message", "circuit is open.")); | |
// 2秒待つと半開に移る | |
Thread.sleep(2000); | |
// 半開になったので処理が実行されるがまたもや失敗。 | |
assertThat(testRestTemplate.getForObject("/rxjava", Map.class)) | |
.contains(entry("message", "io failed.")); | |
// 半開時に失敗したので サーキットブレイカーは開く | |
assertThat(testRestTemplate.getForObject("/rxjava", Map.class)) | |
.contains(entry("message", "circuit is open.")); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.github.wreulicke.spring; | |
import java.util.Map; | |
import io.reactivex.Single; | |
import retrofit2.http.GET; | |
import retrofit2.http.Path; | |
public interface MyService { | |
@GET("/backend/{number}") | |
Single<Map<String, String>> call(@Path("number") int number); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment