Created
June 27, 2020 06:29
-
-
Save likhoman/3eb818bfd136762ce47a9c55000a1c65 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.github.likhoman.mtls.client.ws; | |
import org.springframework.beans.factory.annotation.Value; | |
import org.springframework.context.ApplicationEvent; | |
import org.springframework.context.ApplicationListener; | |
import org.springframework.context.annotation.Bean; | |
import org.springframework.context.annotation.Configuration; | |
import org.springframework.integration.annotation.ServiceActivator; | |
import org.springframework.integration.channel.QueueChannel; | |
import org.springframework.integration.config.EnableIntegration; | |
import org.springframework.integration.event.inbound.ApplicationEventListeningMessageProducer; | |
import org.springframework.integration.stomp.ReactorNettyTcpStompSessionManager; | |
import org.springframework.integration.stomp.StompSessionManager; | |
import org.springframework.integration.stomp.event.StompIntegrationEvent; | |
import org.springframework.integration.stomp.inbound.StompInboundChannelAdapter; | |
import org.springframework.integration.stomp.outbound.StompMessageHandler; | |
import org.springframework.integration.support.converter.PassThruMessageConverter; | |
import org.springframework.messaging.MessageHandler; | |
import org.springframework.messaging.PollableChannel; | |
import org.springframework.messaging.simp.stomp.ReactorNettyTcpStompClient; | |
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; | |
@Configuration | |
@EnableIntegration | |
public class StompConfiguration { | |
@Value("${stomp.host:127.0.0.1}") | |
private String stompHost; | |
@Value("${stomp.port:61613}") | |
private int stompPort; | |
@Value("${stomp.topic}") | |
private String topic; | |
@Bean | |
public ReactorNettyTcpStompClient stompClient() { | |
var stompClient = new ReactorNettyTcpStompClient(stompHost, stompPort); | |
stompClient.setMessageConverter(new PassThruMessageConverter()); | |
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler(); | |
taskScheduler.afterPropertiesSet(); | |
stompClient.setTaskScheduler(taskScheduler); | |
stompClient.setReceiptTimeLimit(5000); | |
return stompClient; | |
} | |
@Bean | |
public StompSessionManager stompSessionManager() { | |
var stompSessionManager = new ReactorNettyTcpStompSessionManager(stompClient()); | |
stompSessionManager.setAutoReceipt(true); | |
return stompSessionManager; | |
} | |
@Bean | |
public PollableChannel stompInputChannel() { | |
return new QueueChannel(); | |
} | |
@Bean | |
public StompInboundChannelAdapter stompInboundChannelAdapter() { | |
var adapter = | |
new StompInboundChannelAdapter(stompSessionManager(), "/topic/" + topic); | |
adapter.setOutputChannel(stompInputChannel()); | |
return adapter; | |
} | |
@Bean | |
@ServiceActivator(inputChannel = "stompOutputChannel") | |
public MessageHandler stompMessageHandler() { | |
var handler = new StompMessageHandler(stompSessionManager()); | |
handler.setDestination("/topic/" + topic); | |
return handler; | |
} | |
@Bean | |
public PollableChannel stompEvents() { | |
return new QueueChannel(); | |
} | |
@Bean | |
public PollableChannel stompErrors() { | |
return new QueueChannel(); | |
} | |
@Bean | |
public ApplicationListener<ApplicationEvent> stompEventListener() { | |
var producer = new ApplicationEventListeningMessageProducer(); | |
producer.setEventTypes(StompIntegrationEvent.class); | |
producer.setOutputChannel(stompEvents()); | |
producer.setErrorChannel(stompErrors()); | |
return producer; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment