Last active
July 11, 2024 15:01
-
-
Save annagapuz/58e50e47540a3d579163a96f3cdd2787 to your computer and use it in GitHub Desktop.
SSEWithSpringBootWebFluxAndOAuth
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
spring.security.oauth2.client.registration.dealer.client-id=HELLO | |
spring.security.oauth2.client.registration.dealer.client-secret=HELLO | |
spring.security.oauth2.client.registration.dealer.authorization-grant-type=client_credentials | |
spring.security.oauth2.client.registration.dealer.scope=https://fhlbnonprodb2c.onmicrosoft.com/dnwindow-apis/.default | |
spring.security.oauth2.client.registration.dealer.client-authentication-method=client_secret_post | |
spring.security.oauth2.client.registration.dealer.provider=azure | |
spring.security.oauth2.client.provider.azure.token-uri=https://fhlbnonprodb2c.b2clogin.com/21dca27d-1390-46a4-b18e-468db144f3e1/b2c_1a_9_signup_signin/oauth2/v2.0/token | |
spring.main.web-application-type=reactive |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import io.netty.handler.timeout.ReadTimeoutHandler; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import org.springframework.beans.factory.annotation.Autowired; | |
import org.springframework.boot.context.event.ApplicationReadyEvent; | |
import org.springframework.context.event.EventListener; | |
import org.springframework.core.ParameterizedTypeReference; | |
import org.springframework.http.HttpStatusCode; | |
import org.springframework.http.MediaType; | |
import org.springframework.http.codec.ServerSentEvent; | |
import org.springframework.security.oauth2.client.ReactiveOAuth2AuthorizedClientManager; | |
import org.springframework.security.oauth2.client.web.reactive.function.client.ServerOAuth2AuthorizedClientExchangeFilterFunction; | |
import org.springframework.stereotype.Component; | |
import org.springframework.web.client.HttpClientErrorException; | |
import org.springframework.web.client.HttpServerErrorException; | |
import org.springframework.web.reactive.function.client.WebClient; | |
import org.springframework.web.reactive.function.client.WebClientRequestException; | |
import org.springframework.web.reactive.function.client.WebClientResponseException; | |
import reactor.core.Disposable; | |
import reactor.core.publisher.Flux; | |
import reactor.netty.http.client.HttpClient; | |
import reactor.util.retry.Retry; | |
import java.io.IOException; | |
import java.time.LocalTime; | |
import java.util.List; | |
import static org.springframework.security.oauth2.client.web.reactive.function.client.ServerOAuth2AuthorizedClientExchangeFilterFunction.clientRegistrationId; | |
@Component | |
public class DNWStreamingClient { | |
private static final Logger LOG = LoggerFactory.getLogger(DNWStreamingClient.class); | |
private final WebClient webClient; | |
public DNWStreamingClient(@Autowired WebClient.Builder webClientBuilder, ReactiveOAuth2AuthorizedClientManager authorizedClientManager) { | |
ServerOAuth2AuthorizedClientExchangeFilterFunction oauth = new ServerOAuth2AuthorizedClientExchangeFilterFunction(authorizedClientManager); | |
this.webClient = webClientBuilder | |
.defaultHeaders(headers -> headers.setAccept(List.of(MediaType.ALL))) | |
.filter(oauth) | |
.build(); | |
} | |
@EventListener(ApplicationReadyEvent.class) | |
public void connectToStreamAsSSE() throws InterruptedException { | |
final String apiUrl = https://api.fhlb-np.com/dnwindow/v1/orders/stream; | |
Flux<ServerSentEvent<String>> eventStream = createEventStream(apiUrl); | |
// This is a non-blocking subscription that should continually repeat due | |
// to the use of retry and repeat | |
// subscribe(eventStream); | |
// This is a blocking subscription that should continually repeat due | |
// to the use of retry and repeat | |
// In the event that the subscription ends, the outer while loop will trigger | |
// the subscribe once again | |
ServerSentEvent<String> finalEvent = null; | |
while (true) { | |
LOG.info("Stream is being initiated ... "); | |
finalEvent = subscribeAndBlock(eventStream); | |
LOG.info("Final event is {}", finalEvent); | |
} | |
} | |
private Flux<ServerSentEvent<String>> createEventStream(String uri) { | |
LOG.info("Initiating stream to {} ...", uri); | |
ParameterizedTypeReference<ServerSentEvent<String>> eventType = new ParameterizedTypeReference<>() { | |
}; | |
return webClient | |
.get() | |
.uri(uri) | |
.attributes(clientRegistrationId("dealer")) | |
.retrieve() | |
.onStatus(HttpStatusCode::is5xxServerError, clientResponse -> clientResponse.bodyToMono(String.class).map(body -> { | |
LOG.error("A 5xx server error was returned: {}", body); | |
return new HttpServerErrorException(clientResponse.statusCode(), "HTTP server error was returned"); | |
})) | |
.onStatus(HttpStatusCode::is4xxClientError, clientResponse -> clientResponse.bodyToMono(String.class).map(body -> { | |
LOG.error("A 4xx bad request error was returned: {}", body); | |
return new HttpClientErrorException(clientResponse.statusCode(), "HTTP server error was returned"); | |
})) | |
.bodyToFlux(eventType); | |
} | |
private ServerSentEvent<String> subscribeAndBlock(Flux<ServerSentEvent<String>> eventStream) { | |
return eventStream | |
// handles error | |
.retryWhen(Retry.indefinitely() | |
.doBeforeRetry(retrySignal -> | |
LOG.info("Will retry because [{} -> {}]", retrySignal.failure().getClass().getName(), retrySignal.failure().getMessage())) | |
.doAfterRetry(retrySignal -> LOG.info("Retry attempt #{}", retrySignal.totalRetries())) | |
.filter(throwable -> throwable instanceof HttpServerErrorException || | |
throwable instanceof WebClientRequestException || // includes ReadTimeoutException | |
throwable instanceof WebClientResponseException || // includes HTTP 200 with a connection reset | |
throwable instanceof IOException) // includes premature close | |
) | |
.repeat(() -> { | |
LOG.info("Will repeat as subscription completed with a HTTP 200"); | |
return true; | |
}) | |
.doOnSubscribe(subscription -> LOG.info("Subscribed")) | |
.doOnCancel(() -> LOG.info("Subscription canceled")) | |
.doOnComplete(() -> LOG.info("Subscription complete")) | |
.doOnTerminate(() -> LOG.info("Subscription terminated")) | |
.doOnError(throwable -> LOG.error("Subscription ended in error", throwable)) | |
.doFinally(signalType -> LOG.info("Final subscription state is {}", signalType)) | |
.doOnNext(sse -> LOG.info("Got SSE --- Time: {} - event: name[{}], id [{}], data[{}] ", LocalTime.now(), sse.event(), sse.id(), sse.data())) | |
.blockLast(); | |
} | |
private Disposable subscribe(Flux<ServerSentEvent<String>> eventStream) { | |
return eventStream | |
// handles error | |
.retryWhen(Retry.indefinitely() | |
.doBeforeRetry(retrySignal -> | |
LOG.info("Will retry because [{} -> {}]", retrySignal.failure().getClass().getName(), retrySignal.failure().getMessage())) | |
.doAfterRetry(retrySignal -> LOG.info("Retry attempt #{}", retrySignal.totalRetries())) | |
.filter(throwable -> throwable instanceof HttpServerErrorException || | |
throwable instanceof WebClientRequestException || // includes ReadTimeoutException | |
throwable instanceof WebClientResponseException || // includes HTTP 200 with a connection reset | |
throwable instanceof IOException) // includes premature close | |
) | |
.repeat(() -> { | |
LOG.info("Will repeat as subscription completed with a HTTP 200"); | |
return true; | |
}) | |
.doOnSubscribe(subscription -> LOG.info("Subscribed")) | |
.doOnCancel(() -> LOG.info("Subscription canceled")) | |
.doOnComplete(() -> LOG.info("Subscription complete")) | |
.doOnTerminate(() -> LOG.info("Subscription terminated")) | |
.doOnError(throwable -> LOG.error("Subscription ended in error", throwable)) | |
.doFinally(signalType -> LOG.info("Final subscription state is {}", signalType)) | |
.subscribe( | |
content -> LOG.info("Got SSE --- Time: {} - event: name[{}], id [{}], data[{}] ", | |
LocalTime.now(), content.event(), content.id(), content.data()), | |
error -> LOG.error("Error receiving SSE", error), | |
() -> LOG.info("Completed!!!") | |
); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?xml version="1.0" encoding="UTF-8"?> | |
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | |
xmlns="http://maven.apache.org/POM/4.0.0" | |
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | |
<modelVersion>4.0.0</modelVersion> | |
<groupId>com.gaps.sample</groupId> | |
<artifactId>spring-web-client</artifactId> | |
<version>1.0-SNAPSHOT</version> | |
<properties> | |
<java.version>17</java.version> | |
<maven.compiler.source>17</maven.compiler.source> | |
<maven.compiler.target>17</maven.compiler.target> | |
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> | |
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> | |
<spring-boot.version>3.2.5</spring-boot.version> | |
</properties> | |
<dependencies> | |
<dependency> | |
<groupId>org.springframework.boot</groupId> | |
<artifactId>spring-boot-starter-webflux</artifactId> | |
</dependency> | |
<dependency> | |
<groupId>org.springframework.boot</groupId> | |
<artifactId>spring-boot-starter-oauth2-client</artifactId> | |
</dependency> | |
</dependencies> | |
<dependencyManagement> | |
<dependencies> | |
<!-- You must have Apache Maven 3.6.x or above for Spring Boot dependency management to work --> | |
<dependency> | |
<groupId>org.springframework.boot</groupId> | |
<artifactId>spring-boot-dependencies</artifactId> | |
<version>${spring-boot.version}</version> | |
<type>pom</type> | |
<scope>import</scope> | |
</dependency> | |
</dependencies> | |
</dependencyManagement> | |
<build> | |
<plugins> | |
<plugin> | |
<groupId>org.springframework.boot</groupId> | |
<artifactId>spring-boot-maven-plugin</artifactId> | |
</plugin> | |
</plugins> | |
</build> | |
</project> |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.gaps.sample.client; | |
import org.springframework.boot.SpringApplication; | |
import org.springframework.boot.autoconfigure.SpringBootApplication; | |
@SpringBootApplication | |
public class SpringWebClientApplication { | |
public static void main(String[] args) { | |
SpringApplication.run(SpringWebClientApplication.class, args); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.gaps.sample.client; | |
import org.springframework.context.annotation.Bean; | |
import org.springframework.context.annotation.Configuration; | |
import org.springframework.security.config.annotation.web.reactive.EnableWebFluxSecurity; | |
import org.springframework.security.config.web.server.ServerHttpSecurity; | |
import org.springframework.security.oauth2.client.*; | |
import org.springframework.security.oauth2.client.registration.ReactiveClientRegistrationRepository; | |
import org.springframework.security.web.server.SecurityWebFilterChain; | |
import static org.springframework.security.config.Customizer.withDefaults; | |
@Configuration | |
@EnableWebFluxSecurity | |
public class WebSecurityConfig { | |
@Bean | |
public SecurityWebFilterChain securityFilterChain(ServerHttpSecurity http) { | |
http | |
.authorizeExchange(authz -> authz | |
.anyExchange().permitAll() | |
) | |
.oauth2Client(withDefaults()); | |
return http.build(); | |
} | |
@Bean | |
public ReactiveOAuth2AuthorizedClientManager authorizedClientManager( | |
ReactiveClientRegistrationRepository clientRegistrationRepository, | |
ReactiveOAuth2AuthorizedClientService authorizedClientService) { | |
ReactiveOAuth2AuthorizedClientProvider authorizedClientProvider = | |
ReactiveOAuth2AuthorizedClientProviderBuilder.builder() | |
.clientCredentials() | |
.build(); | |
AuthorizedClientServiceReactiveOAuth2AuthorizedClientManager authorizedClientManager = | |
new AuthorizedClientServiceReactiveOAuth2AuthorizedClientManager( | |
clientRegistrationRepository, authorizedClientService); | |
authorizedClientManager.setAuthorizedClientProvider(authorizedClientProvider); | |
return authorizedClientManager; | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment