Folia/folia-server/minecraft-patches/features/0009-fixup-Region-Threading-Base.patch
Spottedleaf bb12eee2bd Update ScheduledTaskThreadPool
1. Make halt() unpark waiting threads
   Fixes https://github.com/PaperMC/Folia/issues/338
2. Make intermediate task parsing steal tasks less aggressively
2025-03-22 19:26:07 -07:00

2233 lines
94 KiB
Diff

From 0000000000000000000000000000000000000000 Mon Sep 17 00:00:00 2001
From: Spottedleaf <Spottedleaf@users.noreply.github.com>
Date: Thu, 20 Mar 2025 11:07:45 -0700
Subject: [PATCH] fixup! Region Threading Base
diff --git a/io/papermc/paper/threadedregions/COWArrayList.java b/io/papermc/paper/threadedregions/COWArrayList.java
new file mode 100644
index 0000000000000000000000000000000000000000..65393f18011aed8023ede95382145abf9cbb3c5e
--- /dev/null
+++ b/io/papermc/paper/threadedregions/COWArrayList.java
@@ -0,0 +1,64 @@
+package io.papermc.paper.threadedregions;
+
+import java.lang.reflect.Array;
+import java.util.Arrays;
+
+public final class COWArrayList<E> {
+
+ private volatile E[] array;
+
+ public COWArrayList(final Class<E> clazz) {
+ this.array = (E[])Array.newInstance(clazz, 0);
+ }
+
+ public E[] getArray() {
+ return this.array;
+ }
+
+ public boolean contains(final E test) {
+ for (final E elem : this.array) {
+ if (elem == test) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ public void add(final E element) {
+ synchronized (this) {
+ final E[] array = this.array;
+
+ final E[] copy = Arrays.copyOf(array, array.length + 1);
+ copy[array.length] = element;
+
+ this.array = copy;
+ }
+ }
+
+ public boolean remove(final E element) {
+ synchronized (this) {
+ final E[] array = this.array;
+ int index = -1;
+ for (int i = 0, len = array.length; i < len; ++i) {
+ if (array[i] == element) {
+ index = i;
+ break;
+ }
+ }
+
+ if (index == -1) {
+ return false;
+ }
+
+ final E[] copy = (E[])Array.newInstance(array.getClass().getComponentType(), array.length - 1);
+
+ System.arraycopy(array, 0, copy, 0, index);
+ System.arraycopy(array, index + 1, copy, index, (array.length - 1) - index);
+
+ this.array = copy;
+ }
+
+ return true;
+ }
+}
\ No newline at end of file
diff --git a/io/papermc/paper/threadedregions/RegionizedData.java b/io/papermc/paper/threadedregions/RegionizedData.java
index a1043c426d031755b57b77a9b2eec685e9861b13..5e66fcbee18ea0889ebe652228ac3e61cac7c872 100644
--- a/io/papermc/paper/threadedregions/RegionizedData.java
+++ b/io/papermc/paper/threadedregions/RegionizedData.java
@@ -1,10 +1,12 @@
package io.papermc.paper.threadedregions;
import ca.spottedleaf.concurrentutil.util.Validate;
+import io.papermc.paper.threadedregions.TickRegions.TickRegionData;
import it.unimi.dsi.fastutil.longs.Long2ReferenceOpenHashMap;
import it.unimi.dsi.fastutil.objects.ReferenceOpenHashSet;
import net.minecraft.server.level.ServerLevel;
import javax.annotation.Nullable;
+import java.util.function.Function;
import java.util.function.Supplier;
/**
@@ -42,7 +44,7 @@ import java.util.function.Supplier;
* // callback is left out of this example
* // note: world != null here
* public final RegionizedData<EntityTickList> entityTickLists =
- * new RegionizedData<>(this, () -> new EntityTickList(), ...);
+ * new RegionizedData<>(this, (data) -> new EntityTickList(), ...);
*
* public void addTickingEntity(Entity e) {
* // What we expect here is that this world is the
@@ -86,7 +88,7 @@ import java.util.function.Supplier;
* // note: world == null here, because this RegionizedData object
* // is not instantiated per world, but rather globally.
* public final RegionizedData<TickTimes> tickTimes =
- * new RegionizedData<>(null, () -> new TickTimes(), ...);
+ * new RegionizedData<>(null, (data) -> new TickTimes(), ...);
* }
* }
* </pre>
@@ -97,7 +99,7 @@ import java.util.function.Supplier;
public final class RegionizedData<T> {
private final ServerLevel world;
- private final Supplier<T> initialValueSupplier;
+ private final Function<TickRegionData ,T> initialValueSupplier;
private final RegioniserCallback<T> callback;
/**
@@ -119,14 +121,14 @@ public final class RegionizedData<T> {
* @param supplier Initial value supplier used to lazy initialise region data.
* @param callback Region callback to manage this regionised data.
*/
- public RegionizedData(final ServerLevel world, final Supplier<T> supplier, final RegioniserCallback<T> callback) {
+ public RegionizedData(final ServerLevel world, final Function<TickRegionData,T> supplier, final RegioniserCallback<T> callback) {
this.world = world;
this.initialValueSupplier = Validate.notNull(supplier, "Supplier may not be null.");
this.callback = Validate.notNull(callback, "Regioniser callback may not be null.");
}
- T createNewValue() {
- return Validate.notNull(this.initialValueSupplier.get(), "Initial value supplier may not return null");
+ T createNewValue(final TickRegionData regionData) {
+ return Validate.notNull(this.initialValueSupplier.apply(regionData), "Initial value supplier may not return null");
}
RegioniserCallback<T> getCallback() {
@@ -141,7 +143,7 @@ public final class RegionizedData<T> {
* and the current ticking region's world does not match this {@code RegionizedData}'s world.
*/
public @Nullable T get() {
- final ThreadedRegionizer.ThreadedRegion<TickRegions.TickRegionData, TickRegions.TickRegionSectionData> region =
+ final ThreadedRegionizer.ThreadedRegion<TickRegionData, TickRegions.TickRegionSectionData> region =
TickRegionScheduler.getCurrentRegion();
if (region == null) {
diff --git a/io/papermc/paper/threadedregions/RegionizedServer.java b/io/papermc/paper/threadedregions/RegionizedServer.java
index 1382c695c4991488b113401e231875ddc74f6b01..9008db1d594ffc371a7fa5bd36f6bb43a168d558 100644
--- a/io/papermc/paper/threadedregions/RegionizedServer.java
+++ b/io/papermc/paper/threadedregions/RegionizedServer.java
@@ -1,7 +1,6 @@
package io.papermc.paper.threadedregions;
import ca.spottedleaf.concurrentutil.collection.MultiThreadedQueue;
-import ca.spottedleaf.concurrentutil.scheduler.SchedulerThreadPool;
import ca.spottedleaf.moonrise.common.util.TickThread;
import com.mojang.logging.LogUtils;
import io.papermc.paper.threadedregions.scheduler.FoliaGlobalRegionScheduler;
@@ -23,6 +22,7 @@ import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BooleanSupplier;
@@ -64,7 +64,7 @@ public final class RegionizedServer {
// now we can schedule
this.tickHandle.setInitialStart(System.nanoTime() + TickRegionScheduler.TIME_BETWEEN_TICKS);
TickRegions.getScheduler().scheduleRegion(this.tickHandle);
- TickRegions.getScheduler().init();
+ TickRegions.start();
}
public void invalidateStatus() {
@@ -80,6 +80,15 @@ public final class RegionizedServer {
TickRegions.getScheduler().setHasTasks(this.tickHandle);
}
+ private boolean hasAnyGlobalChunkTasks() {
+ for (final ServerLevel world : this.worlds) {
+ if (world.taskQueueRegionData.hasGlobalChunkTasks()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
/**
* Returns the current tick of the region ticking.
* @throws IllegalStateException If there is no current region.
@@ -118,7 +127,7 @@ public final class RegionizedServer {
private final AtomicBoolean ticking = new AtomicBoolean();
public GlobalTickTickHandle(final RegionizedServer server) {
- super(null, SchedulerThreadPool.DEADLINE_NOT_SET);
+ super(null, ScheduledTaskThreadPool.DEADLINE_NOT_SET);
this.server = server;
}
@@ -158,26 +167,31 @@ public final class RegionizedServer {
return false;
}
- // TODO try catch?
run.run();
return true;
}
+ private boolean runGlobalTask() {
+ boolean ret = false;
+ for (final ServerLevel world : this.server.worlds) {
+ ret |= world.taskQueueRegionData.executeGlobalChunkTask();
+ }
+ return ret;
+ }
+
@Override
- protected boolean runRegionTasks(final BooleanSupplier canContinue) {
+ protected void runRegionTasks(final BooleanSupplier canContinue) {
do {
- if (!this.runOneTask()) {
- return false;
+ if (!this.runOneTask() || !this.runGlobalTask()) {
+ return;
}
} while (canContinue.getAsBoolean());
-
- return true;
}
@Override
protected boolean hasIntermediateTasks() {
- return !this.server.globalTickQueue.isEmpty();
+ return !this.server.globalTickQueue.isEmpty() || this.server.hasAnyGlobalChunkTasks();
}
}
diff --git a/io/papermc/paper/threadedregions/RegionizedTaskQueue.java b/io/papermc/paper/threadedregions/RegionizedTaskQueue.java
index 745ab870310733b569681f5280895bb9798620a4..2794694ce57b21711e1ddac146647317b79336bf 100644
--- a/io/papermc/paper/threadedregions/RegionizedTaskQueue.java
+++ b/io/papermc/paper/threadedregions/RegionizedTaskQueue.java
@@ -79,7 +79,11 @@ public final class RegionizedTaskQueue {
this.world = world;
}
- private boolean executeGlobalChunkTask() {
+ public boolean hasGlobalChunkTasks() {
+ return !this.globalChunkTask.isEmpty();
+ }
+
+ public boolean executeGlobalChunkTask() {
final Runnable run = this.globalChunkTask.poll();
if (run != null) {
run.run();
@@ -94,6 +98,7 @@ public final class RegionizedTaskQueue {
public void pushGlobalChunkTask(final Runnable run) {
this.globalChunkTask.add(run);
+ TickRegions.getScheduler().setHasTasks(RegionizedServer.getGlobalTickData());
}
private PrioritisedQueue getQueue(final boolean synchronise, final int chunkX, final int chunkZ, final boolean isChunkTask) {
@@ -216,12 +221,14 @@ public final class RegionizedTaskQueue {
}
public static final class RegionTaskQueueData {
- private final PrioritisedQueue tickTaskQueue = new PrioritisedQueue();
- private final PrioritisedQueue chunkQueue = new PrioritisedQueue();
+ private final PrioritisedQueue tickTaskQueue = new PrioritisedQueue(this);
+ private final PrioritisedQueue chunkQueue = new PrioritisedQueue(this);
private final WorldRegionTaskData worldRegionTaskData;
+ private final TickRegions.TickRegionData regionData;
- public RegionTaskQueueData(final WorldRegionTaskData worldRegionTaskData) {
+ public RegionTaskQueueData(final WorldRegionTaskData worldRegionTaskData, final TickRegions.TickRegionData regionData) {
this.worldRegionTaskData = worldRegionTaskData;
+ this.regionData = regionData;
}
void mergeInto(final RegionTaskQueueData into) {
@@ -281,6 +288,13 @@ public final class RegionizedTaskQueue {
this.queues[i] = new ArrayDeque<>();
}
}
+
+ private final RegionTaskQueueData queue;
+
+ private PrioritisedQueue(final RegionTaskQueueData queue) {
+ this.queue = queue;
+ }
+
private boolean isDestroyed;
public int getScheduledTasks() {
@@ -609,6 +623,8 @@ public final class RegionizedTaskQueue {
continue;
}
+ queue.queue.regionData.setHasTasks();
+
// successfully queued
return true;
}
diff --git a/io/papermc/paper/threadedregions/RegionizedWorldData.java b/io/papermc/paper/threadedregions/RegionizedWorldData.java
index c6e487a4c14e6b82533881d01f32349b9ae28728..b8f1f042342d3fed5fa26df0de07e8e2b0937130 100644
--- a/io/papermc/paper/threadedregions/RegionizedWorldData.java
+++ b/io/papermc/paper/threadedregions/RegionizedWorldData.java
@@ -7,6 +7,7 @@ import ca.spottedleaf.moonrise.common.util.CoordinateUtils;
import ca.spottedleaf.moonrise.common.util.TickThread;
import ca.spottedleaf.moonrise.patches.chunk_system.scheduling.ChunkHolderManager;
import com.mojang.logging.LogUtils;
+import io.papermc.paper.threadedregions.TickRegions.TickRegionData;
import it.unimi.dsi.fastutil.longs.Long2ReferenceMap;
import it.unimi.dsi.fastutil.longs.Long2ReferenceOpenHashMap;
import it.unimi.dsi.fastutil.objects.ObjectLinkedOpenHashSet;
@@ -33,7 +34,6 @@ import net.minecraft.world.entity.ai.village.VillageSiege;
import net.minecraft.world.entity.item.ItemEntity;
import net.minecraft.world.level.BlockEventData;
import net.minecraft.world.level.ChunkPos;
-import net.minecraft.world.level.Explosion;
import net.minecraft.world.level.Level;
import net.minecraft.world.level.NaturalSpawner;
import net.minecraft.world.level.ServerExplosion;
@@ -61,6 +61,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;
import java.util.function.Predicate;
@@ -82,6 +83,7 @@ public final class RegionizedWorldData {
// entities
for (final ServerPlayer player : from.localPlayers) {
into.localPlayers.add(player);
+ player.getBukkitEntity().updateRegion(into);
into.nearbyPlayers.addPlayer(player);
}
for (final Entity entity : from.allEntities) {
@@ -180,6 +182,7 @@ public final class RegionizedWorldData {
// the chunk holder must _exist_, and so the region section exists.
final RegionizedWorldData into = regionToData.get(CoordinateUtils.getChunkKey(pos.x >> chunkToRegionShift, pos.z >> chunkToRegionShift));
into.localPlayers.add(player);
+ player.getBukkitEntity().updateRegion(into);
into.nearbyPlayers.addPlayer(player);
}
for (final Entity entity : from.allEntities) {
@@ -346,7 +349,8 @@ public final class RegionizedWorldData {
}
// entities
- private final List<ServerPlayer> localPlayers = new ArrayList<>();
+ // this is copy on write to allow packet processing to iterate safely
+ private final CopyOnWriteArrayList<ServerPlayer> localPlayers = new CopyOnWriteArrayList<>();
private final NearbyPlayers nearbyPlayers;
private final ReferenceList<Entity> allEntities = new ReferenceList<>(EMPTY_ENTITY_ARRAY);
private final ReferenceList<Entity> loadedEntities = new ReferenceList<>(EMPTY_ENTITY_ARRAY);
@@ -446,7 +450,9 @@ public final class RegionizedWorldData {
public final alternate.current.wire.WireHandler wireHandler;
public final io.papermc.paper.redstone.RedstoneWireTurbo turbo;
- public RegionizedWorldData(final ServerLevel world) {
+ public final TickRegionData regionData;
+
+ public RegionizedWorldData(final ServerLevel world, final TickRegionData regionData) {
this.world = world;
this.blockLevelTicks = new LevelTicks<>(world::isPositionTickingWithEntitiesLoaded, world, true);
this.fluidLevelTicks = new LevelTicks<>(world::isPositionTickingWithEntitiesLoaded, world, false);
@@ -454,6 +460,7 @@ public final class RegionizedWorldData {
this.nearbyPlayers = new NearbyPlayers(world);
this.wireHandler = new alternate.current.wire.WireHandler(world);
this.turbo = new io.papermc.paper.redstone.RedstoneWireTurbo((RedStoneWireBlock)Blocks.REDSTONE_WIRE);
+ this.regionData = regionData;
// tasks may be drained before the region ticks, so we must set up the tick data early just in case
this.updateTickData();
@@ -611,6 +618,7 @@ public final class RegionizedWorldData {
if (this.allEntities.add(entity)) {
if (entity instanceof ServerPlayer player) {
this.localPlayers.add(player);
+ player.getBukkitEntity().updateRegion(this);
}
TickRegions.RegionStats.updateCurrentRegion();
}
@@ -627,6 +635,7 @@ public final class RegionizedWorldData {
if (this.allEntities.remove(entity)) {
if (entity instanceof ServerPlayer player) {
this.localPlayers.remove(player);
+ player.getBukkitEntity().updateRegion(null);
}
TickRegions.RegionStats.updateCurrentRegion();
}
diff --git a/io/papermc/paper/threadedregions/ScheduledTaskThreadPool.java b/io/papermc/paper/threadedregions/ScheduledTaskThreadPool.java
new file mode 100644
index 0000000000000000000000000000000000000000..5c591b0d6eac45d6094ce44bf62ad976bf995e66
--- /dev/null
+++ b/io/papermc/paper/threadedregions/ScheduledTaskThreadPool.java
@@ -0,0 +1,1243 @@
+package io.papermc.paper.threadedregions;
+
+import ca.spottedleaf.concurrentutil.util.ConcurrentUtil;
+import ca.spottedleaf.concurrentutil.util.TimeUtil;
+import java.lang.invoke.VarHandle;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.LockSupport;
+import java.util.function.BooleanSupplier;
+
+public final class ScheduledTaskThreadPool {
+
+ public static final long DEADLINE_NOT_SET = Long.MIN_VALUE;
+
+ private final ThreadFactory threadFactory;
+ private final long stealThresholdNS;
+ private final long taskTimeSliceNS;
+
+ private final COWArrayList<TickThreadRunner> coreThreads = new COWArrayList<>(TickThreadRunner.class);
+ private final COWArrayList<TickThreadRunner> aliveThreads = new COWArrayList<>(TickThreadRunner.class);
+
+ private long runnerIdGenerator;
+ private boolean shutdown;
+
+ private final ConcurrentSkipListMap<WaitState, WaitState> waitingOrIdleRunners = new ConcurrentSkipListMap<>(WaitState.OLDEST_FIRST);
+
+ private final ConcurrentSkipListMap<ScheduledTickTask, ScheduledTickTask> unwatchedScheduledTicks = new ConcurrentSkipListMap<>(ScheduledTickTask.TICK_COMPARATOR);
+ private final ConcurrentSkipListMap<ScheduledTickTask, ScheduledTickTask> scheduledTasks = new ConcurrentSkipListMap<>(ScheduledTickTask.TASK_COMPARATOR);
+
+ public ScheduledTaskThreadPool(final ThreadFactory threadFactory, final long stealThresholdNS,
+ final long taskTimeSliceNS) {
+ this.threadFactory = threadFactory;
+ this.stealThresholdNS = stealThresholdNS;
+ this.taskTimeSliceNS = taskTimeSliceNS;
+
+ if (threadFactory == null) {
+ throw new NullPointerException("Null thread factory");
+ }
+ if (stealThresholdNS < 0L) {
+ throw new IllegalArgumentException("Steal threshold must be >= 0");
+ }
+ if (taskTimeSliceNS <= 0L) {
+ throw new IllegalArgumentException("Task time slice must be > 0");
+ }
+ }
+
+ private static <K,V> K firstEntry(final ConcurrentSkipListMap<K, V> map) {
+ final Map.Entry<K,V> first = map.firstEntry();
+ return first == null ? null : first.getKey();
+ }
+
+ private static ScheduledTickTask findFirstNonTaken(final ConcurrentSkipListMap<ScheduledTickTask, ScheduledTickTask> map) {
+ ScheduledTickTask first;
+ while ((first = firstEntry(map)) != null && first.isTaken()) {
+ map.remove(first);
+ }
+
+ return first;
+ }
+
+ private static ScheduledTickTask findFirstNonTakenNonWatched(final ConcurrentSkipListMap<ScheduledTickTask, ScheduledTickTask> map) {
+ ScheduledTickTask first;
+ while ((first = firstEntry(map)) != null && (first.isTaken() || first.isWatched())) {
+ map.remove(first);
+ if (!first.isTaken() && !first.isWatched()) {
+ // handle race condition: unwatched after removal
+ map.put(first, first);
+ }
+ }
+
+ return first;
+ }
+
+ private static Thread[] getThreads(final COWArrayList<TickThreadRunner> list) {
+ final TickThreadRunner[] runners = list.getArray();
+ final Thread[] ret = new Thread[runners.length];
+
+ for (int i = 0; i < ret.length; ++i) {
+ ret[i] = runners[i].thread;
+ }
+
+ return ret;
+ }
+
+ /**
+ * Returns an array of the underlying scheduling threads.
+ */
+ public Thread[] getCoreThreads() {
+ return getThreads(this.coreThreads);
+ }
+
+ /**
+ * Returns an array of the underlying scheduling threads which are alive.
+ */
+ public Thread[] getAliveThreads() {
+ return getThreads(this.aliveThreads);
+ }
+
+ /**
+ * Adjusts the number of core threads to the specified threads. Has no effect if shutdown.
+ * Lowering the number of core threads will cause some scheduled tasks to fail to meet their scheduled start
+ * deadlines by up to the task steal time.
+ * @param threads New number of threads
+ * @return Returns this thread pool
+ */
+ public ScheduledTaskThreadPool setCoreThreads(final int threads) {
+ synchronized (this) {
+ if (this.shutdown) {
+ return this;
+ }
+
+ final TickThreadRunner[] currRunners = this.coreThreads.getArray();
+ if (currRunners.length == threads) {
+ return this;
+ }
+
+ if (threads < currRunners.length) {
+ // we need to trim threads
+ for (int i = 0, difference = currRunners.length - threads; i < difference; ++i) {
+ final TickThreadRunner remove = currRunners[currRunners.length - i - 1];
+
+ remove.halt();
+ this.coreThreads.remove(remove);
+ }
+
+ // force remaining runners to task steal
+ this.interruptAllRunners();
+
+ return this;
+ } else {
+ // we need to add threads
+ for (int i = 0, difference = threads - currRunners.length; i < difference; ++i) {
+ final TickThreadRunner runner = new TickThreadRunner(this, this.runnerIdGenerator++);
+ final Thread thread = runner.thread = this.threadFactory.newThread(runner);
+
+ this.coreThreads.add(runner);
+ this.aliveThreads.add(runner);
+
+ thread.start();
+ }
+
+ return this;
+ }
+ }
+ }
+
+ /**
+ * Attempts to prevent further execution of tasks.
+ */
+ public void halt() {
+ synchronized (this) {
+ this.shutdown = true;
+ }
+
+ for (final TickThreadRunner runner : this.coreThreads.getArray()) {
+ runner.halt();
+ }
+ }
+
+ /**
+ * Waits until all threads in this pool have shutdown, or until the specified time has passed.
+ * @param msToWait Maximum time to wait.
+ * @return {@code false} if the maximum time passed, {@code true} otherwise.
+ */
+ public boolean join(final long msToWait) {
+ try {
+ return this.join(msToWait, false);
+ } catch (final InterruptedException ex) {
+ throw new IllegalStateException(ex);
+ }
+ }
+
+ /**
+ * Waits until all threads in this pool have shutdown, or until the specified time has passed.
+ * @param msToWait Maximum time to wait.
+ * @return {@code false} if the maximum time passed, {@code true} otherwise.
+ * @throws InterruptedException If this thread is interrupted.
+ */
+ public boolean joinInterruptable(final long msToWait) throws InterruptedException {
+ return this.join(msToWait, true);
+ }
+
+ private boolean join(final long msToWait, final boolean interruptable) throws InterruptedException {
+ final long nsToWait = msToWait * (1000 * 1000);
+ final long start = System.nanoTime();
+ final long deadline = start + nsToWait;
+ boolean interrupted = false;
+ try {
+ for (final TickThreadRunner runner : this.aliveThreads.getArray()) {
+ final Thread thread = runner.thread;
+ for (;;) {
+ if (!thread.isAlive()) {
+ break;
+ }
+ final long current = System.nanoTime();
+ if (current >= deadline && msToWait > 0L) {
+ return false;
+ }
+
+ try {
+ thread.join(msToWait <= 0L ? 0L : Math.max(1L, (deadline - current) / (1000 * 1000)));
+ } catch (final InterruptedException ex) {
+ if (interruptable) {
+ throw ex;
+ }
+ interrupted = true;
+ }
+ }
+ }
+
+ return true;
+ } finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ private void interruptAllRunners() {
+ for (final TickThreadRunner runner : this.coreThreads.getArray()) {
+ runner.interrupt();
+ }
+ }
+
+ private void interruptOneRunner() {
+ for (final TickThreadRunner runner : this.coreThreads.getArray()) {
+ if (runner.interrupt()) {
+ return;
+ }
+ }
+ }
+
+ private void insert(final SchedulableTick tick, final boolean hasTasks) {
+ final long scheduleTime = tick.getScheduledStart();
+ final long timeNow = System.nanoTime();
+
+ for (;;) {
+ final Map.Entry<WaitState, WaitState> lastIdle = this.waitingOrIdleRunners.firstEntry();
+ final WaitState waitState;
+ if (lastIdle == null // no waiting threads or
+ // scheduled time is past latest waiting time
+ || ((waitState = lastIdle.getKey()).deadline != DEADLINE_NOT_SET && waitState.deadline - scheduleTime < 0L)) {
+ // insert hoping to be stolen
+ final ScheduledTickTask task = tick.task = new ScheduledTickTask(
+ tick,
+ // offset start by steal threshold so that it will hopefully start at its scheduled time
+ scheduleTime - this.stealThresholdNS,
+ hasTasks ? timeNow : DEADLINE_NOT_SET,
+ null
+ );
+
+ this.unwatchedScheduledTicks.put(task, task);
+ if (hasTasks) {
+ this.scheduledTasks.put(task, task);
+ }
+
+ if (!this.waitingOrIdleRunners.isEmpty()) {
+ // handle race condition: all threads went to sleep since we checked
+ this.interruptOneRunner();
+ }
+ break;
+ } else {
+ // try to schedule to the runner
+ if (this.waitingOrIdleRunners.remove(waitState) == null) {
+ // failed, try again
+ continue;
+ }
+
+ final ScheduledTickTask task = tick.task = new ScheduledTickTask(
+ tick, scheduleTime, hasTasks ? timeNow : DEADLINE_NOT_SET, waitState.runner
+ );
+
+ this.unwatchedScheduledTicks.put(task, task);
+ waitState.runner.scheduledTicks.put(task, task);
+ if (hasTasks) {
+ this.scheduledTasks.put(task, task);
+ waitState.runner.scheduledTasks.put(task, task);
+ }
+
+ if (!waitState.runner.interrupt() && waitState.runner.isHalted()) {
+ // handle race condition: runner we selected was halted
+ this.interruptOneRunner();
+ }
+ break;
+ }
+ }
+
+ if (!hasTasks && tick.hasTasks()) {
+ // handle race condition where tasks were added during scheduling
+ this.notifyTasks(tick);
+ }
+ }
+
+ /**
+ * Schedules the specified task to be executed on this thread pool.
+ * @param tick Specified task
+ * @throws IllegalStateException If the task is already scheduled
+ * @see SchedulableTick
+ */
+ public void schedule(final SchedulableTick tick) {
+ final boolean hasTasks = tick.hasTasks();
+ if ((!hasTasks && !tick.setScheduled()) || (hasTasks && !tick.setScheduledTasks())) {
+ throw new IllegalStateException("Task is already scheduled");
+ }
+
+ this.insert(tick, hasTasks);
+ }
+
+ /**
+ * Indicates that intermediate tasks are available to be executed by the task.
+ * @param tick The specified task
+ * @see SchedulableTick
+ */
+ public void notifyTasks(final SchedulableTick tick) {
+ if (!tick.isScheduled() || !tick.upgradeToScheduledTasks()) {
+ return;
+ }
+
+ final ScheduledTickTask task = tick.task;
+ if (task == null || task.isTaken()) {
+ // will be handled by scheduling code
+ return;
+ }
+
+ task.setLastTaskNotify(System.nanoTime());
+
+ final TickThreadRunner runner = task.owner;
+ this.scheduledTasks.put(task, task);
+ if (runner != null) {
+ runner.scheduledTasks.put(task, task);
+ runner.interruptIfWaiting();
+ }
+ }
+
+ /**
+ * Returns {@code false} if the task is not scheduled or is cancelled, returns {@code true} if the task was
+ * cancelled by this thread
+ */
+ public boolean cancel(final SchedulableTick tick) {
+ final boolean ret = tick.cancel();
+
+ if (!ret) {
+ // nothing to do here
+ return ret;
+ }
+
+ // try to remove task from queues
+ final ScheduledTickTask task = tick.task;
+ if (task != null && task.take()) {
+ this.unwatchedScheduledTicks.remove(task);
+ this.scheduledTasks.remove(task);
+ final TickThreadRunner owner = task.owner;
+ if (owner != null) {
+ owner.scheduledTicks.remove(task);
+ owner.scheduledTasks.remove(task);
+ }
+ }
+
+ return ret;
+ }
+
+ /**
+ * Represents a tickable task that can be scheduled into a {@link ScheduledTaskThreadPool}.
+ * <p>
+ * A tickable task is expected to run on a fixed interval, which is determined by
+ * the {@link ScheduledTaskThreadPool}.
+ * </p>
+ * <p>
+ * A tickable task can have intermediate tasks that can be executed before its tick method is ran. Instead of
+ * the {@link ScheduledTaskThreadPool} parking in-between ticks, the scheduler will instead drain
+ * intermediate tasks from scheduled tasks. The parsing of intermediate tasks allows the scheduler to take
+ * advantage of downtime to reduce the intermediate task load from tasks once they begin ticking.
+ * </p>
+ * <p>
+ * It is guaranteed that {@link #runTick()} and {@link #runTasks(BooleanSupplier)} are never
+ * invoked in parallel.
+ * It is required that when intermediate tasks are scheduled, that {@link ScheduledTaskThreadPool#notifyTasks(SchedulableTick)}
+ * is invoked for any scheduled task - otherwise, {@link #runTasks(BooleanSupplier)} may not be invoked to
+ * parse intermediate tasks.
+ * </p>
+ */
+ public static abstract class SchedulableTick {
+ private static final AtomicLong ID_GENERATOR = new AtomicLong();
+ public final long id = ID_GENERATOR.getAndIncrement();
+
+ private long scheduledStart = DEADLINE_NOT_SET;
+
+ private static final int STATE_UNSCHEDULED = 1 << 0;
+ private static final int STATE_SCHEDULED = 1 << 1;
+ private static final int STATE_SCHEDULED_TASKS = 1 << 2;
+ private static final int STATE_TICKING = 1 << 3;
+ private static final int STATE_TASKS = 1 << 4;
+ private static final int STATE_TICKING_CANCELLED = 1 << 5;
+ private static final int STATE_TASKS_CANCELLED = 1 << 6;
+ private static final int STATE_CANCELLED = 1 << 7;
+ private volatile int state = STATE_UNSCHEDULED;
+ private static final VarHandle STATE_HANDLE = ConcurrentUtil.getVarHandle(SchedulableTick.class, "state", int.class);
+
+ private volatile ScheduledTickTask task;
+
+ private int getStateVolatile() {
+ return (int)STATE_HANDLE.getVolatile(this);
+ }
+
+ private void setStateVolatile(final int value) {
+ STATE_HANDLE.setVolatile(this, value);
+ }
+
+ private int compareAndExchangeStateVolatile(final int expect, final int update) {
+ return (int)STATE_HANDLE.compareAndExchange(this, expect, update);
+ }
+
+ private boolean isScheduled() {
+ return this.getStateVolatile() == STATE_SCHEDULED;
+ }
+
+ private boolean upgradeToScheduledTasks() {
+ return STATE_SCHEDULED == this.compareAndExchangeStateVolatile(STATE_SCHEDULED, STATE_SCHEDULED_TASKS);
+ }
+
+ private boolean setScheduled() {
+ return STATE_UNSCHEDULED == this.compareAndExchangeStateVolatile(STATE_UNSCHEDULED, STATE_SCHEDULED);
+ }
+
+ private boolean setScheduledTasks() {
+ return STATE_UNSCHEDULED == this.compareAndExchangeStateVolatile(STATE_UNSCHEDULED, STATE_SCHEDULED_TASKS);
+ }
+
+ private boolean cancel() {
+ for (int currState = this.getStateVolatile();;) {
+ switch (currState) {
+ case STATE_UNSCHEDULED: {
+ return false;
+ }
+ case STATE_SCHEDULED:
+ case STATE_SCHEDULED_TASKS: {
+ if (currState == (currState = this.compareAndExchangeStateVolatile(currState, STATE_CANCELLED))) {
+ return true;
+ }
+ continue;
+ }
+ case STATE_TICKING: {
+ if (currState == (currState = this.compareAndExchangeStateVolatile(currState, STATE_TICKING_CANCELLED))) {
+ return true;
+ }
+ continue;
+ }
+ case STATE_TASKS: {
+ if (currState == (currState = this.compareAndExchangeStateVolatile(currState, STATE_TASKS_CANCELLED))) {
+ return true;
+ }
+ continue;
+ }
+ case STATE_TICKING_CANCELLED:
+ case STATE_TASKS_CANCELLED:
+ case STATE_CANCELLED: {
+ return false;
+ }
+ default: {
+ throw new IllegalStateException("Unknown state: " + currState);
+ }
+ }
+ }
+ }
+
+ private boolean markTicking() {
+ for (int currState = this.getStateVolatile();;) {
+ switch (currState) {
+ case STATE_UNSCHEDULED: {
+ throw new IllegalStateException();
+ }
+ case STATE_SCHEDULED:
+ case STATE_SCHEDULED_TASKS: {
+ if (currState == (currState = this.compareAndExchangeStateVolatile(currState, STATE_TICKING))) {
+ return true;
+ }
+ continue;
+ }
+ case STATE_TICKING:
+ case STATE_TASKS:
+ case STATE_TICKING_CANCELLED:
+ case STATE_TASKS_CANCELLED:
+ case STATE_CANCELLED: {
+ return false;
+ }
+ default: {
+ throw new IllegalStateException("Unknown state: " + currState);
+ }
+ }
+ }
+ }
+
+ private boolean markTasks() {
+ for (int currState = this.getStateVolatile();;) {
+ switch (currState) {
+ case STATE_UNSCHEDULED: {
+ throw new IllegalStateException();
+ }
+ case STATE_SCHEDULED:
+ case STATE_SCHEDULED_TASKS: {
+ if (currState == (currState = this.compareAndExchangeStateVolatile(currState, STATE_TASKS))) {
+ return true;
+ }
+ continue;
+ }
+ case STATE_TICKING:
+ case STATE_TASKS:
+ case STATE_TICKING_CANCELLED:
+ case STATE_TASKS_CANCELLED:
+ case STATE_CANCELLED: {
+ return false;
+ }
+ default: {
+ throw new IllegalStateException("Unknown state: " + currState);
+ }
+ }
+ }
+ }
+
+ private boolean canBeTicked() {
+ final int currState = this.getStateVolatile();
+ switch (currState) {
+ case STATE_UNSCHEDULED: {
+ throw new IllegalStateException();
+ }
+ case STATE_SCHEDULED:
+ case STATE_SCHEDULED_TASKS: {
+ return true;
+ }
+ case STATE_TICKING:
+ case STATE_TASKS:
+ case STATE_TICKING_CANCELLED:
+ case STATE_TASKS_CANCELLED:
+ case STATE_CANCELLED: {
+ return false;
+ }
+ default: {
+ throw new IllegalStateException("Unknown state: " + currState);
+ }
+ }
+ }
+
+ protected final long getScheduledStart() {
+ return this.scheduledStart;
+ }
+
+ /**
+ * If this task is scheduled, then this may only be invoked during {@link #runTick()}
+ */
+ protected final void setScheduledStart(final long value) {
+ this.scheduledStart = value;
+ }
+
+ /**
+ * Executes the tick.
+ * <p>
+ * It is the callee's responsibility to invoke {@link #setScheduledStart(long)} to adjust the start of
+ * the next tick.
+ * </p>
+ * @return {@code true} if the task should continue to be scheduled, {@code false} otherwise.
+ */
+ public abstract boolean runTick();
+
+ private boolean tick() {
+ if (!this.markTicking()) {
+ return false;
+ }
+
+ final boolean tickRes = this.runTick();
+
+ if (!tickRes) {
+ this.setStateVolatile(STATE_CANCELLED);
+ return false;
+ }
+
+ // move to scheduled
+ for (int currState = this.getStateVolatile();;) {
+ switch (currState) {
+ case STATE_TICKING: {
+ if (currState == (currState = this.compareAndExchangeStateVolatile(STATE_TICKING, STATE_SCHEDULED))) {
+ return true;
+ }
+ continue;
+ }
+ case STATE_TICKING_CANCELLED: {
+ if (currState == (currState = this.compareAndExchangeStateVolatile(STATE_TICKING_CANCELLED, STATE_CANCELLED))) {
+ return false;
+ }
+ continue;
+ }
+
+ default: {
+ throw new IllegalStateException("Unknown state: " + currState);
+ }
+ }
+ }
+ }
+
+ /**
+ * Returns whether this task has any intermediate tasks that can be executed.
+ */
+ public abstract boolean hasTasks();
+
+ /**
+ * @return {@code true} if the task should continue to be scheduled, {@code false} otherwise.
+ */
+ public abstract boolean runTasks(final BooleanSupplier canContinue);
+
+ private boolean tasks(final BooleanSupplier canContinue) {
+ if (!this.markTasks()) {
+ return false;
+ }
+
+ final boolean taskRes = this.runTasks(canContinue);
+
+ if (!taskRes) {
+ this.setStateVolatile(STATE_CANCELLED);
+ return false;
+ }
+
+ // move to scheduled
+ for (int currState = this.getStateVolatile();;) {
+ switch (currState) {
+ case STATE_TASKS: {
+ if (currState == (currState = this.compareAndExchangeStateVolatile(STATE_TASKS, STATE_SCHEDULED))) {
+ return true;
+ }
+ continue;
+ }
+ case STATE_TASKS_CANCELLED: {
+ if (currState == (currState = this.compareAndExchangeStateVolatile(STATE_TASKS_CANCELLED, STATE_CANCELLED))) {
+ return false;
+ }
+ continue;
+ }
+ default: {
+ throw new IllegalStateException("Unknown state: " + currState);
+ }
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "SchedulableTick:{" +
+ "class=" + this.getClass().getName() + "," +
+ "state=" + this.state + ","
+ + "}";
+ }
+ }
+
+ private static final class WaitState {
+
+ private static final Comparator<WaitState> OLDEST_FIRST = (final WaitState w1, final WaitState w2) -> {
+ final long d1 = w1.deadline;
+ final long d2 = w2.deadline;
+
+ if (d1 == DEADLINE_NOT_SET && d2 != DEADLINE_NOT_SET) {
+ return -1;
+ }
+ if (d1 != DEADLINE_NOT_SET && d2 == DEADLINE_NOT_SET) {
+ return 1;
+ }
+
+ final int timeCmp = TimeUtil.compareTimes(d2, d1);
+ if (timeCmp != 0) {
+ return timeCmp;
+ }
+
+ return Long.signum(w2.id - w1.id);
+ };
+
+ private final long id;
+ private final long deadline; // set to DEADLINE_NOT_SET if idle
+ private final TickThreadRunner runner;
+
+ private WaitState(final long id, final long deadline, final TickThreadRunner runner) {
+ this.id = id;
+ this.deadline = deadline;
+ this.runner = runner;
+ }
+ }
+
+ private static final class TickThreadRunner implements Runnable {
+
+ private final ScheduledTaskThreadPool scheduler;
+ private final long id;
+ private Thread thread;
+
+ // no scheduled ticks
+ private static final int STATE_IDLE = 1 << 0;
+ private static final int STATE_WAITING = 1 << 1;
+ private static final int STATE_TASKS = 1 << 2;
+ private static final int STATE_INTERRUPT = 1 << 3;
+ private static final int STATE_TICKING = 1 << 4;
+ private static final int STATE_HALTED = 1 << 5;
+ private volatile int state = STATE_INTERRUPT; // set to INTERRUPT initially so that tasks may be stolen on start
+ private static final VarHandle STATE_HANDLE = ConcurrentUtil.getVarHandle(TickThreadRunner.class, "state", int.class);
+
+ private WaitState waitState;
+ private ScheduledTickTask watch;
+
+ private final ConcurrentSkipListMap<ScheduledTickTask, ScheduledTickTask> scheduledTicks = new ConcurrentSkipListMap<>(ScheduledTickTask.TICK_COMPARATOR);
+ private final ConcurrentSkipListMap<ScheduledTickTask, ScheduledTickTask> scheduledTasks = new ConcurrentSkipListMap<>(ScheduledTickTask.TASK_COMPARATOR);
+
+ public TickThreadRunner(final ScheduledTaskThreadPool scheduler, final long id) {
+ this.scheduler = scheduler;
+ this.id = id;
+ }
+
+ private int getStateVolatile() {
+ return (int)STATE_HANDLE.getVolatile(this);
+ }
+
+ private void setStateVolatile(final int value) {
+ STATE_HANDLE.setVolatile(this, value);
+ }
+
+ private int compareAndExchangeStateVolatile(final int expect, final int update) {
+ return (int)STATE_HANDLE.compareAndExchange(this, expect, update);
+ }
+
+ private boolean interruptIfWaiting() {
+ for (int curr = this.getStateVolatile();;) {
+ switch (curr) {
+ case STATE_INTERRUPT:
+ case STATE_TASKS:
+ case STATE_TICKING:
+ case STATE_HALTED: {
+ return false;
+ }
+ case STATE_IDLE:
+ case STATE_WAITING: {
+ if (curr == (curr = this.compareAndExchangeStateVolatile(curr, STATE_INTERRUPT))) {
+ LockSupport.unpark(this.thread);
+ return true;
+ }
+ continue;
+ }
+
+ default: {
+ throw new IllegalStateException("Unknown state: " + curr);
+ }
+ }
+ }
+ }
+
+ private boolean interrupt() {
+ for (int curr = this.getStateVolatile();;) {
+ switch (curr) {
+ case STATE_INTERRUPT:
+ case STATE_TICKING:
+ case STATE_HALTED: {
+ return false;
+ }
+ case STATE_IDLE:
+ case STATE_WAITING:
+ case STATE_TASKS: {
+ if (curr == (curr = this.compareAndExchangeStateVolatile(curr, STATE_INTERRUPT))) {
+ if (curr == STATE_IDLE || curr == STATE_WAITING) {
+ LockSupport.unpark(this.thread);
+ }
+ return true;
+ }
+ continue;
+ }
+
+ default: {
+ throw new IllegalStateException("Unknown state: " + curr);
+ }
+ }
+ }
+ }
+
+ private void halt() {
+ for (int curr = this.getStateVolatile();;) {
+ switch (curr) {
+ case STATE_HALTED: {
+ return;
+ }
+ case STATE_IDLE:
+ case STATE_WAITING:
+ case STATE_TASKS:
+ case STATE_INTERRUPT:
+ case STATE_TICKING: {
+ if (curr == (curr = this.compareAndExchangeStateVolatile(curr, STATE_HALTED))) {
+ if (curr == STATE_IDLE || curr == STATE_WAITING) {
+ LockSupport.unpark(this.thread);
+ }
+ return;
+ }
+ continue;
+ }
+
+ default: {
+ throw new IllegalStateException("Unknown state: " + curr);
+ }
+ }
+ }
+ }
+
+ private boolean isHalted() {
+ return STATE_HALTED == this.getStateVolatile();
+ }
+
+ private void setupWaitState(final long deadline) {
+ if (this.waitState != null) {
+ throw new IllegalStateException("Waitstate already set");
+ }
+ this.waitState = new WaitState(this.id, deadline, this);
+ this.scheduler.waitingOrIdleRunners.put(this.waitState, this.waitState);
+ }
+
+ private void cleanWaitState() {
+ this.scheduler.waitingOrIdleRunners.remove(this.waitState);
+ this.waitState = null;
+ }
+
+ private ScheduledTickTask findTick() {
+ while (this.getStateVolatile() == STATE_WAITING) {
+ final ScheduledTickTask globalFirst = findFirstNonTakenNonWatched(this.scheduler.unwatchedScheduledTicks);
+ final ScheduledTickTask ourFirst = findFirstNonTaken(this.scheduledTicks);
+
+ final ScheduledTickTask toWaitFor;
+ if (globalFirst == null) {
+ toWaitFor = ourFirst;
+ } else if (ourFirst == null) {
+ toWaitFor = globalFirst;
+ } else {
+ final long globalStart = globalFirst.tickStart + this.scheduler.stealThresholdNS;
+ final long ourStart = ourFirst.tickStart;
+
+ toWaitFor = ourStart - globalStart <= 0L ? ourFirst : globalFirst;
+ }
+
+ if (toWaitFor == null) {
+ // no tasks are scheduled
+
+ // move to idle state
+ this.setupWaitState(DEADLINE_NOT_SET);
+ this.compareAndExchangeStateVolatile(STATE_WAITING, STATE_IDLE);
+ if (!this.scheduledTicks.isEmpty() || !this.scheduler.unwatchedScheduledTicks.isEmpty()) {
+ // handle race condition: task added before we moved to idle
+ this.interrupt();
+ }
+
+ return toWaitFor;
+ }
+
+ if (toWaitFor == globalFirst) {
+ if (toWaitFor.watch()) {
+ this.scheduler.unwatchedScheduledTicks.remove(toWaitFor);
+ this.watch = toWaitFor;
+ } else if (toWaitFor != ourFirst) {
+ continue;
+ } // else: failed to watch, but we are waiting for our task
+ }
+
+ return toWaitFor;
+ }
+
+ return null;
+ }
+
+ private void cleanupWatch(final boolean wakeThread) {
+ if (this.watch != null) {
+ this.watch.unwatch();
+
+ if (!this.watch.isTaken()) {
+ this.scheduler.unwatchedScheduledTicks.put(this.watch, this.watch);
+
+ if (wakeThread) {
+ for (;;) {
+ final WaitState latestWaiter = firstEntry(this.scheduler.waitingOrIdleRunners);
+ if (latestWaiter == null) {
+ break;
+ }
+
+ // note: if the task is owned by the waiter, then its deadline should already be <= watch tick start
+ if (latestWaiter.deadline == DEADLINE_NOT_SET || latestWaiter.deadline - (this.watch.tickStart + this.scheduler.stealThresholdNS) > 0L) {
+ if (this.scheduler.waitingOrIdleRunners.remove(latestWaiter) == null || !latestWaiter.runner.interrupt()) {
+ continue;
+ }
+ break;
+ }
+ }
+ }
+ }
+
+ this.watch = null;
+ }
+ }
+
+ private boolean findEarlierTask(final ScheduledTickTask task) {
+ final ScheduledTickTask globalFirst = findFirstNonTakenNonWatched(this.scheduler.unwatchedScheduledTicks);
+ final ScheduledTickTask ourFirst = findFirstNonTaken(this.scheduledTicks);
+
+ if (globalFirst != null && (globalFirst.tickStart + this.scheduler.stealThresholdNS) - task.tickStart < 0L) {
+ return true;
+ }
+
+ if (ourFirst != null && ourFirst.tickStart - task.tickStart < 0L) {
+ return true;
+ }
+
+ return false;
+ }
+
+ private void reinsert(final ScheduledTickTask tick, final TickThreadRunner owner) {
+ final ScheduledTickTask newTask = tick.tick.task = new ScheduledTickTask(
+ tick.tick, tick.tick.getScheduledStart(), DEADLINE_NOT_SET, owner
+ );
+
+ this.scheduler.unwatchedScheduledTicks.put(newTask, newTask);
+ if (owner != null) {
+ owner.scheduledTicks.put(newTask, newTask);
+ }
+
+ if (newTask.tick.hasTasks()) {
+ this.scheduler.notifyTasks(newTask.tick);
+ }
+ }
+
+ private void runTasks(final ScheduledTickTask tick, final long deadline) {
+ if (STATE_WAITING != this.compareAndExchangeStateVolatile(STATE_WAITING, STATE_TASKS)) {
+ // interrupted or halted
+ return;
+ }
+
+ if (!tick.take()) {
+ this.compareAndExchangeStateVolatile(STATE_TASKS, STATE_WAITING);
+ return;
+ }
+
+ // remove from queues
+ this.scheduler.unwatchedScheduledTicks.remove(tick);
+ this.scheduler.scheduledTasks.remove(tick);
+ if (tick.owner != null) {
+ tick.owner.scheduledTicks.remove(tick);
+ tick.owner.scheduledTasks.remove(tick);
+ }
+
+ final BooleanSupplier canContinue = () -> {
+ return TickThreadRunner.this.getStateVolatile() == STATE_TASKS && (System.nanoTime() - deadline < 0L);
+ };
+
+ if (tick.tick.tasks(canContinue)) {
+ this.reinsert(tick, tick.owner == null ? this : tick.owner);
+ }
+
+ this.compareAndExchangeStateVolatile(STATE_TASKS, STATE_WAITING);
+ }
+
+ private ScheduledTickTask findTaskNotBehind(final ConcurrentSkipListMap<ScheduledTickTask, ScheduledTickTask> map,
+ final long timeNow) {
+ for (final Iterator<ScheduledTickTask> iterator = map.keySet().iterator(); iterator.hasNext();) {
+ final ScheduledTickTask task = iterator.next();
+ if (task.isTaken()) {
+ iterator.remove();
+ continue;
+ }
+
+ // try to avoid stealing tasks accidentally by subtracting the steal threshold instead
+ final long tickStart = task.owner == this ? task.tickStart : task.tickStart - this.scheduler.stealThresholdNS;
+
+ if (tickStart - timeNow <= 0L) {
+ // skip tasks already behind
+ continue;
+ }
+
+ return task;
+ }
+
+ return null;
+ }
+
+ private ScheduledTickTask waitForTick() {
+ final ScheduledTickTask tick = this.findTick();
+
+ if (tick == null) {
+ return tick;
+ }
+
+ final long tickDeadline = tick.owner == this ? tick.tickStart : tick.tickStart + this.scheduler.stealThresholdNS;
+
+ this.setupWaitState(tickDeadline);
+ // should already be in STATE_WAITING (unless interrupted)
+
+ for (;;) {
+ if (this.getStateVolatile() != STATE_WAITING || tick.isTaken() || this.findEarlierTask(tick)) {
+ this.cleanupWatch(false);
+ this.cleanWaitState();
+ // start of loop may only be idle or interrupt
+ this.interrupt();
+ return null;
+ }
+
+ final long timeNow = System.nanoTime();
+
+ final ScheduledTickTask ourTask = findFirstNonTaken(this.scheduledTasks);
+ // avoid stealing global tasks that are behind schedule
+ final ScheduledTickTask globalTask = this.findTaskNotBehind(this.scheduler.scheduledTasks, timeNow);
+
+ if (timeNow - tickDeadline >= 0L) {
+ if (!tick.take()) {
+ continue;
+ }
+ this.cleanWaitState();
+ return tick;
+ }
+
+ if (ourTask == null && globalTask == null) {
+ // nothing to do, so just park
+ Thread.interrupted();
+ LockSupport.parkNanos("waiting", tickDeadline - timeNow);
+ continue;
+ }
+
+ final ScheduledTickTask toTask;
+ if (ourTask == null) {
+ toTask = globalTask;
+ } else if (globalTask == null) {
+ toTask = ourTask;
+ } else {
+ // try to use global task only if it is behind
+ if (globalTask.getLastTaskNotify() + this.scheduler.stealThresholdNS - ourTask.getLastTaskNotify() < 0L) {
+ final int ownerState = globalTask.owner == null ? STATE_HALTED : globalTask.owner.getStateVolatile();
+ // steal if not idle, waiting, interrupt
+ switch (ownerState) {
+ case STATE_IDLE:
+ case STATE_WAITING:
+ case STATE_INTERRUPT: {
+ // owner will probably get to it
+ toTask = ourTask;
+ break;
+ }
+ default: {
+ toTask = globalTask;
+ break;
+ }
+ }
+ } else {
+ toTask = ourTask;
+ }
+ }
+
+ long deadline = tickDeadline;
+ deadline = Math.min(deadline, timeNow + this.scheduler.taskTimeSliceNS);
+ deadline = Math.min(deadline, toTask.tickStart);
+
+ // before parsing tasks: were we interrupted?
+ if (this.getStateVolatile() != STATE_WAITING) {
+ continue;
+ }
+
+ this.runTasks(toTask, deadline);
+ }
+ }
+
+ private boolean moveToTickingState() {
+ for (int curr = this.getStateVolatile();;) {
+ switch (curr) {
+ case STATE_HALTED: {
+ return false;
+ }
+ case STATE_WAITING:
+ case STATE_INTERRUPT: { // interrupted too late!
+ if (curr == (curr = this.compareAndExchangeStateVolatile(curr, STATE_TICKING))) {
+ if (curr == STATE_INTERRUPT) {
+ // pass interrupt to another thread
+ this.scheduler.interruptOneRunner();
+ }
+ return true;
+ }
+ continue;
+ }
+
+ default: {
+ throw new IllegalStateException("Unknown state: " + curr);
+ }
+ }
+ }
+ }
+
+ private void doTick(final ScheduledTickTask tick) {
+ if (tick.tick.tick()) {
+ this.reinsert(tick, this);
+ }
+ }
+
+ private void doRun() {
+ for (;;) {
+ if (this.waitState != null) {
+ if (this.waitState.deadline != DEADLINE_NOT_SET) {
+ throw new IllegalStateException();
+ }
+
+ while (this.getStateVolatile() == STATE_IDLE) {
+ Thread.interrupted();
+ LockSupport.park("idling");
+ }
+
+ this.scheduler.waitingOrIdleRunners.remove(this.waitState);
+ this.waitState = null;
+ }
+
+ final int currState = this.compareAndExchangeStateVolatile(STATE_INTERRUPT, STATE_WAITING);
+ if (currState == STATE_HALTED) {
+ return;
+ } else if (currState != STATE_INTERRUPT) {
+ throw new IllegalStateException("State must be HALTED or INTERRUPT at beginning of run loop");
+ }
+
+ while (this.getStateVolatile() == STATE_WAITING) {
+ // try to find a tick
+ final ScheduledTickTask toTick = this.waitForTick();
+
+ if (toTick == null) {
+ // no ticks -> go idle
+ // waitstate should already be setup
+ // OR we were interrupted and need to clear state
+ // OR we were halted and need to exit
+ break;
+ }
+
+ if (!this.moveToTickingState()) {
+ // halted
+ // re-insert task, since we took it
+ this.scheduler.insert(toTick.tick, toTick.tick.hasTasks());
+ break;
+ }
+
+ this.doTick(toTick);
+
+ if (STATE_TICKING != this.compareAndExchangeStateVolatile(STATE_TICKING, STATE_WAITING)) {
+ // halted
+ break;
+ }
+
+ continue;
+ }
+ }
+ }
+
+ private void begin() {
+ this.setupWaitState(DEADLINE_NOT_SET);
+ }
+
+ private void die() {
+ // note: this should also handle unexpected shutdowns gracefully
+
+ this.cleanupWatch(false);
+ if (this.waitState != null) {
+ this.scheduler.waitingOrIdleRunners.remove(this.waitState);
+ this.waitState = null;
+ }
+ this.scheduler.aliveThreads.remove(this);
+ if (this.getStateVolatile() == STATE_HALTED) {
+ // start task stealing for our tasks
+ this.scheduler.interruptAllRunners();
+ }
+ }
+
+ @Override
+ public void run() {
+ try {
+ this.begin();
+ this.doRun();
+ } finally {
+ this.die();
+ }
+ }
+ }
+
+ private static final class ScheduledTickTask {
+ private static final Comparator<ScheduledTickTask> TICK_COMPARATOR = (final ScheduledTickTask t1, final ScheduledTickTask t2) -> {
+ final int timeCmp = TimeUtil.compareTimes(t1.tickStart, t2.tickStart);
+ if (timeCmp != 0L) {
+ return timeCmp;
+ }
+
+ return Long.signum(t1.tick.id - t2.tick.id);
+ };
+ private static final Comparator<ScheduledTickTask> TASK_COMPARATOR = (final ScheduledTickTask t1, final ScheduledTickTask t2) -> {
+ final int timeCmp = TimeUtil.compareTimes(t1.lastTaskNotify, t2.lastTaskNotify);
+ if (timeCmp != 0L) {
+ return timeCmp;
+ }
+
+ return Long.signum(t1.tick.id - t2.tick.id);
+ };
+
+ private final SchedulableTick tick;
+ private final long tickStart;
+ private long lastTaskNotify;
+ private final TickThreadRunner owner;
+
+ private volatile boolean taken;
+ private static final VarHandle TAKEN_HANDLE = ConcurrentUtil.getVarHandle(ScheduledTickTask.class, "taken", boolean.class);
+ private volatile boolean watched;
+ private static final VarHandle WATCHED_HANDLE = ConcurrentUtil.getVarHandle(ScheduledTickTask.class, "watched", boolean.class);
+
+ private ScheduledTickTask(final SchedulableTick tick, final long tickStart, final long lastTaskNotify,
+ final TickThreadRunner owner) {
+ this.tick = tick;
+ this.tickStart = tickStart;
+ this.lastTaskNotify = lastTaskNotify;
+ this.owner = owner;
+ }
+
+ public boolean take() {
+ return !(boolean)TAKEN_HANDLE.getVolatile(this) && !(boolean)TAKEN_HANDLE.compareAndExchange(this, false, true);
+ }
+
+ public boolean isTaken() {
+ return (boolean)TAKEN_HANDLE.getVolatile(this);
+ }
+
+ public boolean watch() {
+ return !(boolean)WATCHED_HANDLE.getVolatile(this) && !(boolean)WATCHED_HANDLE.compareAndExchange(this, false, true);
+ }
+
+ public boolean unwatch() {
+ return (boolean)WATCHED_HANDLE.compareAndExchange(this, true, false);
+ }
+
+ public boolean isWatched() {
+ return (boolean)TAKEN_HANDLE.getVolatile(this);
+ }
+
+ public long getLastTaskNotify() {
+ return this.lastTaskNotify;
+ }
+
+ public void setLastTaskNotify(final long value) {
+ this.lastTaskNotify = value;
+ }
+ }
+}
diff --git a/io/papermc/paper/threadedregions/TickData.java b/io/papermc/paper/threadedregions/TickData.java
index d4d80a69488f57704f1b3dc74cb379de36e80ec0..0383e4dcd611a7568597f46308060f3d7288a564 100644
--- a/io/papermc/paper/threadedregions/TickData.java
+++ b/io/papermc/paper/threadedregions/TickData.java
@@ -20,6 +20,10 @@ public final class TickData {
}
public void addDataFrom(final TickRegionScheduler.TickTime time) {
+ if (!time.isTickExecution()) {
+ // TODO fix later
+ return;
+ }
final long start = time.tickStart();
TickRegionScheduler.TickTime first;
diff --git a/io/papermc/paper/threadedregions/TickRegionScheduler.java b/io/papermc/paper/threadedregions/TickRegionScheduler.java
index 7123b3eb2f2e52946b8ef9de993a6828eb0bb6f7..a8608e8bed64a4da4ed340ab3837b082d6715437 100644
--- a/io/papermc/paper/threadedregions/TickRegionScheduler.java
+++ b/io/papermc/paper/threadedregions/TickRegionScheduler.java
@@ -1,6 +1,5 @@
package io.papermc.paper.threadedregions;
-import ca.spottedleaf.concurrentutil.scheduler.SchedulerThreadPool;
import ca.spottedleaf.concurrentutil.util.TimeUtil;
import ca.spottedleaf.moonrise.common.util.TickThread;
import com.mojang.logging.LogUtils;
@@ -41,10 +40,10 @@ public final class TickRegionScheduler {
}
// Folia end - watchdog
- private final SchedulerThreadPool scheduler;
+ private final ScheduledTaskThreadPool scheduler;
- public TickRegionScheduler(final int threads) {
- this.scheduler = new SchedulerThreadPool(threads, new ThreadFactory() {
+ public TickRegionScheduler() {
+ this.scheduler = new ScheduledTaskThreadPool(new ThreadFactory() {
private final AtomicInteger idGenerator = new AtomicInteger();
@Override
@@ -53,11 +52,15 @@ public final class TickRegionScheduler {
ret.setUncaughtExceptionHandler(TickRegionScheduler.this::uncaughtException);
return ret;
}
- });
+ }, TimeUnit.MILLISECONDS.toNanos(3L), TimeUnit.MILLISECONDS.toNanos(2L));
+ }
+
+ public void setThreads(final int threads) {
+ this.scheduler.setCoreThreads(threads);
}
public int getTotalThreadCount() {
- return this.scheduler.getThreads().length;
+ return this.scheduler.getAliveThreads().length;
}
private static void setTickingRegion(final ThreadedRegionizer.ThreadedRegion<TickRegions.TickRegionData, TickRegions.TickRegionSectionData> region) {
@@ -84,7 +87,7 @@ public final class TickRegionScheduler {
}
}
- private static void setTickTask(final SchedulerThreadPool.SchedulableTick task) {
+ private static void setTickTask(final ScheduledTaskThreadPool.SchedulableTick task) {
final Thread currThread = Thread.currentThread();
if (!(currThread instanceof TickThreadRunner tickThreadRunner)) {
throw new IllegalStateException("Must be tick thread runner");
@@ -127,7 +130,7 @@ public final class TickRegionScheduler {
* Returns the current ticking task, or {@code null} if there is no ticking region.
* If this thread is not a TickThread, then returns {@code null}.
*/
- public static SchedulerThreadPool.SchedulableTick getCurrentTickingTask() {
+ public static ScheduledTaskThreadPool.SchedulableTick getCurrentTickingTask() {
final Thread currThread = Thread.currentThread();
if (!(currThread instanceof TickThreadRunner tickThreadRunner)) {
return null;
@@ -165,22 +168,17 @@ public final class TickRegionScheduler {
region.markNonSchedulable();
}
- /**
- * Updates the tick start to the farthest into the future of its current scheduled time and the
- * provided time.
- * @return {@code false} if the region was not scheduled or is currently ticking or the specified time is less-than its
- * current start time, {@code true} if the next tick start was adjusted.
- */
- public boolean updateTickStartToMax(final RegionScheduleHandle region, final long newStart) {
- return this.scheduler.updateTickStartToMax(region, newStart);
- }
-
public boolean halt(final boolean sync, final long maxWaitNS) {
- return this.scheduler.halt(sync, maxWaitNS);
+ this.scheduler.halt();
+ if (!sync) {
+ return this.scheduler.getAliveThreads().length == 0;
+ }
+
+ return this.scheduler.join(maxWaitNS == 0L ? 0L : Math.max(1L, TimeUnit.NANOSECONDS.toMillis(maxWaitNS)));
}
void dumpAliveThreadTraces(final String reason) {
- for (final Thread thread : this.scheduler.getThreads()) {
+ for (final Thread thread : this.scheduler.getAliveThreads()) {
if (thread.isAlive()) {
TraceUtil.dumpTraceForThread(thread, reason);
}
@@ -191,16 +189,12 @@ public final class TickRegionScheduler {
this.scheduler.notifyTasks(region);
}
- public void init() {
- this.scheduler.start();
- }
-
private void uncaughtException(final Thread thread, final Throwable thr) {
LOGGER.error("Uncaught exception in tick thread \"" + thread.getName() + "\"", thr);
// prevent further ticks from occurring
// we CANNOT sync, because WE ARE ON A SCHEDULER THREAD
- this.scheduler.halt(false, 0L);
+ this.scheduler.halt();
MinecraftServer.getServer().stopServer();
}
@@ -210,7 +204,7 @@ public final class TickRegionScheduler {
// prevent further ticks from occurring
// we CANNOT sync, because WE ARE ON A SCHEDULER THREAD
- this.scheduler.halt(false, 0L);
+ this.scheduler.halt();
final ChunkPos center = handle.region == null ? null : handle.region.region.getCenterChunk();
final ServerLevel world = handle.region == null ? null : handle.region.world;
@@ -226,7 +220,7 @@ public final class TickRegionScheduler {
private ThreadedRegionizer.ThreadedRegion<TickRegions.TickRegionData, TickRegions.TickRegionSectionData> currentTickingRegion;
private RegionizedWorldData currentTickingWorldRegionizedData;
- private SchedulerThreadPool.SchedulableTick currentTickingTask;
+ private ScheduledTaskThreadPool.SchedulableTick currentTickingTask;
// Folia start - profiler
private ca.spottedleaf.leafprofiler.RegionizedProfiler.Handle profiler = ca.spottedleaf.leafprofiler.RegionizedProfiler.Handle.NO_OP_HANDLE;
// Folia end - profiler
@@ -236,7 +230,7 @@ public final class TickRegionScheduler {
}
}
- public static abstract class RegionScheduleHandle extends SchedulerThreadPool.SchedulableTick {
+ public static abstract class RegionScheduleHandle extends ScheduledTaskThreadPool.SchedulableTick {
protected long currentTick;
protected long lastTickStart;
@@ -258,7 +252,7 @@ public final class TickRegionScheduler {
public RegionScheduleHandle(final TickRegions.TickRegionData region, final long firstStart) {
this.currentTick = 0L;
- this.lastTickStart = SchedulerThreadPool.DEADLINE_NOT_SET;
+ this.lastTickStart = ScheduledTaskThreadPool.DEADLINE_NOT_SET;
this.tickTimes5s = new TickData(TimeUnit.SECONDS.toNanos(5L));
this.tickTimes15s = new TickData(TimeUnit.SECONDS.toNanos(15L));
this.tickTimes1m = new TickData(TimeUnit.MINUTES.toNanos(1L));
@@ -267,16 +261,16 @@ public final class TickRegionScheduler {
this.region = region;
this.setScheduledStart(firstStart);
- this.tickSchedule = new Schedule(firstStart == SchedulerThreadPool.DEADLINE_NOT_SET ? firstStart : firstStart - TIME_BETWEEN_TICKS);
+ this.tickSchedule = new Schedule(firstStart == ScheduledTaskThreadPool.DEADLINE_NOT_SET ? firstStart : firstStart - TIME_BETWEEN_TICKS);
}
/**
- * Subclasses should call this instead of {@link ca.spottedleaf.concurrentutil.scheduler.SchedulerThreadPool.SchedulableTick#setScheduledStart(long)}
+ * Subclasses should call this instead of {@link #setScheduledStart(long)}
* so that the tick schedule and scheduled start remain synchronised
*/
protected final void updateScheduledStart(final long to) {
this.setScheduledStart(to);
- this.tickSchedule.setLastPeriod(to == SchedulerThreadPool.DEADLINE_NOT_SET ? to : to - TIME_BETWEEN_TICKS);
+ this.tickSchedule.setLastPeriod(to == ScheduledTaskThreadPool.DEADLINE_NOT_SET ? to : to - TIME_BETWEEN_TICKS);
}
public final void markNonSchedulable() {
@@ -293,7 +287,7 @@ public final class TickRegionScheduler {
protected abstract void tickRegion(final int tickCount, final long startTime, final long scheduledEnd);
- protected abstract boolean runRegionTasks(final BooleanSupplier canContinue);
+ protected abstract void runRegionTasks(final BooleanSupplier canContinue);
protected abstract boolean hasIntermediateTasks();
@@ -303,9 +297,9 @@ public final class TickRegionScheduler {
}
@Override
- public final Boolean runTasks(final BooleanSupplier canContinue) {
+ public final boolean runTasks(final BooleanSupplier canContinue) {
if (this.cancelled.get()) {
- return null;
+ return false;
}
final long cpuStart = MEASURE_CPU_TIME ? THREAD_MX_BEAN.getCurrentThreadCpuTime() : 0L;
@@ -316,7 +310,7 @@ public final class TickRegionScheduler {
throw new IllegalStateException("Scheduled region should be acquirable");
}
// region was killed
- return null;
+ return false;
}
TickRegionScheduler.setTickTask(this);
@@ -326,31 +320,30 @@ public final class TickRegionScheduler {
synchronized (this) {
this.currentTickData = new TickTime(
- SchedulerThreadPool.DEADLINE_NOT_SET, SchedulerThreadPool.DEADLINE_NOT_SET, tickStart, cpuStart,
- SchedulerThreadPool.DEADLINE_NOT_SET, SchedulerThreadPool.DEADLINE_NOT_SET, MEASURE_CPU_TIME,
+ ScheduledTaskThreadPool.DEADLINE_NOT_SET, ScheduledTaskThreadPool.DEADLINE_NOT_SET, tickStart, cpuStart,
+ ScheduledTaskThreadPool.DEADLINE_NOT_SET, ScheduledTaskThreadPool.DEADLINE_NOT_SET, MEASURE_CPU_TIME,
false
);
this.currentTickingThread = Thread.currentThread();
}
- final boolean ret;
final FoliaWatchdogThread.RunningTick runningTick = new FoliaWatchdogThread.RunningTick(tickStart, this, Thread.currentThread()); // Folia - watchdog
WATCHDOG_THREAD.addTick(runningTick); // Folia - watchdog
try {
- ret = this.runRegionTasks(() -> {
+ this.runRegionTasks(() -> {
return !RegionScheduleHandle.this.cancelled.get() && canContinue.getAsBoolean();
});
} catch (final Throwable thr) {
this.scheduler.regionFailed(this, true, thr);
// don't release region for another tick
- return null;
+ return false;
} finally {
WATCHDOG_THREAD.removeTick(runningTick); // Folia - watchdog
final long tickEnd = System.nanoTime();
final long cpuEnd = MEASURE_CPU_TIME ? THREAD_MX_BEAN.getCurrentThreadCpuTime() : 0L;
final TickTime time = new TickTime(
- SchedulerThreadPool.DEADLINE_NOT_SET, SchedulerThreadPool.DEADLINE_NOT_SET,
+ ScheduledTaskThreadPool.DEADLINE_NOT_SET, ScheduledTaskThreadPool.DEADLINE_NOT_SET,
tickStart, cpuStart, tickEnd, cpuEnd, MEASURE_CPU_TIME, false
);
@@ -361,7 +354,7 @@ public final class TickRegionScheduler {
}
}
- return !this.markNotTicking() || this.cancelled.get() ? null : Boolean.valueOf(ret);
+ return this.markNotTicking() && !this.cancelled.get();
}
@Override
@@ -405,7 +398,7 @@ public final class TickRegionScheduler {
synchronized (this) {
this.currentTickData = new TickTime(
lastTickStart, scheduledStart, tickStart, cpuStart,
- SchedulerThreadPool.DEADLINE_NOT_SET, SchedulerThreadPool.DEADLINE_NOT_SET, MEASURE_CPU_TIME,
+ ScheduledTaskThreadPool.DEADLINE_NOT_SET, ScheduledTaskThreadPool.DEADLINE_NOT_SET, MEASURE_CPU_TIME,
true
);
this.currentTickingThread = Thread.currentThread();
@@ -582,7 +575,7 @@ public final class TickRegionScheduler {
* Only valid when {@link #isTickExecution()} is {@code true}.
*/
public boolean hasLastTick() {
- return this.previousTickStart != SchedulerThreadPool.DEADLINE_NOT_SET;
+ return this.previousTickStart != ScheduledTaskThreadPool.DEADLINE_NOT_SET;
}
/*
diff --git a/io/papermc/paper/threadedregions/TickRegions.java b/io/papermc/paper/threadedregions/TickRegions.java
index 988fe74578065c9464f5639e5cc6af79619edef5..1fe996286e048cb8a8d5ede45f27792bc04fec38 100644
--- a/io/papermc/paper/threadedregions/TickRegions.java
+++ b/io/papermc/paper/threadedregions/TickRegions.java
@@ -1,6 +1,5 @@
package io.papermc.paper.threadedregions;
-import ca.spottedleaf.concurrentutil.scheduler.SchedulerThreadPool;
import ca.spottedleaf.concurrentutil.util.TimeUtil;
import ca.spottedleaf.moonrise.patches.chunk_system.scheduling.ChunkHolderManager;
import com.mojang.logging.LogUtils;
@@ -12,6 +11,7 @@ import it.unimi.dsi.fastutil.objects.Reference2ReferenceOpenHashMap;
import it.unimi.dsi.fastutil.objects.ReferenceOpenHashSet;
import net.minecraft.server.MinecraftServer;
import net.minecraft.server.level.ServerLevel;
+import net.minecraft.server.level.ServerPlayer;
import org.slf4j.Logger;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
@@ -29,6 +29,7 @@ public final class TickRegions implements ThreadedRegionizer.RegionCallbacks<Tic
}
private static boolean initialised;
+
private static TickRegionScheduler scheduler;
public static TickRegionScheduler getScheduler() {
@@ -45,6 +46,11 @@ public final class TickRegions implements ThreadedRegionizer.RegionCallbacks<Tic
gridExponent = Math.min(31, gridExponent);
regionShift = gridExponent;
+ scheduler = new TickRegionScheduler();
+ }
+
+ public static void start() {
+ final GlobalConfiguration.ThreadedRegions config = GlobalConfiguration.get().threadedRegions;
int tickThreads;
if (config.threads <= 0) {
tickThreads = Runtime.getRuntime().availableProcessors() / 2;
@@ -57,7 +63,8 @@ public final class TickRegions implements ThreadedRegionizer.RegionCallbacks<Tic
tickThreads = config.threads;
}
- scheduler = new TickRegionScheduler(tickThreads);
+ scheduler.setThreads(tickThreads);
+
LOGGER.info("Regionised ticking is enabled with " + tickThreads + " tick threads");
}
@@ -171,7 +178,7 @@ public final class TickRegions implements ThreadedRegionizer.RegionCallbacks<Tic
private final Reference2ReferenceOpenHashMap<RegionizedData<?>, Object> regionizedData = new Reference2ReferenceOpenHashMap<>();
// tick data
- private ConcreteRegionTickHandle tickHandle = new ConcreteRegionTickHandle(this, SchedulerThreadPool.DEADLINE_NOT_SET);
+ private ConcreteRegionTickHandle tickHandle = new ConcreteRegionTickHandle(this, ScheduledTaskThreadPool.DEADLINE_NOT_SET);
// queue data
private final RegionizedTaskQueue.RegionTaskQueueData taskQueueData;
@@ -182,15 +189,64 @@ public final class TickRegions implements ThreadedRegionizer.RegionCallbacks<Tic
// async-safe read-only region data
private final RegionStats regionStats;
+ //
+ private final java.util.concurrent.atomic.AtomicBoolean hasPackets = new java.util.concurrent.atomic.AtomicBoolean(false);
+
public volatile ca.spottedleaf.leafprofiler.RegionizedProfiler.Handle profiler; // Folia - profiler
private TickRegionData(final ThreadedRegionizer.ThreadedRegion<TickRegionData, TickRegionSectionData> region) {
this.region = region;
this.world = region.regioniser.world;
- this.taskQueueData = new RegionizedTaskQueue.RegionTaskQueueData(this.world.taskQueueRegionData);
+ this.taskQueueData = new RegionizedTaskQueue.RegionTaskQueueData(this.world.taskQueueRegionData, this);
this.regionStats = new RegionStats();
}
+ public void setHasTasks() {
+ TickRegions.getScheduler().setHasTasks(this.tickHandle);
+ }
+
+ public void setHasPackets() {
+ if (!this.hasPackets.get() && !this.hasPackets.compareAndExchange(false, true)) {
+ this.setHasTasks();
+ }
+ }
+
+ public boolean drainOnePacket() {
+ if (!this.hasPackets.get()) {
+ return false;
+ }
+
+ final RegionizedWorldData worldData = this.world.getCurrentWorldData();
+ boolean hasPacketsNew = false;
+
+ for (final ServerPlayer player : worldData.getLocalPlayers()) {
+ if (!ca.spottedleaf.moonrise.common.util.TickThread.isTickThreadFor(player)) {
+ continue;
+ }
+ if (player.getBukkitEntity().executeOnePacket()) {
+ hasPacketsNew |= player.getBukkitEntity().hasPackets();
+ }
+ }
+
+ if (!hasPacketsNew) {
+ this.hasPackets.set(false);
+
+ // handle race condition: packet added during packet processing
+ for (final ServerPlayer player : worldData.getLocalPlayers()) {
+ if (player.getBukkitEntity().hasPackets()) {
+ this.hasPackets.set(true);
+ break;
+ }
+ }
+ }
+
+ return true;
+ }
+
+ public void drainPackets() {
+ while (this.drainOnePacket());
+ }
+
public RegionStats getRegionStats() {
return this.regionStats;
}
@@ -224,7 +280,7 @@ public final class TickRegions implements ThreadedRegionizer.RegionCallbacks<Tic
return ret;
}
- ret = regionizedData.createNewValue();
+ ret = regionizedData.createNewValue(this);
this.regionizedData.put(regionizedData, ret);
return ret;
@@ -242,6 +298,10 @@ public final class TickRegions implements ThreadedRegionizer.RegionCallbacks<Tic
for (final ThreadedRegionizer.ThreadedRegion<TickRegionData, TickRegionSectionData> region : regions) {
final TickRegionData data = region.getData();
data.tickHandle.copyDeadlineAndTickCount(this.tickHandle);
+ // just be lazy about this one, it's not very important
+ if (this.hasPackets.getOpaque()) {
+ data.hasPackets.setOpaque(true);
+ }
}
// generic regionised data
@@ -309,6 +369,10 @@ public final class TickRegions implements ThreadedRegionizer.RegionCallbacks<Tic
// there's not really a great solution to the tick problem, no matter what it'll be messed up
// we will pick the greatest time delay so that tps will not exceed TICK_RATE
data.tickHandle.updateSchedulingToMax(this.tickHandle);
+ // just be lazy about this one, it's not very important
+ if (this.hasPackets.getOpaque()) {
+ data.hasPackets.setOpaque(true);
+ }
// generic regionised data
final long fromTickOffset = currentTickTo - currentTickFrom; // see merge jd
@@ -350,11 +414,11 @@ public final class TickRegions implements ThreadedRegionizer.RegionCallbacks<Tic
}
private void updateSchedulingToMax(final ConcreteRegionTickHandle from) {
- if (from.getScheduledStart() == SchedulerThreadPool.DEADLINE_NOT_SET) {
+ if (from.getScheduledStart() == ScheduledTaskThreadPool.DEADLINE_NOT_SET) {
return;
}
- if (this.getScheduledStart() == SchedulerThreadPool.DEADLINE_NOT_SET) {
+ if (this.getScheduledStart() == ScheduledTaskThreadPool.DEADLINE_NOT_SET) {
this.updateScheduledStart(from.getScheduledStart());
return;
}
@@ -365,7 +429,7 @@ public final class TickRegions implements ThreadedRegionizer.RegionCallbacks<Tic
private void copyDeadlineAndTickCount(final ConcreteRegionTickHandle from) {
this.currentTick = from.currentTick;
- if (from.getScheduledStart() == SchedulerThreadPool.DEADLINE_NOT_SET) {
+ if (from.getScheduledStart() == ScheduledTaskThreadPool.DEADLINE_NOT_SET) {
return;
}
@@ -374,7 +438,7 @@ public final class TickRegions implements ThreadedRegionizer.RegionCallbacks<Tic
}
private void checkInitialSchedule() {
- if (this.getScheduledStart() == SchedulerThreadPool.DEADLINE_NOT_SET) {
+ if (this.getScheduledStart() == ScheduledTaskThreadPool.DEADLINE_NOT_SET) {
this.updateScheduledStart(System.nanoTime() + TickRegionScheduler.TIME_BETWEEN_TICKS);
}
}
@@ -409,35 +473,34 @@ public final class TickRegions implements ThreadedRegionizer.RegionCallbacks<Tic
}
@Override
- protected boolean runRegionTasks(final BooleanSupplier canContinue) {
+ protected void runRegionTasks(final BooleanSupplier canContinue) {
final ca.spottedleaf.leafprofiler.RegionizedProfiler.Handle profiler = io.papermc.paper.threadedregions.TickRegionScheduler.getProfiler(); // Folia start - profiler
profiler.startInBetweenTick(); try { // Folia - profiler
final RegionizedTaskQueue.RegionTaskQueueData queue = this.region.taskQueueData;
boolean processedChunkTask = false;
- boolean executeChunkTask = true;
- boolean executeTickTask = true;
+ boolean executeChunkTask;
+ boolean executeTickTask;
+ boolean executePacketTask;
do {
- if (executeTickTask) {
- executeTickTask = queue.executeTickTask();
- }
- if (executeChunkTask) {
- processedChunkTask |= (executeChunkTask = queue.executeChunkTask());
- }
- } while ((executeChunkTask | executeTickTask) && canContinue.getAsBoolean());
+ executeTickTask = queue.executeTickTask();
+ executeChunkTask = queue.executeChunkTask();
+ executePacketTask = this.region.drainOnePacket();
+
+ processedChunkTask |= executeChunkTask;
+ } while ((executeChunkTask | executeTickTask | executePacketTask) && canContinue.getAsBoolean());
if (processedChunkTask) {
// if we processed any chunk tasks, try to process ticket level updates for full status changes
this.region.world.moonrise$getChunkTaskScheduler().chunkHolderManager.processTicketUpdates();
}
- return true;
} finally { profiler.stopInBetweenTick(); } // Folia - profiler
}
@Override
protected boolean hasIntermediateTasks() {
- return this.region.taskQueueData.hasTasks();
+ return this.region.taskQueueData.hasTasks() || this.region.hasPackets.get();
}
}
diff --git a/io/papermc/paper/threadedregions/scheduler/FoliaRegionScheduler.java b/io/papermc/paper/threadedregions/scheduler/FoliaRegionScheduler.java
index bc8aa525b3488dc71e7ca0529c6a8c57eaa99e1e..498125a093bb323b34860ca723e89eb8720eb71f 100644
--- a/io/papermc/paper/threadedregions/scheduler/FoliaRegionScheduler.java
+++ b/io/papermc/paper/threadedregions/scheduler/FoliaRegionScheduler.java
@@ -7,6 +7,7 @@ import ca.spottedleaf.moonrise.patches.chunk_system.scheduling.ChunkHolderManage
import io.papermc.paper.threadedregions.RegionizedData;
import io.papermc.paper.threadedregions.RegionizedServer;
import io.papermc.paper.threadedregions.TickRegionScheduler;
+import io.papermc.paper.threadedregions.TickRegions.TickRegionData;
import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
import it.unimi.dsi.fastutil.longs.Long2ReferenceOpenHashMap;
@@ -143,6 +144,9 @@ public final class FoliaRegionScheduler implements RegionScheduler {
}
private static final class Scheduler {
+
+ private Scheduler(final TickRegionData regionData) {}
+
private static final RegionizedData.RegioniserCallback<Scheduler> REGIONISER_CALLBACK = new RegionizedData.RegioniserCallback<>() {
@Override
public void merge(final Scheduler from, final Scheduler into, final long fromTickOffset) {
diff --git a/net/minecraft/network/protocol/PacketUtils.java b/net/minecraft/network/protocol/PacketUtils.java
index c7214f0457641b5550b98dbd2863c1d637faeaa5..549b2bb8bd12b631ff023ff3b31bd392b51e41fd 100644
--- a/net/minecraft/network/protocol/PacketUtils.java
+++ b/net/minecraft/network/protocol/PacketUtils.java
@@ -48,14 +48,8 @@ public class PacketUtils {
// Paper end - detailed watchdog information
// Folia start - region threading
};
- // ignore retired state, if removed then we don't want the packet to be handled
if (processor instanceof net.minecraft.server.network.ServerGamePacketListenerImpl gamePacketListener) {
- gamePacketListener.player.getBukkitEntity().taskScheduler.schedule(
- (net.minecraft.server.level.ServerPlayer player) -> {
- run.run();
- },
- null, 1L
- );
+ gamePacketListener.player.getBukkitEntity().addPacket(run);
} else if (processor instanceof net.minecraft.server.network.ServerConfigurationPacketListenerImpl configurationPacketListener) {
io.papermc.paper.threadedregions.RegionizedServer.getInstance().addTask(run);
} else if (processor instanceof net.minecraft.server.network.ServerLoginPacketListenerImpl loginPacketListener) {
diff --git a/net/minecraft/server/MinecraftServer.java b/net/minecraft/server/MinecraftServer.java
index faf72dd6dff74296c73cb058aaabd1f9f475a072..be82a6b43b1f0c644c53d08a6e16bc2876c8c1e0 100644
--- a/net/minecraft/server/MinecraftServer.java
+++ b/net/minecraft/server/MinecraftServer.java
@@ -1647,6 +1647,7 @@ public abstract class MinecraftServer extends ReentrantBlockableEventLoop<TickTa
} finally { foliaProfiler.stopTimer(ca.spottedleaf.leafprofiler.LProfilerRegistry.PLUGIN_TICK_TASKS); } // Folia - profiler
// now run all the entity schedulers
// TODO there has got to be a more efficient variant of this crap
+ region.drainPackets();
long tickedEntitySchedulers = 0L; // Folia - profiler
foliaProfiler.startTimer(ca.spottedleaf.leafprofiler.LProfilerRegistry.ENTITY_SCHEDULER_TICK); try { // Folia - profiler
for (net.minecraft.world.entity.Entity entity : region.world.getCurrentWorldData().getLocalEntitiesCopy()) {
diff --git a/net/minecraft/server/network/ServerCommonPacketListenerImpl.java b/net/minecraft/server/network/ServerCommonPacketListenerImpl.java
index 6eca15223b92aedac74233db886e2c1248750e2c..c739f2cf3590d5aab2391d95f6ea9a5cc4a2e599 100644
--- a/net/minecraft/server/network/ServerCommonPacketListenerImpl.java
+++ b/net/minecraft/server/network/ServerCommonPacketListenerImpl.java
@@ -120,6 +120,7 @@ public abstract class ServerCommonPacketListenerImpl implements ServerCommonPack
this.server.halt(false);
}
this.player.getBukkitEntity().taskScheduler.retire(); // Folia - region threading
+ this.player.getBukkitEntity().stopAcceptingPackets(); // Folia - region threading
}
@Override
diff --git a/net/minecraft/world/level/Level.java b/net/minecraft/world/level/Level.java
index dafd90502937019b616ac0a79465e1dbc578cf66..78f4dd7032d18b8e020a4576e4ac012c1d472e67 100644
--- a/net/minecraft/world/level/Level.java
+++ b/net/minecraft/world/level/Level.java
@@ -820,7 +820,7 @@ public abstract class Level implements LevelAccessor, AutoCloseable, ca.spottedl
// Folia start - region ticking
public final io.papermc.paper.threadedregions.RegionizedData<io.papermc.paper.threadedregions.RegionizedWorldData> worldRegionData
= new io.papermc.paper.threadedregions.RegionizedData<>(
- (ServerLevel)this, () -> new io.papermc.paper.threadedregions.RegionizedWorldData((ServerLevel)Level.this),
+ (ServerLevel)this, (regionData) -> new io.papermc.paper.threadedregions.RegionizedWorldData((ServerLevel)Level.this, regionData),
io.papermc.paper.threadedregions.RegionizedWorldData.REGION_CALLBACK
);
public volatile io.papermc.paper.threadedregions.RegionizedServer.WorldLevelData tickData;