Folia/folia-server/minecraft-patches/features/0009-fixup-Region-Threading-Base.patch
Spottedleaf 23ed0843b0 Update ScheduledTaskThreadPool
1. Adjust insertion last task process time to be the current time.
   This makes fresh inserts to be scheduled fairly with current
   inserts.
2. Always attempt to watch global first task
   This should try to prevent multiple scheduler threads from
   waiting on the same task. If the global first task is our task,
   then we can avoid retrying for the next task.
3. Correctly set runner state to TASKS when parsing intermediate
   tasks. This will prevent the runner from being interrupted
   by notifyTasks.
2025-03-21 08:53:19 -07:00

2154 lines
91 KiB
Diff

From 0000000000000000000000000000000000000000 Mon Sep 17 00:00:00 2001
From: Spottedleaf <Spottedleaf@users.noreply.github.com>
Date: Thu, 20 Mar 2025 11:07:45 -0700
Subject: [PATCH] fixup! Region Threading Base
diff --git a/io/papermc/paper/threadedregions/COWArrayList.java b/io/papermc/paper/threadedregions/COWArrayList.java
new file mode 100644
index 0000000000000000000000000000000000000000..65393f18011aed8023ede95382145abf9cbb3c5e
--- /dev/null
+++ b/io/papermc/paper/threadedregions/COWArrayList.java
@@ -0,0 +1,64 @@
+package io.papermc.paper.threadedregions;
+
+import java.lang.reflect.Array;
+import java.util.Arrays;
+
+public final class COWArrayList<E> {
+
+ private volatile E[] array;
+
+ public COWArrayList(final Class<E> clazz) {
+ this.array = (E[])Array.newInstance(clazz, 0);
+ }
+
+ public E[] getArray() {
+ return this.array;
+ }
+
+ public boolean contains(final E test) {
+ for (final E elem : this.array) {
+ if (elem == test) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ public void add(final E element) {
+ synchronized (this) {
+ final E[] array = this.array;
+
+ final E[] copy = Arrays.copyOf(array, array.length + 1);
+ copy[array.length] = element;
+
+ this.array = copy;
+ }
+ }
+
+ public boolean remove(final E element) {
+ synchronized (this) {
+ final E[] array = this.array;
+ int index = -1;
+ for (int i = 0, len = array.length; i < len; ++i) {
+ if (array[i] == element) {
+ index = i;
+ break;
+ }
+ }
+
+ if (index == -1) {
+ return false;
+ }
+
+ final E[] copy = (E[])Array.newInstance(array.getClass().getComponentType(), array.length - 1);
+
+ System.arraycopy(array, 0, copy, 0, index);
+ System.arraycopy(array, index + 1, copy, index, (array.length - 1) - index);
+
+ this.array = copy;
+ }
+
+ return true;
+ }
+}
\ No newline at end of file
diff --git a/io/papermc/paper/threadedregions/RegionizedData.java b/io/papermc/paper/threadedregions/RegionizedData.java
index a1043c426d031755b57b77a9b2eec685e9861b13..5e66fcbee18ea0889ebe652228ac3e61cac7c872 100644
--- a/io/papermc/paper/threadedregions/RegionizedData.java
+++ b/io/papermc/paper/threadedregions/RegionizedData.java
@@ -1,10 +1,12 @@
package io.papermc.paper.threadedregions;
import ca.spottedleaf.concurrentutil.util.Validate;
+import io.papermc.paper.threadedregions.TickRegions.TickRegionData;
import it.unimi.dsi.fastutil.longs.Long2ReferenceOpenHashMap;
import it.unimi.dsi.fastutil.objects.ReferenceOpenHashSet;
import net.minecraft.server.level.ServerLevel;
import javax.annotation.Nullable;
+import java.util.function.Function;
import java.util.function.Supplier;
/**
@@ -42,7 +44,7 @@ import java.util.function.Supplier;
* // callback is left out of this example
* // note: world != null here
* public final RegionizedData<EntityTickList> entityTickLists =
- * new RegionizedData<>(this, () -> new EntityTickList(), ...);
+ * new RegionizedData<>(this, (data) -> new EntityTickList(), ...);
*
* public void addTickingEntity(Entity e) {
* // What we expect here is that this world is the
@@ -86,7 +88,7 @@ import java.util.function.Supplier;
* // note: world == null here, because this RegionizedData object
* // is not instantiated per world, but rather globally.
* public final RegionizedData<TickTimes> tickTimes =
- * new RegionizedData<>(null, () -> new TickTimes(), ...);
+ * new RegionizedData<>(null, (data) -> new TickTimes(), ...);
* }
* }
* </pre>
@@ -97,7 +99,7 @@ import java.util.function.Supplier;
public final class RegionizedData<T> {
private final ServerLevel world;
- private final Supplier<T> initialValueSupplier;
+ private final Function<TickRegionData ,T> initialValueSupplier;
private final RegioniserCallback<T> callback;
/**
@@ -119,14 +121,14 @@ public final class RegionizedData<T> {
* @param supplier Initial value supplier used to lazy initialise region data.
* @param callback Region callback to manage this regionised data.
*/
- public RegionizedData(final ServerLevel world, final Supplier<T> supplier, final RegioniserCallback<T> callback) {
+ public RegionizedData(final ServerLevel world, final Function<TickRegionData,T> supplier, final RegioniserCallback<T> callback) {
this.world = world;
this.initialValueSupplier = Validate.notNull(supplier, "Supplier may not be null.");
this.callback = Validate.notNull(callback, "Regioniser callback may not be null.");
}
- T createNewValue() {
- return Validate.notNull(this.initialValueSupplier.get(), "Initial value supplier may not return null");
+ T createNewValue(final TickRegionData regionData) {
+ return Validate.notNull(this.initialValueSupplier.apply(regionData), "Initial value supplier may not return null");
}
RegioniserCallback<T> getCallback() {
@@ -141,7 +143,7 @@ public final class RegionizedData<T> {
* and the current ticking region's world does not match this {@code RegionizedData}'s world.
*/
public @Nullable T get() {
- final ThreadedRegionizer.ThreadedRegion<TickRegions.TickRegionData, TickRegions.TickRegionSectionData> region =
+ final ThreadedRegionizer.ThreadedRegion<TickRegionData, TickRegions.TickRegionSectionData> region =
TickRegionScheduler.getCurrentRegion();
if (region == null) {
diff --git a/io/papermc/paper/threadedregions/RegionizedServer.java b/io/papermc/paper/threadedregions/RegionizedServer.java
index 1382c695c4991488b113401e231875ddc74f6b01..9008db1d594ffc371a7fa5bd36f6bb43a168d558 100644
--- a/io/papermc/paper/threadedregions/RegionizedServer.java
+++ b/io/papermc/paper/threadedregions/RegionizedServer.java
@@ -1,7 +1,6 @@
package io.papermc.paper.threadedregions;
import ca.spottedleaf.concurrentutil.collection.MultiThreadedQueue;
-import ca.spottedleaf.concurrentutil.scheduler.SchedulerThreadPool;
import ca.spottedleaf.moonrise.common.util.TickThread;
import com.mojang.logging.LogUtils;
import io.papermc.paper.threadedregions.scheduler.FoliaGlobalRegionScheduler;
@@ -23,6 +22,7 @@ import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BooleanSupplier;
@@ -64,7 +64,7 @@ public final class RegionizedServer {
// now we can schedule
this.tickHandle.setInitialStart(System.nanoTime() + TickRegionScheduler.TIME_BETWEEN_TICKS);
TickRegions.getScheduler().scheduleRegion(this.tickHandle);
- TickRegions.getScheduler().init();
+ TickRegions.start();
}
public void invalidateStatus() {
@@ -80,6 +80,15 @@ public final class RegionizedServer {
TickRegions.getScheduler().setHasTasks(this.tickHandle);
}
+ private boolean hasAnyGlobalChunkTasks() {
+ for (final ServerLevel world : this.worlds) {
+ if (world.taskQueueRegionData.hasGlobalChunkTasks()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
/**
* Returns the current tick of the region ticking.
* @throws IllegalStateException If there is no current region.
@@ -118,7 +127,7 @@ public final class RegionizedServer {
private final AtomicBoolean ticking = new AtomicBoolean();
public GlobalTickTickHandle(final RegionizedServer server) {
- super(null, SchedulerThreadPool.DEADLINE_NOT_SET);
+ super(null, ScheduledTaskThreadPool.DEADLINE_NOT_SET);
this.server = server;
}
@@ -158,26 +167,31 @@ public final class RegionizedServer {
return false;
}
- // TODO try catch?
run.run();
return true;
}
+ private boolean runGlobalTask() {
+ boolean ret = false;
+ for (final ServerLevel world : this.server.worlds) {
+ ret |= world.taskQueueRegionData.executeGlobalChunkTask();
+ }
+ return ret;
+ }
+
@Override
- protected boolean runRegionTasks(final BooleanSupplier canContinue) {
+ protected void runRegionTasks(final BooleanSupplier canContinue) {
do {
- if (!this.runOneTask()) {
- return false;
+ if (!this.runOneTask() || !this.runGlobalTask()) {
+ return;
}
} while (canContinue.getAsBoolean());
-
- return true;
}
@Override
protected boolean hasIntermediateTasks() {
- return !this.server.globalTickQueue.isEmpty();
+ return !this.server.globalTickQueue.isEmpty() || this.server.hasAnyGlobalChunkTasks();
}
}
diff --git a/io/papermc/paper/threadedregions/RegionizedTaskQueue.java b/io/papermc/paper/threadedregions/RegionizedTaskQueue.java
index 745ab870310733b569681f5280895bb9798620a4..2794694ce57b21711e1ddac146647317b79336bf 100644
--- a/io/papermc/paper/threadedregions/RegionizedTaskQueue.java
+++ b/io/papermc/paper/threadedregions/RegionizedTaskQueue.java
@@ -79,7 +79,11 @@ public final class RegionizedTaskQueue {
this.world = world;
}
- private boolean executeGlobalChunkTask() {
+ public boolean hasGlobalChunkTasks() {
+ return !this.globalChunkTask.isEmpty();
+ }
+
+ public boolean executeGlobalChunkTask() {
final Runnable run = this.globalChunkTask.poll();
if (run != null) {
run.run();
@@ -94,6 +98,7 @@ public final class RegionizedTaskQueue {
public void pushGlobalChunkTask(final Runnable run) {
this.globalChunkTask.add(run);
+ TickRegions.getScheduler().setHasTasks(RegionizedServer.getGlobalTickData());
}
private PrioritisedQueue getQueue(final boolean synchronise, final int chunkX, final int chunkZ, final boolean isChunkTask) {
@@ -216,12 +221,14 @@ public final class RegionizedTaskQueue {
}
public static final class RegionTaskQueueData {
- private final PrioritisedQueue tickTaskQueue = new PrioritisedQueue();
- private final PrioritisedQueue chunkQueue = new PrioritisedQueue();
+ private final PrioritisedQueue tickTaskQueue = new PrioritisedQueue(this);
+ private final PrioritisedQueue chunkQueue = new PrioritisedQueue(this);
private final WorldRegionTaskData worldRegionTaskData;
+ private final TickRegions.TickRegionData regionData;
- public RegionTaskQueueData(final WorldRegionTaskData worldRegionTaskData) {
+ public RegionTaskQueueData(final WorldRegionTaskData worldRegionTaskData, final TickRegions.TickRegionData regionData) {
this.worldRegionTaskData = worldRegionTaskData;
+ this.regionData = regionData;
}
void mergeInto(final RegionTaskQueueData into) {
@@ -281,6 +288,13 @@ public final class RegionizedTaskQueue {
this.queues[i] = new ArrayDeque<>();
}
}
+
+ private final RegionTaskQueueData queue;
+
+ private PrioritisedQueue(final RegionTaskQueueData queue) {
+ this.queue = queue;
+ }
+
private boolean isDestroyed;
public int getScheduledTasks() {
@@ -609,6 +623,8 @@ public final class RegionizedTaskQueue {
continue;
}
+ queue.queue.regionData.setHasTasks();
+
// successfully queued
return true;
}
diff --git a/io/papermc/paper/threadedregions/RegionizedWorldData.java b/io/papermc/paper/threadedregions/RegionizedWorldData.java
index c6e487a4c14e6b82533881d01f32349b9ae28728..b8f1f042342d3fed5fa26df0de07e8e2b0937130 100644
--- a/io/papermc/paper/threadedregions/RegionizedWorldData.java
+++ b/io/papermc/paper/threadedregions/RegionizedWorldData.java
@@ -7,6 +7,7 @@ import ca.spottedleaf.moonrise.common.util.CoordinateUtils;
import ca.spottedleaf.moonrise.common.util.TickThread;
import ca.spottedleaf.moonrise.patches.chunk_system.scheduling.ChunkHolderManager;
import com.mojang.logging.LogUtils;
+import io.papermc.paper.threadedregions.TickRegions.TickRegionData;
import it.unimi.dsi.fastutil.longs.Long2ReferenceMap;
import it.unimi.dsi.fastutil.longs.Long2ReferenceOpenHashMap;
import it.unimi.dsi.fastutil.objects.ObjectLinkedOpenHashSet;
@@ -33,7 +34,6 @@ import net.minecraft.world.entity.ai.village.VillageSiege;
import net.minecraft.world.entity.item.ItemEntity;
import net.minecraft.world.level.BlockEventData;
import net.minecraft.world.level.ChunkPos;
-import net.minecraft.world.level.Explosion;
import net.minecraft.world.level.Level;
import net.minecraft.world.level.NaturalSpawner;
import net.minecraft.world.level.ServerExplosion;
@@ -61,6 +61,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;
import java.util.function.Predicate;
@@ -82,6 +83,7 @@ public final class RegionizedWorldData {
// entities
for (final ServerPlayer player : from.localPlayers) {
into.localPlayers.add(player);
+ player.getBukkitEntity().updateRegion(into);
into.nearbyPlayers.addPlayer(player);
}
for (final Entity entity : from.allEntities) {
@@ -180,6 +182,7 @@ public final class RegionizedWorldData {
// the chunk holder must _exist_, and so the region section exists.
final RegionizedWorldData into = regionToData.get(CoordinateUtils.getChunkKey(pos.x >> chunkToRegionShift, pos.z >> chunkToRegionShift));
into.localPlayers.add(player);
+ player.getBukkitEntity().updateRegion(into);
into.nearbyPlayers.addPlayer(player);
}
for (final Entity entity : from.allEntities) {
@@ -346,7 +349,8 @@ public final class RegionizedWorldData {
}
// entities
- private final List<ServerPlayer> localPlayers = new ArrayList<>();
+ // this is copy on write to allow packet processing to iterate safely
+ private final CopyOnWriteArrayList<ServerPlayer> localPlayers = new CopyOnWriteArrayList<>();
private final NearbyPlayers nearbyPlayers;
private final ReferenceList<Entity> allEntities = new ReferenceList<>(EMPTY_ENTITY_ARRAY);
private final ReferenceList<Entity> loadedEntities = new ReferenceList<>(EMPTY_ENTITY_ARRAY);
@@ -446,7 +450,9 @@ public final class RegionizedWorldData {
public final alternate.current.wire.WireHandler wireHandler;
public final io.papermc.paper.redstone.RedstoneWireTurbo turbo;
- public RegionizedWorldData(final ServerLevel world) {
+ public final TickRegionData regionData;
+
+ public RegionizedWorldData(final ServerLevel world, final TickRegionData regionData) {
this.world = world;
this.blockLevelTicks = new LevelTicks<>(world::isPositionTickingWithEntitiesLoaded, world, true);
this.fluidLevelTicks = new LevelTicks<>(world::isPositionTickingWithEntitiesLoaded, world, false);
@@ -454,6 +460,7 @@ public final class RegionizedWorldData {
this.nearbyPlayers = new NearbyPlayers(world);
this.wireHandler = new alternate.current.wire.WireHandler(world);
this.turbo = new io.papermc.paper.redstone.RedstoneWireTurbo((RedStoneWireBlock)Blocks.REDSTONE_WIRE);
+ this.regionData = regionData;
// tasks may be drained before the region ticks, so we must set up the tick data early just in case
this.updateTickData();
@@ -611,6 +618,7 @@ public final class RegionizedWorldData {
if (this.allEntities.add(entity)) {
if (entity instanceof ServerPlayer player) {
this.localPlayers.add(player);
+ player.getBukkitEntity().updateRegion(this);
}
TickRegions.RegionStats.updateCurrentRegion();
}
@@ -627,6 +635,7 @@ public final class RegionizedWorldData {
if (this.allEntities.remove(entity)) {
if (entity instanceof ServerPlayer player) {
this.localPlayers.remove(player);
+ player.getBukkitEntity().updateRegion(null);
}
TickRegions.RegionStats.updateCurrentRegion();
}
diff --git a/io/papermc/paper/threadedregions/ScheduledTaskThreadPool.java b/io/papermc/paper/threadedregions/ScheduledTaskThreadPool.java
new file mode 100644
index 0000000000000000000000000000000000000000..c7627ababadf72231682baa9c056a4082170410a
--- /dev/null
+++ b/io/papermc/paper/threadedregions/ScheduledTaskThreadPool.java
@@ -0,0 +1,1164 @@
+package io.papermc.paper.threadedregions;
+
+import ca.spottedleaf.concurrentutil.util.ConcurrentUtil;
+import ca.spottedleaf.concurrentutil.util.TimeUtil;
+import java.lang.invoke.VarHandle;
+import java.util.Comparator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.LockSupport;
+import java.util.function.BooleanSupplier;
+
+public final class ScheduledTaskThreadPool {
+
+ public static final long DEADLINE_NOT_SET = Long.MIN_VALUE;
+
+ private final ThreadFactory threadFactory;
+ private final long stealThresholdNS;
+ private final long taskTimeSliceNS;
+
+ private final COWArrayList<TickThreadRunner> coreThreads = new COWArrayList<>(TickThreadRunner.class);
+ private final COWArrayList<TickThreadRunner> aliveThreads = new COWArrayList<>(TickThreadRunner.class);
+
+ private long runnerIdGenerator;
+ private boolean shutdown;
+
+ private final ConcurrentSkipListMap<WaitState, WaitState> waitingOrIdleRunners = new ConcurrentSkipListMap<>(WaitState.OLDEST_FIRST);
+
+ private final ConcurrentSkipListMap<ScheduledTickTask, ScheduledTickTask> unwatchedScheduledTicks = new ConcurrentSkipListMap<>(ScheduledTickTask.TICK_COMPARATOR);
+ private final ConcurrentSkipListMap<ScheduledTickTask, ScheduledTickTask> scheduledTasks = new ConcurrentSkipListMap<>(ScheduledTickTask.TASK_COMPARATOR);
+
+ public ScheduledTaskThreadPool(final ThreadFactory threadFactory, final long stealThresholdNS,
+ final long taskTimeSliceNS) {
+ this.threadFactory = threadFactory;
+ this.stealThresholdNS = stealThresholdNS;
+ this.taskTimeSliceNS = taskTimeSliceNS;
+
+ if (threadFactory == null) {
+ throw new IllegalStateException("Null thread factory");
+ }
+ if (stealThresholdNS < 0L) {
+ throw new IllegalArgumentException("Steal threshold must be >= 0");
+ }
+ if (taskTimeSliceNS <= 0L) {
+ throw new IllegalArgumentException("Task time slice must be > 0");
+ }
+ }
+
+ private static <K,V> K firstEntry(final ConcurrentSkipListMap<K, V> map) {
+ final Map.Entry<K,V> first = map.firstEntry();
+ return first == null ? null : first.getKey();
+ }
+
+ private static ScheduledTickTask findFirstNonTaken(final ConcurrentSkipListMap<ScheduledTickTask, ScheduledTickTask> map) {
+ ScheduledTickTask first;
+ while ((first = firstEntry(map)) != null && first.isTaken()) {
+ map.remove(first);
+ }
+
+ return first;
+ }
+
+ private static ScheduledTickTask findFirstNonTakenNonWatched(final ConcurrentSkipListMap<ScheduledTickTask, ScheduledTickTask> map) {
+ ScheduledTickTask first;
+ while ((first = firstEntry(map)) != null && (first.isTaken() || first.isWatched())) {
+ map.remove(first);
+ if (!first.isTaken() && !first.isWatched()) {
+ // handle race condition: unwatched after removal
+ map.put(first, first);
+ }
+ }
+
+ return first;
+ }
+
+ private static Thread[] getThreads(final COWArrayList<TickThreadRunner> list) {
+ final TickThreadRunner[] runners = list.getArray();
+ final Thread[] ret = new Thread[runners.length];
+
+ for (int i = 0; i < ret.length; ++i) {
+ ret[i] = runners[i].thread;
+ }
+
+ return ret;
+ }
+
+ /**
+ * Returns an array of the underlying scheduling threads.
+ */
+ public Thread[] getCoreThreads() {
+ return getThreads(this.coreThreads);
+ }
+
+ /**
+ * Returns an array of the underlying scheduling threads which are alive.
+ */
+ public Thread[] getAliveThreads() {
+ return getThreads(this.aliveThreads);
+ }
+
+ /**
+ * Adjusts the number of core threads to the specified threads. Has no effect if shutdown.
+ * Lowering the number of core threads will cause some scheduled tasks to fail to meet their scheduled start
+ * deadlines by up to the task steal time.
+ * @param threads New number of threads
+ * @return Returns this thread pool
+ */
+ public ScheduledTaskThreadPool setCoreThreads(final int threads) {
+ synchronized (this) {
+ if (this.shutdown) {
+ return this;
+ }
+
+ final TickThreadRunner[] currRunners = this.coreThreads.getArray();
+ if (currRunners.length == threads) {
+ return this;
+ }
+
+ if (threads < currRunners.length) {
+ // we need to trim threads
+ for (int i = 0, difference = currRunners.length - threads; i < difference; ++i) {
+ final TickThreadRunner remove = currRunners[currRunners.length - i - 1];
+
+ remove.halt();
+ this.coreThreads.remove(remove);
+ }
+
+ // force remaining runners to task steal
+ this.interruptAllRunners();
+
+ return this;
+ } else {
+ // we need to add threads
+ for (int i = 0, difference = threads - currRunners.length; i < difference; ++i) {
+ final TickThreadRunner runner = new TickThreadRunner(this, this.runnerIdGenerator++);
+ final Thread thread = runner.thread = this.threadFactory.newThread(runner);
+
+ this.coreThreads.add(runner);
+ this.aliveThreads.add(runner);
+
+ thread.start();
+ }
+
+ return this;
+ }
+ }
+ }
+
+ /**
+ * Attempts to prevent further execution of tasks.
+ */
+ public void halt() {
+ synchronized (this) {
+ this.shutdown = true;
+ }
+
+ for (final TickThreadRunner runner : this.coreThreads.getArray()) {
+ runner.halt();
+ }
+ }
+
+ /**
+ * Waits until all threads in this pool have shutdown, or until the specified time has passed.
+ * @param msToWait Maximum time to wait.
+ * @return {@code false} if the maximum time passed, {@code true} otherwise.
+ */
+ public boolean join(final long msToWait) {
+ try {
+ return this.join(msToWait, false);
+ } catch (final InterruptedException ex) {
+ throw new IllegalStateException(ex);
+ }
+ }
+
+ /**
+ * Waits until all threads in this pool have shutdown, or until the specified time has passed.
+ * @param msToWait Maximum time to wait.
+ * @return {@code false} if the maximum time passed, {@code true} otherwise.
+ * @throws InterruptedException If this thread is interrupted.
+ */
+ public boolean joinInterruptable(final long msToWait) throws InterruptedException {
+ return this.join(msToWait, true);
+ }
+
+ private boolean join(final long msToWait, final boolean interruptable) throws InterruptedException {
+ final long nsToWait = msToWait * (1000 * 1000);
+ final long start = System.nanoTime();
+ final long deadline = start + nsToWait;
+ boolean interrupted = false;
+ try {
+ for (final TickThreadRunner runner : this.aliveThreads.getArray()) {
+ final Thread thread = runner.thread;
+ for (;;) {
+ if (!thread.isAlive()) {
+ break;
+ }
+ final long current = System.nanoTime();
+ if (current >= deadline && msToWait > 0L) {
+ return false;
+ }
+
+ try {
+ thread.join(msToWait <= 0L ? 0L : Math.max(1L, (deadline - current) / (1000 * 1000)));
+ } catch (final InterruptedException ex) {
+ if (interruptable) {
+ throw ex;
+ }
+ interrupted = true;
+ }
+ }
+ }
+
+ return true;
+ } finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ private void interruptAllRunners() {
+ for (final TickThreadRunner runner : this.coreThreads.getArray()) {
+ runner.interrupt();
+ }
+ }
+
+ private void interruptOneRunner() {
+ for (final TickThreadRunner runner : this.coreThreads.getArray()) {
+ if (runner.interrupt()) {
+ return;
+ }
+ }
+ }
+
+ private void insert(final SchedulableTick tick, final boolean hasTasks) {
+ final long scheduleTime = tick.getScheduledStart();
+ final long timeNow = System.nanoTime();
+
+ for (;;) {
+ final Map.Entry<WaitState, WaitState> lastIdle = this.waitingOrIdleRunners.firstEntry();
+ final WaitState waitState;
+ if (lastIdle == null // no waiting threads or
+ // scheduled time is past latest waiting time
+ || ((waitState = lastIdle.getKey()).deadline != DEADLINE_NOT_SET && waitState.deadline - scheduleTime < 0L)) {
+ // insert hoping to be stolen
+ final ScheduledTickTask task = tick.task = new ScheduledTickTask(
+ tick,
+ // offset start by steal threshold so that it will hopefully start at its scheduled time
+ scheduleTime - this.stealThresholdNS,
+ timeNow,
+ null
+ );
+
+ this.unwatchedScheduledTicks.put(task, task);
+ if (hasTasks) {
+ this.scheduledTasks.put(task, task);
+ }
+
+ if (!this.waitingOrIdleRunners.isEmpty()) {
+ // handle race condition: all threads went to sleep since we checked
+ this.interruptOneRunner();
+ }
+ break;
+ } else {
+ // try to schedule to the runner
+ if (this.waitingOrIdleRunners.remove(waitState) == null) {
+ // failed, try again
+ continue;
+ }
+
+ final ScheduledTickTask task = tick.task = new ScheduledTickTask(tick, scheduleTime, timeNow, waitState.runner);
+
+ this.unwatchedScheduledTicks.put(task, task);
+ waitState.runner.scheduledTicks.put(task, task);
+ if (hasTasks) {
+ this.scheduledTasks.put(task, task);
+ waitState.runner.scheduledTasks.put(task, task);
+ }
+
+ if (!waitState.runner.interrupt() && waitState.runner.isHalted()) {
+ // handle race condition: runner we selected was halted
+ this.interruptOneRunner();
+ }
+ break;
+ }
+ }
+
+ if (!hasTasks && tick.hasTasks()) {
+ // handle race condition where tasks were added during scheduling
+ this.notifyTasks(tick);
+ }
+ }
+
+ /**
+ * Schedules the specified task to be executed on this thread pool.
+ * @param tick Specified task
+ * @throws IllegalStateException If the task is already scheduled
+ * @see SchedulableTick
+ */
+ public void schedule(final SchedulableTick tick) {
+ final boolean hasTasks = tick.hasTasks();
+ if ((!hasTasks && !tick.setScheduled()) || (hasTasks && !tick.setScheduledTasks())) {
+ throw new IllegalStateException("Task is already scheduled");
+ }
+
+ this.insert(tick, hasTasks);
+ }
+
+ /**
+ * Indicates that intermediate tasks are available to be executed by the task.
+ * @param tick The specified task
+ * @see SchedulableTick
+ */
+ public void notifyTasks(final SchedulableTick tick) {
+ if (!tick.isScheduled() || !tick.upgradeToScheduledTasks()) {
+ return;
+ }
+
+ final ScheduledTickTask task = tick.task;
+ if (task == null || task.isTaken()) {
+ // will be handled by scheduling code
+ return;
+ }
+
+ final TickThreadRunner runner = task.owner;
+ this.scheduledTasks.put(task, task);
+ if (runner != null) {
+ runner.scheduledTasks.put(task, task);
+ runner.interruptIfWaiting();
+ }
+ }
+
+ /**
+ * Returns {@code false} if the task is not scheduled or is cancelled, returns {@code true} if the task was
+ * cancelled by this thread
+ */
+ public boolean cancel(final SchedulableTick tick) {
+ final boolean ret = tick.cancel();
+
+ if (!ret) {
+ // nothing to do here
+ return ret;
+ }
+
+ // try to remove task from queues
+ final ScheduledTickTask task = tick.task;
+ if (task != null && task.take()) {
+ this.unwatchedScheduledTicks.remove(task);
+ this.scheduledTasks.remove(task);
+ final TickThreadRunner owner = task.owner;
+ if (owner != null) {
+ owner.scheduledTicks.remove(task);
+ owner.scheduledTasks.remove(task);
+ }
+ }
+
+ return ret;
+ }
+
+ /**
+ * Represents a tickable task that can be scheduled into a {@link ScheduledTaskThreadPool}.
+ * <p>
+ * A tickable task is expected to run on a fixed interval, which is determined by
+ * the {@link ScheduledTaskThreadPool}.
+ * </p>
+ * <p>
+ * A tickable task can have intermediate tasks that can be executed before its tick method is ran. Instead of
+ * the {@link ScheduledTaskThreadPool} parking in-between ticks, the scheduler will instead drain
+ * intermediate tasks from scheduled tasks. The parsing of intermediate tasks allows the scheduler to take
+ * advantage of downtime to reduce the intermediate task load from tasks once they begin ticking.
+ * </p>
+ * <p>
+ * It is guaranteed that {@link #runTick()} and {@link #runTasks(BooleanSupplier)} are never
+ * invoked in parallel.
+ * It is required that when intermediate tasks are scheduled, that {@link ScheduledTaskThreadPool#notifyTasks(SchedulableTick)}
+ * is invoked for any scheduled task - otherwise, {@link #runTasks(BooleanSupplier)} may not be invoked to
+ * parse intermediate tasks.
+ * </p>
+ */
+ public static abstract class SchedulableTick {
+ private static final AtomicLong ID_GENERATOR = new AtomicLong();
+ public final long id = ID_GENERATOR.getAndIncrement();
+
+ private long scheduledStart = DEADLINE_NOT_SET;
+
+ private static final int STATE_UNSCHEDULED = 1 << 0;
+ private static final int STATE_SCHEDULED = 1 << 1;
+ private static final int STATE_SCHEDULED_TASKS = 1 << 2;
+ private static final int STATE_TICKING = 1 << 3;
+ private static final int STATE_TASKS = 1 << 4;
+ private static final int STATE_TICKING_CANCELLED = 1 << 5;
+ private static final int STATE_TASKS_CANCELLED = 1 << 6;
+ private static final int STATE_CANCELLED = 1 << 7;
+ private volatile int state = STATE_UNSCHEDULED;
+ private static final VarHandle STATE_HANDLE = ConcurrentUtil.getVarHandle(SchedulableTick.class, "state", int.class);
+
+ private volatile ScheduledTickTask task;
+
+ private int getStateVolatile() {
+ return (int)STATE_HANDLE.getVolatile(this);
+ }
+
+ private void setStateVolatile(final int value) {
+ STATE_HANDLE.setVolatile(this, value);
+ }
+
+ private int compareAndExchangeStateVolatile(final int expect, final int update) {
+ return (int)STATE_HANDLE.compareAndExchange(this, expect, update);
+ }
+
+ private boolean isScheduled() {
+ return this.getStateVolatile() == STATE_SCHEDULED;
+ }
+
+ private boolean upgradeToScheduledTasks() {
+ return STATE_SCHEDULED == this.compareAndExchangeStateVolatile(STATE_SCHEDULED, STATE_SCHEDULED_TASKS);
+ }
+
+ private boolean setScheduled() {
+ return STATE_UNSCHEDULED == this.compareAndExchangeStateVolatile(STATE_UNSCHEDULED, STATE_SCHEDULED);
+ }
+
+ private boolean setScheduledTasks() {
+ return STATE_UNSCHEDULED == this.compareAndExchangeStateVolatile(STATE_UNSCHEDULED, STATE_SCHEDULED_TASKS);
+ }
+
+ private boolean cancel() {
+ for (int currState = this.getStateVolatile();;) {
+ switch (currState) {
+ case STATE_UNSCHEDULED: {
+ return false;
+ }
+ case STATE_SCHEDULED:
+ case STATE_SCHEDULED_TASKS: {
+ if (currState == (currState = this.compareAndExchangeStateVolatile(currState, STATE_CANCELLED))) {
+ return true;
+ }
+ continue;
+ }
+ case STATE_TICKING: {
+ if (currState == (currState = this.compareAndExchangeStateVolatile(currState, STATE_TICKING_CANCELLED))) {
+ return true;
+ }
+ continue;
+ }
+ case STATE_TASKS: {
+ if (currState == (currState = this.compareAndExchangeStateVolatile(currState, STATE_TASKS_CANCELLED))) {
+ return true;
+ }
+ continue;
+ }
+ case STATE_TICKING_CANCELLED:
+ case STATE_TASKS_CANCELLED:
+ case STATE_CANCELLED: {
+ return false;
+ }
+ default: {
+ throw new IllegalStateException("Unknown state: " + currState);
+ }
+ }
+ }
+ }
+
+ private boolean markTicking() {
+ for (int currState = this.getStateVolatile();;) {
+ switch (currState) {
+ case STATE_UNSCHEDULED: {
+ throw new IllegalStateException();
+ }
+ case STATE_SCHEDULED:
+ case STATE_SCHEDULED_TASKS: {
+ if (currState == (currState = this.compareAndExchangeStateVolatile(currState, STATE_TICKING))) {
+ return true;
+ }
+ continue;
+ }
+ case STATE_TICKING:
+ case STATE_TASKS:
+ case STATE_TICKING_CANCELLED:
+ case STATE_TASKS_CANCELLED:
+ case STATE_CANCELLED: {
+ return false;
+ }
+ default: {
+ throw new IllegalStateException("Unknown state: " + currState);
+ }
+ }
+ }
+ }
+
+ private boolean markTasks() {
+ for (int currState = this.getStateVolatile();;) {
+ switch (currState) {
+ case STATE_UNSCHEDULED: {
+ throw new IllegalStateException();
+ }
+ case STATE_SCHEDULED:
+ case STATE_SCHEDULED_TASKS: {
+ if (currState == (currState = this.compareAndExchangeStateVolatile(currState, STATE_TASKS))) {
+ return true;
+ }
+ continue;
+ }
+ case STATE_TICKING:
+ case STATE_TASKS:
+ case STATE_TICKING_CANCELLED:
+ case STATE_TASKS_CANCELLED:
+ case STATE_CANCELLED: {
+ return false;
+ }
+ default: {
+ throw new IllegalStateException("Unknown state: " + currState);
+ }
+ }
+ }
+ }
+
+ private boolean canBeTicked() {
+ final int currState = this.getStateVolatile();
+ switch (currState) {
+ case STATE_UNSCHEDULED: {
+ throw new IllegalStateException();
+ }
+ case STATE_SCHEDULED:
+ case STATE_SCHEDULED_TASKS: {
+ return true;
+ }
+ case STATE_TICKING:
+ case STATE_TASKS:
+ case STATE_TICKING_CANCELLED:
+ case STATE_TASKS_CANCELLED:
+ case STATE_CANCELLED: {
+ return false;
+ }
+ default: {
+ throw new IllegalStateException("Unknown state: " + currState);
+ }
+ }
+ }
+
+ protected final long getScheduledStart() {
+ return this.scheduledStart;
+ }
+
+ /**
+ * If this task is scheduled, then this may only be invoked during {@link #runTick()}
+ */
+ protected final void setScheduledStart(final long value) {
+ this.scheduledStart = value;
+ }
+
+ /**
+ * Executes the tick.
+ * <p>
+ * It is the callee's responsibility to invoke {@link #setScheduledStart(long)} to adjust the start of
+ * the next tick.
+ * </p>
+ * @return {@code true} if the task should continue to be scheduled, {@code false} otherwise.
+ */
+ public abstract boolean runTick();
+
+ private boolean tick() {
+ if (!this.markTicking()) {
+ return false;
+ }
+
+ final boolean tickRes = this.runTick();
+
+ if (!tickRes) {
+ this.setStateVolatile(STATE_CANCELLED);
+ return false;
+ }
+
+ // move to scheduled
+ for (int currState = this.getStateVolatile();;) {
+ switch (currState) {
+ case STATE_TICKING: {
+ if (currState == (currState = this.compareAndExchangeStateVolatile(STATE_TICKING, STATE_SCHEDULED))) {
+ return true;
+ }
+ continue;
+ }
+ case STATE_TICKING_CANCELLED: {
+ if (currState == (currState = this.compareAndExchangeStateVolatile(STATE_TICKING_CANCELLED, STATE_CANCELLED))) {
+ return false;
+ }
+ continue;
+ }
+
+ default: {
+ throw new IllegalStateException("Unknown state: " + currState);
+ }
+ }
+ }
+ }
+
+ /**
+ * Returns whether this task has any intermediate tasks that can be executed.
+ */
+ public abstract boolean hasTasks();
+
+ /**
+ * @return {@code true} if the task should continue to be scheduled, {@code false} otherwise.
+ */
+ public abstract boolean runTasks(final BooleanSupplier canContinue);
+
+ private boolean tasks(final BooleanSupplier canContinue) {
+ if (!this.markTasks()) {
+ return false;
+ }
+
+ final boolean taskRes = this.runTasks(canContinue);
+
+ if (!taskRes) {
+ this.setStateVolatile(STATE_CANCELLED);
+ return false;
+ }
+
+ // move to scheduled
+ for (int currState = this.getStateVolatile();;) {
+ switch (currState) {
+ case STATE_TASKS: {
+ if (currState == (currState = this.compareAndExchangeStateVolatile(STATE_TASKS, STATE_SCHEDULED))) {
+ return true;
+ }
+ continue;
+ }
+ case STATE_TASKS_CANCELLED: {
+ if (currState == (currState = this.compareAndExchangeStateVolatile(STATE_TASKS_CANCELLED, STATE_CANCELLED))) {
+ return false;
+ }
+ continue;
+ }
+ default: {
+ throw new IllegalStateException("Unknown state: " + currState);
+ }
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "SchedulableTick:{" +
+ "class=" + this.getClass().getName() + "," +
+ "state=" + this.state + ","
+ + "}";
+ }
+ }
+
+ private static final class WaitState {
+
+ private static final Comparator<WaitState> OLDEST_FIRST = (final WaitState w1, final WaitState w2) -> {
+ final long d1 = w1.deadline;
+ final long d2 = w2.deadline;
+
+ if (d1 == DEADLINE_NOT_SET && d2 != DEADLINE_NOT_SET) {
+ return -1;
+ }
+ if (d1 != DEADLINE_NOT_SET && d2 == DEADLINE_NOT_SET) {
+ return 1;
+ }
+
+ final int timeCmp = TimeUtil.compareTimes(d2, d1);
+ if (timeCmp != 0) {
+ return timeCmp;
+ }
+
+ return Long.signum(w2.id - w1.id);
+ };
+
+ private final long id;
+ private final long deadline; // set to DEADLINE_NOT_SET if idle
+ private final TickThreadRunner runner;
+
+ private WaitState(final long id, final long deadline, final TickThreadRunner runner) {
+ this.id = id;
+ this.deadline = deadline;
+ this.runner = runner;
+ }
+ }
+
+ private static final class TickThreadRunner implements Runnable {
+
+ private final ScheduledTaskThreadPool scheduler;
+ private final long id;
+ private Thread thread;
+
+ // no scheduled ticks
+ private static final int STATE_IDLE = 1 << 0;
+ private static final int STATE_WAITING = 1 << 1;
+ private static final int STATE_TASKS = 1 << 2;
+ private static final int STATE_INTERRUPT = 1 << 3;
+ private static final int STATE_TICKING = 1 << 4;
+ private static final int STATE_HALTED = 1 << 5;
+ private volatile int state = STATE_INTERRUPT; // set to INTERRUPT initially so that tasks may be stolen on start
+ private static final VarHandle STATE_HANDLE = ConcurrentUtil.getVarHandle(TickThreadRunner.class, "state", int.class);
+
+ private WaitState waitState;
+ private ScheduledTickTask watch;
+
+ private final ConcurrentSkipListMap<ScheduledTickTask, ScheduledTickTask> scheduledTicks = new ConcurrentSkipListMap<>(ScheduledTickTask.TICK_COMPARATOR);
+ private final ConcurrentSkipListMap<ScheduledTickTask, ScheduledTickTask> scheduledTasks = new ConcurrentSkipListMap<>(ScheduledTickTask.TASK_COMPARATOR);
+
+ public TickThreadRunner(final ScheduledTaskThreadPool scheduler, final long id) {
+ this.scheduler = scheduler;
+ this.id = id;
+ }
+
+ private int getStateVolatile() {
+ return (int)STATE_HANDLE.getVolatile(this);
+ }
+
+ private void setStateVolatile(final int value) {
+ STATE_HANDLE.setVolatile(this, value);
+ }
+
+ private int compareAndExchangeStateVolatile(final int expect, final int update) {
+ return (int)STATE_HANDLE.compareAndExchange(this, expect, update);
+ }
+
+ private boolean interruptIfWaiting() {
+ for (int curr = this.getStateVolatile();;) {
+ switch (curr) {
+ case STATE_INTERRUPT:
+ case STATE_TASKS:
+ case STATE_TICKING:
+ case STATE_HALTED: {
+ return false;
+ }
+ case STATE_IDLE:
+ case STATE_WAITING: {
+ if (curr == (curr = this.compareAndExchangeStateVolatile(curr, STATE_INTERRUPT))) {
+ LockSupport.unpark(this.thread);
+ return true;
+ }
+ continue;
+ }
+
+ default: {
+ throw new IllegalStateException("Unknown state: " + curr);
+ }
+ }
+ }
+ }
+
+ private boolean interrupt() {
+ for (int curr = this.getStateVolatile();;) {
+ switch (curr) {
+ case STATE_INTERRUPT:
+ case STATE_TICKING:
+ case STATE_HALTED: {
+ return false;
+ }
+ case STATE_IDLE:
+ case STATE_WAITING:
+ case STATE_TASKS: {
+ if (curr == (curr = this.compareAndExchangeStateVolatile(curr, STATE_INTERRUPT))) {
+ if (curr == STATE_IDLE || curr == STATE_WAITING) {
+ LockSupport.unpark(this.thread);
+ }
+ return true;
+ }
+ continue;
+ }
+
+ default: {
+ throw new IllegalStateException("Unknown state: " + curr);
+ }
+ }
+ }
+ }
+
+ private void halt() {
+ this.setStateVolatile(STATE_HALTED);
+ }
+
+ private boolean isHalted() {
+ return STATE_HALTED == this.getStateVolatile();
+ }
+
+ private void setupWaitState(final long deadline) {
+ if (this.waitState != null) {
+ throw new IllegalStateException("Waitstate already set");
+ }
+ this.waitState = new WaitState(this.id, deadline, this);
+ this.scheduler.waitingOrIdleRunners.put(this.waitState, this.waitState);
+ }
+
+ private void cleanWaitState() {
+ this.scheduler.waitingOrIdleRunners.remove(this.waitState);
+ this.waitState = null;
+ }
+
+ private ScheduledTickTask findTick() {
+ while (this.getStateVolatile() == STATE_WAITING) {
+ final ScheduledTickTask globalFirst = findFirstNonTakenNonWatched(this.scheduler.unwatchedScheduledTicks);
+ final ScheduledTickTask ourFirst = findFirstNonTaken(this.scheduledTicks);
+
+ final ScheduledTickTask toWaitFor;
+ if (globalFirst == null) {
+ toWaitFor = ourFirst;
+ } else if (ourFirst == null) {
+ toWaitFor = globalFirst;
+ } else {
+ final long globalStart = globalFirst.tickStart + this.scheduler.stealThresholdNS;
+ final long ourStart = ourFirst.tickStart;
+
+ toWaitFor = ourStart - globalStart <= 0L ? ourFirst : globalFirst;
+ }
+
+ if (toWaitFor == null) {
+ // no tasks are scheduled
+
+ // move to idle state
+ this.setupWaitState(DEADLINE_NOT_SET);
+ this.compareAndExchangeStateVolatile(STATE_WAITING, STATE_IDLE);
+ if (!this.scheduledTicks.isEmpty() || !this.scheduler.unwatchedScheduledTicks.isEmpty()) {
+ // handle race condition: task added before we moved to idle
+ this.interrupt();
+ }
+
+ return toWaitFor;
+ }
+
+ if (toWaitFor == globalFirst) {
+ if (toWaitFor.watch()) {
+ this.scheduler.unwatchedScheduledTicks.remove(toWaitFor);
+ this.watch = toWaitFor;
+ } else if (toWaitFor != ourFirst) {
+ continue;
+ } // else: failed to watch, but we are waiting for our task
+ }
+
+ return toWaitFor;
+ }
+
+ return null;
+ }
+
+ private void cleanupWatch(final boolean wakeThread) {
+ if (this.watch != null) {
+ this.watch.unwatch();
+
+ if (!this.watch.isTaken()) {
+ this.scheduler.unwatchedScheduledTicks.put(this.watch, this.watch);
+
+ if (wakeThread) {
+ for (;;) {
+ final WaitState latestWaiter = firstEntry(this.scheduler.waitingOrIdleRunners);
+ if (latestWaiter == null) {
+ break;
+ }
+
+ // note: if the task is owned by the waiter, then its deadline should already be <= watch tick start
+ if (latestWaiter.deadline == DEADLINE_NOT_SET || latestWaiter.deadline - (this.watch.tickStart + this.scheduler.stealThresholdNS) > 0L) {
+ if (this.scheduler.waitingOrIdleRunners.remove(latestWaiter) == null || !latestWaiter.runner.interrupt()) {
+ continue;
+ }
+ break;
+ }
+ }
+ }
+ }
+
+ this.watch = null;
+ }
+ }
+
+ private boolean findEarlierTask(final ScheduledTickTask task) {
+ final ScheduledTickTask globalFirst = findFirstNonTakenNonWatched(this.scheduler.unwatchedScheduledTicks);
+ final ScheduledTickTask ourFirst = findFirstNonTaken(this.scheduledTicks);
+
+ if (globalFirst != null && (globalFirst.tickStart + this.scheduler.stealThresholdNS) - task.tickStart < 0L) {
+ return true;
+ }
+
+ if (ourFirst != null && ourFirst.tickStart - task.tickStart < 0L) {
+ return true;
+ }
+
+ return false;
+ }
+
+ private void reinsert(final ScheduledTickTask tick, final TickThreadRunner owner) {
+ final ScheduledTickTask newTask = tick.tick.task = new ScheduledTickTask(
+ tick.tick, tick.tick.getScheduledStart(), System.nanoTime(), owner
+ );
+
+ this.scheduler.unwatchedScheduledTicks.put(newTask, newTask);
+ if (owner != null) {
+ owner.scheduledTicks.put(newTask, newTask);
+ }
+
+ if (newTask.tick.hasTasks()) {
+ this.scheduler.notifyTasks(newTask.tick);
+ }
+ }
+
+ private void runTasks(final ScheduledTickTask tick, final long deadline) {
+ if (STATE_WAITING != this.compareAndExchangeStateVolatile(STATE_WAITING, STATE_TASKS)) {
+ // interrupted or halted
+ return;
+ }
+
+ if (!tick.take()) {
+ this.compareAndExchangeStateVolatile(STATE_TASKS, STATE_WAITING);
+ return;
+ }
+
+ // remove from queues
+ this.scheduler.unwatchedScheduledTicks.remove(tick);
+ this.scheduler.scheduledTasks.remove(tick);
+ if (tick.owner != null) {
+ tick.owner.scheduledTicks.remove(tick);
+ tick.owner.scheduledTasks.remove(tick);
+ }
+
+ final BooleanSupplier canContinue = () -> {
+ return TickThreadRunner.this.getStateVolatile() == STATE_TASKS && (System.nanoTime() - deadline < 0L);
+ };
+
+ if (tick.tick.tasks(canContinue)) {
+ this.reinsert(tick, tick.owner == null ? this : tick.owner);
+ }
+
+ this.compareAndExchangeStateVolatile(STATE_TASKS, STATE_WAITING);
+ }
+
+ private ScheduledTickTask waitForTick() {
+ final ScheduledTickTask tick = this.findTick();
+
+ if (tick == null) {
+ return tick;
+ }
+
+ final long tickDeadline = tick.owner == this ? tick.tickStart : tick.tickStart + this.scheduler.stealThresholdNS;
+
+ this.setupWaitState(tickDeadline);
+ // should already be in STATE_WAITING (unless interrupted)
+
+ for (;;) {
+ if (this.getStateVolatile() != STATE_WAITING || tick.isTaken() || this.findEarlierTask(tick)) {
+ this.cleanupWatch(false);
+ this.cleanWaitState();
+ // start of loop may only be idle or interrupt
+ this.interrupt();
+ return null;
+ }
+
+ final ScheduledTickTask ourTask = findFirstNonTaken(this.scheduledTasks);
+ final ScheduledTickTask globalTask = findFirstNonTaken(this.scheduler.scheduledTasks);
+
+ final long timeNow = System.nanoTime();
+
+ if (timeNow - tickDeadline >= 0L) {
+ if (!tick.take()) {
+ continue;
+ }
+ this.cleanWaitState();
+ return tick;
+ }
+
+ if (ourTask == null && globalTask == null) {
+ // nothing to do, so just park
+ Thread.interrupted();
+ LockSupport.parkNanos("waiting", tickDeadline - timeNow);
+ continue;
+ }
+
+ final ScheduledTickTask toTask;
+ if (ourTask == null) {
+ toTask = globalTask;
+ } else if (globalTask == null) {
+ toTask = ourTask;
+ } else {
+ // try to use global task
+ if (globalTask.lastTaskParse + this.scheduler.taskTimeSliceNS - ourTask.lastTaskParse < 0L) {
+ toTask = globalTask;
+ } else {
+ toTask = ourTask;
+ }
+ }
+
+ long deadline = tickDeadline;
+ deadline = Math.min(deadline, timeNow + this.scheduler.taskTimeSliceNS);
+ deadline = Math.min(deadline, toTask.tickStart);
+
+ this.runTasks(toTask, deadline);
+ }
+ }
+
+ private boolean moveToTickingState() {
+ for (int curr = this.getStateVolatile();;) {
+ switch (curr) {
+ case STATE_HALTED: {
+ return false;
+ }
+ case STATE_WAITING:
+ case STATE_INTERRUPT: { // interrupted too late!
+ if (curr == (curr = this.compareAndExchangeStateVolatile(curr, STATE_TICKING))) {
+ if (curr == STATE_INTERRUPT) {
+ // pass interrupt to another thread
+ this.scheduler.interruptOneRunner();
+ }
+ return true;
+ }
+ continue;
+ }
+
+ default: {
+ throw new IllegalStateException("Unknown state: " + curr);
+ }
+ }
+ }
+ }
+
+ private void doTick(final ScheduledTickTask tick) {
+ if (tick.tick.tick()) {
+ this.reinsert(tick, this);
+ }
+ }
+
+ private void doRun() {
+ for (;;) {
+ if (this.waitState != null) {
+ if (this.waitState.deadline != DEADLINE_NOT_SET) {
+ throw new IllegalStateException();
+ }
+
+ while (this.getStateVolatile() == STATE_IDLE) {
+ Thread.interrupted();
+ LockSupport.park("idling");
+ }
+
+ this.scheduler.waitingOrIdleRunners.remove(this.waitState);
+ this.waitState = null;
+ }
+
+ final int currState = this.compareAndExchangeStateVolatile(STATE_INTERRUPT, STATE_WAITING);
+ if (currState == STATE_HALTED) {
+ return;
+ } else if (currState != STATE_INTERRUPT) {
+ throw new IllegalStateException("State must be HALTED or INTERRUPT at beginning of run loop");
+ }
+
+ while (this.getStateVolatile() == STATE_WAITING) {
+ // try to find a tick
+ final ScheduledTickTask toTick = this.waitForTick();
+
+ if (toTick == null) {
+ // no ticks -> go idle
+ // waitstate should already be setup
+ // OR we were interrupted and need to clear state
+ // OR we were halted and need to exit
+ break;
+ }
+
+ if (!this.moveToTickingState()) {
+ // halted
+ // re-insert task, since we took it
+ this.scheduler.insert(toTick.tick, toTick.tick.hasTasks());
+ break;
+ }
+
+ this.doTick(toTick);
+
+ if (STATE_TICKING != this.compareAndExchangeStateVolatile(STATE_TICKING, STATE_WAITING)) {
+ // halted
+ break;
+ }
+
+ continue;
+ }
+ }
+ }
+
+ private void begin() {
+ this.setupWaitState(DEADLINE_NOT_SET);
+ }
+
+ private void die() {
+ // note: this should also handle unexpected shutdowns gracefully
+
+ this.cleanupWatch(false);
+ if (this.waitState != null) {
+ this.scheduler.waitingOrIdleRunners.remove(this.waitState);
+ this.waitState = null;
+ }
+ this.scheduler.aliveThreads.remove(this);
+ if (this.getStateVolatile() == STATE_HALTED) {
+ // start task stealing for our tasks
+ this.scheduler.interruptAllRunners();
+ }
+ }
+
+ @Override
+ public void run() {
+ try {
+ this.begin();
+ this.doRun();
+ } finally {
+ this.die();
+ }
+ }
+ }
+
+ private static final class ScheduledTickTask {
+ private static final Comparator<ScheduledTickTask> TICK_COMPARATOR = (final ScheduledTickTask t1, final ScheduledTickTask t2) -> {
+ final int timeCmp = TimeUtil.compareTimes(t1.tickStart, t2.tickStart);
+ if (timeCmp != 0L) {
+ return timeCmp;
+ }
+
+ return Long.signum(t1.tick.id - t2.tick.id);
+ };
+ private static final Comparator<ScheduledTickTask> TASK_COMPARATOR = (final ScheduledTickTask t1, final ScheduledTickTask t2) -> {
+ final int timeCmp = TimeUtil.compareTimes(t1.lastTaskParse, t2.lastTaskParse);
+ if (timeCmp != 0L) {
+ return timeCmp;
+ }
+
+ return Long.signum(t1.tick.id - t2.tick.id);
+ };
+
+ private final SchedulableTick tick;
+ private final long tickStart;
+ private final long lastTaskParse;
+ private final TickThreadRunner owner;
+
+ private volatile boolean taken;
+ private static final VarHandle TAKEN_HANDLE = ConcurrentUtil.getVarHandle(ScheduledTickTask.class, "taken", boolean.class);
+ private volatile boolean watched;
+ private static final VarHandle WATCHED_HANDLE = ConcurrentUtil.getVarHandle(ScheduledTickTask.class, "watched", boolean.class);
+
+ private ScheduledTickTask(final SchedulableTick tick, final long tickStart, final long lastTaskParse,
+ final TickThreadRunner owner) {
+ this.tick = tick;
+ this.tickStart = tickStart;
+ this.lastTaskParse = lastTaskParse;
+ this.owner = owner;
+ }
+
+ public boolean take() {
+ return !(boolean)TAKEN_HANDLE.getVolatile(this) && !(boolean)TAKEN_HANDLE.compareAndExchange(this, false, true);
+ }
+
+ public boolean isTaken() {
+ return (boolean)TAKEN_HANDLE.getVolatile(this);
+ }
+
+ public boolean watch() {
+ return !(boolean)WATCHED_HANDLE.getVolatile(this) && !(boolean)WATCHED_HANDLE.compareAndExchange(this, false, true);
+ }
+
+ public boolean unwatch() {
+ return (boolean)WATCHED_HANDLE.compareAndExchange(this, true, false);
+ }
+
+ public boolean isWatched() {
+ return (boolean)TAKEN_HANDLE.getVolatile(this);
+ }
+ }
+}
diff --git a/io/papermc/paper/threadedregions/TickData.java b/io/papermc/paper/threadedregions/TickData.java
index d4d80a69488f57704f1b3dc74cb379de36e80ec0..0383e4dcd611a7568597f46308060f3d7288a564 100644
--- a/io/papermc/paper/threadedregions/TickData.java
+++ b/io/papermc/paper/threadedregions/TickData.java
@@ -20,6 +20,10 @@ public final class TickData {
}
public void addDataFrom(final TickRegionScheduler.TickTime time) {
+ if (!time.isTickExecution()) {
+ // TODO fix later
+ return;
+ }
final long start = time.tickStart();
TickRegionScheduler.TickTime first;
diff --git a/io/papermc/paper/threadedregions/TickRegionScheduler.java b/io/papermc/paper/threadedregions/TickRegionScheduler.java
index 7123b3eb2f2e52946b8ef9de993a6828eb0bb6f7..a8608e8bed64a4da4ed340ab3837b082d6715437 100644
--- a/io/papermc/paper/threadedregions/TickRegionScheduler.java
+++ b/io/papermc/paper/threadedregions/TickRegionScheduler.java
@@ -1,6 +1,5 @@
package io.papermc.paper.threadedregions;
-import ca.spottedleaf.concurrentutil.scheduler.SchedulerThreadPool;
import ca.spottedleaf.concurrentutil.util.TimeUtil;
import ca.spottedleaf.moonrise.common.util.TickThread;
import com.mojang.logging.LogUtils;
@@ -41,10 +40,10 @@ public final class TickRegionScheduler {
}
// Folia end - watchdog
- private final SchedulerThreadPool scheduler;
+ private final ScheduledTaskThreadPool scheduler;
- public TickRegionScheduler(final int threads) {
- this.scheduler = new SchedulerThreadPool(threads, new ThreadFactory() {
+ public TickRegionScheduler() {
+ this.scheduler = new ScheduledTaskThreadPool(new ThreadFactory() {
private final AtomicInteger idGenerator = new AtomicInteger();
@Override
@@ -53,11 +52,15 @@ public final class TickRegionScheduler {
ret.setUncaughtExceptionHandler(TickRegionScheduler.this::uncaughtException);
return ret;
}
- });
+ }, TimeUnit.MILLISECONDS.toNanos(3L), TimeUnit.MILLISECONDS.toNanos(2L));
+ }
+
+ public void setThreads(final int threads) {
+ this.scheduler.setCoreThreads(threads);
}
public int getTotalThreadCount() {
- return this.scheduler.getThreads().length;
+ return this.scheduler.getAliveThreads().length;
}
private static void setTickingRegion(final ThreadedRegionizer.ThreadedRegion<TickRegions.TickRegionData, TickRegions.TickRegionSectionData> region) {
@@ -84,7 +87,7 @@ public final class TickRegionScheduler {
}
}
- private static void setTickTask(final SchedulerThreadPool.SchedulableTick task) {
+ private static void setTickTask(final ScheduledTaskThreadPool.SchedulableTick task) {
final Thread currThread = Thread.currentThread();
if (!(currThread instanceof TickThreadRunner tickThreadRunner)) {
throw new IllegalStateException("Must be tick thread runner");
@@ -127,7 +130,7 @@ public final class TickRegionScheduler {
* Returns the current ticking task, or {@code null} if there is no ticking region.
* If this thread is not a TickThread, then returns {@code null}.
*/
- public static SchedulerThreadPool.SchedulableTick getCurrentTickingTask() {
+ public static ScheduledTaskThreadPool.SchedulableTick getCurrentTickingTask() {
final Thread currThread = Thread.currentThread();
if (!(currThread instanceof TickThreadRunner tickThreadRunner)) {
return null;
@@ -165,22 +168,17 @@ public final class TickRegionScheduler {
region.markNonSchedulable();
}
- /**
- * Updates the tick start to the farthest into the future of its current scheduled time and the
- * provided time.
- * @return {@code false} if the region was not scheduled or is currently ticking or the specified time is less-than its
- * current start time, {@code true} if the next tick start was adjusted.
- */
- public boolean updateTickStartToMax(final RegionScheduleHandle region, final long newStart) {
- return this.scheduler.updateTickStartToMax(region, newStart);
- }
-
public boolean halt(final boolean sync, final long maxWaitNS) {
- return this.scheduler.halt(sync, maxWaitNS);
+ this.scheduler.halt();
+ if (!sync) {
+ return this.scheduler.getAliveThreads().length == 0;
+ }
+
+ return this.scheduler.join(maxWaitNS == 0L ? 0L : Math.max(1L, TimeUnit.NANOSECONDS.toMillis(maxWaitNS)));
}
void dumpAliveThreadTraces(final String reason) {
- for (final Thread thread : this.scheduler.getThreads()) {
+ for (final Thread thread : this.scheduler.getAliveThreads()) {
if (thread.isAlive()) {
TraceUtil.dumpTraceForThread(thread, reason);
}
@@ -191,16 +189,12 @@ public final class TickRegionScheduler {
this.scheduler.notifyTasks(region);
}
- public void init() {
- this.scheduler.start();
- }
-
private void uncaughtException(final Thread thread, final Throwable thr) {
LOGGER.error("Uncaught exception in tick thread \"" + thread.getName() + "\"", thr);
// prevent further ticks from occurring
// we CANNOT sync, because WE ARE ON A SCHEDULER THREAD
- this.scheduler.halt(false, 0L);
+ this.scheduler.halt();
MinecraftServer.getServer().stopServer();
}
@@ -210,7 +204,7 @@ public final class TickRegionScheduler {
// prevent further ticks from occurring
// we CANNOT sync, because WE ARE ON A SCHEDULER THREAD
- this.scheduler.halt(false, 0L);
+ this.scheduler.halt();
final ChunkPos center = handle.region == null ? null : handle.region.region.getCenterChunk();
final ServerLevel world = handle.region == null ? null : handle.region.world;
@@ -226,7 +220,7 @@ public final class TickRegionScheduler {
private ThreadedRegionizer.ThreadedRegion<TickRegions.TickRegionData, TickRegions.TickRegionSectionData> currentTickingRegion;
private RegionizedWorldData currentTickingWorldRegionizedData;
- private SchedulerThreadPool.SchedulableTick currentTickingTask;
+ private ScheduledTaskThreadPool.SchedulableTick currentTickingTask;
// Folia start - profiler
private ca.spottedleaf.leafprofiler.RegionizedProfiler.Handle profiler = ca.spottedleaf.leafprofiler.RegionizedProfiler.Handle.NO_OP_HANDLE;
// Folia end - profiler
@@ -236,7 +230,7 @@ public final class TickRegionScheduler {
}
}
- public static abstract class RegionScheduleHandle extends SchedulerThreadPool.SchedulableTick {
+ public static abstract class RegionScheduleHandle extends ScheduledTaskThreadPool.SchedulableTick {
protected long currentTick;
protected long lastTickStart;
@@ -258,7 +252,7 @@ public final class TickRegionScheduler {
public RegionScheduleHandle(final TickRegions.TickRegionData region, final long firstStart) {
this.currentTick = 0L;
- this.lastTickStart = SchedulerThreadPool.DEADLINE_NOT_SET;
+ this.lastTickStart = ScheduledTaskThreadPool.DEADLINE_NOT_SET;
this.tickTimes5s = new TickData(TimeUnit.SECONDS.toNanos(5L));
this.tickTimes15s = new TickData(TimeUnit.SECONDS.toNanos(15L));
this.tickTimes1m = new TickData(TimeUnit.MINUTES.toNanos(1L));
@@ -267,16 +261,16 @@ public final class TickRegionScheduler {
this.region = region;
this.setScheduledStart(firstStart);
- this.tickSchedule = new Schedule(firstStart == SchedulerThreadPool.DEADLINE_NOT_SET ? firstStart : firstStart - TIME_BETWEEN_TICKS);
+ this.tickSchedule = new Schedule(firstStart == ScheduledTaskThreadPool.DEADLINE_NOT_SET ? firstStart : firstStart - TIME_BETWEEN_TICKS);
}
/**
- * Subclasses should call this instead of {@link ca.spottedleaf.concurrentutil.scheduler.SchedulerThreadPool.SchedulableTick#setScheduledStart(long)}
+ * Subclasses should call this instead of {@link #setScheduledStart(long)}
* so that the tick schedule and scheduled start remain synchronised
*/
protected final void updateScheduledStart(final long to) {
this.setScheduledStart(to);
- this.tickSchedule.setLastPeriod(to == SchedulerThreadPool.DEADLINE_NOT_SET ? to : to - TIME_BETWEEN_TICKS);
+ this.tickSchedule.setLastPeriod(to == ScheduledTaskThreadPool.DEADLINE_NOT_SET ? to : to - TIME_BETWEEN_TICKS);
}
public final void markNonSchedulable() {
@@ -293,7 +287,7 @@ public final class TickRegionScheduler {
protected abstract void tickRegion(final int tickCount, final long startTime, final long scheduledEnd);
- protected abstract boolean runRegionTasks(final BooleanSupplier canContinue);
+ protected abstract void runRegionTasks(final BooleanSupplier canContinue);
protected abstract boolean hasIntermediateTasks();
@@ -303,9 +297,9 @@ public final class TickRegionScheduler {
}
@Override
- public final Boolean runTasks(final BooleanSupplier canContinue) {
+ public final boolean runTasks(final BooleanSupplier canContinue) {
if (this.cancelled.get()) {
- return null;
+ return false;
}
final long cpuStart = MEASURE_CPU_TIME ? THREAD_MX_BEAN.getCurrentThreadCpuTime() : 0L;
@@ -316,7 +310,7 @@ public final class TickRegionScheduler {
throw new IllegalStateException("Scheduled region should be acquirable");
}
// region was killed
- return null;
+ return false;
}
TickRegionScheduler.setTickTask(this);
@@ -326,31 +320,30 @@ public final class TickRegionScheduler {
synchronized (this) {
this.currentTickData = new TickTime(
- SchedulerThreadPool.DEADLINE_NOT_SET, SchedulerThreadPool.DEADLINE_NOT_SET, tickStart, cpuStart,
- SchedulerThreadPool.DEADLINE_NOT_SET, SchedulerThreadPool.DEADLINE_NOT_SET, MEASURE_CPU_TIME,
+ ScheduledTaskThreadPool.DEADLINE_NOT_SET, ScheduledTaskThreadPool.DEADLINE_NOT_SET, tickStart, cpuStart,
+ ScheduledTaskThreadPool.DEADLINE_NOT_SET, ScheduledTaskThreadPool.DEADLINE_NOT_SET, MEASURE_CPU_TIME,
false
);
this.currentTickingThread = Thread.currentThread();
}
- final boolean ret;
final FoliaWatchdogThread.RunningTick runningTick = new FoliaWatchdogThread.RunningTick(tickStart, this, Thread.currentThread()); // Folia - watchdog
WATCHDOG_THREAD.addTick(runningTick); // Folia - watchdog
try {
- ret = this.runRegionTasks(() -> {
+ this.runRegionTasks(() -> {
return !RegionScheduleHandle.this.cancelled.get() && canContinue.getAsBoolean();
});
} catch (final Throwable thr) {
this.scheduler.regionFailed(this, true, thr);
// don't release region for another tick
- return null;
+ return false;
} finally {
WATCHDOG_THREAD.removeTick(runningTick); // Folia - watchdog
final long tickEnd = System.nanoTime();
final long cpuEnd = MEASURE_CPU_TIME ? THREAD_MX_BEAN.getCurrentThreadCpuTime() : 0L;
final TickTime time = new TickTime(
- SchedulerThreadPool.DEADLINE_NOT_SET, SchedulerThreadPool.DEADLINE_NOT_SET,
+ ScheduledTaskThreadPool.DEADLINE_NOT_SET, ScheduledTaskThreadPool.DEADLINE_NOT_SET,
tickStart, cpuStart, tickEnd, cpuEnd, MEASURE_CPU_TIME, false
);
@@ -361,7 +354,7 @@ public final class TickRegionScheduler {
}
}
- return !this.markNotTicking() || this.cancelled.get() ? null : Boolean.valueOf(ret);
+ return this.markNotTicking() && !this.cancelled.get();
}
@Override
@@ -405,7 +398,7 @@ public final class TickRegionScheduler {
synchronized (this) {
this.currentTickData = new TickTime(
lastTickStart, scheduledStart, tickStart, cpuStart,
- SchedulerThreadPool.DEADLINE_NOT_SET, SchedulerThreadPool.DEADLINE_NOT_SET, MEASURE_CPU_TIME,
+ ScheduledTaskThreadPool.DEADLINE_NOT_SET, ScheduledTaskThreadPool.DEADLINE_NOT_SET, MEASURE_CPU_TIME,
true
);
this.currentTickingThread = Thread.currentThread();
@@ -582,7 +575,7 @@ public final class TickRegionScheduler {
* Only valid when {@link #isTickExecution()} is {@code true}.
*/
public boolean hasLastTick() {
- return this.previousTickStart != SchedulerThreadPool.DEADLINE_NOT_SET;
+ return this.previousTickStart != ScheduledTaskThreadPool.DEADLINE_NOT_SET;
}
/*
diff --git a/io/papermc/paper/threadedregions/TickRegions.java b/io/papermc/paper/threadedregions/TickRegions.java
index 988fe74578065c9464f5639e5cc6af79619edef5..1fe996286e048cb8a8d5ede45f27792bc04fec38 100644
--- a/io/papermc/paper/threadedregions/TickRegions.java
+++ b/io/papermc/paper/threadedregions/TickRegions.java
@@ -1,6 +1,5 @@
package io.papermc.paper.threadedregions;
-import ca.spottedleaf.concurrentutil.scheduler.SchedulerThreadPool;
import ca.spottedleaf.concurrentutil.util.TimeUtil;
import ca.spottedleaf.moonrise.patches.chunk_system.scheduling.ChunkHolderManager;
import com.mojang.logging.LogUtils;
@@ -12,6 +11,7 @@ import it.unimi.dsi.fastutil.objects.Reference2ReferenceOpenHashMap;
import it.unimi.dsi.fastutil.objects.ReferenceOpenHashSet;
import net.minecraft.server.MinecraftServer;
import net.minecraft.server.level.ServerLevel;
+import net.minecraft.server.level.ServerPlayer;
import org.slf4j.Logger;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
@@ -29,6 +29,7 @@ public final class TickRegions implements ThreadedRegionizer.RegionCallbacks<Tic
}
private static boolean initialised;
+
private static TickRegionScheduler scheduler;
public static TickRegionScheduler getScheduler() {
@@ -45,6 +46,11 @@ public final class TickRegions implements ThreadedRegionizer.RegionCallbacks<Tic
gridExponent = Math.min(31, gridExponent);
regionShift = gridExponent;
+ scheduler = new TickRegionScheduler();
+ }
+
+ public static void start() {
+ final GlobalConfiguration.ThreadedRegions config = GlobalConfiguration.get().threadedRegions;
int tickThreads;
if (config.threads <= 0) {
tickThreads = Runtime.getRuntime().availableProcessors() / 2;
@@ -57,7 +63,8 @@ public final class TickRegions implements ThreadedRegionizer.RegionCallbacks<Tic
tickThreads = config.threads;
}
- scheduler = new TickRegionScheduler(tickThreads);
+ scheduler.setThreads(tickThreads);
+
LOGGER.info("Regionised ticking is enabled with " + tickThreads + " tick threads");
}
@@ -171,7 +178,7 @@ public final class TickRegions implements ThreadedRegionizer.RegionCallbacks<Tic
private final Reference2ReferenceOpenHashMap<RegionizedData<?>, Object> regionizedData = new Reference2ReferenceOpenHashMap<>();
// tick data
- private ConcreteRegionTickHandle tickHandle = new ConcreteRegionTickHandle(this, SchedulerThreadPool.DEADLINE_NOT_SET);
+ private ConcreteRegionTickHandle tickHandle = new ConcreteRegionTickHandle(this, ScheduledTaskThreadPool.DEADLINE_NOT_SET);
// queue data
private final RegionizedTaskQueue.RegionTaskQueueData taskQueueData;
@@ -182,15 +189,64 @@ public final class TickRegions implements ThreadedRegionizer.RegionCallbacks<Tic
// async-safe read-only region data
private final RegionStats regionStats;
+ //
+ private final java.util.concurrent.atomic.AtomicBoolean hasPackets = new java.util.concurrent.atomic.AtomicBoolean(false);
+
public volatile ca.spottedleaf.leafprofiler.RegionizedProfiler.Handle profiler; // Folia - profiler
private TickRegionData(final ThreadedRegionizer.ThreadedRegion<TickRegionData, TickRegionSectionData> region) {
this.region = region;
this.world = region.regioniser.world;
- this.taskQueueData = new RegionizedTaskQueue.RegionTaskQueueData(this.world.taskQueueRegionData);
+ this.taskQueueData = new RegionizedTaskQueue.RegionTaskQueueData(this.world.taskQueueRegionData, this);
this.regionStats = new RegionStats();
}
+ public void setHasTasks() {
+ TickRegions.getScheduler().setHasTasks(this.tickHandle);
+ }
+
+ public void setHasPackets() {
+ if (!this.hasPackets.get() && !this.hasPackets.compareAndExchange(false, true)) {
+ this.setHasTasks();
+ }
+ }
+
+ public boolean drainOnePacket() {
+ if (!this.hasPackets.get()) {
+ return false;
+ }
+
+ final RegionizedWorldData worldData = this.world.getCurrentWorldData();
+ boolean hasPacketsNew = false;
+
+ for (final ServerPlayer player : worldData.getLocalPlayers()) {
+ if (!ca.spottedleaf.moonrise.common.util.TickThread.isTickThreadFor(player)) {
+ continue;
+ }
+ if (player.getBukkitEntity().executeOnePacket()) {
+ hasPacketsNew |= player.getBukkitEntity().hasPackets();
+ }
+ }
+
+ if (!hasPacketsNew) {
+ this.hasPackets.set(false);
+
+ // handle race condition: packet added during packet processing
+ for (final ServerPlayer player : worldData.getLocalPlayers()) {
+ if (player.getBukkitEntity().hasPackets()) {
+ this.hasPackets.set(true);
+ break;
+ }
+ }
+ }
+
+ return true;
+ }
+
+ public void drainPackets() {
+ while (this.drainOnePacket());
+ }
+
public RegionStats getRegionStats() {
return this.regionStats;
}
@@ -224,7 +280,7 @@ public final class TickRegions implements ThreadedRegionizer.RegionCallbacks<Tic
return ret;
}
- ret = regionizedData.createNewValue();
+ ret = regionizedData.createNewValue(this);
this.regionizedData.put(regionizedData, ret);
return ret;
@@ -242,6 +298,10 @@ public final class TickRegions implements ThreadedRegionizer.RegionCallbacks<Tic
for (final ThreadedRegionizer.ThreadedRegion<TickRegionData, TickRegionSectionData> region : regions) {
final TickRegionData data = region.getData();
data.tickHandle.copyDeadlineAndTickCount(this.tickHandle);
+ // just be lazy about this one, it's not very important
+ if (this.hasPackets.getOpaque()) {
+ data.hasPackets.setOpaque(true);
+ }
}
// generic regionised data
@@ -309,6 +369,10 @@ public final class TickRegions implements ThreadedRegionizer.RegionCallbacks<Tic
// there's not really a great solution to the tick problem, no matter what it'll be messed up
// we will pick the greatest time delay so that tps will not exceed TICK_RATE
data.tickHandle.updateSchedulingToMax(this.tickHandle);
+ // just be lazy about this one, it's not very important
+ if (this.hasPackets.getOpaque()) {
+ data.hasPackets.setOpaque(true);
+ }
// generic regionised data
final long fromTickOffset = currentTickTo - currentTickFrom; // see merge jd
@@ -350,11 +414,11 @@ public final class TickRegions implements ThreadedRegionizer.RegionCallbacks<Tic
}
private void updateSchedulingToMax(final ConcreteRegionTickHandle from) {
- if (from.getScheduledStart() == SchedulerThreadPool.DEADLINE_NOT_SET) {
+ if (from.getScheduledStart() == ScheduledTaskThreadPool.DEADLINE_NOT_SET) {
return;
}
- if (this.getScheduledStart() == SchedulerThreadPool.DEADLINE_NOT_SET) {
+ if (this.getScheduledStart() == ScheduledTaskThreadPool.DEADLINE_NOT_SET) {
this.updateScheduledStart(from.getScheduledStart());
return;
}
@@ -365,7 +429,7 @@ public final class TickRegions implements ThreadedRegionizer.RegionCallbacks<Tic
private void copyDeadlineAndTickCount(final ConcreteRegionTickHandle from) {
this.currentTick = from.currentTick;
- if (from.getScheduledStart() == SchedulerThreadPool.DEADLINE_NOT_SET) {
+ if (from.getScheduledStart() == ScheduledTaskThreadPool.DEADLINE_NOT_SET) {
return;
}
@@ -374,7 +438,7 @@ public final class TickRegions implements ThreadedRegionizer.RegionCallbacks<Tic
}
private void checkInitialSchedule() {
- if (this.getScheduledStart() == SchedulerThreadPool.DEADLINE_NOT_SET) {
+ if (this.getScheduledStart() == ScheduledTaskThreadPool.DEADLINE_NOT_SET) {
this.updateScheduledStart(System.nanoTime() + TickRegionScheduler.TIME_BETWEEN_TICKS);
}
}
@@ -409,35 +473,34 @@ public final class TickRegions implements ThreadedRegionizer.RegionCallbacks<Tic
}
@Override
- protected boolean runRegionTasks(final BooleanSupplier canContinue) {
+ protected void runRegionTasks(final BooleanSupplier canContinue) {
final ca.spottedleaf.leafprofiler.RegionizedProfiler.Handle profiler = io.papermc.paper.threadedregions.TickRegionScheduler.getProfiler(); // Folia start - profiler
profiler.startInBetweenTick(); try { // Folia - profiler
final RegionizedTaskQueue.RegionTaskQueueData queue = this.region.taskQueueData;
boolean processedChunkTask = false;
- boolean executeChunkTask = true;
- boolean executeTickTask = true;
+ boolean executeChunkTask;
+ boolean executeTickTask;
+ boolean executePacketTask;
do {
- if (executeTickTask) {
- executeTickTask = queue.executeTickTask();
- }
- if (executeChunkTask) {
- processedChunkTask |= (executeChunkTask = queue.executeChunkTask());
- }
- } while ((executeChunkTask | executeTickTask) && canContinue.getAsBoolean());
+ executeTickTask = queue.executeTickTask();
+ executeChunkTask = queue.executeChunkTask();
+ executePacketTask = this.region.drainOnePacket();
+
+ processedChunkTask |= executeChunkTask;
+ } while ((executeChunkTask | executeTickTask | executePacketTask) && canContinue.getAsBoolean());
if (processedChunkTask) {
// if we processed any chunk tasks, try to process ticket level updates for full status changes
this.region.world.moonrise$getChunkTaskScheduler().chunkHolderManager.processTicketUpdates();
}
- return true;
} finally { profiler.stopInBetweenTick(); } // Folia - profiler
}
@Override
protected boolean hasIntermediateTasks() {
- return this.region.taskQueueData.hasTasks();
+ return this.region.taskQueueData.hasTasks() || this.region.hasPackets.get();
}
}
diff --git a/io/papermc/paper/threadedregions/scheduler/FoliaRegionScheduler.java b/io/papermc/paper/threadedregions/scheduler/FoliaRegionScheduler.java
index bc8aa525b3488dc71e7ca0529c6a8c57eaa99e1e..498125a093bb323b34860ca723e89eb8720eb71f 100644
--- a/io/papermc/paper/threadedregions/scheduler/FoliaRegionScheduler.java
+++ b/io/papermc/paper/threadedregions/scheduler/FoliaRegionScheduler.java
@@ -7,6 +7,7 @@ import ca.spottedleaf.moonrise.patches.chunk_system.scheduling.ChunkHolderManage
import io.papermc.paper.threadedregions.RegionizedData;
import io.papermc.paper.threadedregions.RegionizedServer;
import io.papermc.paper.threadedregions.TickRegionScheduler;
+import io.papermc.paper.threadedregions.TickRegions.TickRegionData;
import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
import it.unimi.dsi.fastutil.longs.Long2ReferenceOpenHashMap;
@@ -143,6 +144,9 @@ public final class FoliaRegionScheduler implements RegionScheduler {
}
private static final class Scheduler {
+
+ private Scheduler(final TickRegionData regionData) {}
+
private static final RegionizedData.RegioniserCallback<Scheduler> REGIONISER_CALLBACK = new RegionizedData.RegioniserCallback<>() {
@Override
public void merge(final Scheduler from, final Scheduler into, final long fromTickOffset) {
diff --git a/net/minecraft/network/protocol/PacketUtils.java b/net/minecraft/network/protocol/PacketUtils.java
index c7214f0457641b5550b98dbd2863c1d637faeaa5..549b2bb8bd12b631ff023ff3b31bd392b51e41fd 100644
--- a/net/minecraft/network/protocol/PacketUtils.java
+++ b/net/minecraft/network/protocol/PacketUtils.java
@@ -48,14 +48,8 @@ public class PacketUtils {
// Paper end - detailed watchdog information
// Folia start - region threading
};
- // ignore retired state, if removed then we don't want the packet to be handled
if (processor instanceof net.minecraft.server.network.ServerGamePacketListenerImpl gamePacketListener) {
- gamePacketListener.player.getBukkitEntity().taskScheduler.schedule(
- (net.minecraft.server.level.ServerPlayer player) -> {
- run.run();
- },
- null, 1L
- );
+ gamePacketListener.player.getBukkitEntity().addPacket(run);
} else if (processor instanceof net.minecraft.server.network.ServerConfigurationPacketListenerImpl configurationPacketListener) {
io.papermc.paper.threadedregions.RegionizedServer.getInstance().addTask(run);
} else if (processor instanceof net.minecraft.server.network.ServerLoginPacketListenerImpl loginPacketListener) {
diff --git a/net/minecraft/server/MinecraftServer.java b/net/minecraft/server/MinecraftServer.java
index faf72dd6dff74296c73cb058aaabd1f9f475a072..be82a6b43b1f0c644c53d08a6e16bc2876c8c1e0 100644
--- a/net/minecraft/server/MinecraftServer.java
+++ b/net/minecraft/server/MinecraftServer.java
@@ -1647,6 +1647,7 @@ public abstract class MinecraftServer extends ReentrantBlockableEventLoop<TickTa
} finally { foliaProfiler.stopTimer(ca.spottedleaf.leafprofiler.LProfilerRegistry.PLUGIN_TICK_TASKS); } // Folia - profiler
// now run all the entity schedulers
// TODO there has got to be a more efficient variant of this crap
+ region.drainPackets();
long tickedEntitySchedulers = 0L; // Folia - profiler
foliaProfiler.startTimer(ca.spottedleaf.leafprofiler.LProfilerRegistry.ENTITY_SCHEDULER_TICK); try { // Folia - profiler
for (net.minecraft.world.entity.Entity entity : region.world.getCurrentWorldData().getLocalEntitiesCopy()) {
diff --git a/net/minecraft/server/network/ServerCommonPacketListenerImpl.java b/net/minecraft/server/network/ServerCommonPacketListenerImpl.java
index 6eca15223b92aedac74233db886e2c1248750e2c..c739f2cf3590d5aab2391d95f6ea9a5cc4a2e599 100644
--- a/net/minecraft/server/network/ServerCommonPacketListenerImpl.java
+++ b/net/minecraft/server/network/ServerCommonPacketListenerImpl.java
@@ -120,6 +120,7 @@ public abstract class ServerCommonPacketListenerImpl implements ServerCommonPack
this.server.halt(false);
}
this.player.getBukkitEntity().taskScheduler.retire(); // Folia - region threading
+ this.player.getBukkitEntity().stopAcceptingPackets(); // Folia - region threading
}
@Override
diff --git a/net/minecraft/world/level/Level.java b/net/minecraft/world/level/Level.java
index dafd90502937019b616ac0a79465e1dbc578cf66..78f4dd7032d18b8e020a4576e4ac012c1d472e67 100644
--- a/net/minecraft/world/level/Level.java
+++ b/net/minecraft/world/level/Level.java
@@ -820,7 +820,7 @@ public abstract class Level implements LevelAccessor, AutoCloseable, ca.spottedl
// Folia start - region ticking
public final io.papermc.paper.threadedregions.RegionizedData<io.papermc.paper.threadedregions.RegionizedWorldData> worldRegionData
= new io.papermc.paper.threadedregions.RegionizedData<>(
- (ServerLevel)this, () -> new io.papermc.paper.threadedregions.RegionizedWorldData((ServerLevel)Level.this),
+ (ServerLevel)this, (regionData) -> new io.papermc.paper.threadedregions.RegionizedWorldData((ServerLevel)Level.this, regionData),
io.papermc.paper.threadedregions.RegionizedWorldData.REGION_CALLBACK
);
public volatile io.papermc.paper.threadedregions.RegionizedServer.WorldLevelData tickData;