Last active
September 29, 2020 01:38
-
-
Save Anaminus/e4296c6dbd13cf97b2bfe3335c4afef3 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| --[[ Scheduler | |
| A custom scheduler for managing threads. | |
| SYNOPSIS | |
| New(): Scheduler | |
| Drivers: Dictionary[string]Driver | |
| Drivers.Heartbeat: Driver | |
| Drivers.ContinuousHeartbeat: Driver | |
| Drivers.RenderStepped: Driver | |
| Drivers.Stepped: Driver | |
| Drivers.Loop: Driver | |
| Scheduler.MinimumWaitTime: number | |
| Scheduler.HandleError: function(thread: thread, message: string) | |
| Scheduler:SetDriver(driver: Driver) | |
| Scheduler:Delay(duration: number, func: function) | |
| Scheduler:DelayCancel(duration: number, func: function): function() | |
| Scheduler:Spawn(func: function) | |
| Scheduler:Wait(duration: number) | |
| Scheduler:Yield() | |
| Driver: function(cycle: Cycle): Disconnect | |
| Cycle: function() boolean | |
| Disconnect: function() | |
| ]] | |
| -------------------------------------------------------------------------------- | |
| -------------------------------------------------------------------------------- | |
| -- OPTIMIZE: Multiple queues: If the duration of a thread is above a threshold, | |
| -- then the thread is added to a "cold queue" that is checked less frequently, | |
| -- off the hot path. When the duration of a thread in the cold queue drops below | |
| -- a threshold, it is moved to the "hot queue", which is iterated every cycle. A | |
| -- thread in the cold queue must have a duration greater than the frequency at | |
| -- which the cold queue is checked. | |
| -- OPTIMIZE: Avoid allocating tables. Create two queues, one for the 'thread' | |
| -- field of each entry, and another for the 'time' field. | |
| -- OPTIMIZE: Use table.find to pop threads. Depends on the above | |
| -- structs-of-arrays optimization. | |
| -- OPTIMIZE?: Use cycle to pop canceled threads: if time field is less than 0, | |
| -- thread is canceled. With arrays-of-structs, we can retain a reference to the | |
| -- entry in the cancel function, which sets the time field to a negative value | |
| -- directly. With structs-of-arrays, we must either resort to finding the | |
| -- thread, or tracking its location in the queue by using a dictionary. | |
| -------------------------------------------------------------------------------- | |
| -------------------------------------------------------------------------------- | |
| -- Drivers contains several driver functions. A driver function receives a | |
| -- 'cycle' function that, when called, cycles once through the scheduler's | |
| -- queue. The cycle function returns whether any threads were resumed in the | |
| -- last cycle. | |
| local Drivers = {} | |
| -- Driven by RunService.Heartbeat. | |
| function Drivers.Heartbeat(cycle) | |
| -- Connect runs efficiently; when possible, events recycle threads for | |
| -- listeners. | |
| local conn = game:GetService("RunService").Heartbeat:Connect(cycle) | |
| return function() | |
| conn:Disconnect() | |
| end | |
| -- -- Implementation that uses Wait. | |
| -- local running = true | |
| -- local signal = game:GetService("RunService").Heartbeat | |
| -- -- Use desired thread-spawning pattern. | |
| -- coroutine.wrap(function() | |
| -- while running do | |
| -- cycle() | |
| -- signal:Wait() | |
| -- end | |
| -- end)() | |
| -- return function() | |
| -- running = false | |
| -- end | |
| end | |
| -- Driven by RunService.Heartbeat. Repeats cycles until no threads are active. | |
| function Drivers.ContinuousHeartbeat(cycle) | |
| local conn = game:GetService("RunService").Heartbeat:Connect(function() | |
| while cycle() do end | |
| end) | |
| return function() | |
| conn:Disconnect() | |
| end | |
| end | |
| -- Driven by RunService.RenderStepped. | |
| function Drivers.RenderStepped(cycle) | |
| local conn = game:GetService("RunService").RenderStepped:Connect(cycle) | |
| return function() | |
| conn:Disconnect() | |
| end | |
| end | |
| -- Driven by RunService.Stepped. | |
| function Drivers.Stepped(cycle) | |
| local conn = game:GetService("RunService").Stepped:Connect(cycle) | |
| return function() | |
| conn:Disconnect() | |
| end | |
| end | |
| -- Driven by naked loop. Does not yield back to the real scheduler. Dangerous. | |
| function Drivers.Loop(cycle) | |
| local running = true | |
| coroutine.wrap(function() | |
| while running do | |
| cycle() | |
| end | |
| end)() | |
| return function() | |
| running = false | |
| end | |
| end | |
| local Scheduler = {__index={}} | |
| -- New returns a new Scheduler driven by Heartbeat. | |
| function New() | |
| return setmetatable({ | |
| queue = {}, | |
| resume = {}, | |
| connection = nil, | |
| driver = Drivers.Heartbeat, | |
| inactiveDuration = 2, | |
| -- HandleError is an optional function that is called when a coroutine | |
| -- returns an error. The first argument is the thread, which may be used | |
| -- with debug.traceback to acquire a stack trace. The second argument is | |
| -- the error message. | |
| HandleError = nil, | |
| -- MinimumWaitTime specifies the minimum duration that threads are | |
| -- allowed to yield. | |
| MinimumWaitTime = 0, | |
| }, Scheduler) | |
| end | |
| -------------------------------------------------------------------------------- | |
| -- PRIVATE API ----------------------------------------------------------------- | |
| -- teardown disconnects the driver. | |
| function Scheduler.__index:teardown() | |
| if self.connection ~= nil then | |
| self.connection() | |
| self.connection = nil | |
| end | |
| end | |
| -- setup connects the driver. | |
| function Scheduler.__index:setup() | |
| if not self.connection then | |
| self.lastActive = tick() | |
| self.connection = self.driver(function() | |
| return self:cycle() | |
| end) | |
| end | |
| end | |
| -- cycle executes one scheduler cycle. | |
| function Scheduler.__index:cycle() | |
| -- The order of threads in the queue is undefined, so that the fast-remove | |
| -- optimization can be used. This sort of thing might explain why the | |
| -- "instant" behavior of Roblox's spawn and coroutine.yield were removed: | |
| -- users were relying on queue order when it was meant to be unreliable. If | |
| -- the scheduler were required to maintain queue order, then it would miss | |
| -- out on certain optimizations. | |
| local t = tick() | |
| local queue = self.queue | |
| local resume = self.resume | |
| local i = 1 | |
| while i <= #queue do | |
| local entry = queue[i] | |
| if t >= entry.time then | |
| -- Entry time has expired, remove it from the queue. | |
| if #queue == 1 then | |
| queue[1] = nil | |
| else | |
| -- Fast remove. | |
| queue[#queue], queue[i] = nil, queue[#queue] | |
| end | |
| table.insert(resume, entry.thread) | |
| -- Skip increment; because of fast remove, the current index | |
| -- hasn't been checked yet. | |
| else | |
| i = i + 1 | |
| end | |
| end | |
| local active = #resume > 0 | |
| if active then | |
| self.lastActive = t | |
| end | |
| -- Resume threads separately to ensure that the queue isn't modified while | |
| -- iterated. | |
| while #resume > 0 do | |
| local thread = table.remove(resume) | |
| local ok, result = coroutine.resume(thread) | |
| if not ok and self.HandleError then | |
| self.HandleError(thread, result) | |
| end | |
| end | |
| -- Automatically teardown the driver when there are no threads. To eliminate | |
| -- thrashing (high-frequency setup and teardown), this occurs only after a | |
| -- duration of inactivity. | |
| if #queue == 0 and t-self.lastActive >= self.inactiveDuration then | |
| self:teardown() | |
| end | |
| return active | |
| end | |
| -- pushThread pushes a thread into the queue, with an optional timestamp | |
| -- indicating when the thread should be resumed. | |
| function Scheduler.__index:pushThread(thread, time) | |
| table.insert(self.queue, { | |
| thread = thread, | |
| time = time or 0, | |
| }) | |
| self:setup() | |
| end | |
| -- popThread locates and force-removes a thread from the queue. Must not be used | |
| -- with threads that are referenced externally. | |
| function Scheduler.__index:popThread(thread) | |
| local queue = self.queue | |
| for i = 1, #queue do | |
| if queue[i].thread == thread then | |
| if #queue == 1 then | |
| queue[1] = nil | |
| else | |
| queue[#queue], queue[i] = nil, queue[#queue] | |
| end | |
| break | |
| end | |
| end | |
| if #self.queue == 0 then | |
| self:teardown() | |
| end | |
| end | |
| -------------------------------------------------------------------------------- | |
| -- PUBLIC API ------------------------------------------------------------------ | |
| -- SetDriver sets the driver that runs the scheduler cycle. | |
| function Scheduler.__index:SetDriver(driver) | |
| self:teardown() | |
| self.driver = driver | |
| if #self.queue > 0 then | |
| self:setup() | |
| end | |
| end | |
| -- Delay queues `func` to be called after waiting for `duration` seconds. | |
| function Scheduler.__index:Delay(duration, func) | |
| local t = tick() | |
| if duration < self.MinimumWaitTime then | |
| duration = self.MinimumWaitTime | |
| end | |
| self:pushThread(coroutine.create(func), t + duration) | |
| end | |
| -- DelayCancel queues `func` to be called after waiting for `duration` seconds. | |
| -- Returns a function that, when called, cancels the delayed call. | |
| function Scheduler.__index:DelayCancel(duration, func) | |
| local t = tick() | |
| if duration < self.MinimumWaitTime then | |
| duration = self.MinimumWaitTime | |
| end | |
| local thread = coroutine.create(func) | |
| self:pushThread(thread, t + duration) | |
| local canceled = false | |
| return function() | |
| if canceled then | |
| return | |
| end | |
| canceled = true | |
| self:popThread(thread) | |
| end | |
| end | |
| -- Spawn queues `func` to be called as soon as possible. | |
| function Scheduler.__index:Spawn(func) | |
| self:pushThread(coroutine.create(func)) | |
| end | |
| -- Wait queues the running thread to be resumed after waiting for `duration` | |
| -- seconds. | |
| function Scheduler.__index:Wait(duration) | |
| local t = tick() | |
| if duration < self.MinimumWaitTime then | |
| duration = self.MinimumWaitTime | |
| end | |
| self:pushThread(coroutine.running(), t + duration) | |
| coroutine.yield() | |
| end | |
| -- Yield queues the running thread to be resumed as soon as possible. | |
| function Scheduler.__index:Yield() | |
| self:pushThread(coroutine.running()) | |
| coroutine.yield() | |
| end | |
| return { | |
| New = New, | |
| Drivers = Drivers, | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment