Created
January 8, 2025 21:56
-
-
Save josergdev/7fe6d90ecd535c5901bc87d12885467b to your computer and use it in GitHub Desktop.
Outbox with Spring Integration
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package dev.joserg.outboxintegration; | |
import org.postgresql.jdbc.PgConnection; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import org.springframework.beans.factory.annotation.Qualifier; | |
import org.springframework.boot.SpringApplication; | |
import org.springframework.boot.autoconfigure.SpringBootApplication; | |
import org.springframework.boot.context.properties.ConfigurationProperties; | |
import org.springframework.boot.jdbc.DataSourceBuilder; | |
import org.springframework.context.annotation.Bean; | |
import org.springframework.context.annotation.Configuration; | |
import org.springframework.integration.annotation.Gateway; | |
import org.springframework.integration.annotation.MessagingGateway; | |
import org.springframework.integration.channel.QueueChannel; | |
import org.springframework.integration.dsl.IntegrationFlow; | |
import org.springframework.integration.dsl.MessageChannels; | |
import org.springframework.integration.jdbc.channel.PostgresChannelMessageTableSubscriber; | |
import org.springframework.integration.jdbc.channel.PostgresSubscribableChannel; | |
import org.springframework.integration.jdbc.store.JdbcChannelMessageStore; | |
import org.springframework.integration.jdbc.store.channel.PostgresChannelMessageStoreQueryProvider; | |
import org.springframework.integration.store.ChannelMessageStore; | |
import org.springframework.jdbc.datasource.DataSourceTransactionManager; | |
import org.springframework.messaging.Message; | |
import org.springframework.messaging.support.MessageBuilder; | |
import org.springframework.stereotype.Component; | |
import org.springframework.web.bind.annotation.GetMapping; | |
import org.springframework.web.bind.annotation.RestController; | |
import javax.sql.DataSource; | |
import java.io.Serializable; | |
import java.time.*; | |
import java.time.format.DateTimeFormatter; | |
import java.util.UUID; | |
import java.util.random.RandomGenerator; | |
import static dev.joserg.outboxintegration.OutboxIntegrationApplication.SpringConfiguration.Channel.*; | |
@SpringBootApplication | |
public class OutboxIntegrationApplication { | |
public static void main(String[] args) { | |
SpringApplication.run(OutboxIntegrationApplication.class, args); | |
} | |
@RestController | |
public static class GreetingController { | |
private final Publisher publisher; | |
public GreetingController(Publisher publisher) { | |
this.publisher = publisher; | |
} | |
@GetMapping("/greet") | |
public String greet() { | |
this.publisher.publish(new GreetingMessage(ZonedDateTime.now(ZoneOffset.UTC).format(DateTimeFormatter.ISO_DATE_TIME))); | |
return "Hello!"; | |
} | |
} | |
@MessagingGateway | |
public interface Publisher { | |
@Gateway(requestChannel = "journalChannel") | |
void publish(GreetingMessage greetingMessage); | |
} | |
public record GreetingMessage(String instant) implements Serializable { | |
} | |
@Component | |
public static class KafkaPublisher { | |
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPublisher.class); | |
private final RandomGenerator randomGenerator = RandomGenerator.getDefault(); | |
public void publish(Message<GreetingMessage> greetingMessage) { | |
if (randomGenerator.nextBoolean()) { | |
LOGGER.info("Sending to kafka: {}", greetingMessage); | |
} else { | |
throw new RuntimeException("Kafka server down"); | |
} | |
} | |
} | |
@Component | |
public static class RabbitPublisher { | |
private static final Logger LOGGER = LoggerFactory.getLogger(RabbitPublisher.class); | |
private final RandomGenerator randomGenerator = RandomGenerator.getDefault(); | |
public void publish(Message<GreetingMessage> greetingMessage) { | |
if (randomGenerator.nextBoolean()) { | |
LOGGER.info("Sending to rabbit: {}", greetingMessage); | |
} else { | |
throw new RuntimeException("rabbit server down"); | |
} | |
} | |
} | |
@Component | |
public static class JournalPublisher { | |
private static final Logger LOGGER = LoggerFactory.getLogger(JournalPublisher.class); | |
private final RandomGenerator randomGenerator = RandomGenerator.getDefault(); | |
public void publish(Message<GreetingMessage> greetingMessage) { | |
if (randomGenerator.nextBoolean()) { | |
LOGGER.info("Sending to journal: {}", greetingMessage); | |
} else { | |
throw new RuntimeException("journal server down"); | |
} | |
} | |
} | |
@Configuration | |
public static class SpringConfiguration { | |
public enum Channel { | |
JOURNAL, KAFKA, RABBIT; | |
public static UUID id(Channel channel) { | |
return switch (channel) { | |
case JOURNAL -> UUID.fromString("00000000-0000-0000-0000-000000000001"); | |
case KAFKA -> UUID.fromString("00000000-0000-0000-0000-000000000002"); | |
case RABBIT -> UUID.fromString("00000000-0000-0000-0000-000000000003"); | |
}; | |
} | |
public UUID id() { | |
return id(this); | |
} | |
} | |
@Bean | |
@ConfigurationProperties("spring.datasource") | |
public DataSource dataSource() { | |
return DataSourceBuilder.create().build(); | |
} | |
@Bean | |
public DataSourceTransactionManager dataSourceTransactionManager(DataSource dataSource) { | |
return new DataSourceTransactionManager(dataSource); | |
} | |
@Bean | |
JdbcChannelMessageStore jdbcChannelMessageStore(DataSource dataSource) { | |
JdbcChannelMessageStore jdbcChannelMessageStore = new JdbcChannelMessageStore(dataSource); | |
jdbcChannelMessageStore.setChannelMessageStoreQueryProvider(new PostgresChannelMessageStoreQueryProvider()); | |
return jdbcChannelMessageStore; | |
} | |
@Bean | |
public PostgresChannelMessageTableSubscriber messageStoreSubscriber(DataSource dataSource) { | |
return new PostgresChannelMessageTableSubscriber(() -> dataSource.getConnection().unwrap(PgConnection.class)); | |
} | |
@Bean | |
QueueChannel journalChannel(ChannelMessageStore channelMessageStore) { | |
return MessageChannels.queue(channelMessageStore, JOURNAL.id()).getObject(); | |
} | |
@Bean | |
public PostgresSubscribableChannel journalSubscription( | |
PostgresChannelMessageTableSubscriber subscriber, | |
JdbcChannelMessageStore messageStore, | |
DataSourceTransactionManager dataSourceTransactionManager) { | |
var postgresSubscribableChannel = new PostgresSubscribableChannel(messageStore, JOURNAL.id(), subscriber); | |
postgresSubscribableChannel.setTransactionManager(dataSourceTransactionManager); | |
return postgresSubscribableChannel; | |
} | |
@Bean | |
QueueChannel kafkaChannel(ChannelMessageStore channelMessageStore) { | |
return MessageChannels.queue(channelMessageStore, KAFKA.id()).getObject(); | |
} | |
@Bean | |
public PostgresSubscribableChannel kafkaSubscription( | |
PostgresChannelMessageTableSubscriber subscriber, | |
JdbcChannelMessageStore messageStore, | |
DataSourceTransactionManager dataSourceTransactionManager) { | |
var postgresSubscribableChannel = new PostgresSubscribableChannel(messageStore, KAFKA.id(), subscriber); | |
postgresSubscribableChannel.setTransactionManager(dataSourceTransactionManager); | |
return postgresSubscribableChannel; | |
} | |
@Bean | |
QueueChannel rabbitChannel(ChannelMessageStore channelMessageStore) { | |
return MessageChannels.queue(channelMessageStore, RABBIT.id()).getObject(); | |
} | |
@Bean | |
public PostgresSubscribableChannel rabbitSubscription( | |
PostgresChannelMessageTableSubscriber subscriber, | |
JdbcChannelMessageStore messageStore, | |
DataSourceTransactionManager dataSourceTransactionManager) { | |
var postgresSubscribableChannel = new PostgresSubscribableChannel(messageStore, RABBIT.id(), subscriber); | |
postgresSubscribableChannel.setTransactionManager(dataSourceTransactionManager); | |
return postgresSubscribableChannel; | |
} | |
@Bean | |
public IntegrationFlow journalFlow(@Qualifier("journalSubscription") PostgresSubscribableChannel journalSubscription, | |
@Qualifier("kafkaChannel") QueueChannel kafkaChannel, | |
@Qualifier("rabbitChannel") QueueChannel rabbitChannel, | |
JournalPublisher journalPublisher) { | |
return IntegrationFlow | |
.from(journalSubscription) | |
.<GreetingMessage>handle( | |
(payload, headers) -> { | |
var message = MessageBuilder.createMessage(payload, headers); | |
journalPublisher.publish(message); | |
return message; | |
}, | |
configurer -> configurer.requiresReply(true)) | |
.routeToRecipients(configurer -> configurer | |
.recipient(rabbitChannel) | |
.recipient(kafkaChannel)) | |
.get(); | |
} | |
@Bean | |
public IntegrationFlow kafkaFlow(@Qualifier("kafkaSubscription") PostgresSubscribableChannel kafkaSubscription, | |
KafkaPublisher kafkaPublisher) { | |
return IntegrationFlow.from(kafkaSubscription) | |
.handle(message -> kafkaPublisher.publish((Message<GreetingMessage>) message)) | |
.get(); | |
} | |
@Bean | |
public IntegrationFlow rabbitFlow(@Qualifier("rabbitSubscription") PostgresSubscribableChannel rabbitSubscription, | |
RabbitPublisher rabbitPublisher) { | |
return IntegrationFlow.from(rabbitSubscription) | |
.handle(message -> rabbitPublisher.publish((Message<GreetingMessage>) message)) | |
.get(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment