Last active
January 26, 2021 07:41
-
-
Save suikast42/367f35f8d27071d616d70803dd41e385 to your computer and use it in GitHub Desktop.
Spring Boot -> Kafka Stream -> TimeScaleDB over JTA
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Create a standard JTA based Springboot project so that your servives supports @ Transactional | |
I avoid to using JPA entities here. The JPA specification has the limitation of ID's. So every enitity that your mapped must have one. | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.boxbay.wms.bi.batch; | |
import com.boxbay.wms.bi.configuartion.serde.DeviceMovesSerde; | |
import com.boxbay.wms.bi.configuartion.topic.TopicConfiguration; | |
import com.boxbay.wms.bi.model.DeviceMoveTracking; | |
import com.boxbay.wms.commons.datetime.DateTime; | |
import org.apache.kafka.clients.consumer.ConsumerConfig; | |
import org.apache.kafka.clients.consumer.ConsumerRecord; | |
import org.apache.kafka.common.serialization.StringDeserializer; | |
import org.springframework.beans.factory.annotation.Autowired; | |
import org.springframework.beans.factory.annotation.Qualifier; | |
import org.springframework.context.annotation.Bean; | |
import org.springframework.jdbc.core.BatchPreparedStatementSetter; | |
import org.springframework.jdbc.core.JdbcTemplate; | |
import org.springframework.kafka.annotation.KafkaListener; | |
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; | |
import org.springframework.kafka.config.KafkaListenerContainerFactory; | |
import org.springframework.kafka.core.ConsumerFactory; | |
import org.springframework.kafka.core.DefaultKafkaConsumerFactory; | |
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; | |
import org.springframework.stereotype.Repository; | |
import org.springframework.transaction.annotation.Transactional; | |
import java.sql.PreparedStatement; | |
import java.sql.SQLException; | |
import java.util.HashMap; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.stream.Collectors; | |
/** | |
* @author: vuru | |
* Date: 30.09.2020 | |
* Time: 06:33 | |
*/ | |
@SuppressWarnings("SqlDialectInspection") | |
@Repository | |
@Transactional | |
public class KafkaToTimeScaleBatchingService { | |
@Autowired | |
private JdbcTemplate jdbcTemplate; | |
private final String QUERY = """ | |
<your query> | |
"""; | |
public int[] batchInsert(final List<YourModel> moves) { | |
return this.jdbcTemplate.batchUpdate(QUERY, | |
new BatchPreparedStatementSetter() { | |
@Override | |
public void setValues(PreparedStatement ps, int i) throws SQLException { | |
<Your value sets> | |
} | |
@Override | |
public int getBatchSize() { | |
return moves.size(); | |
} | |
}); | |
} | |
@KafkaListener(containerFactory = "yourContainerFactory", | |
id = "yourID", | |
clientIdPrefix = "yourPref", | |
topics = "yourTopics", | |
autoStartup = "true", | |
groupId = "yourGrp" | |
) | |
@Transactional | |
public void processMaterialTracking(List<ConsumerRecord<String, YourModel>> record) { | |
try { | |
batchInsert(record.stream().map(ConsumerRecord::value).collect(Collectors.toList())); | |
} catch (Exception e) { | |
<hanlde exp for doing a rollback or not a rollback should deliver the same dataset again> | |
} | |
} | |
@Bean("yourContainerFactory") | |
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, YourModel>> | |
deviceMovesListener( | |
@Qualifier("yourConsumerFactory") ConsumerFactory<String, YourModel> consumerFactory | |
) { | |
ConcurrentKafkaListenerContainerFactory<String, DeviceMoveTracking> factory = | |
new ConcurrentKafkaListenerContainerFactory<>(); | |
factory.setConsumerFactory(consumerFactory); | |
factory.setConcurrency(5); | |
factory.setBatchListener(true); | |
factory.getContainerProperties().setPollTimeout(3000); | |
return factory; | |
} | |
@Bean("yourConsumerFactory") | |
public ConsumerFactory<String, DeviceMoveTracking> consumerFactoryDeviceMoves() { | |
Map<String, Object> props = new HashMap<>(); | |
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); | |
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); | |
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, YourModelSerde.class); | |
props.put(ConsumerConfig.ISOLATION_LEVEL_DOC, "read_committed"); | |
return new DefaultKafkaConsumerFactory<>(props); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment