Created
March 28, 2020 07:40
-
-
Save aikar/00b51b642d578061fdbc897174f5c515 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
commit 26d9deef41eb09dd70394e15124c26f2c546252f | |
Author: Aikar <[email protected]> | |
Date: Fri Mar 27 20:57:32 2020 -0400 | |
Improve behavior of main thread blocking chunk load/gens | |
diff --git a/src/main/java/com/destroystokyo/paper/io/PrioritizedTaskQueue.java b/src/main/java/com/destroystokyo/paper/io/PrioritizedTaskQueue.java | |
index 78bd238f4c..f28e563a43 100644 | |
--- a/src/main/java/com/destroystokyo/paper/io/PrioritizedTaskQueue.java | |
+++ b/src/main/java/com/destroystokyo/paper/io/PrioritizedTaskQueue.java | |
@@ -72,8 +72,14 @@ public class PrioritizedTaskQueue<T extends PrioritizedTaskQueue.PrioritizedTask | |
* This can also be thrown if the queue has shutdown. | |
*/ | |
public void add(final T task) throws IllegalStateException { | |
- task.onQueue(this); | |
- this.queues[task.getPriority()].add(task); | |
+ add(task, false); | |
+ } | |
+ public void add(final T task, boolean allowReenqueue) throws IllegalStateException { | |
+ int priority = task.getPriority(); | |
+ if (priority != COMPLETING_PRIORITY) { | |
+ task.onQueue(this, allowReenqueue); | |
+ this.queues[priority].add(task); | |
+ } | |
if (this.shutdown.get()) { | |
// note: we're not actually sure at this point if our task will go through | |
throw new IllegalStateException("Queue has shutdown, refusing to execute task " + IOUtil.genericToString(task)); | |
@@ -251,7 +257,10 @@ public class PrioritizedTaskQueue<T extends PrioritizedTaskQueue.PrioritizedTask | |
} | |
void onQueue(final PrioritizedTaskQueue queue) { | |
- if (this.queue.getAndSet(queue) != null) { | |
+ onQueue(queue, false); | |
+ } | |
+ void onQueue(final PrioritizedTaskQueue queue, boolean allowReenqueue) { | |
+ if (this.queue.getAndSet(queue) != null && !allowReenqueue) { | |
throw new IllegalStateException("Already queued!"); | |
} | |
} | |
diff --git a/src/main/java/com/destroystokyo/paper/io/chunk/ChunkTaskManager.java b/src/main/java/com/destroystokyo/paper/io/chunk/ChunkTaskManager.java | |
index 715a2dd8d2..aaef3c426d 100644 | |
--- a/src/main/java/com/destroystokyo/paper/io/chunk/ChunkTaskManager.java | |
+++ b/src/main/java/com/destroystokyo/paper/io/chunk/ChunkTaskManager.java | |
@@ -3,10 +3,12 @@ package com.destroystokyo.paper.io.chunk; | |
import com.destroystokyo.paper.io.PaperFileIOThread; | |
import com.destroystokyo.paper.io.IOUtil; | |
import com.destroystokyo.paper.io.PrioritizedTaskQueue; | |
+import com.destroystokyo.paper.io.PrioritizedTaskQueue.PrioritizedTask; | |
import com.destroystokyo.paper.io.QueueExecutorThread; | |
import net.minecraft.server.ChunkRegionLoader; | |
import net.minecraft.server.IAsyncTaskHandler; | |
import net.minecraft.server.IChunkAccess; | |
+import net.minecraft.server.MCUtil; | |
import net.minecraft.server.MinecraftServer; | |
import net.minecraft.server.NBTTagCompound; | |
import net.minecraft.server.WorldServer; | |
@@ -34,7 +36,9 @@ public final class ChunkTaskManager { | |
private final PrioritizedTaskQueue<ChunkTask> chunkTasks = new PrioritizedTaskQueue<>(); // used if async chunks are disabled in config | |
protected static QueueExecutorThread<ChunkTask>[] globalWorkers; | |
+ protected static QueueExecutorThread<ChunkTask> globalUrgentWorker; | |
protected static PrioritizedTaskQueue<ChunkTask> globalQueue; | |
+ protected static PrioritizedTaskQueue<ChunkTask> globalUrgentQueue; | |
protected static final ConcurrentLinkedQueue<Runnable> CHUNK_WAIT_QUEUE = new ConcurrentLinkedQueue<>(); | |
@@ -116,6 +120,7 @@ public final class ChunkTaskManager { | |
globalWorkers = new QueueExecutorThread[threads]; | |
globalQueue = new PrioritizedTaskQueue<>(); | |
+ globalUrgentQueue = new PrioritizedTaskQueue<>(); | |
for (int i = 0; i < threads; ++i) { | |
globalWorkers[i] = new QueueExecutorThread<>(globalQueue, (long)0.10e6); //0.1ms | |
@@ -127,6 +132,15 @@ public final class ChunkTaskManager { | |
globalWorkers[i].start(); | |
} | |
+ | |
+ globalUrgentWorker = new QueueExecutorThread<>(globalUrgentQueue, (long)0.10e6); //0.1ms | |
+ globalUrgentWorker.setName("Paper Async Chunk Urgent Task Thread"); | |
+ globalUrgentWorker.setPriority(Thread.NORM_PRIORITY+1); | |
+ globalUrgentWorker.setUncaughtExceptionHandler((final Thread thread, final Throwable throwable) -> { | |
+ PaperFileIOThread.LOGGER.fatal("Thread '" + thread.getName() + "' threw an uncaught exception!", throwable); | |
+ }); | |
+ | |
+ globalUrgentWorker.start(); | |
} | |
/** | |
@@ -234,7 +248,12 @@ public final class ChunkTaskManager { | |
if (!failed) { | |
chunkData.chunkData = data; | |
} | |
- ChunkTaskManager.this.internalSchedule(ret); // only schedule to the worker threads here | |
+ if (false && priority == PrioritizedTaskQueue.HIGHEST_PRIORITY) { | |
+ System.out.println("exec load on main " + world.getWorld().getName() + ":" + chunkX + "," + chunkZ); | |
+ MinecraftServer.getServer().execute(ret); | |
+ } else { | |
+ ChunkTaskManager.this.internalSchedule(ret); // only schedule to the worker threads here | |
+ } | |
}, true, failed, intendingToBlock); // read data off disk if the future fails | |
}); | |
@@ -290,7 +309,12 @@ public final class ChunkTaskManager { | |
PaperFileIOThread.Holder.INSTANCE.loadChunkDataAsync(world, chunkX, chunkZ, priority, (final PaperFileIOThread.ChunkData chunkData) -> { | |
ret.chunkData = chunkData; | |
- ChunkTaskManager.this.internalSchedule(ret); // only schedule to the worker threads here | |
+ if (false && priority == PrioritizedTaskQueue.HIGHEST_PRIORITY) { | |
+ System.out.println("exec load on main " + world.getWorld().getName() + ":" + chunkX + "," + chunkZ); | |
+ MinecraftServer.getServer().execute(ret); | |
+ } else { | |
+ ChunkTaskManager.this.internalSchedule(ret); // only schedule to the worker threads here | |
+ } | |
}, true, true, intendingToBlock); | |
return ret; | |
@@ -377,6 +401,7 @@ public final class ChunkTaskManager { | |
worker.flush(); | |
} | |
} | |
+ globalUrgentWorker.flush(); | |
// flush again since tasks we execute async saves | |
drainChunkWaitQueue(); | |
@@ -409,20 +434,30 @@ public final class ChunkTaskManager { | |
public void raisePriority(final int chunkX, final int chunkZ, final int priority) { | |
final Long chunkKey = Long.valueOf(IOUtil.getCoordinateKey(chunkX, chunkZ)); | |
- ChunkSaveTask chunkSaveTask = this.chunkSaveTasks.get(chunkKey); | |
+ ChunkTask chunkSaveTask = this.chunkSaveTasks.get(chunkKey); | |
if (chunkSaveTask != null) { | |
- final boolean raised = chunkSaveTask.raisePriority(priority); | |
- if (chunkSaveTask.isScheduled() && raised) { | |
- // only notify if we're in queue to be executed | |
- this.internalScheduleNotify(); | |
- } | |
+ // don't bump save into urgent queue | |
+ raiseTaskPriority(chunkSaveTask, priority != PrioritizedTaskQueue.HIGHEST_PRIORITY ? priority : PrioritizedTaskQueue.HIGH_PRIORITY); | |
} | |
ChunkLoadTask chunkLoadTask = this.chunkLoadTasks.get(chunkKey); | |
if (chunkLoadTask != null) { | |
- final boolean raised = chunkLoadTask.raisePriority(priority); | |
- if (chunkLoadTask.isScheduled() && raised) { | |
- // only notify if we're in queue to be executed | |
+ raiseTaskPriority(chunkLoadTask, priority); | |
+ } | |
+ } | |
+ | |
+ private void raiseTaskPriority(ChunkTask task, int priority) { | |
+ final boolean raised = task.raisePriority(priority); | |
+ if (task.isScheduled() && raised) { | |
+ // only notify if we're in queue to be executed | |
+ if (priority == PrioritizedTaskQueue.HIGHEST_PRIORITY) { | |
+ // was in another queue but became urgent later, add to urgent queue and the previous | |
+ // queue will just have to ignore this task if it has already been started. | |
+ // Ultimately, we now have 2 potential queues that can pull it out whoever gets it first | |
+ // but the urgent queue has dedicated thread(s) so it's likely to win.... | |
+ globalUrgentQueue.add(task, true); | |
+ this.internalScheduleNotifyUrgent(); | |
+ } else { | |
this.internalScheduleNotify(); | |
} | |
} | |
@@ -436,8 +471,14 @@ public final class ChunkTaskManager { | |
// It's important we order the task to be executed before notifying. Avoid a race condition where the worker thread | |
// wakes up and goes to sleep before we actually schedule (or it's just about to sleep) | |
- this.queue.add(task); | |
- this.internalScheduleNotify(); | |
+ if (task.getPriority() == PrioritizedTaskQueue.HIGHEST_PRIORITY) { | |
+ globalUrgentQueue.add(task, true); | |
+ this.internalScheduleNotifyUrgent(); | |
+ } else { | |
+ this.queue.add(task); | |
+ this.internalScheduleNotify(); | |
+ } | |
+ | |
} | |
protected void internalScheduleNotify() { | |
@@ -452,4 +493,12 @@ public final class ChunkTaskManager { | |
} | |
} | |
+ | |
+ protected void internalScheduleNotifyUrgent() { | |
+ if (globalUrgentWorker == null) { | |
+ return; | |
+ } | |
+ globalUrgentWorker.notifyTasks(); | |
+ } | |
+ | |
} | |
diff --git a/src/main/java/net/minecraft/server/ChunkProviderServer.java b/src/main/java/net/minecraft/server/ChunkProviderServer.java | |
index 1dcd0980ec..afc255f794 100644 | |
--- a/src/main/java/net/minecraft/server/ChunkProviderServer.java | |
+++ b/src/main/java/net/minecraft/server/ChunkProviderServer.java | |
@@ -227,6 +227,7 @@ public class ChunkProviderServer extends IChunkProvider { | |
} | |
private long asyncLoadSeqCounter; | |
+ public static boolean IS_CHUNK_LOAD_BLOCKING_MAIN = false; | |
public void getChunkAtAsynchronously(int x, int z, boolean gen, java.util.function.Consumer<Chunk> onComplete) { | |
if (Thread.currentThread() != this.serverThread) { | |
@@ -384,7 +385,11 @@ public class ChunkProviderServer extends IChunkProvider { | |
} | |
gameprofilerfiller.c("getChunkCacheMiss"); | |
+ // Paper start - Async chunks | |
+ boolean prevBlocking = IS_CHUNK_LOAD_BLOCKING_MAIN; | |
+ IS_CHUNK_LOAD_BLOCKING_MAIN = true; | |
CompletableFuture<Either<IChunkAccess, PlayerChunk.Failure>> completablefuture = this.getChunkFutureMainThread(i, j, chunkstatus, flag); | |
+ // Paper end | |
if (!completablefuture.isDone()) { // Paper | |
// Paper start - async chunk io/loading | |
@@ -393,10 +398,13 @@ public class ChunkProviderServer extends IChunkProvider { | |
// Paper end | |
com.destroystokyo.paper.io.SyncLoadFinder.logSyncLoad(this.world, x, z); // Paper - sync load info | |
this.world.timings.chunkAwait.startTiming(); // Paper | |
+ System.out.println("waiting for chunk" + this.getWorld().getWorld().getName() + ":" + x + "," + z); | |
this.serverThreadQueue.awaitTasks(completablefuture::isDone); | |
+ System.out.println("chunk ready" + this.getWorld().getWorld().getName() + ":" + x + "," + z); | |
com.destroystokyo.paper.io.chunk.ChunkTaskManager.popChunkWait(); // Paper - async chunk debug | |
this.world.timings.chunkAwait.stopTiming(); // Paper | |
} // Paper | |
+ IS_CHUNK_LOAD_BLOCKING_MAIN = prevBlocking;// Paper | |
ichunkaccess = (IChunkAccess) ((Either) completablefuture.join()).map((ichunkaccess1) -> { | |
return ichunkaccess1; | |
}, (playerchunk_failure) -> { | |
diff --git a/src/main/java/net/minecraft/server/PlayerChunkMap.java b/src/main/java/net/minecraft/server/PlayerChunkMap.java | |
index f9e843288a..d064784a44 100644 | |
--- a/src/main/java/net/minecraft/server/PlayerChunkMap.java | |
+++ b/src/main/java/net/minecraft/server/PlayerChunkMap.java | |
@@ -2,6 +2,7 @@ package net.minecraft.server; | |
import co.aikar.timings.Timing; // Paper | |
import com.destroystokyo.paper.PaperWorldConfig; // Paper | |
+import com.destroystokyo.paper.io.chunk.ChunkTaskManager; | |
import com.google.common.collect.ImmutableList; | |
import com.google.common.collect.Iterables; | |
import com.google.common.collect.ComparisonChain; // Paper | |
@@ -705,13 +706,16 @@ public class PlayerChunkMap extends IChunkLoader implements PlayerChunk.d { | |
}; | |
CompletableFuture<NBTTagCompound> chunkSaveFuture = this.world.asyncChunkTaskManager.getChunkSaveFuture(chunkcoordintpair.x, chunkcoordintpair.z); | |
+ boolean isBlockingMain = MCUtil.isMainThread() && ChunkProviderServer.IS_CHUNK_LOAD_BLOCKING_MAIN; | |
+ int priority = isBlockingMain ? com.destroystokyo.paper.io.PrioritizedTaskQueue.HIGHEST_PRIORITY : com.destroystokyo.paper.io.PrioritizedTaskQueue.HIGH_PRIORITY; | |
+ if (isBlockingMain) { | |
+ System.out.println("blocking main load " + chunkcoordintpair); | |
+ } | |
if (chunkSaveFuture != null) { | |
- this.world.asyncChunkTaskManager.scheduleChunkLoad(chunkcoordintpair.x, chunkcoordintpair.z, | |
- com.destroystokyo.paper.io.PrioritizedTaskQueue.HIGH_PRIORITY, chunkHolderConsumer, false, chunkSaveFuture); | |
- this.world.asyncChunkTaskManager.raisePriority(chunkcoordintpair.x, chunkcoordintpair.z, com.destroystokyo.paper.io.PrioritizedTaskQueue.HIGH_PRIORITY); | |
+ this.world.asyncChunkTaskManager.scheduleChunkLoad(chunkcoordintpair.x, chunkcoordintpair.z, priority, chunkHolderConsumer, isBlockingMain, chunkSaveFuture); | |
+ this.world.asyncChunkTaskManager.raisePriority(chunkcoordintpair.x, chunkcoordintpair.z, priority); | |
} else { | |
- this.world.asyncChunkTaskManager.scheduleChunkLoad(chunkcoordintpair.x, chunkcoordintpair.z, | |
- com.destroystokyo.paper.io.PrioritizedTaskQueue.NORMAL_PRIORITY, chunkHolderConsumer, false); | |
+ this.world.asyncChunkTaskManager.scheduleChunkLoad(chunkcoordintpair.x, chunkcoordintpair.z, priority, chunkHolderConsumer, isBlockingMain); | |
} | |
return ret; | |
// Paper end | |
@@ -749,7 +753,14 @@ public class PlayerChunkMap extends IChunkLoader implements PlayerChunk.d { | |
return CompletableFuture.completedFuture(Either.right(playerchunk_failure)); | |
}); | |
}, (runnable) -> { | |
- this.mailboxWorldGen.a(ChunkTaskQueueSorter.a(playerchunk, runnable)); // CraftBukkit - decompile error | |
+ // Paper start | |
+ if (ChunkProviderServer.IS_CHUNK_LOAD_BLOCKING_MAIN) { | |
+ System.out.println("exec main " + world.getWorld().getName() + ":" + chunkcoordintpair); | |
+ this.executor.execute(runnable); | |
+ } else { | |
+ // Paper end | |
+ this.mailboxWorldGen.a(ChunkTaskQueueSorter.a(playerchunk, runnable)); // CraftBukkit - decompile error | |
+ } | |
}); | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment