Created
July 13, 2017 17:29
-
-
Save adsr/2d415077351e2d2cea1ee4230125ff65 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java | |
| index f23d902..ebf0464 100644 | |
| --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java | |
| +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java | |
| @@ -60,6 +60,7 @@ | |
| private static final long INITIAL_POLL_PERIOD_IN_MILLIS = TimeUnit.SECONDS.toMillis(5); | |
| private static final long MAX_POLL_PERIOD_IN_MILLIS = TimeUnit.HOURS.toMillis(1); | |
| + private static final int EVENT_QUEUE_CAPACITY = 1024 * 32; | |
| private final boolean recordSchemaChangesInSourceRecords; | |
| private final RecordMakers recordMakers; | |
| @@ -79,6 +80,9 @@ | |
| private final AtomicLong totalRecordCounter = new AtomicLong(); | |
| private volatile Map<String, ?> lastOffset = null; | |
| private com.github.shyiko.mysql.binlog.GtidSet gtidSet; | |
| + private BlockingQueue eventQueue; | |
| + private EventProcessor eventProcessor; | |
| + private Thread eventProcessorThread; | |
| /** | |
| * Create a binlog reader. | |
| @@ -96,12 +100,17 @@ public BinlogReader(String name, MySqlTaskContext context) { | |
| // Use exponential delay to log the progress frequently at first, but the quickly tapering off to once an hour... | |
| pollOutputDelay = ElapsedTimeStrategy.exponential(clock, INITIAL_POLL_PERIOD_IN_MILLIS, MAX_POLL_PERIOD_IN_MILLIS); | |
| + // Make event processor stuff | |
| + eventProcessor = new LinkedBlockingQueue<Event>(EVENT_QUEUE_CAPACITY); | |
| + eventProcessor = new EventProcessor(this, eventQueue); | |
| + eventProcessorThread = new Thread(eventProcessor, "event-processor"); | |
| + | |
| // Set up the log reader ... | |
| client = new BinaryLogClient(context.hostname(), context.port(), context.username(), context.password()); | |
| client.setServerId(context.serverId()); | |
| client.setSSLMode(sslModeFor(context.sslMode())); | |
| client.setKeepAlive(context.config().getBoolean(MySqlConnectorConfig.KEEP_ALIVE)); | |
| - client.registerEventListener(this::handleEvent); | |
| + client.registerEventListener(this::queueEvent); | |
| client.registerLifecycleListener(new ReaderThreadLifecycleListener()); | |
| if (logger.isDebugEnabled()) client.registerEventListener(this::logEvent); | |
| @@ -242,6 +251,9 @@ protected void doStart() { | |
| context.hostname() + ":" + context.port() + " with user '" + context.username() + "': " + e.getMessage(), e); | |
| } | |
| } | |
| + | |
| + // Start event processor thread | |
| + eventProcessorThread.start(); | |
| } | |
| @Override | |
| @@ -249,7 +261,10 @@ protected void doStop() { | |
| try { | |
| if (isRunning()) { | |
| logger.debug("Stopping binlog reader, last recorded offset: {}", lastOffset); | |
| + eventProcessor.setRunning(false); | |
| + eventQueue.put(null); | |
| client.disconnect(); | |
| + eventProcessorThread.join(); | |
| } | |
| cleanupResources(); | |
| } catch (IOException e) { | |
| @@ -299,6 +314,10 @@ protected void ignoreEvent(Event event) { | |
| logger.trace("Ignoring event due to missing handler: {}", event); | |
| } | |
| + protected void queueEvent(Event event) { | |
| + eventQueue.put(event); | |
| + } | |
| + | |
| protected void handleEvent(Event event) { | |
| if (event == null) return; | |
| @@ -710,6 +729,30 @@ protected SSLMode sslModeFor(SecureConnectionMode mode) { | |
| return null; | |
| } | |
| + protected final class EventProcessor implements Runnable { | |
| + private volatile boolean running = true; | |
| + private BinlogReader reader; | |
| + | |
| + public EventProcesor(BinlogReader reader, BlockingQueue eventQueue) { | |
| + this.reader = reader; | |
| + this.eventQueue = eventQueue; | |
| + } | |
| + | |
| + public void setRunning(boolean running) { | |
| + this.running = running; | |
| + } | |
| + | |
| + public void run() { | |
| + while (running) { | |
| + try { | |
| + reader.handleEvent(eventQueue.take()); | |
| + } catch (InterruptedException e) { | |
| + // Swallow | |
| + } | |
| + } | |
| + } | |
| + } | |
| + | |
| protected final class ReaderThreadLifecycleListener implements LifecycleListener { | |
| @Override | |
| public void onDisconnect(BinaryLogClient client) { |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment