Skip to content

Instantly share code, notes, and snippets.

Tests run: 4, Failures: 2, Errors: 0, Skipped: 0, Time elapsed: 1.207 sec <<< FAILURE! - in io.debezium.connector.mysql.MySqlConnectorRegressionIT
shouldConsumeAllEventsFromDatabaseUsingBinlogAndNoSnapshot(io.debezium.connector.mysql.MySqlConnectorRegressionIT) Time elapsed: 0.992 sec <<< FAILURE!
org.junit.ComparisonFailure: expected:<[0]> but was:<[1]>
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.fest.assertions.ConstructorInvoker.newInstance(ConstructorInvoker.java:36)
at org.fest.assertions.ComparisonFailureFactory.newComparisonFailure(ComparisonFailureFactory.java:60)
at org.fest.assertions.ComparisonFailureFactory.comparisonFailure(ComparisonFailureFactory.java:46)
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java
index def91ed..9b7040a 100644
--- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java
+++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java
@@ -28,6 +28,7 @@
import io.debezium.relational.TableId;
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.relational.history.KafkaDatabaseHistory;
+import io.debezium.relational.topic.ByTableTopicMapper;
diff --git a/run.sh b/run.sh
index 248ca9c..4e5cf03 100755
--- a/run.sh
+++ b/run.sh
@@ -117,6 +117,8 @@ for d in $all_mysqlds; do
./workspace/mysql/bin/mysql -uroot -proot --protocol=tcp -P${listen_port[$d]} -e "set sql_log_bin=0; USE etsy_index"
./workspace/mysql/bin/mysql -uroot -proot --protocol=tcp -P${listen_port[$d]} -e "set sql_log_bin=0; CREATE TABLE etsy_index.test (k int primary key, v1 tinyint default '1', v2 int null, v3 int null default '3', v4 int not null default '4', v5 int not null)"
./workspace/mysql/bin/mysql -uroot -proot --protocol=tcp -P${listen_port[$d]} -e "set sql_log_bin=0; CREATE TABLE etsy_index.test2 (k int primary key, v1 tinyint default '1', v2 int null, v3 int null default '3', v4 int not null default '4', v5 int not null)"
+
+ ./workspace/mysql/bin/mysql -uroot -proot --protocol=tcp -P${listen_port[$d]} -e "set sql_log_bin=0; CREATE TABLE test.test (k int primary key, v varchar(32))"
diff --git a/etsy-partitioner/Makefile b/etsy-partitioner/Makefile
index bcae918..51a1530 100644
--- a/etsy-partitioner/Makefile
+++ b/etsy-partitioner/Makefile
@@ -1,7 +1,7 @@
all: com/etsy/EtsyPartitioner.class
com/etsy/EtsyPartitioner.class: com/etsy/EtsyPartitioner.java
- javac -cp ../kafka/clients/build/libs/kafka-clients-0.10.2.0.jar:../kafka/connect/api/build/libs/connect-api-0.10.2.0.jar com/etsy/EtsyPartitioner.java
+ javac -cp ../kafka/clients/build/libs/kafka-clients-0.10.3.0-SNAPSHOT.jar:../kafka/connect/api/build/libs/connect-api-0.10.3.0-SNAPSHOT.jar com/etsy/EtsyPartitioner.java
diff --git a/debezium-core/src/main/java/io/debezium/relational/topic/TopicMappers.java b/debezium-core/src/main/java/io/debezium/relational/topic/TopicMappers.java
index 5cdfcab..eb05251 100644
--- a/debezium-core/src/main/java/io/debezium/relational/topic/TopicMappers.java
+++ b/debezium-core/src/main/java/io/debezium/relational/topic/TopicMappers.java
@@ -28,10 +28,11 @@ public TopicMappers(Configuration config, String[] topicMapperAliases) {
if (topicMapperAliases != null) {
for (String alias : topicMapperAliases) {
TopicMapper topicMapper = config.getInstance("topic.mappers." + alias + ".type", TopicMapper.class);
- if (topicMapper != null) {
- topicMappers.add(topicMapper);
diff --git a/run.sh b/run.sh
index 90499ae..12e8f05 100755
--- a/run.sh
+++ b/run.sh
@@ -100,25 +100,26 @@ done
# Make db tables
info "Making db tables..."
for d in $all_mysqlds; do
- ./workspace/mysql/bin/mysql -uroot -proot --protocol=tcp -P${listen_port[$d]} -e "set sql_log_bin=0; CREATE DATABASE etsy_shard_001"
- ./workspace/mysql/bin/mysql -uroot -proot --protocol=tcp -P${listen_port[$d]} -e "set sql_log_bin=0; USE etsy_shard_001"
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java
index 2e85cde..6caa6e8 100644
--- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java
+++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java
@@ -18,23 +18,12 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
+import com.github.shyiko.mysql.binlog.event.*;
import org.apache.kafka.connect.errors.ConnectException;
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java
index 2e85cde..4fc1159 100644
--- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java
+++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java
@@ -21,8 +21,6 @@
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
-import com.github.shyiko.mysql.binlog.BinaryLogClient;
-import com.github.shyiko.mysql.binlog.BinaryLogClient.LifecycleListener;
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java
index 2e85cde..d9f4788 100644
--- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java
+++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java
@@ -118,6 +118,12 @@ public Event nextEvent(ByteArrayInputStream inputStream) throws IOException {
if (event.getHeader().getEventType() == EventType.TABLE_MAP) {
TableMapEventData tableMapEvent = event.getData();
tableMapEventByTableId.put(tableMapEvent.getTableId(), tableMapEvent);
+ } else if (event.getHeader().getEventType() == EventType.TABLE_METADATA) {
+ TableMetadataEventData tableMetadataEvent = event.getData();
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java
index 6caa6e8..d9f4788 100644
--- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java
+++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java
@@ -18,12 +18,23 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
-import com.github.shyiko.mysql.binlog.event.*;
import org.apache.kafka.connect.errors.ConnectException;