Created
July 1, 2023 17:57
-
-
Save ochaton/f3907afe372dc4649d8a7b8a00e39dd5 to your computer and use it in GitHub Desktop.
Plain Lua implementation of Tarantool's rmean_collect
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
---Class rmean is plain-Lua implementation of Tarantool's rmean collector | |
--- | |
---rmean provides window function mean with specified window size (default=5s) | |
---rmean well tested on 10K parallel running collectors | |
--- | |
---rmean:collect(value) is lightning fast ≈ 1B calls per second with jit.on | |
---and ≈ 15M with jit.off | |
--- | |
---rmean:mean() makes 10M calls per second with jit.off() and ≈50M calls with jit.on | |
--- | |
---rmean creates sigle fiber for all collectors which rerolls timings | |
---it takes 4µs per created counter. | |
---@class rmean | |
---@field window_size number default window_size of rmean.collector | |
---@field roller_tm rmean.collector | |
---@field private _prev_ts number last timestamp | |
---@field private _running boolean | |
---@field private _collectors table<rmean.collector, rmean.collector> | |
local rmean = {} | |
rmean.__index = rmean | |
rmean._collectors = setmetatable({ n = 0 }, { __mode = 'kv' }) | |
setmetatable(rmean, {__call = function(class, ...) return class.new(...) end}) | |
---Default window_size | |
rmean.window_size = 5 | |
local clock = require 'clock' | |
local fiber = require 'fiber' | |
---rmean.collector is named separate counter | |
---@class rmean.collector | |
---@field name string? name of collector | |
---@field window_size number window size of collector (by default=5s) | |
---@field value number[] list of accumulated values per second | |
---@field total number sum of all values | |
---@field count number monotonic counter of all incoming collects | |
local collector = {} | |
collector.__index = collector | |
collector.__tostring = function(self) | |
return ("rmean<%s> [%.2f/%.2f#%d]"):format( | |
self.name or 'anon', | |
self:mean(), | |
self.total, | |
self.count | |
) | |
end | |
---Creates new list with zero values | |
---@param n number | |
---@private | |
---@return number[] | |
local function new_zero_list(n) | |
local t = table.new(n, 0) | |
-- we start iteration from 0 | |
-- we abuse lua store mechanism of arrays | |
for i = 0, n do | |
t[i] = 0 | |
end | |
return t | |
end | |
---fiber roller of registered collectors | |
---@private | |
function rmean:rmean_roller_f() | |
fiber.name("rmean/roller_f") | |
self._prev_ts = fiber.time() | |
if not self.roller_tm then | |
self.roller_tm = rmean("rmean_roller_time") | |
end | |
while self._running do | |
fiber.sleep(1) | |
local dt = self._prev_ts | |
self._prev_ts = fiber.time() | |
dt = self._prev_ts - dt | |
local s = clock.time() | |
local garbage = 0 | |
for i = 1, self._collectors.n do | |
local counter = self._collectors[i] | |
if counter then | |
counter:roll(dt) | |
else | |
garbage = garbage + 1 | |
end | |
end | |
if garbage > 2/3*self._collectors.n then | |
-- clear garbage | |
local j = 1 | |
for i = 1, self._collectors.n-1 do | |
if self._collectors[i] then | |
if j < i then | |
self._collectors[j] = self._collectors[i] | |
end | |
j = j + 1 | |
end | |
end | |
for i = j, self._collectors.n do | |
self._collectors[i] = nil | |
end | |
self._collectors.n = j | |
end | |
self.roller_tm:collect(clock.time()-s) | |
end | |
end | |
---Creates new rmean collector | |
---@param name string? | |
---@param window_size integer? default=5 seconds | |
---@return rmean.collector | |
function rmean.new(name, window_size) | |
if name == rmean then | |
error("Usage: rmean.new([name],[window_size]) or rmean([name],[window_size]) (not rmean:new())", 2) | |
end | |
if not name then | |
name = 'anon' | |
end | |
window_size = tonumber(window_size) or rmean.window_size | |
local obj = setmetatable({ | |
name = name, | |
window_size = window_size, | |
value = new_zero_list(window_size), | |
total = 0, | |
count = 0, | |
}, collector) | |
rmean._collectors.n = rmean._collectors.n+1 | |
rmean._collectors[rmean._collectors.n] = obj | |
if not rmean._roller_f then | |
rmean._running = true | |
rmean._roller_f = fiber.new(rmean.rmean_roller_f, rmean) | |
end | |
return obj | |
end | |
---Calculates and returns mean value | |
---@return number | |
function collector:mean() | |
local sum = 0 | |
for i = 1, self.window_size do | |
sum = sum + self.value[i] | |
end | |
return sum / self.window_size | |
end | |
---Increments current time bucket with given value | |
---@param value number | |
function collector:collect(value) | |
value = tonumber(value) | |
if not value then return end | |
self.value[0] = self.value[0] + value | |
self.total = self.total + value | |
self.count = self.count + 1 | |
end | |
-- just alias | |
collector.inc = collector.collect | |
---Rerolls statistics | |
---@param dt number | |
function collector:roll(dt) | |
if dt < 0 then return end | |
local value = self.value | |
local tmp = value[0] / dt | |
local j = self.window_size | |
while j > dt+0.1 do | |
value[j] = j > 0 and value[j-1] or tmp | |
j = j - 1 | |
end | |
while j > 0 do | |
value[j] = tmp | |
j = j - 1 | |
end | |
value[0] = 0 | |
end | |
return rmean |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
TODO: provide mehanism of destruction of rmean and rmean/collector to survive reload.
It would be nice to provide mehanism to reset all collectors