Created
October 27, 2020 20:05
-
-
Save ochaton/d582e19c6f77c8aebd174ae320e1f9e6 to your computer and use it in GitHub Desktop.
self made S3 (Simple Stupid Scheduler)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| local Coro = {} | |
| Coro.__index = Coro | |
| local S = { | |
| queue = require 'q'(), | |
| } | |
| function S:push(o) | |
| self.queue:push(o) | |
| end | |
| function S:pushfirst(o) | |
| self.queue:pushfirst(o) | |
| end | |
| function S:pop() | |
| local c = self.queue:pop() | |
| return c | |
| end | |
| local function schedule(self, r, ...) | |
| if not r then | |
| print("Coro failed", ...) | |
| return | |
| elseif self.running:status() == 'dead' then | |
| self.running.result = { n = select('#', ...), ... } | |
| for _, waiter in ipairs(self.running.waiters) do | |
| waiter:wakeup() | |
| end | |
| end | |
| end | |
| function S:schedule() | |
| if self.schedulling then | |
| error("Already running", 2) | |
| end | |
| while self.queue:len() > 0 do | |
| self.scheduling = true | |
| local c = assert(self:pop()) | |
| self.running = c; | |
| schedule(self, c:execute()) | |
| self.running = nil | |
| end | |
| self.scheduling = false | |
| end | |
| local C = {} | |
| C.__index = C | |
| local function newchannel(n) | |
| return setmetatable({ | |
| n = n or 0, | |
| q = require 'q'(), | |
| producers = {}, | |
| consumers = {}, | |
| }, C) | |
| end | |
| local function testcoro() | |
| if not S.running then | |
| error("Not inside coroutine", 3) | |
| end | |
| end | |
| function C:put(o) | |
| testcoro() | |
| if self.q:len() < self.n then | |
| self.q:push(o) | |
| return | |
| end | |
| if self.q:len() == self.n and #self.producers == 0 and self.consumers[1] then | |
| table.remove(self.consumers, 1):transfer(o) | |
| return | |
| end | |
| if self.q:len() == self.n then | |
| table.insert(self.producers, S.running) | |
| coroutine.yield() | |
| if self.closed then return nil end | |
| local consumer = assert(table.remove(self.consumers, 1)) | |
| consumer:transfer(o) | |
| return | |
| end | |
| return | |
| end | |
| function C:get() | |
| testcoro() | |
| if self.q:len() > 0 then | |
| return self.q:pop() | |
| end | |
| if self.closed then return nil end | |
| table.insert(self.consumers, S.running) | |
| if self.producers[1] then | |
| table.remove(self.producers, 1):wakeup() | |
| end | |
| local o = coroutine.yield() | |
| return o | |
| end | |
| function C:close() | |
| if self.closed then | |
| return | |
| end | |
| self.closed = true | |
| for n = #self.consumers, 1, -1 do | |
| self.consumers[n]:wakeup() | |
| end | |
| for n = #self.producers, 1, -1 do | |
| self.producers[n]:wakeup() | |
| end | |
| self.consumers = {} | |
| self.producers = {} | |
| end | |
| function Coro.channel(n) | |
| return newchannel(n) | |
| end | |
| local id = 0 | |
| function Coro.spawn(func, ...) | |
| id = id + 1 | |
| local coro = setmetatable({ | |
| id = id, | |
| c = coroutine.create(function(...) return func(...) end), | |
| args = { n = select('#', ...), ...}, | |
| waiters = {}, | |
| scheduled = false | |
| }, Coro) | |
| coro:push() | |
| return coro | |
| end | |
| function Coro:__tostring() | |
| return ("[%d] %s"):format(self.id, self.c) | |
| end | |
| function Coro:transfer(...) | |
| self.args = { ..., n = select('#', ...) } | |
| self:wakeup() | |
| Coro.yield() | |
| end | |
| function Coro:execute() | |
| self.scheduled = false | |
| return coroutine.resume(self.c, unpack(self.args, 1, self.args.n)) | |
| end | |
| function Coro:status() | |
| return coroutine.status(self.c) | |
| end | |
| function Coro:push() | |
| if not self.scheduled then | |
| self.scheduled = true | |
| S:push(self) | |
| end | |
| end | |
| function Coro.yield() | |
| testcoro() | |
| local self = S.running | |
| if not self.scheduled then | |
| self.scheduled = true | |
| S:push(self) | |
| end | |
| coroutine.yield() | |
| end | |
| function Coro.schedule() | |
| S:schedule() | |
| end | |
| function Coro:wakeup() | |
| if not self.scheduled then | |
| self.scheduled = true | |
| S:pushfirst(self) | |
| end | |
| end | |
| function Coro:join() | |
| if S.running then | |
| table.insert(self.waiters, S.running) | |
| coroutine.yield() | |
| else | |
| S:schedule() | |
| end | |
| return unpack(self.result, 1, self.result.n) | |
| end | |
| return Coro |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment