Skip to content

Instantly share code, notes, and snippets.

@Anaminus
Last active September 29, 2020 01:38
Show Gist options
  • Select an option

  • Save Anaminus/e4296c6dbd13cf97b2bfe3335c4afef3 to your computer and use it in GitHub Desktop.

Select an option

Save Anaminus/e4296c6dbd13cf97b2bfe3335c4afef3 to your computer and use it in GitHub Desktop.
--[[ Scheduler
A custom scheduler for managing threads.
SYNOPSIS
New(): Scheduler
Drivers: Dictionary[string]Driver
Drivers.Heartbeat: Driver
Drivers.ContinuousHeartbeat: Driver
Drivers.RenderStepped: Driver
Drivers.Stepped: Driver
Drivers.Loop: Driver
Scheduler.MinimumWaitTime: number
Scheduler.HandleError: function(thread: thread, message: string)
Scheduler:SetDriver(driver: Driver)
Scheduler:Delay(duration: number, func: function)
Scheduler:DelayCancel(duration: number, func: function): function()
Scheduler:Spawn(func: function)
Scheduler:Wait(duration: number)
Scheduler:Yield()
Driver: function(cycle: Cycle): Disconnect
Cycle: function() boolean
Disconnect: function()
]]
--------------------------------------------------------------------------------
--------------------------------------------------------------------------------
-- OPTIMIZE: Multiple queues: If the duration of a thread is above a threshold,
-- then the thread is added to a "cold queue" that is checked less frequently,
-- off the hot path. When the duration of a thread in the cold queue drops below
-- a threshold, it is moved to the "hot queue", which is iterated every cycle. A
-- thread in the cold queue must have a duration greater than the frequency at
-- which the cold queue is checked.
-- OPTIMIZE: Avoid allocating tables. Create two queues, one for the 'thread'
-- field of each entry, and another for the 'time' field.
-- OPTIMIZE: Use table.find to pop threads. Depends on the above
-- structs-of-arrays optimization.
-- OPTIMIZE?: Use cycle to pop canceled threads: if time field is less than 0,
-- thread is canceled. With arrays-of-structs, we can retain a reference to the
-- entry in the cancel function, which sets the time field to a negative value
-- directly. With structs-of-arrays, we must either resort to finding the
-- thread, or tracking its location in the queue by using a dictionary.
--------------------------------------------------------------------------------
--------------------------------------------------------------------------------
-- Drivers contains several driver functions. A driver function receives a
-- 'cycle' function that, when called, cycles once through the scheduler's
-- queue. The cycle function returns whether any threads were resumed in the
-- last cycle.
local Drivers = {}
-- Driven by RunService.Heartbeat.
function Drivers.Heartbeat(cycle)
-- Connect runs efficiently; when possible, events recycle threads for
-- listeners.
local conn = game:GetService("RunService").Heartbeat:Connect(cycle)
return function()
conn:Disconnect()
end
-- -- Implementation that uses Wait.
-- local running = true
-- local signal = game:GetService("RunService").Heartbeat
-- -- Use desired thread-spawning pattern.
-- coroutine.wrap(function()
-- while running do
-- cycle()
-- signal:Wait()
-- end
-- end)()
-- return function()
-- running = false
-- end
end
-- Driven by RunService.Heartbeat. Repeats cycles until no threads are active.
function Drivers.ContinuousHeartbeat(cycle)
local conn = game:GetService("RunService").Heartbeat:Connect(function()
while cycle() do end
end)
return function()
conn:Disconnect()
end
end
-- Driven by RunService.RenderStepped.
function Drivers.RenderStepped(cycle)
local conn = game:GetService("RunService").RenderStepped:Connect(cycle)
return function()
conn:Disconnect()
end
end
-- Driven by RunService.Stepped.
function Drivers.Stepped(cycle)
local conn = game:GetService("RunService").Stepped:Connect(cycle)
return function()
conn:Disconnect()
end
end
-- Driven by naked loop. Does not yield back to the real scheduler. Dangerous.
function Drivers.Loop(cycle)
local running = true
coroutine.wrap(function()
while running do
cycle()
end
end)()
return function()
running = false
end
end
local Scheduler = {__index={}}
-- New returns a new Scheduler driven by Heartbeat.
function New()
return setmetatable({
queue = {},
resume = {},
connection = nil,
driver = Drivers.Heartbeat,
inactiveDuration = 2,
-- HandleError is an optional function that is called when a coroutine
-- returns an error. The first argument is the thread, which may be used
-- with debug.traceback to acquire a stack trace. The second argument is
-- the error message.
HandleError = nil,
-- MinimumWaitTime specifies the minimum duration that threads are
-- allowed to yield.
MinimumWaitTime = 0,
}, Scheduler)
end
--------------------------------------------------------------------------------
-- PRIVATE API -----------------------------------------------------------------
-- teardown disconnects the driver.
function Scheduler.__index:teardown()
if self.connection ~= nil then
self.connection()
self.connection = nil
end
end
-- setup connects the driver.
function Scheduler.__index:setup()
if not self.connection then
self.lastActive = tick()
self.connection = self.driver(function()
return self:cycle()
end)
end
end
-- cycle executes one scheduler cycle.
function Scheduler.__index:cycle()
-- The order of threads in the queue is undefined, so that the fast-remove
-- optimization can be used. This sort of thing might explain why the
-- "instant" behavior of Roblox's spawn and coroutine.yield were removed:
-- users were relying on queue order when it was meant to be unreliable. If
-- the scheduler were required to maintain queue order, then it would miss
-- out on certain optimizations.
local t = tick()
local queue = self.queue
local resume = self.resume
local i = 1
while i <= #queue do
local entry = queue[i]
if t >= entry.time then
-- Entry time has expired, remove it from the queue.
if #queue == 1 then
queue[1] = nil
else
-- Fast remove.
queue[#queue], queue[i] = nil, queue[#queue]
end
table.insert(resume, entry.thread)
-- Skip increment; because of fast remove, the current index
-- hasn't been checked yet.
else
i = i + 1
end
end
local active = #resume > 0
if active then
self.lastActive = t
end
-- Resume threads separately to ensure that the queue isn't modified while
-- iterated.
while #resume > 0 do
local thread = table.remove(resume)
local ok, result = coroutine.resume(thread)
if not ok and self.HandleError then
self.HandleError(thread, result)
end
end
-- Automatically teardown the driver when there are no threads. To eliminate
-- thrashing (high-frequency setup and teardown), this occurs only after a
-- duration of inactivity.
if #queue == 0 and t-self.lastActive >= self.inactiveDuration then
self:teardown()
end
return active
end
-- pushThread pushes a thread into the queue, with an optional timestamp
-- indicating when the thread should be resumed.
function Scheduler.__index:pushThread(thread, time)
table.insert(self.queue, {
thread = thread,
time = time or 0,
})
self:setup()
end
-- popThread locates and force-removes a thread from the queue. Must not be used
-- with threads that are referenced externally.
function Scheduler.__index:popThread(thread)
local queue = self.queue
for i = 1, #queue do
if queue[i].thread == thread then
if #queue == 1 then
queue[1] = nil
else
queue[#queue], queue[i] = nil, queue[#queue]
end
break
end
end
if #self.queue == 0 then
self:teardown()
end
end
--------------------------------------------------------------------------------
-- PUBLIC API ------------------------------------------------------------------
-- SetDriver sets the driver that runs the scheduler cycle.
function Scheduler.__index:SetDriver(driver)
self:teardown()
self.driver = driver
if #self.queue > 0 then
self:setup()
end
end
-- Delay queues `func` to be called after waiting for `duration` seconds.
function Scheduler.__index:Delay(duration, func)
local t = tick()
if duration < self.MinimumWaitTime then
duration = self.MinimumWaitTime
end
self:pushThread(coroutine.create(func), t + duration)
end
-- DelayCancel queues `func` to be called after waiting for `duration` seconds.
-- Returns a function that, when called, cancels the delayed call.
function Scheduler.__index:DelayCancel(duration, func)
local t = tick()
if duration < self.MinimumWaitTime then
duration = self.MinimumWaitTime
end
local thread = coroutine.create(func)
self:pushThread(thread, t + duration)
local canceled = false
return function()
if canceled then
return
end
canceled = true
self:popThread(thread)
end
end
-- Spawn queues `func` to be called as soon as possible.
function Scheduler.__index:Spawn(func)
self:pushThread(coroutine.create(func))
end
-- Wait queues the running thread to be resumed after waiting for `duration`
-- seconds.
function Scheduler.__index:Wait(duration)
local t = tick()
if duration < self.MinimumWaitTime then
duration = self.MinimumWaitTime
end
self:pushThread(coroutine.running(), t + duration)
coroutine.yield()
end
-- Yield queues the running thread to be resumed as soon as possible.
function Scheduler.__index:Yield()
self:pushThread(coroutine.running())
coroutine.yield()
end
return {
New = New,
Drivers = Drivers,
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment