Created
November 16, 2021 01:25
-
-
Save pradeepbn/a033cae7171f9e6da9a7f92737f843b9 to your computer and use it in GitHub Desktop.
Diff to reproduce shutdown sequence issue
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java | |
index 4af6eec83..23dd02bd3 100644 | |
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java | |
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java | |
@@ -37,10 +37,7 @@ import io.netty.util.concurrent.Future; | |
import io.netty.util.concurrent.GenericFutureListener; | |
import java.util.Optional; | |
-import java.util.concurrent.ExecutorService; | |
-import java.util.concurrent.RejectedExecutionException; | |
-import java.util.concurrent.Semaphore; | |
-import java.util.concurrent.TimeUnit; | |
+import java.util.concurrent.*; | |
import java.util.function.Consumer; | |
import lombok.AccessLevel; | |
@@ -129,6 +126,8 @@ public class BookieRequestProcessor implements RequestProcessor { | |
private final ByteBufAllocator allocator; | |
private final boolean throttleReadResponses; | |
+ public CountDownLatch shutdownSignal; | |
+ | |
public BookieRequestProcessor(ServerConfiguration serverCfg, Bookie bookie, StatsLogger statsLogger, | |
SecurityHandlerFactory shFactory, ByteBufAllocator allocator) throws SecurityException { | |
@@ -202,6 +201,7 @@ public class BookieRequestProcessor implements RequestProcessor { | |
int maxReads = serverCfg.getMaxReadsInProgressLimit(); | |
readsSemaphore = maxReads > 0 ? new Semaphore(maxReads, true) : null; | |
+ shutdownSignal = new CountDownLatch(1); | |
} | |
protected void onAddRequestStart(Channel channel) { | |
@@ -265,6 +265,13 @@ public class BookieRequestProcessor implements RequestProcessor { | |
@Override | |
public void close() { | |
LOG.info("Closing RequestProcessor"); | |
+ shutdownSignal.countDown(); | |
+ try { | |
+ TimeUnit.SECONDS.sleep(5); | |
+ } catch (InterruptedException e) { | |
+ // pass | |
+ } | |
+ | |
shutdownExecutor(writeThreadPool); | |
shutdownExecutor(readThreadPool); | |
if (serverCfg.getNumLongPollWorkerThreads() > 0 || readThreadPool == null) { | |
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java | |
index a563cc002..94900768e 100644 | |
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java | |
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java | |
@@ -24,6 +24,7 @@ import io.netty.channel.Channel; | |
import io.netty.util.ReferenceCountUtil; | |
import java.io.IOException; | |
import java.util.concurrent.CompletableFuture; | |
+import java.util.concurrent.CountDownLatch; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.TimeUnit; | |
import org.apache.bookkeeper.bookie.Bookie; | |
@@ -55,12 +56,15 @@ class ReadEntryProcessorV3 extends PacketProcessorBaseV3 { | |
// Stats | |
protected final OpStatsLogger readStats; | |
protected final OpStatsLogger reqStats; | |
+ public CountDownLatch shutdownSignal; | |
public ReadEntryProcessorV3(Request request, | |
Channel channel, | |
BookieRequestProcessor requestProcessor, | |
ExecutorService fenceThreadPool) { | |
super(request, channel, requestProcessor); | |
+ this.shutdownSignal = requestProcessor.shutdownSignal; | |
+ | |
requestProcessor.onReadRequestStart(channel); | |
this.readRequest = request.getReadRequest(); | |
@@ -242,6 +246,15 @@ class ReadEntryProcessorV3 extends PacketProcessorBaseV3 { | |
@Override | |
public void safeRun() { | |
+ try { | |
+ System.out.println("Wait for shutdown signal in readEntryProcessorV3"); | |
+ this.shutdownSignal.await(); | |
+ System.out.println("Received shutdown signal in readEntryProcessorV3"); | |
+ | |
+ } catch (InterruptedException e) { | |
+ // pass | |
+ } | |
+ | |
requestProcessor.getRequestStats().getReadEntrySchedulingDelayStats().registerSuccessfulEvent( | |
MathUtils.elapsedNanos(enqueueNanos), TimeUnit.NANOSECONDS); | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment