Last active
February 23, 2020 21:26
-
-
Save ogomaemmanuel/26d8b550c937825723e40780ad42ad8c to your computer and use it in GitHub Desktop.
CDC Mysql Example
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
server.port=8045 | |
#spring.datasource.initialization-mode=always | |
spring.datasource.url=jdbc:mysql://localhost:33060/debezium | |
spring.datasource.username=homestead | |
spring.datasource.password=secret |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
plugins { | |
java | |
groovy | |
id("org.springframework.boot") version "2.1.5.RELEASE" | |
} | |
apply(plugin = "io.spring.dependency-management") | |
group = "pl.dk" | |
version = "0.0.1-SNAPSHOT" | |
repositories { | |
mavenCentral() | |
} | |
configure<JavaPluginConvention> { | |
sourceCompatibility = JavaVersion.VERSION_11 | |
targetCompatibility = JavaVersion.VERSION_11 | |
} | |
dependencies { | |
annotationProcessor("org.projectlombok:lombok") | |
implementation("org.springframework.boot:spring-boot-starter") | |
implementation("org.springframework.data:spring-data-jdbc") | |
implementation("com.zaxxer:HikariCP") | |
implementation("org.codehaus.groovy:groovy-all:2.5.7") | |
implementation("org.projectlombok:lombok") | |
compile("mysql:mysql-connector-java:8.0.19") | |
implementation("io.debezium:debezium-embedded:0.9.5.Final") | |
implementation("io.debezium:debezium-connector-mysql:0.9.5.Final") | |
testImplementation("org.springframework.boot:spring-boot-starter-test") | |
testImplementation("org.spockframework:spock-spring:1.3-groovy-2.5") | |
testImplementation("org.testcontainers:spock:1.11.3") | |
testImplementation("org.testcontainers:postgresql:1.11.3") | |
testImplementation("org.awaitility:awaitility-groovy:3.1.6") | |
} | |
tasks.withType(Wrapper::class.java).configureEach { | |
gradleVersion = "6.1.1" | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package pl.dk.debeziumdemo.watch; | |
import io.debezium.config.Configuration; | |
import io.debezium.data.Envelope.Operation; | |
import io.debezium.embedded.EmbeddedEngine; | |
import lombok.extern.slf4j.Slf4j; | |
import org.apache.kafka.connect.data.Struct; | |
import org.apache.kafka.connect.source.SourceRecord; | |
import javax.annotation.PostConstruct; | |
import javax.annotation.PreDestroy; | |
import java.util.ArrayList; | |
import java.util.List; | |
import java.util.concurrent.Executor; | |
import java.util.concurrent.Executors; | |
import static io.debezium.data.Envelope.FieldName.AFTER; | |
import static io.debezium.data.Envelope.FieldName.OPERATION; | |
import static java.util.List.copyOf; | |
@Slf4j | |
class OrdersWatch { | |
private final Configuration debeziumConfiguration; | |
private final Executor executor = Executors.newSingleThreadExecutor(); | |
private EmbeddedEngine engine; | |
private final List<Struct> changesCaptured = new ArrayList<>(); | |
OrdersWatch(Configuration debeziumConfiguration) { | |
this.debeziumConfiguration = debeziumConfiguration; | |
} | |
void start() { | |
engine = EmbeddedEngine.create() | |
.using(debeziumConfiguration) | |
.notifying(this::handleEvent) | |
.build(); | |
executor.execute(engine); | |
} | |
@PostConstruct | |
void postConstrutct() { | |
start(); | |
} | |
@PreDestroy | |
void stop() { | |
if (engine != null) { | |
engine.stop(); | |
} | |
} | |
private void handleEvent(SourceRecord sourceRecord) { | |
System.out.println(sourceRecord.toString()); | |
} | |
List<Struct> getChangesCaptured() { | |
return copyOf(changesCaptured); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package pl.dk.debeziumdemo.watch; | |
import org.springframework.context.annotation.Bean; | |
import org.springframework.context.annotation.Configuration; | |
import io.debezium.connector.mysql.MySqlConnectorConfig; | |
import io.debezium.relational.history.MemoryDatabaseHistory; | |
@Configuration | |
class WatchConfig { | |
@Bean | |
OrdersWatch ordersWatch() { | |
return new OrdersWatch(debeziumConfiguration()); | |
} | |
io.debezium.config.Configuration debeziumConfiguration() { | |
return io.debezium.config.Configuration.create() | |
.with("connector.class", "io.debezium.connector.mysql.MySqlConnector") | |
.with("offset.storage", "org.apache.kafka.connect.storage.MemoryOffsetBackingStore") | |
.with("offset.flush.interval.ms", 60000) | |
.with("name", "orders-mysql-connector") | |
.with("database.server.name", "orders") | |
.with("database.hostname", "localhost") | |
.with("database.port",33060) | |
.with("database.server.id",184054) | |
.with("database.user", "homestead") | |
.with("database.password", "secret") | |
.with("database.dbname", "debezium") | |
.with("table.whitelist", "debezium.students") | |
.with("database.whitelist","debezium") | |
.with("schemas.enable", false) | |
.with(MySqlConnectorConfig.DATABASE_HISTORY, MemoryDatabaseHistory.class.getName()) | |
// .with("snapshot.mode", "never") | |
.build(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment