Skip to content

Instantly share code, notes, and snippets.

@ogomaemmanuel
Last active February 23, 2020 21:26
Show Gist options
  • Save ogomaemmanuel/26d8b550c937825723e40780ad42ad8c to your computer and use it in GitHub Desktop.
Save ogomaemmanuel/26d8b550c937825723e40780ad42ad8c to your computer and use it in GitHub Desktop.
CDC Mysql Example
server.port=8045
#spring.datasource.initialization-mode=always
spring.datasource.url=jdbc:mysql://localhost:33060/debezium
spring.datasource.username=homestead
spring.datasource.password=secret
plugins {
java
groovy
id("org.springframework.boot") version "2.1.5.RELEASE"
}
apply(plugin = "io.spring.dependency-management")
group = "pl.dk"
version = "0.0.1-SNAPSHOT"
repositories {
mavenCentral()
}
configure<JavaPluginConvention> {
sourceCompatibility = JavaVersion.VERSION_11
targetCompatibility = JavaVersion.VERSION_11
}
dependencies {
annotationProcessor("org.projectlombok:lombok")
implementation("org.springframework.boot:spring-boot-starter")
implementation("org.springframework.data:spring-data-jdbc")
implementation("com.zaxxer:HikariCP")
implementation("org.codehaus.groovy:groovy-all:2.5.7")
implementation("org.projectlombok:lombok")
compile("mysql:mysql-connector-java:8.0.19")
implementation("io.debezium:debezium-embedded:0.9.5.Final")
implementation("io.debezium:debezium-connector-mysql:0.9.5.Final")
testImplementation("org.springframework.boot:spring-boot-starter-test")
testImplementation("org.spockframework:spock-spring:1.3-groovy-2.5")
testImplementation("org.testcontainers:spock:1.11.3")
testImplementation("org.testcontainers:postgresql:1.11.3")
testImplementation("org.awaitility:awaitility-groovy:3.1.6")
}
tasks.withType(Wrapper::class.java).configureEach {
gradleVersion = "6.1.1"
}
package pl.dk.debeziumdemo.watch;
import io.debezium.config.Configuration;
import io.debezium.data.Envelope.Operation;
import io.debezium.embedded.EmbeddedEngine;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import static io.debezium.data.Envelope.FieldName.AFTER;
import static io.debezium.data.Envelope.FieldName.OPERATION;
import static java.util.List.copyOf;
@Slf4j
class OrdersWatch {
private final Configuration debeziumConfiguration;
private final Executor executor = Executors.newSingleThreadExecutor();
private EmbeddedEngine engine;
private final List<Struct> changesCaptured = new ArrayList<>();
OrdersWatch(Configuration debeziumConfiguration) {
this.debeziumConfiguration = debeziumConfiguration;
}
void start() {
engine = EmbeddedEngine.create()
.using(debeziumConfiguration)
.notifying(this::handleEvent)
.build();
executor.execute(engine);
}
@PostConstruct
void postConstrutct() {
start();
}
@PreDestroy
void stop() {
if (engine != null) {
engine.stop();
}
}
private void handleEvent(SourceRecord sourceRecord) {
System.out.println(sourceRecord.toString());
}
List<Struct> getChangesCaptured() {
return copyOf(changesCaptured);
}
}
package pl.dk.debeziumdemo.watch;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.relational.history.MemoryDatabaseHistory;
@Configuration
class WatchConfig {
@Bean
OrdersWatch ordersWatch() {
return new OrdersWatch(debeziumConfiguration());
}
io.debezium.config.Configuration debeziumConfiguration() {
return io.debezium.config.Configuration.create()
.with("connector.class", "io.debezium.connector.mysql.MySqlConnector")
.with("offset.storage", "org.apache.kafka.connect.storage.MemoryOffsetBackingStore")
.with("offset.flush.interval.ms", 60000)
.with("name", "orders-mysql-connector")
.with("database.server.name", "orders")
.with("database.hostname", "localhost")
.with("database.port",33060)
.with("database.server.id",184054)
.with("database.user", "homestead")
.with("database.password", "secret")
.with("database.dbname", "debezium")
.with("table.whitelist", "debezium.students")
.with("database.whitelist","debezium")
.with("schemas.enable", false)
.with(MySqlConnectorConfig.DATABASE_HISTORY, MemoryDatabaseHistory.class.getName())
// .with("snapshot.mode", "never")
.build();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment