Skip to content

Instantly share code, notes, and snippets.

@ochaton
Created March 10, 2023 19:22
Show Gist options
  • Save ochaton/e227ad3be8173f7e00e7d7bc80ec1699 to your computer and use it in GitHub Desktop.
Save ochaton/e227ad3be8173f7e00e7d7bc80ec1699 to your computer and use it in GitHub Desktop.
Tarantool graphite test helper
local socket = require 'socket'
local fiber = require 'fiber'
local url = require 'uri'
local log = require 'log'
local M = {}
M.__index = M
---Creates new graphite object (by default space is temporary)
---@param space_name? string name of metrics space
---@param space_opts SpaceCreateOptions? options for space
---@return table
function M.new(space_name, space_opts)
space_name = space_name or M.space_name or 'graphite_data'
space_opts = space_opts or {}
if type(space_opts) ~= 'table' then
error("graphite.new([space_name],[space_opts]): second argument must be a table", 2)
end
if space_opts.temporary == nil then
space_opts.temporary = true
end
if space_opts.if_not_exists == nil then
space_opts.if_not_exists = true
end
local space = box.schema.space.create(space_name, space_opts)
space:format({
{ name = 'name', type = 'string' },
{ name = 'time', type = 'number' },
{ name = 'value', type = 'number' },
})
local fmt = assert(space:format())
space:create_index('primary', {
if_not_exists = true,
parts = {
{ 2, fmt[2].type }, -- time
{ 1, fmt[1].type }, -- name
}
})
space:create_index('name', {
if_not_exists = true,
parts = {
{ 1, fmt[1].type }, -- name
{ 2, fmt[2].type }, -- time
},
})
return setmetatable({ space = space }, M)
end
---@class GraphitePoint
---@field name string
---@field value number
---@field time number
---Parses given text
---@param text string
---@return GraphitePoint[], string[]? errors
function M:parse(text)
local points = {}
local errors = {}
for line in text:gmatch("[^\n]+") do
-- https://github.com/go-graphite/go-carbon/blob/230055528fb0a8abd95531b138d114ce7ea89f11/points/points.go#L123
local row = line:gsub("[\n\t\r]", ""):split(" ")
if #row ~= 3 then
-- malformed row
table.insert(errors, ("not enough substrings (expected 3, got %d): %q"):format(
#row,
line
))
goto next_line
end
local name = row[1]
local value = tonumber(row[2])
if value == nil or value ~= value then
-- malformed row
table.insert(errors, ("metrics value is not parsed (expected number): %q"):format(
line
))
goto next_line
end
local tsf = tonumber(row[3])
if tsf == nil or tsf ~= tsf then
-- malformed row
table.insert(errors, ("metrics time is not parsed (expected number): %q"):format(
line
))
goto next_line
end
table.insert(points, { name = name, value = value, time = tsf })
::next_line::
end
if errors[1] then
return points, errors
end
return points
end
---Saves points to space
---@param points GraphitePoint[]
function M:persist(points)
for _, point in ipairs(points) do
self.space:replace({
point.name,
point.time,
point.value,
})
end
end
---Binds graphite listenner on socket
---@param bind_uri? string uri to bind to (defaults to :2003)
---@param opts { tcp: boolean?, udp: boolean? } options to bind (defaults to tcp and udp true)
function M:listen(bind_uri, opts)
assert(self.space, "space is not defined. Did you call graphite.new()?")
opts = opts or { tcp = true, udp = true }
bind_uri = bind_uri or "0.0.0.0:2003"
if type(bind_uri) ~= 'string' then
error("g:listen([bind_uri], [opts]): first argument bind_uri must be a string", 2)
end
if type(opts) ~= 'table' then
error("g:listen([bind_uri], [opts]): second argument opts must be a table", 2)
end
if opts.tcp == nil then
opts.tcp = true
end
if opts.udp == nil then
opts.udp = true
end
local uri = assert(url.parse(bind_uri))
-- if uri.scheme is given then disable other options
if uri.scheme then
for _, scheme in ipairs{"tcp","udp"} do
if scheme ~= uri.scheme then
opts[scheme] = false
end
end
end
if not opts.udp and not opts.tcp then
error("both tcp and udp disabled", 2)
end
local bind_addr = url.format({host=uri.host or "0.0.0.0", service = tostring(uri.service or 2003)})
local bind_host = assert(url.parse(bind_addr).host)
local bind_port = assert(tonumber(url.parse(bind_addr).service))
if opts.udp ~= false then
local udp = assert(socket('AF_INET', 'SOCK_DGRAM', 'udp'))
local addrinfo = assert(socket.getaddrinfo(bind_host, bind_port, {
type = 'SOCK_DGRAM', family = 'AF_INET', protocol = 'udp' }))
if #addrinfo == 0 then
error(("getaddrinfo for %s returned empty result"):format(bind_addr))
end
if not udp:bind(addrinfo[1].host, addrinfo[1].port) then
error(("bind to %s:%s (%s) failed %s:%s"):format(
addrinfo[1].host, addrinfo[1].port, bind_uri,
udp:errno(), udp:error()
))
end
if udp:name().port ~= addrinfo[1].port then
error(("socket was binded to another port O_o? %s not %s"):format(udp:name().port, addrinfo[1].port))
end
if udp:name().host ~= addrinfo[1].host then
error(("socket was binded to another host O_o? %s not %s"):format(udp:name().host, addrinfo[1].host))
end
self.udp = udp
self.udp_reader_f = fiber.create(function()
fiber.name(("udp:/%s"):format(bind_addr), { truncate = true })
log.info("started")
while self.udp do
self.udp:readable(1)
local msg, source = self.udp:recvfrom()
if not msg then goto continue end
local points, errs = self:parse(msg)
if errs then
for _, err in ipairs(errs) do
log.error(err)
end
end
self:persist(points)
log.info("received %d points from %s:%s", #points, source.host, source.port)
::continue::
end
end)
end
if opts.tcp ~= false then
local addrinfo = assert(socket.getaddrinfo(bind_host, bind_port, {
type = 'SOCK_STREAM', family = 'AF_INET', protocol = 'tcp' }))
if #addrinfo == 0 then
error(("getaddrinfo for %s returned empty result"):format(bind_addr))
end
self.tcp = assert(socket.tcp_server(addrinfo[1].host, addrinfo[1].port,
function (client)
client:nonblock(true)
log.info("incoming data from %s:%s", client:name().host, client:name().port)
while true do
client:readable(1)
local text = assert(client:read("\n"))
if text == "" or text == nil then
break
end
local points, errs = self:parse(text)
if errs then
for _, err in ipairs(errs) do
log.error(err)
end
end
self:persist(points)
end
end))
end
end
return M
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment