Created
March 10, 2023 19:22
-
-
Save ochaton/e227ad3be8173f7e00e7d7bc80ec1699 to your computer and use it in GitHub Desktop.
Tarantool graphite test helper
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
local socket = require 'socket' | |
local fiber = require 'fiber' | |
local url = require 'uri' | |
local log = require 'log' | |
local M = {} | |
M.__index = M | |
---Creates new graphite object (by default space is temporary) | |
---@param space_name? string name of metrics space | |
---@param space_opts SpaceCreateOptions? options for space | |
---@return table | |
function M.new(space_name, space_opts) | |
space_name = space_name or M.space_name or 'graphite_data' | |
space_opts = space_opts or {} | |
if type(space_opts) ~= 'table' then | |
error("graphite.new([space_name],[space_opts]): second argument must be a table", 2) | |
end | |
if space_opts.temporary == nil then | |
space_opts.temporary = true | |
end | |
if space_opts.if_not_exists == nil then | |
space_opts.if_not_exists = true | |
end | |
local space = box.schema.space.create(space_name, space_opts) | |
space:format({ | |
{ name = 'name', type = 'string' }, | |
{ name = 'time', type = 'number' }, | |
{ name = 'value', type = 'number' }, | |
}) | |
local fmt = assert(space:format()) | |
space:create_index('primary', { | |
if_not_exists = true, | |
parts = { | |
{ 2, fmt[2].type }, -- time | |
{ 1, fmt[1].type }, -- name | |
} | |
}) | |
space:create_index('name', { | |
if_not_exists = true, | |
parts = { | |
{ 1, fmt[1].type }, -- name | |
{ 2, fmt[2].type }, -- time | |
}, | |
}) | |
return setmetatable({ space = space }, M) | |
end | |
---@class GraphitePoint | |
---@field name string | |
---@field value number | |
---@field time number | |
---Parses given text | |
---@param text string | |
---@return GraphitePoint[], string[]? errors | |
function M:parse(text) | |
local points = {} | |
local errors = {} | |
for line in text:gmatch("[^\n]+") do | |
-- https://github.com/go-graphite/go-carbon/blob/230055528fb0a8abd95531b138d114ce7ea89f11/points/points.go#L123 | |
local row = line:gsub("[\n\t\r]", ""):split(" ") | |
if #row ~= 3 then | |
-- malformed row | |
table.insert(errors, ("not enough substrings (expected 3, got %d): %q"):format( | |
#row, | |
line | |
)) | |
goto next_line | |
end | |
local name = row[1] | |
local value = tonumber(row[2]) | |
if value == nil or value ~= value then | |
-- malformed row | |
table.insert(errors, ("metrics value is not parsed (expected number): %q"):format( | |
line | |
)) | |
goto next_line | |
end | |
local tsf = tonumber(row[3]) | |
if tsf == nil or tsf ~= tsf then | |
-- malformed row | |
table.insert(errors, ("metrics time is not parsed (expected number): %q"):format( | |
line | |
)) | |
goto next_line | |
end | |
table.insert(points, { name = name, value = value, time = tsf }) | |
::next_line:: | |
end | |
if errors[1] then | |
return points, errors | |
end | |
return points | |
end | |
---Saves points to space | |
---@param points GraphitePoint[] | |
function M:persist(points) | |
for _, point in ipairs(points) do | |
self.space:replace({ | |
point.name, | |
point.time, | |
point.value, | |
}) | |
end | |
end | |
---Binds graphite listenner on socket | |
---@param bind_uri? string uri to bind to (defaults to :2003) | |
---@param opts { tcp: boolean?, udp: boolean? } options to bind (defaults to tcp and udp true) | |
function M:listen(bind_uri, opts) | |
assert(self.space, "space is not defined. Did you call graphite.new()?") | |
opts = opts or { tcp = true, udp = true } | |
bind_uri = bind_uri or "0.0.0.0:2003" | |
if type(bind_uri) ~= 'string' then | |
error("g:listen([bind_uri], [opts]): first argument bind_uri must be a string", 2) | |
end | |
if type(opts) ~= 'table' then | |
error("g:listen([bind_uri], [opts]): second argument opts must be a table", 2) | |
end | |
if opts.tcp == nil then | |
opts.tcp = true | |
end | |
if opts.udp == nil then | |
opts.udp = true | |
end | |
local uri = assert(url.parse(bind_uri)) | |
-- if uri.scheme is given then disable other options | |
if uri.scheme then | |
for _, scheme in ipairs{"tcp","udp"} do | |
if scheme ~= uri.scheme then | |
opts[scheme] = false | |
end | |
end | |
end | |
if not opts.udp and not opts.tcp then | |
error("both tcp and udp disabled", 2) | |
end | |
local bind_addr = url.format({host=uri.host or "0.0.0.0", service = tostring(uri.service or 2003)}) | |
local bind_host = assert(url.parse(bind_addr).host) | |
local bind_port = assert(tonumber(url.parse(bind_addr).service)) | |
if opts.udp ~= false then | |
local udp = assert(socket('AF_INET', 'SOCK_DGRAM', 'udp')) | |
local addrinfo = assert(socket.getaddrinfo(bind_host, bind_port, { | |
type = 'SOCK_DGRAM', family = 'AF_INET', protocol = 'udp' })) | |
if #addrinfo == 0 then | |
error(("getaddrinfo for %s returned empty result"):format(bind_addr)) | |
end | |
if not udp:bind(addrinfo[1].host, addrinfo[1].port) then | |
error(("bind to %s:%s (%s) failed %s:%s"):format( | |
addrinfo[1].host, addrinfo[1].port, bind_uri, | |
udp:errno(), udp:error() | |
)) | |
end | |
if udp:name().port ~= addrinfo[1].port then | |
error(("socket was binded to another port O_o? %s not %s"):format(udp:name().port, addrinfo[1].port)) | |
end | |
if udp:name().host ~= addrinfo[1].host then | |
error(("socket was binded to another host O_o? %s not %s"):format(udp:name().host, addrinfo[1].host)) | |
end | |
self.udp = udp | |
self.udp_reader_f = fiber.create(function() | |
fiber.name(("udp:/%s"):format(bind_addr), { truncate = true }) | |
log.info("started") | |
while self.udp do | |
self.udp:readable(1) | |
local msg, source = self.udp:recvfrom() | |
if not msg then goto continue end | |
local points, errs = self:parse(msg) | |
if errs then | |
for _, err in ipairs(errs) do | |
log.error(err) | |
end | |
end | |
self:persist(points) | |
log.info("received %d points from %s:%s", #points, source.host, source.port) | |
::continue:: | |
end | |
end) | |
end | |
if opts.tcp ~= false then | |
local addrinfo = assert(socket.getaddrinfo(bind_host, bind_port, { | |
type = 'SOCK_STREAM', family = 'AF_INET', protocol = 'tcp' })) | |
if #addrinfo == 0 then | |
error(("getaddrinfo for %s returned empty result"):format(bind_addr)) | |
end | |
self.tcp = assert(socket.tcp_server(addrinfo[1].host, addrinfo[1].port, | |
function (client) | |
client:nonblock(true) | |
log.info("incoming data from %s:%s", client:name().host, client:name().port) | |
while true do | |
client:readable(1) | |
local text = assert(client:read("\n")) | |
if text == "" or text == nil then | |
break | |
end | |
local points, errs = self:parse(text) | |
if errs then | |
for _, err in ipairs(errs) do | |
log.error(err) | |
end | |
end | |
self:persist(points) | |
end | |
end)) | |
end | |
end | |
return M |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment