Skip to content

Instantly share code, notes, and snippets.

@josergdev
Created January 8, 2025 21:56
Show Gist options
  • Save josergdev/7fe6d90ecd535c5901bc87d12885467b to your computer and use it in GitHub Desktop.
Save josergdev/7fe6d90ecd535c5901bc87d12885467b to your computer and use it in GitHub Desktop.
Outbox with Spring Integration
package dev.joserg.outboxintegration;
import org.postgresql.jdbc.PgConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.jdbc.DataSourceBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.jdbc.channel.PostgresChannelMessageTableSubscriber;
import org.springframework.integration.jdbc.channel.PostgresSubscribableChannel;
import org.springframework.integration.jdbc.store.JdbcChannelMessageStore;
import org.springframework.integration.jdbc.store.channel.PostgresChannelMessageStoreQueryProvider;
import org.springframework.integration.store.ChannelMessageStore;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.sql.DataSource;
import java.io.Serializable;
import java.time.*;
import java.time.format.DateTimeFormatter;
import java.util.UUID;
import java.util.random.RandomGenerator;
import static dev.joserg.outboxintegration.OutboxIntegrationApplication.SpringConfiguration.Channel.*;
@SpringBootApplication
public class OutboxIntegrationApplication {
public static void main(String[] args) {
SpringApplication.run(OutboxIntegrationApplication.class, args);
}
@RestController
public static class GreetingController {
private final Publisher publisher;
public GreetingController(Publisher publisher) {
this.publisher = publisher;
}
@GetMapping("/greet")
public String greet() {
this.publisher.publish(new GreetingMessage(ZonedDateTime.now(ZoneOffset.UTC).format(DateTimeFormatter.ISO_DATE_TIME)));
return "Hello!";
}
}
@MessagingGateway
public interface Publisher {
@Gateway(requestChannel = "journalChannel")
void publish(GreetingMessage greetingMessage);
}
public record GreetingMessage(String instant) implements Serializable {
}
@Component
public static class KafkaPublisher {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPublisher.class);
private final RandomGenerator randomGenerator = RandomGenerator.getDefault();
public void publish(Message<GreetingMessage> greetingMessage) {
if (randomGenerator.nextBoolean()) {
LOGGER.info("Sending to kafka: {}", greetingMessage);
} else {
throw new RuntimeException("Kafka server down");
}
}
}
@Component
public static class RabbitPublisher {
private static final Logger LOGGER = LoggerFactory.getLogger(RabbitPublisher.class);
private final RandomGenerator randomGenerator = RandomGenerator.getDefault();
public void publish(Message<GreetingMessage> greetingMessage) {
if (randomGenerator.nextBoolean()) {
LOGGER.info("Sending to rabbit: {}", greetingMessage);
} else {
throw new RuntimeException("rabbit server down");
}
}
}
@Component
public static class JournalPublisher {
private static final Logger LOGGER = LoggerFactory.getLogger(JournalPublisher.class);
private final RandomGenerator randomGenerator = RandomGenerator.getDefault();
public void publish(Message<GreetingMessage> greetingMessage) {
if (randomGenerator.nextBoolean()) {
LOGGER.info("Sending to journal: {}", greetingMessage);
} else {
throw new RuntimeException("journal server down");
}
}
}
@Configuration
public static class SpringConfiguration {
public enum Channel {
JOURNAL, KAFKA, RABBIT;
public static UUID id(Channel channel) {
return switch (channel) {
case JOURNAL -> UUID.fromString("00000000-0000-0000-0000-000000000001");
case KAFKA -> UUID.fromString("00000000-0000-0000-0000-000000000002");
case RABBIT -> UUID.fromString("00000000-0000-0000-0000-000000000003");
};
}
public UUID id() {
return id(this);
}
}
@Bean
@ConfigurationProperties("spring.datasource")
public DataSource dataSource() {
return DataSourceBuilder.create().build();
}
@Bean
public DataSourceTransactionManager dataSourceTransactionManager(DataSource dataSource) {
return new DataSourceTransactionManager(dataSource);
}
@Bean
JdbcChannelMessageStore jdbcChannelMessageStore(DataSource dataSource) {
JdbcChannelMessageStore jdbcChannelMessageStore = new JdbcChannelMessageStore(dataSource);
jdbcChannelMessageStore.setChannelMessageStoreQueryProvider(new PostgresChannelMessageStoreQueryProvider());
return jdbcChannelMessageStore;
}
@Bean
public PostgresChannelMessageTableSubscriber messageStoreSubscriber(DataSource dataSource) {
return new PostgresChannelMessageTableSubscriber(() -> dataSource.getConnection().unwrap(PgConnection.class));
}
@Bean
QueueChannel journalChannel(ChannelMessageStore channelMessageStore) {
return MessageChannels.queue(channelMessageStore, JOURNAL.id()).getObject();
}
@Bean
public PostgresSubscribableChannel journalSubscription(
PostgresChannelMessageTableSubscriber subscriber,
JdbcChannelMessageStore messageStore,
DataSourceTransactionManager dataSourceTransactionManager) {
var postgresSubscribableChannel = new PostgresSubscribableChannel(messageStore, JOURNAL.id(), subscriber);
postgresSubscribableChannel.setTransactionManager(dataSourceTransactionManager);
return postgresSubscribableChannel;
}
@Bean
QueueChannel kafkaChannel(ChannelMessageStore channelMessageStore) {
return MessageChannels.queue(channelMessageStore, KAFKA.id()).getObject();
}
@Bean
public PostgresSubscribableChannel kafkaSubscription(
PostgresChannelMessageTableSubscriber subscriber,
JdbcChannelMessageStore messageStore,
DataSourceTransactionManager dataSourceTransactionManager) {
var postgresSubscribableChannel = new PostgresSubscribableChannel(messageStore, KAFKA.id(), subscriber);
postgresSubscribableChannel.setTransactionManager(dataSourceTransactionManager);
return postgresSubscribableChannel;
}
@Bean
QueueChannel rabbitChannel(ChannelMessageStore channelMessageStore) {
return MessageChannels.queue(channelMessageStore, RABBIT.id()).getObject();
}
@Bean
public PostgresSubscribableChannel rabbitSubscription(
PostgresChannelMessageTableSubscriber subscriber,
JdbcChannelMessageStore messageStore,
DataSourceTransactionManager dataSourceTransactionManager) {
var postgresSubscribableChannel = new PostgresSubscribableChannel(messageStore, RABBIT.id(), subscriber);
postgresSubscribableChannel.setTransactionManager(dataSourceTransactionManager);
return postgresSubscribableChannel;
}
@Bean
public IntegrationFlow journalFlow(@Qualifier("journalSubscription") PostgresSubscribableChannel journalSubscription,
@Qualifier("kafkaChannel") QueueChannel kafkaChannel,
@Qualifier("rabbitChannel") QueueChannel rabbitChannel,
JournalPublisher journalPublisher) {
return IntegrationFlow
.from(journalSubscription)
.<GreetingMessage>handle(
(payload, headers) -> {
var message = MessageBuilder.createMessage(payload, headers);
journalPublisher.publish(message);
return message;
},
configurer -> configurer.requiresReply(true))
.routeToRecipients(configurer -> configurer
.recipient(rabbitChannel)
.recipient(kafkaChannel))
.get();
}
@Bean
public IntegrationFlow kafkaFlow(@Qualifier("kafkaSubscription") PostgresSubscribableChannel kafkaSubscription,
KafkaPublisher kafkaPublisher) {
return IntegrationFlow.from(kafkaSubscription)
.handle(message -> kafkaPublisher.publish((Message<GreetingMessage>) message))
.get();
}
@Bean
public IntegrationFlow rabbitFlow(@Qualifier("rabbitSubscription") PostgresSubscribableChannel rabbitSubscription,
RabbitPublisher rabbitPublisher) {
return IntegrationFlow.from(rabbitSubscription)
.handle(message -> rabbitPublisher.publish((Message<GreetingMessage>) message))
.get();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment