mirror of
https://github.com/PaperMC/Folia.git
synced 2025-04-19 02:29:21 +08:00
1. Make halt() unpark waiting threads Fixes https://github.com/PaperMC/Folia/issues/338 2. Make intermediate task parsing steal tasks less aggressively
2233 lines
94 KiB
Diff
2233 lines
94 KiB
Diff
From 0000000000000000000000000000000000000000 Mon Sep 17 00:00:00 2001
|
|
From: Spottedleaf <Spottedleaf@users.noreply.github.com>
|
|
Date: Thu, 20 Mar 2025 11:07:45 -0700
|
|
Subject: [PATCH] fixup! Region Threading Base
|
|
|
|
|
|
diff --git a/io/papermc/paper/threadedregions/COWArrayList.java b/io/papermc/paper/threadedregions/COWArrayList.java
|
|
new file mode 100644
|
|
index 0000000000000000000000000000000000000000..65393f18011aed8023ede95382145abf9cbb3c5e
|
|
--- /dev/null
|
|
+++ b/io/papermc/paper/threadedregions/COWArrayList.java
|
|
@@ -0,0 +1,64 @@
|
|
+package io.papermc.paper.threadedregions;
|
|
+
|
|
+import java.lang.reflect.Array;
|
|
+import java.util.Arrays;
|
|
+
|
|
+public final class COWArrayList<E> {
|
|
+
|
|
+ private volatile E[] array;
|
|
+
|
|
+ public COWArrayList(final Class<E> clazz) {
|
|
+ this.array = (E[])Array.newInstance(clazz, 0);
|
|
+ }
|
|
+
|
|
+ public E[] getArray() {
|
|
+ return this.array;
|
|
+ }
|
|
+
|
|
+ public boolean contains(final E test) {
|
|
+ for (final E elem : this.array) {
|
|
+ if (elem == test) {
|
|
+ return true;
|
|
+ }
|
|
+ }
|
|
+
|
|
+ return false;
|
|
+ }
|
|
+
|
|
+ public void add(final E element) {
|
|
+ synchronized (this) {
|
|
+ final E[] array = this.array;
|
|
+
|
|
+ final E[] copy = Arrays.copyOf(array, array.length + 1);
|
|
+ copy[array.length] = element;
|
|
+
|
|
+ this.array = copy;
|
|
+ }
|
|
+ }
|
|
+
|
|
+ public boolean remove(final E element) {
|
|
+ synchronized (this) {
|
|
+ final E[] array = this.array;
|
|
+ int index = -1;
|
|
+ for (int i = 0, len = array.length; i < len; ++i) {
|
|
+ if (array[i] == element) {
|
|
+ index = i;
|
|
+ break;
|
|
+ }
|
|
+ }
|
|
+
|
|
+ if (index == -1) {
|
|
+ return false;
|
|
+ }
|
|
+
|
|
+ final E[] copy = (E[])Array.newInstance(array.getClass().getComponentType(), array.length - 1);
|
|
+
|
|
+ System.arraycopy(array, 0, copy, 0, index);
|
|
+ System.arraycopy(array, index + 1, copy, index, (array.length - 1) - index);
|
|
+
|
|
+ this.array = copy;
|
|
+ }
|
|
+
|
|
+ return true;
|
|
+ }
|
|
+}
|
|
\ No newline at end of file
|
|
diff --git a/io/papermc/paper/threadedregions/RegionizedData.java b/io/papermc/paper/threadedregions/RegionizedData.java
|
|
index a1043c426d031755b57b77a9b2eec685e9861b13..5e66fcbee18ea0889ebe652228ac3e61cac7c872 100644
|
|
--- a/io/papermc/paper/threadedregions/RegionizedData.java
|
|
+++ b/io/papermc/paper/threadedregions/RegionizedData.java
|
|
@@ -1,10 +1,12 @@
|
|
package io.papermc.paper.threadedregions;
|
|
|
|
import ca.spottedleaf.concurrentutil.util.Validate;
|
|
+import io.papermc.paper.threadedregions.TickRegions.TickRegionData;
|
|
import it.unimi.dsi.fastutil.longs.Long2ReferenceOpenHashMap;
|
|
import it.unimi.dsi.fastutil.objects.ReferenceOpenHashSet;
|
|
import net.minecraft.server.level.ServerLevel;
|
|
import javax.annotation.Nullable;
|
|
+import java.util.function.Function;
|
|
import java.util.function.Supplier;
|
|
|
|
/**
|
|
@@ -42,7 +44,7 @@ import java.util.function.Supplier;
|
|
* // callback is left out of this example
|
|
* // note: world != null here
|
|
* public final RegionizedData<EntityTickList> entityTickLists =
|
|
- * new RegionizedData<>(this, () -> new EntityTickList(), ...);
|
|
+ * new RegionizedData<>(this, (data) -> new EntityTickList(), ...);
|
|
*
|
|
* public void addTickingEntity(Entity e) {
|
|
* // What we expect here is that this world is the
|
|
@@ -86,7 +88,7 @@ import java.util.function.Supplier;
|
|
* // note: world == null here, because this RegionizedData object
|
|
* // is not instantiated per world, but rather globally.
|
|
* public final RegionizedData<TickTimes> tickTimes =
|
|
- * new RegionizedData<>(null, () -> new TickTimes(), ...);
|
|
+ * new RegionizedData<>(null, (data) -> new TickTimes(), ...);
|
|
* }
|
|
* }
|
|
* </pre>
|
|
@@ -97,7 +99,7 @@ import java.util.function.Supplier;
|
|
public final class RegionizedData<T> {
|
|
|
|
private final ServerLevel world;
|
|
- private final Supplier<T> initialValueSupplier;
|
|
+ private final Function<TickRegionData ,T> initialValueSupplier;
|
|
private final RegioniserCallback<T> callback;
|
|
|
|
/**
|
|
@@ -119,14 +121,14 @@ public final class RegionizedData<T> {
|
|
* @param supplier Initial value supplier used to lazy initialise region data.
|
|
* @param callback Region callback to manage this regionised data.
|
|
*/
|
|
- public RegionizedData(final ServerLevel world, final Supplier<T> supplier, final RegioniserCallback<T> callback) {
|
|
+ public RegionizedData(final ServerLevel world, final Function<TickRegionData,T> supplier, final RegioniserCallback<T> callback) {
|
|
this.world = world;
|
|
this.initialValueSupplier = Validate.notNull(supplier, "Supplier may not be null.");
|
|
this.callback = Validate.notNull(callback, "Regioniser callback may not be null.");
|
|
}
|
|
|
|
- T createNewValue() {
|
|
- return Validate.notNull(this.initialValueSupplier.get(), "Initial value supplier may not return null");
|
|
+ T createNewValue(final TickRegionData regionData) {
|
|
+ return Validate.notNull(this.initialValueSupplier.apply(regionData), "Initial value supplier may not return null");
|
|
}
|
|
|
|
RegioniserCallback<T> getCallback() {
|
|
@@ -141,7 +143,7 @@ public final class RegionizedData<T> {
|
|
* and the current ticking region's world does not match this {@code RegionizedData}'s world.
|
|
*/
|
|
public @Nullable T get() {
|
|
- final ThreadedRegionizer.ThreadedRegion<TickRegions.TickRegionData, TickRegions.TickRegionSectionData> region =
|
|
+ final ThreadedRegionizer.ThreadedRegion<TickRegionData, TickRegions.TickRegionSectionData> region =
|
|
TickRegionScheduler.getCurrentRegion();
|
|
|
|
if (region == null) {
|
|
diff --git a/io/papermc/paper/threadedregions/RegionizedServer.java b/io/papermc/paper/threadedregions/RegionizedServer.java
|
|
index 1382c695c4991488b113401e231875ddc74f6b01..9008db1d594ffc371a7fa5bd36f6bb43a168d558 100644
|
|
--- a/io/papermc/paper/threadedregions/RegionizedServer.java
|
|
+++ b/io/papermc/paper/threadedregions/RegionizedServer.java
|
|
@@ -1,7 +1,6 @@
|
|
package io.papermc.paper.threadedregions;
|
|
|
|
import ca.spottedleaf.concurrentutil.collection.MultiThreadedQueue;
|
|
-import ca.spottedleaf.concurrentutil.scheduler.SchedulerThreadPool;
|
|
import ca.spottedleaf.moonrise.common.util.TickThread;
|
|
import com.mojang.logging.LogUtils;
|
|
import io.papermc.paper.threadedregions.scheduler.FoliaGlobalRegionScheduler;
|
|
@@ -23,6 +22,7 @@ import org.slf4j.Logger;
|
|
import java.util.ArrayList;
|
|
import java.util.Collections;
|
|
import java.util.List;
|
|
+import java.util.Set;
|
|
import java.util.concurrent.CopyOnWriteArrayList;
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
import java.util.function.BooleanSupplier;
|
|
@@ -64,7 +64,7 @@ public final class RegionizedServer {
|
|
// now we can schedule
|
|
this.tickHandle.setInitialStart(System.nanoTime() + TickRegionScheduler.TIME_BETWEEN_TICKS);
|
|
TickRegions.getScheduler().scheduleRegion(this.tickHandle);
|
|
- TickRegions.getScheduler().init();
|
|
+ TickRegions.start();
|
|
}
|
|
|
|
public void invalidateStatus() {
|
|
@@ -80,6 +80,15 @@ public final class RegionizedServer {
|
|
TickRegions.getScheduler().setHasTasks(this.tickHandle);
|
|
}
|
|
|
|
+ private boolean hasAnyGlobalChunkTasks() {
|
|
+ for (final ServerLevel world : this.worlds) {
|
|
+ if (world.taskQueueRegionData.hasGlobalChunkTasks()) {
|
|
+ return true;
|
|
+ }
|
|
+ }
|
|
+ return false;
|
|
+ }
|
|
+
|
|
/**
|
|
* Returns the current tick of the region ticking.
|
|
* @throws IllegalStateException If there is no current region.
|
|
@@ -118,7 +127,7 @@ public final class RegionizedServer {
|
|
private final AtomicBoolean ticking = new AtomicBoolean();
|
|
|
|
public GlobalTickTickHandle(final RegionizedServer server) {
|
|
- super(null, SchedulerThreadPool.DEADLINE_NOT_SET);
|
|
+ super(null, ScheduledTaskThreadPool.DEADLINE_NOT_SET);
|
|
this.server = server;
|
|
}
|
|
|
|
@@ -158,26 +167,31 @@ public final class RegionizedServer {
|
|
return false;
|
|
}
|
|
|
|
- // TODO try catch?
|
|
run.run();
|
|
|
|
return true;
|
|
}
|
|
|
|
+ private boolean runGlobalTask() {
|
|
+ boolean ret = false;
|
|
+ for (final ServerLevel world : this.server.worlds) {
|
|
+ ret |= world.taskQueueRegionData.executeGlobalChunkTask();
|
|
+ }
|
|
+ return ret;
|
|
+ }
|
|
+
|
|
@Override
|
|
- protected boolean runRegionTasks(final BooleanSupplier canContinue) {
|
|
+ protected void runRegionTasks(final BooleanSupplier canContinue) {
|
|
do {
|
|
- if (!this.runOneTask()) {
|
|
- return false;
|
|
+ if (!this.runOneTask() || !this.runGlobalTask()) {
|
|
+ return;
|
|
}
|
|
} while (canContinue.getAsBoolean());
|
|
-
|
|
- return true;
|
|
}
|
|
|
|
@Override
|
|
protected boolean hasIntermediateTasks() {
|
|
- return !this.server.globalTickQueue.isEmpty();
|
|
+ return !this.server.globalTickQueue.isEmpty() || this.server.hasAnyGlobalChunkTasks();
|
|
}
|
|
}
|
|
|
|
diff --git a/io/papermc/paper/threadedregions/RegionizedTaskQueue.java b/io/papermc/paper/threadedregions/RegionizedTaskQueue.java
|
|
index 745ab870310733b569681f5280895bb9798620a4..2794694ce57b21711e1ddac146647317b79336bf 100644
|
|
--- a/io/papermc/paper/threadedregions/RegionizedTaskQueue.java
|
|
+++ b/io/papermc/paper/threadedregions/RegionizedTaskQueue.java
|
|
@@ -79,7 +79,11 @@ public final class RegionizedTaskQueue {
|
|
this.world = world;
|
|
}
|
|
|
|
- private boolean executeGlobalChunkTask() {
|
|
+ public boolean hasGlobalChunkTasks() {
|
|
+ return !this.globalChunkTask.isEmpty();
|
|
+ }
|
|
+
|
|
+ public boolean executeGlobalChunkTask() {
|
|
final Runnable run = this.globalChunkTask.poll();
|
|
if (run != null) {
|
|
run.run();
|
|
@@ -94,6 +98,7 @@ public final class RegionizedTaskQueue {
|
|
|
|
public void pushGlobalChunkTask(final Runnable run) {
|
|
this.globalChunkTask.add(run);
|
|
+ TickRegions.getScheduler().setHasTasks(RegionizedServer.getGlobalTickData());
|
|
}
|
|
|
|
private PrioritisedQueue getQueue(final boolean synchronise, final int chunkX, final int chunkZ, final boolean isChunkTask) {
|
|
@@ -216,12 +221,14 @@ public final class RegionizedTaskQueue {
|
|
}
|
|
|
|
public static final class RegionTaskQueueData {
|
|
- private final PrioritisedQueue tickTaskQueue = new PrioritisedQueue();
|
|
- private final PrioritisedQueue chunkQueue = new PrioritisedQueue();
|
|
+ private final PrioritisedQueue tickTaskQueue = new PrioritisedQueue(this);
|
|
+ private final PrioritisedQueue chunkQueue = new PrioritisedQueue(this);
|
|
private final WorldRegionTaskData worldRegionTaskData;
|
|
+ private final TickRegions.TickRegionData regionData;
|
|
|
|
- public RegionTaskQueueData(final WorldRegionTaskData worldRegionTaskData) {
|
|
+ public RegionTaskQueueData(final WorldRegionTaskData worldRegionTaskData, final TickRegions.TickRegionData regionData) {
|
|
this.worldRegionTaskData = worldRegionTaskData;
|
|
+ this.regionData = regionData;
|
|
}
|
|
|
|
void mergeInto(final RegionTaskQueueData into) {
|
|
@@ -281,6 +288,13 @@ public final class RegionizedTaskQueue {
|
|
this.queues[i] = new ArrayDeque<>();
|
|
}
|
|
}
|
|
+
|
|
+ private final RegionTaskQueueData queue;
|
|
+
|
|
+ private PrioritisedQueue(final RegionTaskQueueData queue) {
|
|
+ this.queue = queue;
|
|
+ }
|
|
+
|
|
private boolean isDestroyed;
|
|
|
|
public int getScheduledTasks() {
|
|
@@ -609,6 +623,8 @@ public final class RegionizedTaskQueue {
|
|
continue;
|
|
}
|
|
|
|
+ queue.queue.regionData.setHasTasks();
|
|
+
|
|
// successfully queued
|
|
return true;
|
|
}
|
|
diff --git a/io/papermc/paper/threadedregions/RegionizedWorldData.java b/io/papermc/paper/threadedregions/RegionizedWorldData.java
|
|
index c6e487a4c14e6b82533881d01f32349b9ae28728..b8f1f042342d3fed5fa26df0de07e8e2b0937130 100644
|
|
--- a/io/papermc/paper/threadedregions/RegionizedWorldData.java
|
|
+++ b/io/papermc/paper/threadedregions/RegionizedWorldData.java
|
|
@@ -7,6 +7,7 @@ import ca.spottedleaf.moonrise.common.util.CoordinateUtils;
|
|
import ca.spottedleaf.moonrise.common.util.TickThread;
|
|
import ca.spottedleaf.moonrise.patches.chunk_system.scheduling.ChunkHolderManager;
|
|
import com.mojang.logging.LogUtils;
|
|
+import io.papermc.paper.threadedregions.TickRegions.TickRegionData;
|
|
import it.unimi.dsi.fastutil.longs.Long2ReferenceMap;
|
|
import it.unimi.dsi.fastutil.longs.Long2ReferenceOpenHashMap;
|
|
import it.unimi.dsi.fastutil.objects.ObjectLinkedOpenHashSet;
|
|
@@ -33,7 +34,6 @@ import net.minecraft.world.entity.ai.village.VillageSiege;
|
|
import net.minecraft.world.entity.item.ItemEntity;
|
|
import net.minecraft.world.level.BlockEventData;
|
|
import net.minecraft.world.level.ChunkPos;
|
|
-import net.minecraft.world.level.Explosion;
|
|
import net.minecraft.world.level.Level;
|
|
import net.minecraft.world.level.NaturalSpawner;
|
|
import net.minecraft.world.level.ServerExplosion;
|
|
@@ -61,6 +61,7 @@ import java.util.Iterator;
|
|
import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.Set;
|
|
+import java.util.concurrent.CopyOnWriteArrayList;
|
|
import java.util.function.Consumer;
|
|
import java.util.function.Predicate;
|
|
|
|
@@ -82,6 +83,7 @@ public final class RegionizedWorldData {
|
|
// entities
|
|
for (final ServerPlayer player : from.localPlayers) {
|
|
into.localPlayers.add(player);
|
|
+ player.getBukkitEntity().updateRegion(into);
|
|
into.nearbyPlayers.addPlayer(player);
|
|
}
|
|
for (final Entity entity : from.allEntities) {
|
|
@@ -180,6 +182,7 @@ public final class RegionizedWorldData {
|
|
// the chunk holder must _exist_, and so the region section exists.
|
|
final RegionizedWorldData into = regionToData.get(CoordinateUtils.getChunkKey(pos.x >> chunkToRegionShift, pos.z >> chunkToRegionShift));
|
|
into.localPlayers.add(player);
|
|
+ player.getBukkitEntity().updateRegion(into);
|
|
into.nearbyPlayers.addPlayer(player);
|
|
}
|
|
for (final Entity entity : from.allEntities) {
|
|
@@ -346,7 +349,8 @@ public final class RegionizedWorldData {
|
|
}
|
|
|
|
// entities
|
|
- private final List<ServerPlayer> localPlayers = new ArrayList<>();
|
|
+ // this is copy on write to allow packet processing to iterate safely
|
|
+ private final CopyOnWriteArrayList<ServerPlayer> localPlayers = new CopyOnWriteArrayList<>();
|
|
private final NearbyPlayers nearbyPlayers;
|
|
private final ReferenceList<Entity> allEntities = new ReferenceList<>(EMPTY_ENTITY_ARRAY);
|
|
private final ReferenceList<Entity> loadedEntities = new ReferenceList<>(EMPTY_ENTITY_ARRAY);
|
|
@@ -446,7 +450,9 @@ public final class RegionizedWorldData {
|
|
public final alternate.current.wire.WireHandler wireHandler;
|
|
public final io.papermc.paper.redstone.RedstoneWireTurbo turbo;
|
|
|
|
- public RegionizedWorldData(final ServerLevel world) {
|
|
+ public final TickRegionData regionData;
|
|
+
|
|
+ public RegionizedWorldData(final ServerLevel world, final TickRegionData regionData) {
|
|
this.world = world;
|
|
this.blockLevelTicks = new LevelTicks<>(world::isPositionTickingWithEntitiesLoaded, world, true);
|
|
this.fluidLevelTicks = new LevelTicks<>(world::isPositionTickingWithEntitiesLoaded, world, false);
|
|
@@ -454,6 +460,7 @@ public final class RegionizedWorldData {
|
|
this.nearbyPlayers = new NearbyPlayers(world);
|
|
this.wireHandler = new alternate.current.wire.WireHandler(world);
|
|
this.turbo = new io.papermc.paper.redstone.RedstoneWireTurbo((RedStoneWireBlock)Blocks.REDSTONE_WIRE);
|
|
+ this.regionData = regionData;
|
|
|
|
// tasks may be drained before the region ticks, so we must set up the tick data early just in case
|
|
this.updateTickData();
|
|
@@ -611,6 +618,7 @@ public final class RegionizedWorldData {
|
|
if (this.allEntities.add(entity)) {
|
|
if (entity instanceof ServerPlayer player) {
|
|
this.localPlayers.add(player);
|
|
+ player.getBukkitEntity().updateRegion(this);
|
|
}
|
|
TickRegions.RegionStats.updateCurrentRegion();
|
|
}
|
|
@@ -627,6 +635,7 @@ public final class RegionizedWorldData {
|
|
if (this.allEntities.remove(entity)) {
|
|
if (entity instanceof ServerPlayer player) {
|
|
this.localPlayers.remove(player);
|
|
+ player.getBukkitEntity().updateRegion(null);
|
|
}
|
|
TickRegions.RegionStats.updateCurrentRegion();
|
|
}
|
|
diff --git a/io/papermc/paper/threadedregions/ScheduledTaskThreadPool.java b/io/papermc/paper/threadedregions/ScheduledTaskThreadPool.java
|
|
new file mode 100644
|
|
index 0000000000000000000000000000000000000000..5c591b0d6eac45d6094ce44bf62ad976bf995e66
|
|
--- /dev/null
|
|
+++ b/io/papermc/paper/threadedregions/ScheduledTaskThreadPool.java
|
|
@@ -0,0 +1,1243 @@
|
|
+package io.papermc.paper.threadedregions;
|
|
+
|
|
+import ca.spottedleaf.concurrentutil.util.ConcurrentUtil;
|
|
+import ca.spottedleaf.concurrentutil.util.TimeUtil;
|
|
+import java.lang.invoke.VarHandle;
|
|
+import java.util.Comparator;
|
|
+import java.util.Iterator;
|
|
+import java.util.Map;
|
|
+import java.util.concurrent.ConcurrentSkipListMap;
|
|
+import java.util.concurrent.ThreadFactory;
|
|
+import java.util.concurrent.atomic.AtomicLong;
|
|
+import java.util.concurrent.locks.LockSupport;
|
|
+import java.util.function.BooleanSupplier;
|
|
+
|
|
+public final class ScheduledTaskThreadPool {
|
|
+
|
|
+ public static final long DEADLINE_NOT_SET = Long.MIN_VALUE;
|
|
+
|
|
+ private final ThreadFactory threadFactory;
|
|
+ private final long stealThresholdNS;
|
|
+ private final long taskTimeSliceNS;
|
|
+
|
|
+ private final COWArrayList<TickThreadRunner> coreThreads = new COWArrayList<>(TickThreadRunner.class);
|
|
+ private final COWArrayList<TickThreadRunner> aliveThreads = new COWArrayList<>(TickThreadRunner.class);
|
|
+
|
|
+ private long runnerIdGenerator;
|
|
+ private boolean shutdown;
|
|
+
|
|
+ private final ConcurrentSkipListMap<WaitState, WaitState> waitingOrIdleRunners = new ConcurrentSkipListMap<>(WaitState.OLDEST_FIRST);
|
|
+
|
|
+ private final ConcurrentSkipListMap<ScheduledTickTask, ScheduledTickTask> unwatchedScheduledTicks = new ConcurrentSkipListMap<>(ScheduledTickTask.TICK_COMPARATOR);
|
|
+ private final ConcurrentSkipListMap<ScheduledTickTask, ScheduledTickTask> scheduledTasks = new ConcurrentSkipListMap<>(ScheduledTickTask.TASK_COMPARATOR);
|
|
+
|
|
+ public ScheduledTaskThreadPool(final ThreadFactory threadFactory, final long stealThresholdNS,
|
|
+ final long taskTimeSliceNS) {
|
|
+ this.threadFactory = threadFactory;
|
|
+ this.stealThresholdNS = stealThresholdNS;
|
|
+ this.taskTimeSliceNS = taskTimeSliceNS;
|
|
+
|
|
+ if (threadFactory == null) {
|
|
+ throw new NullPointerException("Null thread factory");
|
|
+ }
|
|
+ if (stealThresholdNS < 0L) {
|
|
+ throw new IllegalArgumentException("Steal threshold must be >= 0");
|
|
+ }
|
|
+ if (taskTimeSliceNS <= 0L) {
|
|
+ throw new IllegalArgumentException("Task time slice must be > 0");
|
|
+ }
|
|
+ }
|
|
+
|
|
+ private static <K,V> K firstEntry(final ConcurrentSkipListMap<K, V> map) {
|
|
+ final Map.Entry<K,V> first = map.firstEntry();
|
|
+ return first == null ? null : first.getKey();
|
|
+ }
|
|
+
|
|
+ private static ScheduledTickTask findFirstNonTaken(final ConcurrentSkipListMap<ScheduledTickTask, ScheduledTickTask> map) {
|
|
+ ScheduledTickTask first;
|
|
+ while ((first = firstEntry(map)) != null && first.isTaken()) {
|
|
+ map.remove(first);
|
|
+ }
|
|
+
|
|
+ return first;
|
|
+ }
|
|
+
|
|
+ private static ScheduledTickTask findFirstNonTakenNonWatched(final ConcurrentSkipListMap<ScheduledTickTask, ScheduledTickTask> map) {
|
|
+ ScheduledTickTask first;
|
|
+ while ((first = firstEntry(map)) != null && (first.isTaken() || first.isWatched())) {
|
|
+ map.remove(first);
|
|
+ if (!first.isTaken() && !first.isWatched()) {
|
|
+ // handle race condition: unwatched after removal
|
|
+ map.put(first, first);
|
|
+ }
|
|
+ }
|
|
+
|
|
+ return first;
|
|
+ }
|
|
+
|
|
+ private static Thread[] getThreads(final COWArrayList<TickThreadRunner> list) {
|
|
+ final TickThreadRunner[] runners = list.getArray();
|
|
+ final Thread[] ret = new Thread[runners.length];
|
|
+
|
|
+ for (int i = 0; i < ret.length; ++i) {
|
|
+ ret[i] = runners[i].thread;
|
|
+ }
|
|
+
|
|
+ return ret;
|
|
+ }
|
|
+
|
|
+ /**
|
|
+ * Returns an array of the underlying scheduling threads.
|
|
+ */
|
|
+ public Thread[] getCoreThreads() {
|
|
+ return getThreads(this.coreThreads);
|
|
+ }
|
|
+
|
|
+ /**
|
|
+ * Returns an array of the underlying scheduling threads which are alive.
|
|
+ */
|
|
+ public Thread[] getAliveThreads() {
|
|
+ return getThreads(this.aliveThreads);
|
|
+ }
|
|
+
|
|
+ /**
|
|
+ * Adjusts the number of core threads to the specified threads. Has no effect if shutdown.
|
|
+ * Lowering the number of core threads will cause some scheduled tasks to fail to meet their scheduled start
|
|
+ * deadlines by up to the task steal time.
|
|
+ * @param threads New number of threads
|
|
+ * @return Returns this thread pool
|
|
+ */
|
|
+ public ScheduledTaskThreadPool setCoreThreads(final int threads) {
|
|
+ synchronized (this) {
|
|
+ if (this.shutdown) {
|
|
+ return this;
|
|
+ }
|
|
+
|
|
+ final TickThreadRunner[] currRunners = this.coreThreads.getArray();
|
|
+ if (currRunners.length == threads) {
|
|
+ return this;
|
|
+ }
|
|
+
|
|
+ if (threads < currRunners.length) {
|
|
+ // we need to trim threads
|
|
+ for (int i = 0, difference = currRunners.length - threads; i < difference; ++i) {
|
|
+ final TickThreadRunner remove = currRunners[currRunners.length - i - 1];
|
|
+
|
|
+ remove.halt();
|
|
+ this.coreThreads.remove(remove);
|
|
+ }
|
|
+
|
|
+ // force remaining runners to task steal
|
|
+ this.interruptAllRunners();
|
|
+
|
|
+ return this;
|
|
+ } else {
|
|
+ // we need to add threads
|
|
+ for (int i = 0, difference = threads - currRunners.length; i < difference; ++i) {
|
|
+ final TickThreadRunner runner = new TickThreadRunner(this, this.runnerIdGenerator++);
|
|
+ final Thread thread = runner.thread = this.threadFactory.newThread(runner);
|
|
+
|
|
+ this.coreThreads.add(runner);
|
|
+ this.aliveThreads.add(runner);
|
|
+
|
|
+ thread.start();
|
|
+ }
|
|
+
|
|
+ return this;
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+
|
|
+ /**
|
|
+ * Attempts to prevent further execution of tasks.
|
|
+ */
|
|
+ public void halt() {
|
|
+ synchronized (this) {
|
|
+ this.shutdown = true;
|
|
+ }
|
|
+
|
|
+ for (final TickThreadRunner runner : this.coreThreads.getArray()) {
|
|
+ runner.halt();
|
|
+ }
|
|
+ }
|
|
+
|
|
+ /**
|
|
+ * Waits until all threads in this pool have shutdown, or until the specified time has passed.
|
|
+ * @param msToWait Maximum time to wait.
|
|
+ * @return {@code false} if the maximum time passed, {@code true} otherwise.
|
|
+ */
|
|
+ public boolean join(final long msToWait) {
|
|
+ try {
|
|
+ return this.join(msToWait, false);
|
|
+ } catch (final InterruptedException ex) {
|
|
+ throw new IllegalStateException(ex);
|
|
+ }
|
|
+ }
|
|
+
|
|
+ /**
|
|
+ * Waits until all threads in this pool have shutdown, or until the specified time has passed.
|
|
+ * @param msToWait Maximum time to wait.
|
|
+ * @return {@code false} if the maximum time passed, {@code true} otherwise.
|
|
+ * @throws InterruptedException If this thread is interrupted.
|
|
+ */
|
|
+ public boolean joinInterruptable(final long msToWait) throws InterruptedException {
|
|
+ return this.join(msToWait, true);
|
|
+ }
|
|
+
|
|
+ private boolean join(final long msToWait, final boolean interruptable) throws InterruptedException {
|
|
+ final long nsToWait = msToWait * (1000 * 1000);
|
|
+ final long start = System.nanoTime();
|
|
+ final long deadline = start + nsToWait;
|
|
+ boolean interrupted = false;
|
|
+ try {
|
|
+ for (final TickThreadRunner runner : this.aliveThreads.getArray()) {
|
|
+ final Thread thread = runner.thread;
|
|
+ for (;;) {
|
|
+ if (!thread.isAlive()) {
|
|
+ break;
|
|
+ }
|
|
+ final long current = System.nanoTime();
|
|
+ if (current >= deadline && msToWait > 0L) {
|
|
+ return false;
|
|
+ }
|
|
+
|
|
+ try {
|
|
+ thread.join(msToWait <= 0L ? 0L : Math.max(1L, (deadline - current) / (1000 * 1000)));
|
|
+ } catch (final InterruptedException ex) {
|
|
+ if (interruptable) {
|
|
+ throw ex;
|
|
+ }
|
|
+ interrupted = true;
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+
|
|
+ return true;
|
|
+ } finally {
|
|
+ if (interrupted) {
|
|
+ Thread.currentThread().interrupt();
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+
|
|
+ private void interruptAllRunners() {
|
|
+ for (final TickThreadRunner runner : this.coreThreads.getArray()) {
|
|
+ runner.interrupt();
|
|
+ }
|
|
+ }
|
|
+
|
|
+ private void interruptOneRunner() {
|
|
+ for (final TickThreadRunner runner : this.coreThreads.getArray()) {
|
|
+ if (runner.interrupt()) {
|
|
+ return;
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+
|
|
+ private void insert(final SchedulableTick tick, final boolean hasTasks) {
|
|
+ final long scheduleTime = tick.getScheduledStart();
|
|
+ final long timeNow = System.nanoTime();
|
|
+
|
|
+ for (;;) {
|
|
+ final Map.Entry<WaitState, WaitState> lastIdle = this.waitingOrIdleRunners.firstEntry();
|
|
+ final WaitState waitState;
|
|
+ if (lastIdle == null // no waiting threads or
|
|
+ // scheduled time is past latest waiting time
|
|
+ || ((waitState = lastIdle.getKey()).deadline != DEADLINE_NOT_SET && waitState.deadline - scheduleTime < 0L)) {
|
|
+ // insert hoping to be stolen
|
|
+ final ScheduledTickTask task = tick.task = new ScheduledTickTask(
|
|
+ tick,
|
|
+ // offset start by steal threshold so that it will hopefully start at its scheduled time
|
|
+ scheduleTime - this.stealThresholdNS,
|
|
+ hasTasks ? timeNow : DEADLINE_NOT_SET,
|
|
+ null
|
|
+ );
|
|
+
|
|
+ this.unwatchedScheduledTicks.put(task, task);
|
|
+ if (hasTasks) {
|
|
+ this.scheduledTasks.put(task, task);
|
|
+ }
|
|
+
|
|
+ if (!this.waitingOrIdleRunners.isEmpty()) {
|
|
+ // handle race condition: all threads went to sleep since we checked
|
|
+ this.interruptOneRunner();
|
|
+ }
|
|
+ break;
|
|
+ } else {
|
|
+ // try to schedule to the runner
|
|
+ if (this.waitingOrIdleRunners.remove(waitState) == null) {
|
|
+ // failed, try again
|
|
+ continue;
|
|
+ }
|
|
+
|
|
+ final ScheduledTickTask task = tick.task = new ScheduledTickTask(
|
|
+ tick, scheduleTime, hasTasks ? timeNow : DEADLINE_NOT_SET, waitState.runner
|
|
+ );
|
|
+
|
|
+ this.unwatchedScheduledTicks.put(task, task);
|
|
+ waitState.runner.scheduledTicks.put(task, task);
|
|
+ if (hasTasks) {
|
|
+ this.scheduledTasks.put(task, task);
|
|
+ waitState.runner.scheduledTasks.put(task, task);
|
|
+ }
|
|
+
|
|
+ if (!waitState.runner.interrupt() && waitState.runner.isHalted()) {
|
|
+ // handle race condition: runner we selected was halted
|
|
+ this.interruptOneRunner();
|
|
+ }
|
|
+ break;
|
|
+ }
|
|
+ }
|
|
+
|
|
+ if (!hasTasks && tick.hasTasks()) {
|
|
+ // handle race condition where tasks were added during scheduling
|
|
+ this.notifyTasks(tick);
|
|
+ }
|
|
+ }
|
|
+
|
|
+ /**
|
|
+ * Schedules the specified task to be executed on this thread pool.
|
|
+ * @param tick Specified task
|
|
+ * @throws IllegalStateException If the task is already scheduled
|
|
+ * @see SchedulableTick
|
|
+ */
|
|
+ public void schedule(final SchedulableTick tick) {
|
|
+ final boolean hasTasks = tick.hasTasks();
|
|
+ if ((!hasTasks && !tick.setScheduled()) || (hasTasks && !tick.setScheduledTasks())) {
|
|
+ throw new IllegalStateException("Task is already scheduled");
|
|
+ }
|
|
+
|
|
+ this.insert(tick, hasTasks);
|
|
+ }
|
|
+
|
|
+ /**
|
|
+ * Indicates that intermediate tasks are available to be executed by the task.
|
|
+ * @param tick The specified task
|
|
+ * @see SchedulableTick
|
|
+ */
|
|
+ public void notifyTasks(final SchedulableTick tick) {
|
|
+ if (!tick.isScheduled() || !tick.upgradeToScheduledTasks()) {
|
|
+ return;
|
|
+ }
|
|
+
|
|
+ final ScheduledTickTask task = tick.task;
|
|
+ if (task == null || task.isTaken()) {
|
|
+ // will be handled by scheduling code
|
|
+ return;
|
|
+ }
|
|
+
|
|
+ task.setLastTaskNotify(System.nanoTime());
|
|
+
|
|
+ final TickThreadRunner runner = task.owner;
|
|
+ this.scheduledTasks.put(task, task);
|
|
+ if (runner != null) {
|
|
+ runner.scheduledTasks.put(task, task);
|
|
+ runner.interruptIfWaiting();
|
|
+ }
|
|
+ }
|
|
+
|
|
+ /**
|
|
+ * Returns {@code false} if the task is not scheduled or is cancelled, returns {@code true} if the task was
|
|
+ * cancelled by this thread
|
|
+ */
|
|
+ public boolean cancel(final SchedulableTick tick) {
|
|
+ final boolean ret = tick.cancel();
|
|
+
|
|
+ if (!ret) {
|
|
+ // nothing to do here
|
|
+ return ret;
|
|
+ }
|
|
+
|
|
+ // try to remove task from queues
|
|
+ final ScheduledTickTask task = tick.task;
|
|
+ if (task != null && task.take()) {
|
|
+ this.unwatchedScheduledTicks.remove(task);
|
|
+ this.scheduledTasks.remove(task);
|
|
+ final TickThreadRunner owner = task.owner;
|
|
+ if (owner != null) {
|
|
+ owner.scheduledTicks.remove(task);
|
|
+ owner.scheduledTasks.remove(task);
|
|
+ }
|
|
+ }
|
|
+
|
|
+ return ret;
|
|
+ }
|
|
+
|
|
+ /**
|
|
+ * Represents a tickable task that can be scheduled into a {@link ScheduledTaskThreadPool}.
|
|
+ * <p>
|
|
+ * A tickable task is expected to run on a fixed interval, which is determined by
|
|
+ * the {@link ScheduledTaskThreadPool}.
|
|
+ * </p>
|
|
+ * <p>
|
|
+ * A tickable task can have intermediate tasks that can be executed before its tick method is ran. Instead of
|
|
+ * the {@link ScheduledTaskThreadPool} parking in-between ticks, the scheduler will instead drain
|
|
+ * intermediate tasks from scheduled tasks. The parsing of intermediate tasks allows the scheduler to take
|
|
+ * advantage of downtime to reduce the intermediate task load from tasks once they begin ticking.
|
|
+ * </p>
|
|
+ * <p>
|
|
+ * It is guaranteed that {@link #runTick()} and {@link #runTasks(BooleanSupplier)} are never
|
|
+ * invoked in parallel.
|
|
+ * It is required that when intermediate tasks are scheduled, that {@link ScheduledTaskThreadPool#notifyTasks(SchedulableTick)}
|
|
+ * is invoked for any scheduled task - otherwise, {@link #runTasks(BooleanSupplier)} may not be invoked to
|
|
+ * parse intermediate tasks.
|
|
+ * </p>
|
|
+ */
|
|
+ public static abstract class SchedulableTick {
|
|
+ private static final AtomicLong ID_GENERATOR = new AtomicLong();
|
|
+ public final long id = ID_GENERATOR.getAndIncrement();
|
|
+
|
|
+ private long scheduledStart = DEADLINE_NOT_SET;
|
|
+
|
|
+ private static final int STATE_UNSCHEDULED = 1 << 0;
|
|
+ private static final int STATE_SCHEDULED = 1 << 1;
|
|
+ private static final int STATE_SCHEDULED_TASKS = 1 << 2;
|
|
+ private static final int STATE_TICKING = 1 << 3;
|
|
+ private static final int STATE_TASKS = 1 << 4;
|
|
+ private static final int STATE_TICKING_CANCELLED = 1 << 5;
|
|
+ private static final int STATE_TASKS_CANCELLED = 1 << 6;
|
|
+ private static final int STATE_CANCELLED = 1 << 7;
|
|
+ private volatile int state = STATE_UNSCHEDULED;
|
|
+ private static final VarHandle STATE_HANDLE = ConcurrentUtil.getVarHandle(SchedulableTick.class, "state", int.class);
|
|
+
|
|
+ private volatile ScheduledTickTask task;
|
|
+
|
|
+ private int getStateVolatile() {
|
|
+ return (int)STATE_HANDLE.getVolatile(this);
|
|
+ }
|
|
+
|
|
+ private void setStateVolatile(final int value) {
|
|
+ STATE_HANDLE.setVolatile(this, value);
|
|
+ }
|
|
+
|
|
+ private int compareAndExchangeStateVolatile(final int expect, final int update) {
|
|
+ return (int)STATE_HANDLE.compareAndExchange(this, expect, update);
|
|
+ }
|
|
+
|
|
+ private boolean isScheduled() {
|
|
+ return this.getStateVolatile() == STATE_SCHEDULED;
|
|
+ }
|
|
+
|
|
+ private boolean upgradeToScheduledTasks() {
|
|
+ return STATE_SCHEDULED == this.compareAndExchangeStateVolatile(STATE_SCHEDULED, STATE_SCHEDULED_TASKS);
|
|
+ }
|
|
+
|
|
+ private boolean setScheduled() {
|
|
+ return STATE_UNSCHEDULED == this.compareAndExchangeStateVolatile(STATE_UNSCHEDULED, STATE_SCHEDULED);
|
|
+ }
|
|
+
|
|
+ private boolean setScheduledTasks() {
|
|
+ return STATE_UNSCHEDULED == this.compareAndExchangeStateVolatile(STATE_UNSCHEDULED, STATE_SCHEDULED_TASKS);
|
|
+ }
|
|
+
|
|
+ private boolean cancel() {
|
|
+ for (int currState = this.getStateVolatile();;) {
|
|
+ switch (currState) {
|
|
+ case STATE_UNSCHEDULED: {
|
|
+ return false;
|
|
+ }
|
|
+ case STATE_SCHEDULED:
|
|
+ case STATE_SCHEDULED_TASKS: {
|
|
+ if (currState == (currState = this.compareAndExchangeStateVolatile(currState, STATE_CANCELLED))) {
|
|
+ return true;
|
|
+ }
|
|
+ continue;
|
|
+ }
|
|
+ case STATE_TICKING: {
|
|
+ if (currState == (currState = this.compareAndExchangeStateVolatile(currState, STATE_TICKING_CANCELLED))) {
|
|
+ return true;
|
|
+ }
|
|
+ continue;
|
|
+ }
|
|
+ case STATE_TASKS: {
|
|
+ if (currState == (currState = this.compareAndExchangeStateVolatile(currState, STATE_TASKS_CANCELLED))) {
|
|
+ return true;
|
|
+ }
|
|
+ continue;
|
|
+ }
|
|
+ case STATE_TICKING_CANCELLED:
|
|
+ case STATE_TASKS_CANCELLED:
|
|
+ case STATE_CANCELLED: {
|
|
+ return false;
|
|
+ }
|
|
+ default: {
|
|
+ throw new IllegalStateException("Unknown state: " + currState);
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+
|
|
+ private boolean markTicking() {
|
|
+ for (int currState = this.getStateVolatile();;) {
|
|
+ switch (currState) {
|
|
+ case STATE_UNSCHEDULED: {
|
|
+ throw new IllegalStateException();
|
|
+ }
|
|
+ case STATE_SCHEDULED:
|
|
+ case STATE_SCHEDULED_TASKS: {
|
|
+ if (currState == (currState = this.compareAndExchangeStateVolatile(currState, STATE_TICKING))) {
|
|
+ return true;
|
|
+ }
|
|
+ continue;
|
|
+ }
|
|
+ case STATE_TICKING:
|
|
+ case STATE_TASKS:
|
|
+ case STATE_TICKING_CANCELLED:
|
|
+ case STATE_TASKS_CANCELLED:
|
|
+ case STATE_CANCELLED: {
|
|
+ return false;
|
|
+ }
|
|
+ default: {
|
|
+ throw new IllegalStateException("Unknown state: " + currState);
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+
|
|
+ private boolean markTasks() {
|
|
+ for (int currState = this.getStateVolatile();;) {
|
|
+ switch (currState) {
|
|
+ case STATE_UNSCHEDULED: {
|
|
+ throw new IllegalStateException();
|
|
+ }
|
|
+ case STATE_SCHEDULED:
|
|
+ case STATE_SCHEDULED_TASKS: {
|
|
+ if (currState == (currState = this.compareAndExchangeStateVolatile(currState, STATE_TASKS))) {
|
|
+ return true;
|
|
+ }
|
|
+ continue;
|
|
+ }
|
|
+ case STATE_TICKING:
|
|
+ case STATE_TASKS:
|
|
+ case STATE_TICKING_CANCELLED:
|
|
+ case STATE_TASKS_CANCELLED:
|
|
+ case STATE_CANCELLED: {
|
|
+ return false;
|
|
+ }
|
|
+ default: {
|
|
+ throw new IllegalStateException("Unknown state: " + currState);
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+
|
|
+ private boolean canBeTicked() {
|
|
+ final int currState = this.getStateVolatile();
|
|
+ switch (currState) {
|
|
+ case STATE_UNSCHEDULED: {
|
|
+ throw new IllegalStateException();
|
|
+ }
|
|
+ case STATE_SCHEDULED:
|
|
+ case STATE_SCHEDULED_TASKS: {
|
|
+ return true;
|
|
+ }
|
|
+ case STATE_TICKING:
|
|
+ case STATE_TASKS:
|
|
+ case STATE_TICKING_CANCELLED:
|
|
+ case STATE_TASKS_CANCELLED:
|
|
+ case STATE_CANCELLED: {
|
|
+ return false;
|
|
+ }
|
|
+ default: {
|
|
+ throw new IllegalStateException("Unknown state: " + currState);
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+
|
|
+ protected final long getScheduledStart() {
|
|
+ return this.scheduledStart;
|
|
+ }
|
|
+
|
|
+ /**
|
|
+ * If this task is scheduled, then this may only be invoked during {@link #runTick()}
|
|
+ */
|
|
+ protected final void setScheduledStart(final long value) {
|
|
+ this.scheduledStart = value;
|
|
+ }
|
|
+
|
|
+ /**
|
|
+ * Executes the tick.
|
|
+ * <p>
|
|
+ * It is the callee's responsibility to invoke {@link #setScheduledStart(long)} to adjust the start of
|
|
+ * the next tick.
|
|
+ * </p>
|
|
+ * @return {@code true} if the task should continue to be scheduled, {@code false} otherwise.
|
|
+ */
|
|
+ public abstract boolean runTick();
|
|
+
|
|
+ private boolean tick() {
|
|
+ if (!this.markTicking()) {
|
|
+ return false;
|
|
+ }
|
|
+
|
|
+ final boolean tickRes = this.runTick();
|
|
+
|
|
+ if (!tickRes) {
|
|
+ this.setStateVolatile(STATE_CANCELLED);
|
|
+ return false;
|
|
+ }
|
|
+
|
|
+ // move to scheduled
|
|
+ for (int currState = this.getStateVolatile();;) {
|
|
+ switch (currState) {
|
|
+ case STATE_TICKING: {
|
|
+ if (currState == (currState = this.compareAndExchangeStateVolatile(STATE_TICKING, STATE_SCHEDULED))) {
|
|
+ return true;
|
|
+ }
|
|
+ continue;
|
|
+ }
|
|
+ case STATE_TICKING_CANCELLED: {
|
|
+ if (currState == (currState = this.compareAndExchangeStateVolatile(STATE_TICKING_CANCELLED, STATE_CANCELLED))) {
|
|
+ return false;
|
|
+ }
|
|
+ continue;
|
|
+ }
|
|
+
|
|
+ default: {
|
|
+ throw new IllegalStateException("Unknown state: " + currState);
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+
|
|
+ /**
|
|
+ * Returns whether this task has any intermediate tasks that can be executed.
|
|
+ */
|
|
+ public abstract boolean hasTasks();
|
|
+
|
|
+ /**
|
|
+ * @return {@code true} if the task should continue to be scheduled, {@code false} otherwise.
|
|
+ */
|
|
+ public abstract boolean runTasks(final BooleanSupplier canContinue);
|
|
+
|
|
+ private boolean tasks(final BooleanSupplier canContinue) {
|
|
+ if (!this.markTasks()) {
|
|
+ return false;
|
|
+ }
|
|
+
|
|
+ final boolean taskRes = this.runTasks(canContinue);
|
|
+
|
|
+ if (!taskRes) {
|
|
+ this.setStateVolatile(STATE_CANCELLED);
|
|
+ return false;
|
|
+ }
|
|
+
|
|
+ // move to scheduled
|
|
+ for (int currState = this.getStateVolatile();;) {
|
|
+ switch (currState) {
|
|
+ case STATE_TASKS: {
|
|
+ if (currState == (currState = this.compareAndExchangeStateVolatile(STATE_TASKS, STATE_SCHEDULED))) {
|
|
+ return true;
|
|
+ }
|
|
+ continue;
|
|
+ }
|
|
+ case STATE_TASKS_CANCELLED: {
|
|
+ if (currState == (currState = this.compareAndExchangeStateVolatile(STATE_TASKS_CANCELLED, STATE_CANCELLED))) {
|
|
+ return false;
|
|
+ }
|
|
+ continue;
|
|
+ }
|
|
+ default: {
|
|
+ throw new IllegalStateException("Unknown state: " + currState);
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public String toString() {
|
|
+ return "SchedulableTick:{" +
|
|
+ "class=" + this.getClass().getName() + "," +
|
|
+ "state=" + this.state + ","
|
|
+ + "}";
|
|
+ }
|
|
+ }
|
|
+
|
|
+ private static final class WaitState {
|
|
+
|
|
+ private static final Comparator<WaitState> OLDEST_FIRST = (final WaitState w1, final WaitState w2) -> {
|
|
+ final long d1 = w1.deadline;
|
|
+ final long d2 = w2.deadline;
|
|
+
|
|
+ if (d1 == DEADLINE_NOT_SET && d2 != DEADLINE_NOT_SET) {
|
|
+ return -1;
|
|
+ }
|
|
+ if (d1 != DEADLINE_NOT_SET && d2 == DEADLINE_NOT_SET) {
|
|
+ return 1;
|
|
+ }
|
|
+
|
|
+ final int timeCmp = TimeUtil.compareTimes(d2, d1);
|
|
+ if (timeCmp != 0) {
|
|
+ return timeCmp;
|
|
+ }
|
|
+
|
|
+ return Long.signum(w2.id - w1.id);
|
|
+ };
|
|
+
|
|
+ private final long id;
|
|
+ private final long deadline; // set to DEADLINE_NOT_SET if idle
|
|
+ private final TickThreadRunner runner;
|
|
+
|
|
+ private WaitState(final long id, final long deadline, final TickThreadRunner runner) {
|
|
+ this.id = id;
|
|
+ this.deadline = deadline;
|
|
+ this.runner = runner;
|
|
+ }
|
|
+ }
|
|
+
|
|
+ private static final class TickThreadRunner implements Runnable {
|
|
+
|
|
+ private final ScheduledTaskThreadPool scheduler;
|
|
+ private final long id;
|
|
+ private Thread thread;
|
|
+
|
|
+ // no scheduled ticks
|
|
+ private static final int STATE_IDLE = 1 << 0;
|
|
+ private static final int STATE_WAITING = 1 << 1;
|
|
+ private static final int STATE_TASKS = 1 << 2;
|
|
+ private static final int STATE_INTERRUPT = 1 << 3;
|
|
+ private static final int STATE_TICKING = 1 << 4;
|
|
+ private static final int STATE_HALTED = 1 << 5;
|
|
+ private volatile int state = STATE_INTERRUPT; // set to INTERRUPT initially so that tasks may be stolen on start
|
|
+ private static final VarHandle STATE_HANDLE = ConcurrentUtil.getVarHandle(TickThreadRunner.class, "state", int.class);
|
|
+
|
|
+ private WaitState waitState;
|
|
+ private ScheduledTickTask watch;
|
|
+
|
|
+ private final ConcurrentSkipListMap<ScheduledTickTask, ScheduledTickTask> scheduledTicks = new ConcurrentSkipListMap<>(ScheduledTickTask.TICK_COMPARATOR);
|
|
+ private final ConcurrentSkipListMap<ScheduledTickTask, ScheduledTickTask> scheduledTasks = new ConcurrentSkipListMap<>(ScheduledTickTask.TASK_COMPARATOR);
|
|
+
|
|
+ public TickThreadRunner(final ScheduledTaskThreadPool scheduler, final long id) {
|
|
+ this.scheduler = scheduler;
|
|
+ this.id = id;
|
|
+ }
|
|
+
|
|
+ private int getStateVolatile() {
|
|
+ return (int)STATE_HANDLE.getVolatile(this);
|
|
+ }
|
|
+
|
|
+ private void setStateVolatile(final int value) {
|
|
+ STATE_HANDLE.setVolatile(this, value);
|
|
+ }
|
|
+
|
|
+ private int compareAndExchangeStateVolatile(final int expect, final int update) {
|
|
+ return (int)STATE_HANDLE.compareAndExchange(this, expect, update);
|
|
+ }
|
|
+
|
|
+ private boolean interruptIfWaiting() {
|
|
+ for (int curr = this.getStateVolatile();;) {
|
|
+ switch (curr) {
|
|
+ case STATE_INTERRUPT:
|
|
+ case STATE_TASKS:
|
|
+ case STATE_TICKING:
|
|
+ case STATE_HALTED: {
|
|
+ return false;
|
|
+ }
|
|
+ case STATE_IDLE:
|
|
+ case STATE_WAITING: {
|
|
+ if (curr == (curr = this.compareAndExchangeStateVolatile(curr, STATE_INTERRUPT))) {
|
|
+ LockSupport.unpark(this.thread);
|
|
+ return true;
|
|
+ }
|
|
+ continue;
|
|
+ }
|
|
+
|
|
+ default: {
|
|
+ throw new IllegalStateException("Unknown state: " + curr);
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+
|
|
+ private boolean interrupt() {
|
|
+ for (int curr = this.getStateVolatile();;) {
|
|
+ switch (curr) {
|
|
+ case STATE_INTERRUPT:
|
|
+ case STATE_TICKING:
|
|
+ case STATE_HALTED: {
|
|
+ return false;
|
|
+ }
|
|
+ case STATE_IDLE:
|
|
+ case STATE_WAITING:
|
|
+ case STATE_TASKS: {
|
|
+ if (curr == (curr = this.compareAndExchangeStateVolatile(curr, STATE_INTERRUPT))) {
|
|
+ if (curr == STATE_IDLE || curr == STATE_WAITING) {
|
|
+ LockSupport.unpark(this.thread);
|
|
+ }
|
|
+ return true;
|
|
+ }
|
|
+ continue;
|
|
+ }
|
|
+
|
|
+ default: {
|
|
+ throw new IllegalStateException("Unknown state: " + curr);
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+
|
|
+ private void halt() {
|
|
+ for (int curr = this.getStateVolatile();;) {
|
|
+ switch (curr) {
|
|
+ case STATE_HALTED: {
|
|
+ return;
|
|
+ }
|
|
+ case STATE_IDLE:
|
|
+ case STATE_WAITING:
|
|
+ case STATE_TASKS:
|
|
+ case STATE_INTERRUPT:
|
|
+ case STATE_TICKING: {
|
|
+ if (curr == (curr = this.compareAndExchangeStateVolatile(curr, STATE_HALTED))) {
|
|
+ if (curr == STATE_IDLE || curr == STATE_WAITING) {
|
|
+ LockSupport.unpark(this.thread);
|
|
+ }
|
|
+ return;
|
|
+ }
|
|
+ continue;
|
|
+ }
|
|
+
|
|
+ default: {
|
|
+ throw new IllegalStateException("Unknown state: " + curr);
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+
|
|
+ private boolean isHalted() {
|
|
+ return STATE_HALTED == this.getStateVolatile();
|
|
+ }
|
|
+
|
|
+ private void setupWaitState(final long deadline) {
|
|
+ if (this.waitState != null) {
|
|
+ throw new IllegalStateException("Waitstate already set");
|
|
+ }
|
|
+ this.waitState = new WaitState(this.id, deadline, this);
|
|
+ this.scheduler.waitingOrIdleRunners.put(this.waitState, this.waitState);
|
|
+ }
|
|
+
|
|
+ private void cleanWaitState() {
|
|
+ this.scheduler.waitingOrIdleRunners.remove(this.waitState);
|
|
+ this.waitState = null;
|
|
+ }
|
|
+
|
|
+ private ScheduledTickTask findTick() {
|
|
+ while (this.getStateVolatile() == STATE_WAITING) {
|
|
+ final ScheduledTickTask globalFirst = findFirstNonTakenNonWatched(this.scheduler.unwatchedScheduledTicks);
|
|
+ final ScheduledTickTask ourFirst = findFirstNonTaken(this.scheduledTicks);
|
|
+
|
|
+ final ScheduledTickTask toWaitFor;
|
|
+ if (globalFirst == null) {
|
|
+ toWaitFor = ourFirst;
|
|
+ } else if (ourFirst == null) {
|
|
+ toWaitFor = globalFirst;
|
|
+ } else {
|
|
+ final long globalStart = globalFirst.tickStart + this.scheduler.stealThresholdNS;
|
|
+ final long ourStart = ourFirst.tickStart;
|
|
+
|
|
+ toWaitFor = ourStart - globalStart <= 0L ? ourFirst : globalFirst;
|
|
+ }
|
|
+
|
|
+ if (toWaitFor == null) {
|
|
+ // no tasks are scheduled
|
|
+
|
|
+ // move to idle state
|
|
+ this.setupWaitState(DEADLINE_NOT_SET);
|
|
+ this.compareAndExchangeStateVolatile(STATE_WAITING, STATE_IDLE);
|
|
+ if (!this.scheduledTicks.isEmpty() || !this.scheduler.unwatchedScheduledTicks.isEmpty()) {
|
|
+ // handle race condition: task added before we moved to idle
|
|
+ this.interrupt();
|
|
+ }
|
|
+
|
|
+ return toWaitFor;
|
|
+ }
|
|
+
|
|
+ if (toWaitFor == globalFirst) {
|
|
+ if (toWaitFor.watch()) {
|
|
+ this.scheduler.unwatchedScheduledTicks.remove(toWaitFor);
|
|
+ this.watch = toWaitFor;
|
|
+ } else if (toWaitFor != ourFirst) {
|
|
+ continue;
|
|
+ } // else: failed to watch, but we are waiting for our task
|
|
+ }
|
|
+
|
|
+ return toWaitFor;
|
|
+ }
|
|
+
|
|
+ return null;
|
|
+ }
|
|
+
|
|
+ private void cleanupWatch(final boolean wakeThread) {
|
|
+ if (this.watch != null) {
|
|
+ this.watch.unwatch();
|
|
+
|
|
+ if (!this.watch.isTaken()) {
|
|
+ this.scheduler.unwatchedScheduledTicks.put(this.watch, this.watch);
|
|
+
|
|
+ if (wakeThread) {
|
|
+ for (;;) {
|
|
+ final WaitState latestWaiter = firstEntry(this.scheduler.waitingOrIdleRunners);
|
|
+ if (latestWaiter == null) {
|
|
+ break;
|
|
+ }
|
|
+
|
|
+ // note: if the task is owned by the waiter, then its deadline should already be <= watch tick start
|
|
+ if (latestWaiter.deadline == DEADLINE_NOT_SET || latestWaiter.deadline - (this.watch.tickStart + this.scheduler.stealThresholdNS) > 0L) {
|
|
+ if (this.scheduler.waitingOrIdleRunners.remove(latestWaiter) == null || !latestWaiter.runner.interrupt()) {
|
|
+ continue;
|
|
+ }
|
|
+ break;
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+
|
|
+ this.watch = null;
|
|
+ }
|
|
+ }
|
|
+
|
|
+ private boolean findEarlierTask(final ScheduledTickTask task) {
|
|
+ final ScheduledTickTask globalFirst = findFirstNonTakenNonWatched(this.scheduler.unwatchedScheduledTicks);
|
|
+ final ScheduledTickTask ourFirst = findFirstNonTaken(this.scheduledTicks);
|
|
+
|
|
+ if (globalFirst != null && (globalFirst.tickStart + this.scheduler.stealThresholdNS) - task.tickStart < 0L) {
|
|
+ return true;
|
|
+ }
|
|
+
|
|
+ if (ourFirst != null && ourFirst.tickStart - task.tickStart < 0L) {
|
|
+ return true;
|
|
+ }
|
|
+
|
|
+ return false;
|
|
+ }
|
|
+
|
|
+ private void reinsert(final ScheduledTickTask tick, final TickThreadRunner owner) {
|
|
+ final ScheduledTickTask newTask = tick.tick.task = new ScheduledTickTask(
|
|
+ tick.tick, tick.tick.getScheduledStart(), DEADLINE_NOT_SET, owner
|
|
+ );
|
|
+
|
|
+ this.scheduler.unwatchedScheduledTicks.put(newTask, newTask);
|
|
+ if (owner != null) {
|
|
+ owner.scheduledTicks.put(newTask, newTask);
|
|
+ }
|
|
+
|
|
+ if (newTask.tick.hasTasks()) {
|
|
+ this.scheduler.notifyTasks(newTask.tick);
|
|
+ }
|
|
+ }
|
|
+
|
|
+ private void runTasks(final ScheduledTickTask tick, final long deadline) {
|
|
+ if (STATE_WAITING != this.compareAndExchangeStateVolatile(STATE_WAITING, STATE_TASKS)) {
|
|
+ // interrupted or halted
|
|
+ return;
|
|
+ }
|
|
+
|
|
+ if (!tick.take()) {
|
|
+ this.compareAndExchangeStateVolatile(STATE_TASKS, STATE_WAITING);
|
|
+ return;
|
|
+ }
|
|
+
|
|
+ // remove from queues
|
|
+ this.scheduler.unwatchedScheduledTicks.remove(tick);
|
|
+ this.scheduler.scheduledTasks.remove(tick);
|
|
+ if (tick.owner != null) {
|
|
+ tick.owner.scheduledTicks.remove(tick);
|
|
+ tick.owner.scheduledTasks.remove(tick);
|
|
+ }
|
|
+
|
|
+ final BooleanSupplier canContinue = () -> {
|
|
+ return TickThreadRunner.this.getStateVolatile() == STATE_TASKS && (System.nanoTime() - deadline < 0L);
|
|
+ };
|
|
+
|
|
+ if (tick.tick.tasks(canContinue)) {
|
|
+ this.reinsert(tick, tick.owner == null ? this : tick.owner);
|
|
+ }
|
|
+
|
|
+ this.compareAndExchangeStateVolatile(STATE_TASKS, STATE_WAITING);
|
|
+ }
|
|
+
|
|
+ private ScheduledTickTask findTaskNotBehind(final ConcurrentSkipListMap<ScheduledTickTask, ScheduledTickTask> map,
|
|
+ final long timeNow) {
|
|
+ for (final Iterator<ScheduledTickTask> iterator = map.keySet().iterator(); iterator.hasNext();) {
|
|
+ final ScheduledTickTask task = iterator.next();
|
|
+ if (task.isTaken()) {
|
|
+ iterator.remove();
|
|
+ continue;
|
|
+ }
|
|
+
|
|
+ // try to avoid stealing tasks accidentally by subtracting the steal threshold instead
|
|
+ final long tickStart = task.owner == this ? task.tickStart : task.tickStart - this.scheduler.stealThresholdNS;
|
|
+
|
|
+ if (tickStart - timeNow <= 0L) {
|
|
+ // skip tasks already behind
|
|
+ continue;
|
|
+ }
|
|
+
|
|
+ return task;
|
|
+ }
|
|
+
|
|
+ return null;
|
|
+ }
|
|
+
|
|
+ private ScheduledTickTask waitForTick() {
|
|
+ final ScheduledTickTask tick = this.findTick();
|
|
+
|
|
+ if (tick == null) {
|
|
+ return tick;
|
|
+ }
|
|
+
|
|
+ final long tickDeadline = tick.owner == this ? tick.tickStart : tick.tickStart + this.scheduler.stealThresholdNS;
|
|
+
|
|
+ this.setupWaitState(tickDeadline);
|
|
+ // should already be in STATE_WAITING (unless interrupted)
|
|
+
|
|
+ for (;;) {
|
|
+ if (this.getStateVolatile() != STATE_WAITING || tick.isTaken() || this.findEarlierTask(tick)) {
|
|
+ this.cleanupWatch(false);
|
|
+ this.cleanWaitState();
|
|
+ // start of loop may only be idle or interrupt
|
|
+ this.interrupt();
|
|
+ return null;
|
|
+ }
|
|
+
|
|
+ final long timeNow = System.nanoTime();
|
|
+
|
|
+ final ScheduledTickTask ourTask = findFirstNonTaken(this.scheduledTasks);
|
|
+ // avoid stealing global tasks that are behind schedule
|
|
+ final ScheduledTickTask globalTask = this.findTaskNotBehind(this.scheduler.scheduledTasks, timeNow);
|
|
+
|
|
+ if (timeNow - tickDeadline >= 0L) {
|
|
+ if (!tick.take()) {
|
|
+ continue;
|
|
+ }
|
|
+ this.cleanWaitState();
|
|
+ return tick;
|
|
+ }
|
|
+
|
|
+ if (ourTask == null && globalTask == null) {
|
|
+ // nothing to do, so just park
|
|
+ Thread.interrupted();
|
|
+ LockSupport.parkNanos("waiting", tickDeadline - timeNow);
|
|
+ continue;
|
|
+ }
|
|
+
|
|
+ final ScheduledTickTask toTask;
|
|
+ if (ourTask == null) {
|
|
+ toTask = globalTask;
|
|
+ } else if (globalTask == null) {
|
|
+ toTask = ourTask;
|
|
+ } else {
|
|
+ // try to use global task only if it is behind
|
|
+ if (globalTask.getLastTaskNotify() + this.scheduler.stealThresholdNS - ourTask.getLastTaskNotify() < 0L) {
|
|
+ final int ownerState = globalTask.owner == null ? STATE_HALTED : globalTask.owner.getStateVolatile();
|
|
+ // steal if not idle, waiting, interrupt
|
|
+ switch (ownerState) {
|
|
+ case STATE_IDLE:
|
|
+ case STATE_WAITING:
|
|
+ case STATE_INTERRUPT: {
|
|
+ // owner will probably get to it
|
|
+ toTask = ourTask;
|
|
+ break;
|
|
+ }
|
|
+ default: {
|
|
+ toTask = globalTask;
|
|
+ break;
|
|
+ }
|
|
+ }
|
|
+ } else {
|
|
+ toTask = ourTask;
|
|
+ }
|
|
+ }
|
|
+
|
|
+ long deadline = tickDeadline;
|
|
+ deadline = Math.min(deadline, timeNow + this.scheduler.taskTimeSliceNS);
|
|
+ deadline = Math.min(deadline, toTask.tickStart);
|
|
+
|
|
+ // before parsing tasks: were we interrupted?
|
|
+ if (this.getStateVolatile() != STATE_WAITING) {
|
|
+ continue;
|
|
+ }
|
|
+
|
|
+ this.runTasks(toTask, deadline);
|
|
+ }
|
|
+ }
|
|
+
|
|
+ private boolean moveToTickingState() {
|
|
+ for (int curr = this.getStateVolatile();;) {
|
|
+ switch (curr) {
|
|
+ case STATE_HALTED: {
|
|
+ return false;
|
|
+ }
|
|
+ case STATE_WAITING:
|
|
+ case STATE_INTERRUPT: { // interrupted too late!
|
|
+ if (curr == (curr = this.compareAndExchangeStateVolatile(curr, STATE_TICKING))) {
|
|
+ if (curr == STATE_INTERRUPT) {
|
|
+ // pass interrupt to another thread
|
|
+ this.scheduler.interruptOneRunner();
|
|
+ }
|
|
+ return true;
|
|
+ }
|
|
+ continue;
|
|
+ }
|
|
+
|
|
+ default: {
|
|
+ throw new IllegalStateException("Unknown state: " + curr);
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+
|
|
+ private void doTick(final ScheduledTickTask tick) {
|
|
+ if (tick.tick.tick()) {
|
|
+ this.reinsert(tick, this);
|
|
+ }
|
|
+ }
|
|
+
|
|
+ private void doRun() {
|
|
+ for (;;) {
|
|
+ if (this.waitState != null) {
|
|
+ if (this.waitState.deadline != DEADLINE_NOT_SET) {
|
|
+ throw new IllegalStateException();
|
|
+ }
|
|
+
|
|
+ while (this.getStateVolatile() == STATE_IDLE) {
|
|
+ Thread.interrupted();
|
|
+ LockSupport.park("idling");
|
|
+ }
|
|
+
|
|
+ this.scheduler.waitingOrIdleRunners.remove(this.waitState);
|
|
+ this.waitState = null;
|
|
+ }
|
|
+
|
|
+ final int currState = this.compareAndExchangeStateVolatile(STATE_INTERRUPT, STATE_WAITING);
|
|
+ if (currState == STATE_HALTED) {
|
|
+ return;
|
|
+ } else if (currState != STATE_INTERRUPT) {
|
|
+ throw new IllegalStateException("State must be HALTED or INTERRUPT at beginning of run loop");
|
|
+ }
|
|
+
|
|
+ while (this.getStateVolatile() == STATE_WAITING) {
|
|
+ // try to find a tick
|
|
+ final ScheduledTickTask toTick = this.waitForTick();
|
|
+
|
|
+ if (toTick == null) {
|
|
+ // no ticks -> go idle
|
|
+ // waitstate should already be setup
|
|
+ // OR we were interrupted and need to clear state
|
|
+ // OR we were halted and need to exit
|
|
+ break;
|
|
+ }
|
|
+
|
|
+ if (!this.moveToTickingState()) {
|
|
+ // halted
|
|
+ // re-insert task, since we took it
|
|
+ this.scheduler.insert(toTick.tick, toTick.tick.hasTasks());
|
|
+ break;
|
|
+ }
|
|
+
|
|
+ this.doTick(toTick);
|
|
+
|
|
+ if (STATE_TICKING != this.compareAndExchangeStateVolatile(STATE_TICKING, STATE_WAITING)) {
|
|
+ // halted
|
|
+ break;
|
|
+ }
|
|
+
|
|
+ continue;
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+
|
|
+ private void begin() {
|
|
+ this.setupWaitState(DEADLINE_NOT_SET);
|
|
+ }
|
|
+
|
|
+ private void die() {
|
|
+ // note: this should also handle unexpected shutdowns gracefully
|
|
+
|
|
+ this.cleanupWatch(false);
|
|
+ if (this.waitState != null) {
|
|
+ this.scheduler.waitingOrIdleRunners.remove(this.waitState);
|
|
+ this.waitState = null;
|
|
+ }
|
|
+ this.scheduler.aliveThreads.remove(this);
|
|
+ if (this.getStateVolatile() == STATE_HALTED) {
|
|
+ // start task stealing for our tasks
|
|
+ this.scheduler.interruptAllRunners();
|
|
+ }
|
|
+ }
|
|
+
|
|
+ @Override
|
|
+ public void run() {
|
|
+ try {
|
|
+ this.begin();
|
|
+ this.doRun();
|
|
+ } finally {
|
|
+ this.die();
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+
|
|
+ private static final class ScheduledTickTask {
|
|
+ private static final Comparator<ScheduledTickTask> TICK_COMPARATOR = (final ScheduledTickTask t1, final ScheduledTickTask t2) -> {
|
|
+ final int timeCmp = TimeUtil.compareTimes(t1.tickStart, t2.tickStart);
|
|
+ if (timeCmp != 0L) {
|
|
+ return timeCmp;
|
|
+ }
|
|
+
|
|
+ return Long.signum(t1.tick.id - t2.tick.id);
|
|
+ };
|
|
+ private static final Comparator<ScheduledTickTask> TASK_COMPARATOR = (final ScheduledTickTask t1, final ScheduledTickTask t2) -> {
|
|
+ final int timeCmp = TimeUtil.compareTimes(t1.lastTaskNotify, t2.lastTaskNotify);
|
|
+ if (timeCmp != 0L) {
|
|
+ return timeCmp;
|
|
+ }
|
|
+
|
|
+ return Long.signum(t1.tick.id - t2.tick.id);
|
|
+ };
|
|
+
|
|
+ private final SchedulableTick tick;
|
|
+ private final long tickStart;
|
|
+ private long lastTaskNotify;
|
|
+ private final TickThreadRunner owner;
|
|
+
|
|
+ private volatile boolean taken;
|
|
+ private static final VarHandle TAKEN_HANDLE = ConcurrentUtil.getVarHandle(ScheduledTickTask.class, "taken", boolean.class);
|
|
+ private volatile boolean watched;
|
|
+ private static final VarHandle WATCHED_HANDLE = ConcurrentUtil.getVarHandle(ScheduledTickTask.class, "watched", boolean.class);
|
|
+
|
|
+ private ScheduledTickTask(final SchedulableTick tick, final long tickStart, final long lastTaskNotify,
|
|
+ final TickThreadRunner owner) {
|
|
+ this.tick = tick;
|
|
+ this.tickStart = tickStart;
|
|
+ this.lastTaskNotify = lastTaskNotify;
|
|
+ this.owner = owner;
|
|
+ }
|
|
+
|
|
+ public boolean take() {
|
|
+ return !(boolean)TAKEN_HANDLE.getVolatile(this) && !(boolean)TAKEN_HANDLE.compareAndExchange(this, false, true);
|
|
+ }
|
|
+
|
|
+ public boolean isTaken() {
|
|
+ return (boolean)TAKEN_HANDLE.getVolatile(this);
|
|
+ }
|
|
+
|
|
+ public boolean watch() {
|
|
+ return !(boolean)WATCHED_HANDLE.getVolatile(this) && !(boolean)WATCHED_HANDLE.compareAndExchange(this, false, true);
|
|
+ }
|
|
+
|
|
+ public boolean unwatch() {
|
|
+ return (boolean)WATCHED_HANDLE.compareAndExchange(this, true, false);
|
|
+ }
|
|
+
|
|
+ public boolean isWatched() {
|
|
+ return (boolean)TAKEN_HANDLE.getVolatile(this);
|
|
+ }
|
|
+
|
|
+ public long getLastTaskNotify() {
|
|
+ return this.lastTaskNotify;
|
|
+ }
|
|
+
|
|
+ public void setLastTaskNotify(final long value) {
|
|
+ this.lastTaskNotify = value;
|
|
+ }
|
|
+ }
|
|
+}
|
|
diff --git a/io/papermc/paper/threadedregions/TickData.java b/io/papermc/paper/threadedregions/TickData.java
|
|
index d4d80a69488f57704f1b3dc74cb379de36e80ec0..0383e4dcd611a7568597f46308060f3d7288a564 100644
|
|
--- a/io/papermc/paper/threadedregions/TickData.java
|
|
+++ b/io/papermc/paper/threadedregions/TickData.java
|
|
@@ -20,6 +20,10 @@ public final class TickData {
|
|
}
|
|
|
|
public void addDataFrom(final TickRegionScheduler.TickTime time) {
|
|
+ if (!time.isTickExecution()) {
|
|
+ // TODO fix later
|
|
+ return;
|
|
+ }
|
|
final long start = time.tickStart();
|
|
|
|
TickRegionScheduler.TickTime first;
|
|
diff --git a/io/papermc/paper/threadedregions/TickRegionScheduler.java b/io/papermc/paper/threadedregions/TickRegionScheduler.java
|
|
index 7123b3eb2f2e52946b8ef9de993a6828eb0bb6f7..a8608e8bed64a4da4ed340ab3837b082d6715437 100644
|
|
--- a/io/papermc/paper/threadedregions/TickRegionScheduler.java
|
|
+++ b/io/papermc/paper/threadedregions/TickRegionScheduler.java
|
|
@@ -1,6 +1,5 @@
|
|
package io.papermc.paper.threadedregions;
|
|
|
|
-import ca.spottedleaf.concurrentutil.scheduler.SchedulerThreadPool;
|
|
import ca.spottedleaf.concurrentutil.util.TimeUtil;
|
|
import ca.spottedleaf.moonrise.common.util.TickThread;
|
|
import com.mojang.logging.LogUtils;
|
|
@@ -41,10 +40,10 @@ public final class TickRegionScheduler {
|
|
}
|
|
// Folia end - watchdog
|
|
|
|
- private final SchedulerThreadPool scheduler;
|
|
+ private final ScheduledTaskThreadPool scheduler;
|
|
|
|
- public TickRegionScheduler(final int threads) {
|
|
- this.scheduler = new SchedulerThreadPool(threads, new ThreadFactory() {
|
|
+ public TickRegionScheduler() {
|
|
+ this.scheduler = new ScheduledTaskThreadPool(new ThreadFactory() {
|
|
private final AtomicInteger idGenerator = new AtomicInteger();
|
|
|
|
@Override
|
|
@@ -53,11 +52,15 @@ public final class TickRegionScheduler {
|
|
ret.setUncaughtExceptionHandler(TickRegionScheduler.this::uncaughtException);
|
|
return ret;
|
|
}
|
|
- });
|
|
+ }, TimeUnit.MILLISECONDS.toNanos(3L), TimeUnit.MILLISECONDS.toNanos(2L));
|
|
+ }
|
|
+
|
|
+ public void setThreads(final int threads) {
|
|
+ this.scheduler.setCoreThreads(threads);
|
|
}
|
|
|
|
public int getTotalThreadCount() {
|
|
- return this.scheduler.getThreads().length;
|
|
+ return this.scheduler.getAliveThreads().length;
|
|
}
|
|
|
|
private static void setTickingRegion(final ThreadedRegionizer.ThreadedRegion<TickRegions.TickRegionData, TickRegions.TickRegionSectionData> region) {
|
|
@@ -84,7 +87,7 @@ public final class TickRegionScheduler {
|
|
}
|
|
}
|
|
|
|
- private static void setTickTask(final SchedulerThreadPool.SchedulableTick task) {
|
|
+ private static void setTickTask(final ScheduledTaskThreadPool.SchedulableTick task) {
|
|
final Thread currThread = Thread.currentThread();
|
|
if (!(currThread instanceof TickThreadRunner tickThreadRunner)) {
|
|
throw new IllegalStateException("Must be tick thread runner");
|
|
@@ -127,7 +130,7 @@ public final class TickRegionScheduler {
|
|
* Returns the current ticking task, or {@code null} if there is no ticking region.
|
|
* If this thread is not a TickThread, then returns {@code null}.
|
|
*/
|
|
- public static SchedulerThreadPool.SchedulableTick getCurrentTickingTask() {
|
|
+ public static ScheduledTaskThreadPool.SchedulableTick getCurrentTickingTask() {
|
|
final Thread currThread = Thread.currentThread();
|
|
if (!(currThread instanceof TickThreadRunner tickThreadRunner)) {
|
|
return null;
|
|
@@ -165,22 +168,17 @@ public final class TickRegionScheduler {
|
|
region.markNonSchedulable();
|
|
}
|
|
|
|
- /**
|
|
- * Updates the tick start to the farthest into the future of its current scheduled time and the
|
|
- * provided time.
|
|
- * @return {@code false} if the region was not scheduled or is currently ticking or the specified time is less-than its
|
|
- * current start time, {@code true} if the next tick start was adjusted.
|
|
- */
|
|
- public boolean updateTickStartToMax(final RegionScheduleHandle region, final long newStart) {
|
|
- return this.scheduler.updateTickStartToMax(region, newStart);
|
|
- }
|
|
-
|
|
public boolean halt(final boolean sync, final long maxWaitNS) {
|
|
- return this.scheduler.halt(sync, maxWaitNS);
|
|
+ this.scheduler.halt();
|
|
+ if (!sync) {
|
|
+ return this.scheduler.getAliveThreads().length == 0;
|
|
+ }
|
|
+
|
|
+ return this.scheduler.join(maxWaitNS == 0L ? 0L : Math.max(1L, TimeUnit.NANOSECONDS.toMillis(maxWaitNS)));
|
|
}
|
|
|
|
void dumpAliveThreadTraces(final String reason) {
|
|
- for (final Thread thread : this.scheduler.getThreads()) {
|
|
+ for (final Thread thread : this.scheduler.getAliveThreads()) {
|
|
if (thread.isAlive()) {
|
|
TraceUtil.dumpTraceForThread(thread, reason);
|
|
}
|
|
@@ -191,16 +189,12 @@ public final class TickRegionScheduler {
|
|
this.scheduler.notifyTasks(region);
|
|
}
|
|
|
|
- public void init() {
|
|
- this.scheduler.start();
|
|
- }
|
|
-
|
|
private void uncaughtException(final Thread thread, final Throwable thr) {
|
|
LOGGER.error("Uncaught exception in tick thread \"" + thread.getName() + "\"", thr);
|
|
|
|
// prevent further ticks from occurring
|
|
// we CANNOT sync, because WE ARE ON A SCHEDULER THREAD
|
|
- this.scheduler.halt(false, 0L);
|
|
+ this.scheduler.halt();
|
|
|
|
MinecraftServer.getServer().stopServer();
|
|
}
|
|
@@ -210,7 +204,7 @@ public final class TickRegionScheduler {
|
|
|
|
// prevent further ticks from occurring
|
|
// we CANNOT sync, because WE ARE ON A SCHEDULER THREAD
|
|
- this.scheduler.halt(false, 0L);
|
|
+ this.scheduler.halt();
|
|
|
|
final ChunkPos center = handle.region == null ? null : handle.region.region.getCenterChunk();
|
|
final ServerLevel world = handle.region == null ? null : handle.region.world;
|
|
@@ -226,7 +220,7 @@ public final class TickRegionScheduler {
|
|
|
|
private ThreadedRegionizer.ThreadedRegion<TickRegions.TickRegionData, TickRegions.TickRegionSectionData> currentTickingRegion;
|
|
private RegionizedWorldData currentTickingWorldRegionizedData;
|
|
- private SchedulerThreadPool.SchedulableTick currentTickingTask;
|
|
+ private ScheduledTaskThreadPool.SchedulableTick currentTickingTask;
|
|
// Folia start - profiler
|
|
private ca.spottedleaf.leafprofiler.RegionizedProfiler.Handle profiler = ca.spottedleaf.leafprofiler.RegionizedProfiler.Handle.NO_OP_HANDLE;
|
|
// Folia end - profiler
|
|
@@ -236,7 +230,7 @@ public final class TickRegionScheduler {
|
|
}
|
|
}
|
|
|
|
- public static abstract class RegionScheduleHandle extends SchedulerThreadPool.SchedulableTick {
|
|
+ public static abstract class RegionScheduleHandle extends ScheduledTaskThreadPool.SchedulableTick {
|
|
|
|
protected long currentTick;
|
|
protected long lastTickStart;
|
|
@@ -258,7 +252,7 @@ public final class TickRegionScheduler {
|
|
|
|
public RegionScheduleHandle(final TickRegions.TickRegionData region, final long firstStart) {
|
|
this.currentTick = 0L;
|
|
- this.lastTickStart = SchedulerThreadPool.DEADLINE_NOT_SET;
|
|
+ this.lastTickStart = ScheduledTaskThreadPool.DEADLINE_NOT_SET;
|
|
this.tickTimes5s = new TickData(TimeUnit.SECONDS.toNanos(5L));
|
|
this.tickTimes15s = new TickData(TimeUnit.SECONDS.toNanos(15L));
|
|
this.tickTimes1m = new TickData(TimeUnit.MINUTES.toNanos(1L));
|
|
@@ -267,16 +261,16 @@ public final class TickRegionScheduler {
|
|
this.region = region;
|
|
|
|
this.setScheduledStart(firstStart);
|
|
- this.tickSchedule = new Schedule(firstStart == SchedulerThreadPool.DEADLINE_NOT_SET ? firstStart : firstStart - TIME_BETWEEN_TICKS);
|
|
+ this.tickSchedule = new Schedule(firstStart == ScheduledTaskThreadPool.DEADLINE_NOT_SET ? firstStart : firstStart - TIME_BETWEEN_TICKS);
|
|
}
|
|
|
|
/**
|
|
- * Subclasses should call this instead of {@link ca.spottedleaf.concurrentutil.scheduler.SchedulerThreadPool.SchedulableTick#setScheduledStart(long)}
|
|
+ * Subclasses should call this instead of {@link #setScheduledStart(long)}
|
|
* so that the tick schedule and scheduled start remain synchronised
|
|
*/
|
|
protected final void updateScheduledStart(final long to) {
|
|
this.setScheduledStart(to);
|
|
- this.tickSchedule.setLastPeriod(to == SchedulerThreadPool.DEADLINE_NOT_SET ? to : to - TIME_BETWEEN_TICKS);
|
|
+ this.tickSchedule.setLastPeriod(to == ScheduledTaskThreadPool.DEADLINE_NOT_SET ? to : to - TIME_BETWEEN_TICKS);
|
|
}
|
|
|
|
public final void markNonSchedulable() {
|
|
@@ -293,7 +287,7 @@ public final class TickRegionScheduler {
|
|
|
|
protected abstract void tickRegion(final int tickCount, final long startTime, final long scheduledEnd);
|
|
|
|
- protected abstract boolean runRegionTasks(final BooleanSupplier canContinue);
|
|
+ protected abstract void runRegionTasks(final BooleanSupplier canContinue);
|
|
|
|
protected abstract boolean hasIntermediateTasks();
|
|
|
|
@@ -303,9 +297,9 @@ public final class TickRegionScheduler {
|
|
}
|
|
|
|
@Override
|
|
- public final Boolean runTasks(final BooleanSupplier canContinue) {
|
|
+ public final boolean runTasks(final BooleanSupplier canContinue) {
|
|
if (this.cancelled.get()) {
|
|
- return null;
|
|
+ return false;
|
|
}
|
|
|
|
final long cpuStart = MEASURE_CPU_TIME ? THREAD_MX_BEAN.getCurrentThreadCpuTime() : 0L;
|
|
@@ -316,7 +310,7 @@ public final class TickRegionScheduler {
|
|
throw new IllegalStateException("Scheduled region should be acquirable");
|
|
}
|
|
// region was killed
|
|
- return null;
|
|
+ return false;
|
|
}
|
|
|
|
TickRegionScheduler.setTickTask(this);
|
|
@@ -326,31 +320,30 @@ public final class TickRegionScheduler {
|
|
|
|
synchronized (this) {
|
|
this.currentTickData = new TickTime(
|
|
- SchedulerThreadPool.DEADLINE_NOT_SET, SchedulerThreadPool.DEADLINE_NOT_SET, tickStart, cpuStart,
|
|
- SchedulerThreadPool.DEADLINE_NOT_SET, SchedulerThreadPool.DEADLINE_NOT_SET, MEASURE_CPU_TIME,
|
|
+ ScheduledTaskThreadPool.DEADLINE_NOT_SET, ScheduledTaskThreadPool.DEADLINE_NOT_SET, tickStart, cpuStart,
|
|
+ ScheduledTaskThreadPool.DEADLINE_NOT_SET, ScheduledTaskThreadPool.DEADLINE_NOT_SET, MEASURE_CPU_TIME,
|
|
false
|
|
);
|
|
this.currentTickingThread = Thread.currentThread();
|
|
}
|
|
|
|
- final boolean ret;
|
|
final FoliaWatchdogThread.RunningTick runningTick = new FoliaWatchdogThread.RunningTick(tickStart, this, Thread.currentThread()); // Folia - watchdog
|
|
WATCHDOG_THREAD.addTick(runningTick); // Folia - watchdog
|
|
try {
|
|
- ret = this.runRegionTasks(() -> {
|
|
+ this.runRegionTasks(() -> {
|
|
return !RegionScheduleHandle.this.cancelled.get() && canContinue.getAsBoolean();
|
|
});
|
|
} catch (final Throwable thr) {
|
|
this.scheduler.regionFailed(this, true, thr);
|
|
// don't release region for another tick
|
|
- return null;
|
|
+ return false;
|
|
} finally {
|
|
WATCHDOG_THREAD.removeTick(runningTick); // Folia - watchdog
|
|
final long tickEnd = System.nanoTime();
|
|
final long cpuEnd = MEASURE_CPU_TIME ? THREAD_MX_BEAN.getCurrentThreadCpuTime() : 0L;
|
|
|
|
final TickTime time = new TickTime(
|
|
- SchedulerThreadPool.DEADLINE_NOT_SET, SchedulerThreadPool.DEADLINE_NOT_SET,
|
|
+ ScheduledTaskThreadPool.DEADLINE_NOT_SET, ScheduledTaskThreadPool.DEADLINE_NOT_SET,
|
|
tickStart, cpuStart, tickEnd, cpuEnd, MEASURE_CPU_TIME, false
|
|
);
|
|
|
|
@@ -361,7 +354,7 @@ public final class TickRegionScheduler {
|
|
}
|
|
}
|
|
|
|
- return !this.markNotTicking() || this.cancelled.get() ? null : Boolean.valueOf(ret);
|
|
+ return this.markNotTicking() && !this.cancelled.get();
|
|
}
|
|
|
|
@Override
|
|
@@ -405,7 +398,7 @@ public final class TickRegionScheduler {
|
|
synchronized (this) {
|
|
this.currentTickData = new TickTime(
|
|
lastTickStart, scheduledStart, tickStart, cpuStart,
|
|
- SchedulerThreadPool.DEADLINE_NOT_SET, SchedulerThreadPool.DEADLINE_NOT_SET, MEASURE_CPU_TIME,
|
|
+ ScheduledTaskThreadPool.DEADLINE_NOT_SET, ScheduledTaskThreadPool.DEADLINE_NOT_SET, MEASURE_CPU_TIME,
|
|
true
|
|
);
|
|
this.currentTickingThread = Thread.currentThread();
|
|
@@ -582,7 +575,7 @@ public final class TickRegionScheduler {
|
|
* Only valid when {@link #isTickExecution()} is {@code true}.
|
|
*/
|
|
public boolean hasLastTick() {
|
|
- return this.previousTickStart != SchedulerThreadPool.DEADLINE_NOT_SET;
|
|
+ return this.previousTickStart != ScheduledTaskThreadPool.DEADLINE_NOT_SET;
|
|
}
|
|
|
|
/*
|
|
diff --git a/io/papermc/paper/threadedregions/TickRegions.java b/io/papermc/paper/threadedregions/TickRegions.java
|
|
index 988fe74578065c9464f5639e5cc6af79619edef5..1fe996286e048cb8a8d5ede45f27792bc04fec38 100644
|
|
--- a/io/papermc/paper/threadedregions/TickRegions.java
|
|
+++ b/io/papermc/paper/threadedregions/TickRegions.java
|
|
@@ -1,6 +1,5 @@
|
|
package io.papermc.paper.threadedregions;
|
|
|
|
-import ca.spottedleaf.concurrentutil.scheduler.SchedulerThreadPool;
|
|
import ca.spottedleaf.concurrentutil.util.TimeUtil;
|
|
import ca.spottedleaf.moonrise.patches.chunk_system.scheduling.ChunkHolderManager;
|
|
import com.mojang.logging.LogUtils;
|
|
@@ -12,6 +11,7 @@ import it.unimi.dsi.fastutil.objects.Reference2ReferenceOpenHashMap;
|
|
import it.unimi.dsi.fastutil.objects.ReferenceOpenHashSet;
|
|
import net.minecraft.server.MinecraftServer;
|
|
import net.minecraft.server.level.ServerLevel;
|
|
+import net.minecraft.server.level.ServerPlayer;
|
|
import org.slf4j.Logger;
|
|
import java.util.Iterator;
|
|
import java.util.concurrent.TimeUnit;
|
|
@@ -29,6 +29,7 @@ public final class TickRegions implements ThreadedRegionizer.RegionCallbacks<Tic
|
|
}
|
|
|
|
private static boolean initialised;
|
|
+
|
|
private static TickRegionScheduler scheduler;
|
|
|
|
public static TickRegionScheduler getScheduler() {
|
|
@@ -45,6 +46,11 @@ public final class TickRegions implements ThreadedRegionizer.RegionCallbacks<Tic
|
|
gridExponent = Math.min(31, gridExponent);
|
|
regionShift = gridExponent;
|
|
|
|
+ scheduler = new TickRegionScheduler();
|
|
+ }
|
|
+
|
|
+ public static void start() {
|
|
+ final GlobalConfiguration.ThreadedRegions config = GlobalConfiguration.get().threadedRegions;
|
|
int tickThreads;
|
|
if (config.threads <= 0) {
|
|
tickThreads = Runtime.getRuntime().availableProcessors() / 2;
|
|
@@ -57,7 +63,8 @@ public final class TickRegions implements ThreadedRegionizer.RegionCallbacks<Tic
|
|
tickThreads = config.threads;
|
|
}
|
|
|
|
- scheduler = new TickRegionScheduler(tickThreads);
|
|
+ scheduler.setThreads(tickThreads);
|
|
+
|
|
LOGGER.info("Regionised ticking is enabled with " + tickThreads + " tick threads");
|
|
}
|
|
|
|
@@ -171,7 +178,7 @@ public final class TickRegions implements ThreadedRegionizer.RegionCallbacks<Tic
|
|
private final Reference2ReferenceOpenHashMap<RegionizedData<?>, Object> regionizedData = new Reference2ReferenceOpenHashMap<>();
|
|
|
|
// tick data
|
|
- private ConcreteRegionTickHandle tickHandle = new ConcreteRegionTickHandle(this, SchedulerThreadPool.DEADLINE_NOT_SET);
|
|
+ private ConcreteRegionTickHandle tickHandle = new ConcreteRegionTickHandle(this, ScheduledTaskThreadPool.DEADLINE_NOT_SET);
|
|
|
|
// queue data
|
|
private final RegionizedTaskQueue.RegionTaskQueueData taskQueueData;
|
|
@@ -182,15 +189,64 @@ public final class TickRegions implements ThreadedRegionizer.RegionCallbacks<Tic
|
|
// async-safe read-only region data
|
|
private final RegionStats regionStats;
|
|
|
|
+ //
|
|
+ private final java.util.concurrent.atomic.AtomicBoolean hasPackets = new java.util.concurrent.atomic.AtomicBoolean(false);
|
|
+
|
|
public volatile ca.spottedleaf.leafprofiler.RegionizedProfiler.Handle profiler; // Folia - profiler
|
|
|
|
private TickRegionData(final ThreadedRegionizer.ThreadedRegion<TickRegionData, TickRegionSectionData> region) {
|
|
this.region = region;
|
|
this.world = region.regioniser.world;
|
|
- this.taskQueueData = new RegionizedTaskQueue.RegionTaskQueueData(this.world.taskQueueRegionData);
|
|
+ this.taskQueueData = new RegionizedTaskQueue.RegionTaskQueueData(this.world.taskQueueRegionData, this);
|
|
this.regionStats = new RegionStats();
|
|
}
|
|
|
|
+ public void setHasTasks() {
|
|
+ TickRegions.getScheduler().setHasTasks(this.tickHandle);
|
|
+ }
|
|
+
|
|
+ public void setHasPackets() {
|
|
+ if (!this.hasPackets.get() && !this.hasPackets.compareAndExchange(false, true)) {
|
|
+ this.setHasTasks();
|
|
+ }
|
|
+ }
|
|
+
|
|
+ public boolean drainOnePacket() {
|
|
+ if (!this.hasPackets.get()) {
|
|
+ return false;
|
|
+ }
|
|
+
|
|
+ final RegionizedWorldData worldData = this.world.getCurrentWorldData();
|
|
+ boolean hasPacketsNew = false;
|
|
+
|
|
+ for (final ServerPlayer player : worldData.getLocalPlayers()) {
|
|
+ if (!ca.spottedleaf.moonrise.common.util.TickThread.isTickThreadFor(player)) {
|
|
+ continue;
|
|
+ }
|
|
+ if (player.getBukkitEntity().executeOnePacket()) {
|
|
+ hasPacketsNew |= player.getBukkitEntity().hasPackets();
|
|
+ }
|
|
+ }
|
|
+
|
|
+ if (!hasPacketsNew) {
|
|
+ this.hasPackets.set(false);
|
|
+
|
|
+ // handle race condition: packet added during packet processing
|
|
+ for (final ServerPlayer player : worldData.getLocalPlayers()) {
|
|
+ if (player.getBukkitEntity().hasPackets()) {
|
|
+ this.hasPackets.set(true);
|
|
+ break;
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+
|
|
+ return true;
|
|
+ }
|
|
+
|
|
+ public void drainPackets() {
|
|
+ while (this.drainOnePacket());
|
|
+ }
|
|
+
|
|
public RegionStats getRegionStats() {
|
|
return this.regionStats;
|
|
}
|
|
@@ -224,7 +280,7 @@ public final class TickRegions implements ThreadedRegionizer.RegionCallbacks<Tic
|
|
return ret;
|
|
}
|
|
|
|
- ret = regionizedData.createNewValue();
|
|
+ ret = regionizedData.createNewValue(this);
|
|
this.regionizedData.put(regionizedData, ret);
|
|
|
|
return ret;
|
|
@@ -242,6 +298,10 @@ public final class TickRegions implements ThreadedRegionizer.RegionCallbacks<Tic
|
|
for (final ThreadedRegionizer.ThreadedRegion<TickRegionData, TickRegionSectionData> region : regions) {
|
|
final TickRegionData data = region.getData();
|
|
data.tickHandle.copyDeadlineAndTickCount(this.tickHandle);
|
|
+ // just be lazy about this one, it's not very important
|
|
+ if (this.hasPackets.getOpaque()) {
|
|
+ data.hasPackets.setOpaque(true);
|
|
+ }
|
|
}
|
|
|
|
// generic regionised data
|
|
@@ -309,6 +369,10 @@ public final class TickRegions implements ThreadedRegionizer.RegionCallbacks<Tic
|
|
// there's not really a great solution to the tick problem, no matter what it'll be messed up
|
|
// we will pick the greatest time delay so that tps will not exceed TICK_RATE
|
|
data.tickHandle.updateSchedulingToMax(this.tickHandle);
|
|
+ // just be lazy about this one, it's not very important
|
|
+ if (this.hasPackets.getOpaque()) {
|
|
+ data.hasPackets.setOpaque(true);
|
|
+ }
|
|
|
|
// generic regionised data
|
|
final long fromTickOffset = currentTickTo - currentTickFrom; // see merge jd
|
|
@@ -350,11 +414,11 @@ public final class TickRegions implements ThreadedRegionizer.RegionCallbacks<Tic
|
|
}
|
|
|
|
private void updateSchedulingToMax(final ConcreteRegionTickHandle from) {
|
|
- if (from.getScheduledStart() == SchedulerThreadPool.DEADLINE_NOT_SET) {
|
|
+ if (from.getScheduledStart() == ScheduledTaskThreadPool.DEADLINE_NOT_SET) {
|
|
return;
|
|
}
|
|
|
|
- if (this.getScheduledStart() == SchedulerThreadPool.DEADLINE_NOT_SET) {
|
|
+ if (this.getScheduledStart() == ScheduledTaskThreadPool.DEADLINE_NOT_SET) {
|
|
this.updateScheduledStart(from.getScheduledStart());
|
|
return;
|
|
}
|
|
@@ -365,7 +429,7 @@ public final class TickRegions implements ThreadedRegionizer.RegionCallbacks<Tic
|
|
private void copyDeadlineAndTickCount(final ConcreteRegionTickHandle from) {
|
|
this.currentTick = from.currentTick;
|
|
|
|
- if (from.getScheduledStart() == SchedulerThreadPool.DEADLINE_NOT_SET) {
|
|
+ if (from.getScheduledStart() == ScheduledTaskThreadPool.DEADLINE_NOT_SET) {
|
|
return;
|
|
}
|
|
|
|
@@ -374,7 +438,7 @@ public final class TickRegions implements ThreadedRegionizer.RegionCallbacks<Tic
|
|
}
|
|
|
|
private void checkInitialSchedule() {
|
|
- if (this.getScheduledStart() == SchedulerThreadPool.DEADLINE_NOT_SET) {
|
|
+ if (this.getScheduledStart() == ScheduledTaskThreadPool.DEADLINE_NOT_SET) {
|
|
this.updateScheduledStart(System.nanoTime() + TickRegionScheduler.TIME_BETWEEN_TICKS);
|
|
}
|
|
}
|
|
@@ -409,35 +473,34 @@ public final class TickRegions implements ThreadedRegionizer.RegionCallbacks<Tic
|
|
}
|
|
|
|
@Override
|
|
- protected boolean runRegionTasks(final BooleanSupplier canContinue) {
|
|
+ protected void runRegionTasks(final BooleanSupplier canContinue) {
|
|
final ca.spottedleaf.leafprofiler.RegionizedProfiler.Handle profiler = io.papermc.paper.threadedregions.TickRegionScheduler.getProfiler(); // Folia start - profiler
|
|
profiler.startInBetweenTick(); try { // Folia - profiler
|
|
final RegionizedTaskQueue.RegionTaskQueueData queue = this.region.taskQueueData;
|
|
|
|
boolean processedChunkTask = false;
|
|
|
|
- boolean executeChunkTask = true;
|
|
- boolean executeTickTask = true;
|
|
+ boolean executeChunkTask;
|
|
+ boolean executeTickTask;
|
|
+ boolean executePacketTask;
|
|
do {
|
|
- if (executeTickTask) {
|
|
- executeTickTask = queue.executeTickTask();
|
|
- }
|
|
- if (executeChunkTask) {
|
|
- processedChunkTask |= (executeChunkTask = queue.executeChunkTask());
|
|
- }
|
|
- } while ((executeChunkTask | executeTickTask) && canContinue.getAsBoolean());
|
|
+ executeTickTask = queue.executeTickTask();
|
|
+ executeChunkTask = queue.executeChunkTask();
|
|
+ executePacketTask = this.region.drainOnePacket();
|
|
+
|
|
+ processedChunkTask |= executeChunkTask;
|
|
+ } while ((executeChunkTask | executeTickTask | executePacketTask) && canContinue.getAsBoolean());
|
|
|
|
if (processedChunkTask) {
|
|
// if we processed any chunk tasks, try to process ticket level updates for full status changes
|
|
this.region.world.moonrise$getChunkTaskScheduler().chunkHolderManager.processTicketUpdates();
|
|
}
|
|
- return true;
|
|
} finally { profiler.stopInBetweenTick(); } // Folia - profiler
|
|
}
|
|
|
|
@Override
|
|
protected boolean hasIntermediateTasks() {
|
|
- return this.region.taskQueueData.hasTasks();
|
|
+ return this.region.taskQueueData.hasTasks() || this.region.hasPackets.get();
|
|
}
|
|
}
|
|
|
|
diff --git a/io/papermc/paper/threadedregions/scheduler/FoliaRegionScheduler.java b/io/papermc/paper/threadedregions/scheduler/FoliaRegionScheduler.java
|
|
index bc8aa525b3488dc71e7ca0529c6a8c57eaa99e1e..498125a093bb323b34860ca723e89eb8720eb71f 100644
|
|
--- a/io/papermc/paper/threadedregions/scheduler/FoliaRegionScheduler.java
|
|
+++ b/io/papermc/paper/threadedregions/scheduler/FoliaRegionScheduler.java
|
|
@@ -7,6 +7,7 @@ import ca.spottedleaf.moonrise.patches.chunk_system.scheduling.ChunkHolderManage
|
|
import io.papermc.paper.threadedregions.RegionizedData;
|
|
import io.papermc.paper.threadedregions.RegionizedServer;
|
|
import io.papermc.paper.threadedregions.TickRegionScheduler;
|
|
+import io.papermc.paper.threadedregions.TickRegions.TickRegionData;
|
|
import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
|
|
import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
|
|
import it.unimi.dsi.fastutil.longs.Long2ReferenceOpenHashMap;
|
|
@@ -143,6 +144,9 @@ public final class FoliaRegionScheduler implements RegionScheduler {
|
|
}
|
|
|
|
private static final class Scheduler {
|
|
+
|
|
+ private Scheduler(final TickRegionData regionData) {}
|
|
+
|
|
private static final RegionizedData.RegioniserCallback<Scheduler> REGIONISER_CALLBACK = new RegionizedData.RegioniserCallback<>() {
|
|
@Override
|
|
public void merge(final Scheduler from, final Scheduler into, final long fromTickOffset) {
|
|
diff --git a/net/minecraft/network/protocol/PacketUtils.java b/net/minecraft/network/protocol/PacketUtils.java
|
|
index c7214f0457641b5550b98dbd2863c1d637faeaa5..549b2bb8bd12b631ff023ff3b31bd392b51e41fd 100644
|
|
--- a/net/minecraft/network/protocol/PacketUtils.java
|
|
+++ b/net/minecraft/network/protocol/PacketUtils.java
|
|
@@ -48,14 +48,8 @@ public class PacketUtils {
|
|
// Paper end - detailed watchdog information
|
|
// Folia start - region threading
|
|
};
|
|
- // ignore retired state, if removed then we don't want the packet to be handled
|
|
if (processor instanceof net.minecraft.server.network.ServerGamePacketListenerImpl gamePacketListener) {
|
|
- gamePacketListener.player.getBukkitEntity().taskScheduler.schedule(
|
|
- (net.minecraft.server.level.ServerPlayer player) -> {
|
|
- run.run();
|
|
- },
|
|
- null, 1L
|
|
- );
|
|
+ gamePacketListener.player.getBukkitEntity().addPacket(run);
|
|
} else if (processor instanceof net.minecraft.server.network.ServerConfigurationPacketListenerImpl configurationPacketListener) {
|
|
io.papermc.paper.threadedregions.RegionizedServer.getInstance().addTask(run);
|
|
} else if (processor instanceof net.minecraft.server.network.ServerLoginPacketListenerImpl loginPacketListener) {
|
|
diff --git a/net/minecraft/server/MinecraftServer.java b/net/minecraft/server/MinecraftServer.java
|
|
index faf72dd6dff74296c73cb058aaabd1f9f475a072..be82a6b43b1f0c644c53d08a6e16bc2876c8c1e0 100644
|
|
--- a/net/minecraft/server/MinecraftServer.java
|
|
+++ b/net/minecraft/server/MinecraftServer.java
|
|
@@ -1647,6 +1647,7 @@ public abstract class MinecraftServer extends ReentrantBlockableEventLoop<TickTa
|
|
} finally { foliaProfiler.stopTimer(ca.spottedleaf.leafprofiler.LProfilerRegistry.PLUGIN_TICK_TASKS); } // Folia - profiler
|
|
// now run all the entity schedulers
|
|
// TODO there has got to be a more efficient variant of this crap
|
|
+ region.drainPackets();
|
|
long tickedEntitySchedulers = 0L; // Folia - profiler
|
|
foliaProfiler.startTimer(ca.spottedleaf.leafprofiler.LProfilerRegistry.ENTITY_SCHEDULER_TICK); try { // Folia - profiler
|
|
for (net.minecraft.world.entity.Entity entity : region.world.getCurrentWorldData().getLocalEntitiesCopy()) {
|
|
diff --git a/net/minecraft/server/network/ServerCommonPacketListenerImpl.java b/net/minecraft/server/network/ServerCommonPacketListenerImpl.java
|
|
index 6eca15223b92aedac74233db886e2c1248750e2c..c739f2cf3590d5aab2391d95f6ea9a5cc4a2e599 100644
|
|
--- a/net/minecraft/server/network/ServerCommonPacketListenerImpl.java
|
|
+++ b/net/minecraft/server/network/ServerCommonPacketListenerImpl.java
|
|
@@ -120,6 +120,7 @@ public abstract class ServerCommonPacketListenerImpl implements ServerCommonPack
|
|
this.server.halt(false);
|
|
}
|
|
this.player.getBukkitEntity().taskScheduler.retire(); // Folia - region threading
|
|
+ this.player.getBukkitEntity().stopAcceptingPackets(); // Folia - region threading
|
|
}
|
|
|
|
@Override
|
|
diff --git a/net/minecraft/world/level/Level.java b/net/minecraft/world/level/Level.java
|
|
index dafd90502937019b616ac0a79465e1dbc578cf66..78f4dd7032d18b8e020a4576e4ac012c1d472e67 100644
|
|
--- a/net/minecraft/world/level/Level.java
|
|
+++ b/net/minecraft/world/level/Level.java
|
|
@@ -820,7 +820,7 @@ public abstract class Level implements LevelAccessor, AutoCloseable, ca.spottedl
|
|
// Folia start - region ticking
|
|
public final io.papermc.paper.threadedregions.RegionizedData<io.papermc.paper.threadedregions.RegionizedWorldData> worldRegionData
|
|
= new io.papermc.paper.threadedregions.RegionizedData<>(
|
|
- (ServerLevel)this, () -> new io.papermc.paper.threadedregions.RegionizedWorldData((ServerLevel)Level.this),
|
|
+ (ServerLevel)this, (regionData) -> new io.papermc.paper.threadedregions.RegionizedWorldData((ServerLevel)Level.this, regionData),
|
|
io.papermc.paper.threadedregions.RegionizedWorldData.REGION_CALLBACK
|
|
);
|
|
public volatile io.papermc.paper.threadedregions.RegionizedServer.WorldLevelData tickData;
|