Skip to content

Instantly share code, notes, and snippets.

@annagapuz
Last active July 11, 2024 15:01
Show Gist options
  • Save annagapuz/58e50e47540a3d579163a96f3cdd2787 to your computer and use it in GitHub Desktop.
Save annagapuz/58e50e47540a3d579163a96f3cdd2787 to your computer and use it in GitHub Desktop.
SSEWithSpringBootWebFluxAndOAuth
spring.security.oauth2.client.registration.dealer.client-id=HELLO
spring.security.oauth2.client.registration.dealer.client-secret=HELLO
spring.security.oauth2.client.registration.dealer.authorization-grant-type=client_credentials
spring.security.oauth2.client.registration.dealer.scope=https://fhlbnonprodb2c.onmicrosoft.com/dnwindow-apis/.default
spring.security.oauth2.client.registration.dealer.client-authentication-method=client_secret_post
spring.security.oauth2.client.registration.dealer.provider=azure
spring.security.oauth2.client.provider.azure.token-uri=https://fhlbnonprodb2c.b2clogin.com/21dca27d-1390-46a4-b18e-468db144f3e1/b2c_1a_9_signup_signin/oauth2/v2.0/token
spring.main.web-application-type=reactive
import io.netty.handler.timeout.ReadTimeoutHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.HttpStatusCode;
import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.security.oauth2.client.ReactiveOAuth2AuthorizedClientManager;
import org.springframework.security.oauth2.client.web.reactive.function.client.ServerOAuth2AuthorizedClientExchangeFilterFunction;
import org.springframework.stereotype.Component;
import org.springframework.web.client.HttpClientErrorException;
import org.springframework.web.client.HttpServerErrorException;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.WebClientRequestException;
import org.springframework.web.reactive.function.client.WebClientResponseException;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.netty.http.client.HttpClient;
import reactor.util.retry.Retry;
import java.io.IOException;
import java.time.LocalTime;
import java.util.List;
import static org.springframework.security.oauth2.client.web.reactive.function.client.ServerOAuth2AuthorizedClientExchangeFilterFunction.clientRegistrationId;
@Component
public class DNWStreamingClient {
private static final Logger LOG = LoggerFactory.getLogger(DNWStreamingClient.class);
private final WebClient webClient;
public DNWStreamingClient(@Autowired WebClient.Builder webClientBuilder, ReactiveOAuth2AuthorizedClientManager authorizedClientManager) {
ServerOAuth2AuthorizedClientExchangeFilterFunction oauth = new ServerOAuth2AuthorizedClientExchangeFilterFunction(authorizedClientManager);
this.webClient = webClientBuilder
.defaultHeaders(headers -> headers.setAccept(List.of(MediaType.ALL)))
.filter(oauth)
.build();
}
@EventListener(ApplicationReadyEvent.class)
public void connectToStreamAsSSE() throws InterruptedException {
final String apiUrl = https://api.fhlb-np.com/dnwindow/v1/orders/stream;
Flux<ServerSentEvent<String>> eventStream = createEventStream(apiUrl);
// This is a non-blocking subscription that should continually repeat due
// to the use of retry and repeat
// subscribe(eventStream);
// This is a blocking subscription that should continually repeat due
// to the use of retry and repeat
// In the event that the subscription ends, the outer while loop will trigger
// the subscribe once again
ServerSentEvent<String> finalEvent = null;
while (true) {
LOG.info("Stream is being initiated ... ");
finalEvent = subscribeAndBlock(eventStream);
LOG.info("Final event is {}", finalEvent);
}
}
private Flux<ServerSentEvent<String>> createEventStream(String uri) {
LOG.info("Initiating stream to {} ...", uri);
ParameterizedTypeReference<ServerSentEvent<String>> eventType = new ParameterizedTypeReference<>() {
};
return webClient
.get()
.uri(uri)
.attributes(clientRegistrationId("dealer"))
.retrieve()
.onStatus(HttpStatusCode::is5xxServerError, clientResponse -> clientResponse.bodyToMono(String.class).map(body -> {
LOG.error("A 5xx server error was returned: {}", body);
return new HttpServerErrorException(clientResponse.statusCode(), "HTTP server error was returned");
}))
.onStatus(HttpStatusCode::is4xxClientError, clientResponse -> clientResponse.bodyToMono(String.class).map(body -> {
LOG.error("A 4xx bad request error was returned: {}", body);
return new HttpClientErrorException(clientResponse.statusCode(), "HTTP server error was returned");
}))
.bodyToFlux(eventType);
}
private ServerSentEvent<String> subscribeAndBlock(Flux<ServerSentEvent<String>> eventStream) {
return eventStream
// handles error
.retryWhen(Retry.indefinitely()
.doBeforeRetry(retrySignal ->
LOG.info("Will retry because [{} -> {}]", retrySignal.failure().getClass().getName(), retrySignal.failure().getMessage()))
.doAfterRetry(retrySignal -> LOG.info("Retry attempt #{}", retrySignal.totalRetries()))
.filter(throwable -> throwable instanceof HttpServerErrorException ||
throwable instanceof WebClientRequestException || // includes ReadTimeoutException
throwable instanceof WebClientResponseException || // includes HTTP 200 with a connection reset
throwable instanceof IOException) // includes premature close
)
.repeat(() -> {
LOG.info("Will repeat as subscription completed with a HTTP 200");
return true;
})
.doOnSubscribe(subscription -> LOG.info("Subscribed"))
.doOnCancel(() -> LOG.info("Subscription canceled"))
.doOnComplete(() -> LOG.info("Subscription complete"))
.doOnTerminate(() -> LOG.info("Subscription terminated"))
.doOnError(throwable -> LOG.error("Subscription ended in error", throwable))
.doFinally(signalType -> LOG.info("Final subscription state is {}", signalType))
.doOnNext(sse -> LOG.info("Got SSE --- Time: {} - event: name[{}], id [{}], data[{}] ", LocalTime.now(), sse.event(), sse.id(), sse.data()))
.blockLast();
}
private Disposable subscribe(Flux<ServerSentEvent<String>> eventStream) {
return eventStream
// handles error
.retryWhen(Retry.indefinitely()
.doBeforeRetry(retrySignal ->
LOG.info("Will retry because [{} -> {}]", retrySignal.failure().getClass().getName(), retrySignal.failure().getMessage()))
.doAfterRetry(retrySignal -> LOG.info("Retry attempt #{}", retrySignal.totalRetries()))
.filter(throwable -> throwable instanceof HttpServerErrorException ||
throwable instanceof WebClientRequestException || // includes ReadTimeoutException
throwable instanceof WebClientResponseException || // includes HTTP 200 with a connection reset
throwable instanceof IOException) // includes premature close
)
.repeat(() -> {
LOG.info("Will repeat as subscription completed with a HTTP 200");
return true;
})
.doOnSubscribe(subscription -> LOG.info("Subscribed"))
.doOnCancel(() -> LOG.info("Subscription canceled"))
.doOnComplete(() -> LOG.info("Subscription complete"))
.doOnTerminate(() -> LOG.info("Subscription terminated"))
.doOnError(throwable -> LOG.error("Subscription ended in error", throwable))
.doFinally(signalType -> LOG.info("Final subscription state is {}", signalType))
.subscribe(
content -> LOG.info("Got SSE --- Time: {} - event: name[{}], id [{}], data[{}] ",
LocalTime.now(), content.event(), content.id(), content.data()),
error -> LOG.error("Error receiving SSE", error),
() -> LOG.info("Completed!!!")
);
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.gaps.sample</groupId>
<artifactId>spring-web-client</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<java.version>17</java.version>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<spring-boot.version>3.2.5</spring-boot.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-oauth2-client</artifactId>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<!-- You must have Apache Maven 3.6.x or above for Spring Boot dependency management to work -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
package com.gaps.sample.client;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class SpringWebClientApplication {
public static void main(String[] args) {
SpringApplication.run(SpringWebClientApplication.class, args);
}
}
package com.gaps.sample.client;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.security.config.annotation.web.reactive.EnableWebFluxSecurity;
import org.springframework.security.config.web.server.ServerHttpSecurity;
import org.springframework.security.oauth2.client.*;
import org.springframework.security.oauth2.client.registration.ReactiveClientRegistrationRepository;
import org.springframework.security.web.server.SecurityWebFilterChain;
import static org.springframework.security.config.Customizer.withDefaults;
@Configuration
@EnableWebFluxSecurity
public class WebSecurityConfig {
@Bean
public SecurityWebFilterChain securityFilterChain(ServerHttpSecurity http) {
http
.authorizeExchange(authz -> authz
.anyExchange().permitAll()
)
.oauth2Client(withDefaults());
return http.build();
}
@Bean
public ReactiveOAuth2AuthorizedClientManager authorizedClientManager(
ReactiveClientRegistrationRepository clientRegistrationRepository,
ReactiveOAuth2AuthorizedClientService authorizedClientService) {
ReactiveOAuth2AuthorizedClientProvider authorizedClientProvider =
ReactiveOAuth2AuthorizedClientProviderBuilder.builder()
.clientCredentials()
.build();
AuthorizedClientServiceReactiveOAuth2AuthorizedClientManager authorizedClientManager =
new AuthorizedClientServiceReactiveOAuth2AuthorizedClientManager(
clientRegistrationRepository, authorizedClientService);
authorizedClientManager.setAuthorizedClientProvider(authorizedClientProvider);
return authorizedClientManager;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment