Skip to content

Instantly share code, notes, and snippets.

@ochaton
Created July 1, 2023 17:57
Show Gist options
  • Save ochaton/f3907afe372dc4649d8a7b8a00e39dd5 to your computer and use it in GitHub Desktop.
Save ochaton/f3907afe372dc4649d8a7b8a00e39dd5 to your computer and use it in GitHub Desktop.
Plain Lua implementation of Tarantool's rmean_collect
---Class rmean is plain-Lua implementation of Tarantool's rmean collector
---
---rmean provides window function mean with specified window size (default=5s)
---rmean well tested on 10K parallel running collectors
---
---rmean:collect(value) is lightning fast ≈ 1B calls per second with jit.on
---and ≈ 15M with jit.off
---
---rmean:mean() makes 10M calls per second with jit.off() and ≈50M calls with jit.on
---
---rmean creates sigle fiber for all collectors which rerolls timings
---it takes 4µs per created counter.
---@class rmean
---@field window_size number default window_size of rmean.collector
---@field roller_tm rmean.collector
---@field private _prev_ts number last timestamp
---@field private _running boolean
---@field private _collectors table<rmean.collector, rmean.collector>
local rmean = {}
rmean.__index = rmean
rmean._collectors = setmetatable({ n = 0 }, { __mode = 'kv' })
setmetatable(rmean, {__call = function(class, ...) return class.new(...) end})
---Default window_size
rmean.window_size = 5
local clock = require 'clock'
local fiber = require 'fiber'
---rmean.collector is named separate counter
---@class rmean.collector
---@field name string? name of collector
---@field window_size number window size of collector (by default=5s)
---@field value number[] list of accumulated values per second
---@field total number sum of all values
---@field count number monotonic counter of all incoming collects
local collector = {}
collector.__index = collector
collector.__tostring = function(self)
return ("rmean<%s> [%.2f/%.2f#%d]"):format(
self.name or 'anon',
self:mean(),
self.total,
self.count
)
end
---Creates new list with zero values
---@param n number
---@private
---@return number[]
local function new_zero_list(n)
local t = table.new(n, 0)
-- we start iteration from 0
-- we abuse lua store mechanism of arrays
for i = 0, n do
t[i] = 0
end
return t
end
---fiber roller of registered collectors
---@private
function rmean:rmean_roller_f()
fiber.name("rmean/roller_f")
self._prev_ts = fiber.time()
if not self.roller_tm then
self.roller_tm = rmean("rmean_roller_time")
end
while self._running do
fiber.sleep(1)
local dt = self._prev_ts
self._prev_ts = fiber.time()
dt = self._prev_ts - dt
local s = clock.time()
local garbage = 0
for i = 1, self._collectors.n do
local counter = self._collectors[i]
if counter then
counter:roll(dt)
else
garbage = garbage + 1
end
end
if garbage > 2/3*self._collectors.n then
-- clear garbage
local j = 1
for i = 1, self._collectors.n-1 do
if self._collectors[i] then
if j < i then
self._collectors[j] = self._collectors[i]
end
j = j + 1
end
end
for i = j, self._collectors.n do
self._collectors[i] = nil
end
self._collectors.n = j
end
self.roller_tm:collect(clock.time()-s)
end
end
---Creates new rmean collector
---@param name string?
---@param window_size integer? default=5 seconds
---@return rmean.collector
function rmean.new(name, window_size)
if name == rmean then
error("Usage: rmean.new([name],[window_size]) or rmean([name],[window_size]) (not rmean:new())", 2)
end
if not name then
name = 'anon'
end
window_size = tonumber(window_size) or rmean.window_size
local obj = setmetatable({
name = name,
window_size = window_size,
value = new_zero_list(window_size),
total = 0,
count = 0,
}, collector)
rmean._collectors.n = rmean._collectors.n+1
rmean._collectors[rmean._collectors.n] = obj
if not rmean._roller_f then
rmean._running = true
rmean._roller_f = fiber.new(rmean.rmean_roller_f, rmean)
end
return obj
end
---Calculates and returns mean value
---@return number
function collector:mean()
local sum = 0
for i = 1, self.window_size do
sum = sum + self.value[i]
end
return sum / self.window_size
end
---Increments current time bucket with given value
---@param value number
function collector:collect(value)
value = tonumber(value)
if not value then return end
self.value[0] = self.value[0] + value
self.total = self.total + value
self.count = self.count + 1
end
-- just alias
collector.inc = collector.collect
---Rerolls statistics
---@param dt number
function collector:roll(dt)
if dt < 0 then return end
local value = self.value
local tmp = value[0] / dt
local j = self.window_size
while j > dt+0.1 do
value[j] = j > 0 and value[j-1] or tmp
j = j - 1
end
while j > 0 do
value[j] = tmp
j = j - 1
end
value[0] = 0
end
return rmean
@ochaton
Copy link
Author

ochaton commented Jul 1, 2023

TODO: provide mehanism of destruction of rmean and rmean/collector to survive reload.

It would be nice to provide mehanism to reset all collectors

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment