Skip to content

Instantly share code, notes, and snippets.

@adsr
Created July 13, 2017 17:29
Show Gist options
  • Select an option

  • Save adsr/2d415077351e2d2cea1ee4230125ff65 to your computer and use it in GitHub Desktop.

Select an option

Save adsr/2d415077351e2d2cea1ee4230125ff65 to your computer and use it in GitHub Desktop.
diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java
index f23d902..ebf0464 100644
--- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java
+++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java
@@ -60,6 +60,7 @@
private static final long INITIAL_POLL_PERIOD_IN_MILLIS = TimeUnit.SECONDS.toMillis(5);
private static final long MAX_POLL_PERIOD_IN_MILLIS = TimeUnit.HOURS.toMillis(1);
+ private static final int EVENT_QUEUE_CAPACITY = 1024 * 32;
private final boolean recordSchemaChangesInSourceRecords;
private final RecordMakers recordMakers;
@@ -79,6 +80,9 @@
private final AtomicLong totalRecordCounter = new AtomicLong();
private volatile Map<String, ?> lastOffset = null;
private com.github.shyiko.mysql.binlog.GtidSet gtidSet;
+ private BlockingQueue eventQueue;
+ private EventProcessor eventProcessor;
+ private Thread eventProcessorThread;
/**
* Create a binlog reader.
@@ -96,12 +100,17 @@ public BinlogReader(String name, MySqlTaskContext context) {
// Use exponential delay to log the progress frequently at first, but the quickly tapering off to once an hour...
pollOutputDelay = ElapsedTimeStrategy.exponential(clock, INITIAL_POLL_PERIOD_IN_MILLIS, MAX_POLL_PERIOD_IN_MILLIS);
+ // Make event processor stuff
+ eventProcessor = new LinkedBlockingQueue<Event>(EVENT_QUEUE_CAPACITY);
+ eventProcessor = new EventProcessor(this, eventQueue);
+ eventProcessorThread = new Thread(eventProcessor, "event-processor");
+
// Set up the log reader ...
client = new BinaryLogClient(context.hostname(), context.port(), context.username(), context.password());
client.setServerId(context.serverId());
client.setSSLMode(sslModeFor(context.sslMode()));
client.setKeepAlive(context.config().getBoolean(MySqlConnectorConfig.KEEP_ALIVE));
- client.registerEventListener(this::handleEvent);
+ client.registerEventListener(this::queueEvent);
client.registerLifecycleListener(new ReaderThreadLifecycleListener());
if (logger.isDebugEnabled()) client.registerEventListener(this::logEvent);
@@ -242,6 +251,9 @@ protected void doStart() {
context.hostname() + ":" + context.port() + " with user '" + context.username() + "': " + e.getMessage(), e);
}
}
+
+ // Start event processor thread
+ eventProcessorThread.start();
}
@Override
@@ -249,7 +261,10 @@ protected void doStop() {
try {
if (isRunning()) {
logger.debug("Stopping binlog reader, last recorded offset: {}", lastOffset);
+ eventProcessor.setRunning(false);
+ eventQueue.put(null);
client.disconnect();
+ eventProcessorThread.join();
}
cleanupResources();
} catch (IOException e) {
@@ -299,6 +314,10 @@ protected void ignoreEvent(Event event) {
logger.trace("Ignoring event due to missing handler: {}", event);
}
+ protected void queueEvent(Event event) {
+ eventQueue.put(event);
+ }
+
protected void handleEvent(Event event) {
if (event == null) return;
@@ -710,6 +729,30 @@ protected SSLMode sslModeFor(SecureConnectionMode mode) {
return null;
}
+ protected final class EventProcessor implements Runnable {
+ private volatile boolean running = true;
+ private BinlogReader reader;
+
+ public EventProcesor(BinlogReader reader, BlockingQueue eventQueue) {
+ this.reader = reader;
+ this.eventQueue = eventQueue;
+ }
+
+ public void setRunning(boolean running) {
+ this.running = running;
+ }
+
+ public void run() {
+ while (running) {
+ try {
+ reader.handleEvent(eventQueue.take());
+ } catch (InterruptedException e) {
+ // Swallow
+ }
+ }
+ }
+ }
+
protected final class ReaderThreadLifecycleListener implements LifecycleListener {
@Override
public void onDisconnect(BinaryLogClient client) {
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment