Skip to content

Instantly share code, notes, and snippets.

@pradeepbn
Created November 16, 2021 01:25
Show Gist options
  • Save pradeepbn/a033cae7171f9e6da9a7f92737f843b9 to your computer and use it in GitHub Desktop.
Save pradeepbn/a033cae7171f9e6da9a7f92737f843b9 to your computer and use it in GitHub Desktop.
Diff to reproduce shutdown sequence issue
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
index 4af6eec83..23dd02bd3 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
@@ -37,10 +37,7 @@ import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.util.Optional;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
import java.util.function.Consumer;
import lombok.AccessLevel;
@@ -129,6 +126,8 @@ public class BookieRequestProcessor implements RequestProcessor {
private final ByteBufAllocator allocator;
private final boolean throttleReadResponses;
+ public CountDownLatch shutdownSignal;
+
public BookieRequestProcessor(ServerConfiguration serverCfg, Bookie bookie, StatsLogger statsLogger,
SecurityHandlerFactory shFactory, ByteBufAllocator allocator) throws SecurityException {
@@ -202,6 +201,7 @@ public class BookieRequestProcessor implements RequestProcessor {
int maxReads = serverCfg.getMaxReadsInProgressLimit();
readsSemaphore = maxReads > 0 ? new Semaphore(maxReads, true) : null;
+ shutdownSignal = new CountDownLatch(1);
}
protected void onAddRequestStart(Channel channel) {
@@ -265,6 +265,13 @@ public class BookieRequestProcessor implements RequestProcessor {
@Override
public void close() {
LOG.info("Closing RequestProcessor");
+ shutdownSignal.countDown();
+ try {
+ TimeUnit.SECONDS.sleep(5);
+ } catch (InterruptedException e) {
+ // pass
+ }
+
shutdownExecutor(writeThreadPool);
shutdownExecutor(readThreadPool);
if (serverCfg.getNumLongPollWorkerThreads() > 0 || readThreadPool == null) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
index a563cc002..94900768e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
@@ -24,6 +24,7 @@ import io.netty.channel.Channel;
import io.netty.util.ReferenceCountUtil;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.bookie.Bookie;
@@ -55,12 +56,15 @@ class ReadEntryProcessorV3 extends PacketProcessorBaseV3 {
// Stats
protected final OpStatsLogger readStats;
protected final OpStatsLogger reqStats;
+ public CountDownLatch shutdownSignal;
public ReadEntryProcessorV3(Request request,
Channel channel,
BookieRequestProcessor requestProcessor,
ExecutorService fenceThreadPool) {
super(request, channel, requestProcessor);
+ this.shutdownSignal = requestProcessor.shutdownSignal;
+
requestProcessor.onReadRequestStart(channel);
this.readRequest = request.getReadRequest();
@@ -242,6 +246,15 @@ class ReadEntryProcessorV3 extends PacketProcessorBaseV3 {
@Override
public void safeRun() {
+ try {
+ System.out.println("Wait for shutdown signal in readEntryProcessorV3");
+ this.shutdownSignal.await();
+ System.out.println("Received shutdown signal in readEntryProcessorV3");
+
+ } catch (InterruptedException e) {
+ // pass
+ }
+
requestProcessor.getRequestStats().getReadEntrySchedulingDelayStats().registerSuccessfulEvent(
MathUtils.elapsedNanos(enqueueNanos), TimeUnit.NANOSECONDS);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment