Last active
September 10, 2020 12:44
-
-
Save winster/19e11fc9baf3784b71e48ad56fbbac73 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Slf4j | |
public class CacheTest { | |
private static Cache<ItineraryRequest, ItineraryResponse> caffeineWebclientCache; | |
@BeforeAll | |
public static void setup(){ | |
caffeineWebclientCache = Caffeine.newBuilder().maximumSize(100) | |
.expireAfterWrite(Duration.ofSeconds(60)).build(); | |
} | |
@BeforeEach | |
public void before() { | |
webclient_execution_count = 0; | |
caffeineWebclientCache.invalidateAll(); | |
} | |
int webclient_execution_count = 0; | |
@Test | |
void webclientRequest() | |
{ | |
final String tripId = "Trip999"; | |
final String system = "fra"; | |
final String server = "World"; | |
ItineraryRequest itineraryRequest = new ItineraryRequest(); | |
itineraryRequest.setTripId(tripId); | |
itineraryRequest.setSystem(system); | |
itineraryRequest.setServer(server); | |
ItineraryResponse itineraryResponse = new ItineraryResponse(); | |
itineraryResponse.setTripId(itineraryRequest.getTripId()); | |
itineraryResponse.setSystem(itineraryRequest.getSystem()); | |
try { | |
String bookingDataResponseJson = new ObjectMapper().writeValueAsString(itineraryResponse); | |
ItineraryResponse tripResponseExpected = | |
new ObjectMapper().readValue(bookingDataResponseJson, | |
ItineraryResponse.class); | |
ClientResponse clientResponse = | |
ClientResponse.create(HttpStatus.OK) | |
.header("Content-Type", "application/json") | |
.body(bookingDataResponseJson) | |
.build(); | |
WebClient webClient = WebClient.builder() | |
.exchangeFunction( | |
clientRequest -> Mono.just( | |
clientResponse)) | |
.build(); | |
RetryBackoffSpec fixedRetry = | |
Retry.fixedDelay(2L, Duration.ofMillis(10L)) | |
.filter(throwable -> throwable | |
instanceof Exception); | |
Mono<ItineraryResponse> bookingDataMono = | |
createWebClient(webClient, itineraryRequest, fixedRetry); | |
Mono<ItineraryResponse> bookingDataMono_1 = | |
createWebClient(webClient, itineraryRequest, fixedRetry); | |
Flux<ItineraryResponse> bookingDataFlux = bookingDataMono.concatWith(bookingDataMono_1); | |
StepVerifier.create(bookingDataFlux) | |
.consumeNextWith(tripResponseActual -> { | |
assertEquals(tripResponseActual.getTripId(), | |
tripResponseExpected.getTripId()); | |
assertThat(webclient_execution_count).isEqualTo(1); | |
assertThat(caffeineWebclientCache.getIfPresent(itineraryRequest)).isNull(); | |
}).consumeNextWith(tripResponseActual -> { | |
assertThat(webclient_execution_count).isEqualTo(2); | |
assertThat(caffeineWebclientCache.getIfPresent(itineraryRequest)).isNull(); | |
}).verifyComplete(); | |
} catch (IOException e) { | |
e.printStackTrace(); | |
} | |
} | |
@Test | |
void webclientRequest_Cached() | |
{ | |
final String tripId = "Trip121"; | |
final String system = "RERER"; | |
final String server = "World"; | |
ItineraryRequest itineraryRequest = new ItineraryRequest(); | |
itineraryRequest.setTripId(tripId); | |
itineraryRequest.setSystem(system); | |
itineraryRequest.setServer(server); | |
ItineraryResponse itineraryResponse = new ItineraryResponse(); | |
itineraryResponse.setTripId(itineraryRequest.getTripId()); | |
itineraryResponse.setSystem(itineraryRequest.getSystem()); | |
try { | |
String bookingDataResponseJson = new ObjectMapper().writeValueAsString(itineraryResponse); | |
ItineraryResponse tripResponseExpected = | |
new ObjectMapper().readValue(bookingDataResponseJson, | |
ItineraryResponse.class); | |
ClientResponse clientResponse = | |
ClientResponse.create(HttpStatus.OK) | |
.header("Content-Type", "application/json") | |
.body(bookingDataResponseJson) | |
.build(); | |
WebClient webClient = WebClient.builder() | |
.exchangeFunction( | |
clientRequest -> Mono.just( | |
clientResponse)) | |
.build(); | |
RetryBackoffSpec fixedRetry = | |
Retry.fixedDelay(2L, Duration.ofMillis(10L)) | |
.filter(throwable -> throwable | |
instanceof Exception); | |
Mono<ItineraryResponse> bookingDataMono = | |
createWebclientCached(webClient, itineraryRequest, fixedRetry); | |
Mono<ItineraryResponse> bookingDataMono_1 = | |
createWebclientCached(webClient, itineraryRequest, fixedRetry); | |
Flux<ItineraryResponse> bookingDataFlux = bookingDataMono.concatWith(bookingDataMono_1); | |
StepVerifier.create(bookingDataFlux) | |
.consumeNextWith(tripResponseActual -> { | |
assertEquals(tripResponseActual.getTripId(), | |
tripResponseExpected.getTripId()); | |
assertThat(webclient_execution_count).isEqualTo(1); | |
assertThat(caffeineWebclientCache.getIfPresent(itineraryRequest)).isNotNull(); | |
assertThat(caffeineWebclientCache.getIfPresent(itineraryRequest).getTripId()).isEqualTo(tripResponseExpected.getTripId()); | |
}).consumeNextWith(tripResponseActual -> { | |
assertThat(webclient_execution_count).isEqualTo(1); | |
assertThat(caffeineWebclientCache.getIfPresent(itineraryRequest)).isNotNull(); | |
}).verifyComplete(); | |
} catch (IOException e) { | |
e.printStackTrace(); | |
} | |
} | |
Mono<ItineraryResponse> createWebclientCached(WebClient webClient, ItineraryRequest itineraryRequest, RetryBackoffSpec fixedRetry) | |
{ | |
return CacheMono | |
.lookup(key -> Mono.justOrEmpty(caffeineWebclientCache.getIfPresent(key)) | |
.map(Signal::next), | |
itineraryRequest) | |
.onCacheMissResume(() -> createWebClient(webClient, itineraryRequest, fixedRetry)) | |
.andWriteWith((key, signal) -> | |
Mono.fromRunnable(() -> { | |
log.info(" 1 :: AndWriteWith {} {}", key, signal.get()); | |
Optional.ofNullable(signal.get()) | |
.ifPresent(value -> | |
caffeineWebclientCache.put(key, value));})); | |
} | |
private Mono<ItineraryResponse> createWebClient(WebClient webClient, ItineraryRequest itineraryRequest, RetryBackoffSpec fixedRetry) { | |
return webClient | |
.post() | |
.uri("http://dummy") | |
.body(Mono.just(itineraryRequest), ItineraryRequest.class).exchange() | |
.flatMap(clientResponse -> { | |
HttpStatus statusCode = clientResponse.statusCode(); | |
if (statusCode.is4xxClientError() | |
|| statusCode.is5xxServerError()) { | |
return Mono.error(Exception::new); | |
} | |
++webclient_execution_count; | |
return clientResponse.bodyToMono(ItineraryResponse.class); | |
}) | |
.onErrorResume(TimeoutException.class, e -> Mono.error(Exception::new)) | |
.onErrorResume(ConnectException.class, e -> {log.error("Connect Exception");return Mono.error(Exception::new);}) | |
.doOnCancel(() -> { | |
log.error("Cancelled. Who did this and why?"); | |
}) | |
.retryWhen(fixedRetry); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment