Skip to content

Instantly share code, notes, and snippets.

@ochaton
Last active October 29, 2020 23:05
Show Gist options
  • Select an option

  • Save ochaton/f524d2da345d8aede67cd36920589847 to your computer and use it in GitHub Desktop.

Select an option

Save ochaton/f524d2da345d8aede67cd36920589847 to your computer and use it in GitHub Desktop.
Online parser of Tarantool 1.5 replication (working draft)
---
-- TODO:
-- Support all ops for update
-- Create separate module
local ffi = require 'ffi'
local fiber = require 'fiber'
ffi.cdef[[
struct header_v11 {
uint32_t header_crc32c;
int64_t lsn;
double tm;
uint32_t len;
uint32_t data_crc32c;
} __attribute__((packed));
enum log_format { XLOG = 65534, SNAP = 65535 };
]]
local cnn = require 'connection':new("127.0.0.1", "3316")
cnn.confirmed_lsn = 1
function cnn:on_connected()
self:log('D', 'connected')
self.initial_version = nil
self:log('D', 'confirming lsn', self.confirmed_lsn)
self:push_write(
ffi.new('uint64_t[1]', self.confirmed_lsn+1), 8
)
end
function cnn:on_disconnected()
self.initial_version = nil
end
local hsize = ffi.sizeof('struct header_v11')
local iproto_ops = {
replace = 13,
update = 19,
delete = 21,
}
for k, v in pairs(iproto_ops) do
if type(v) == 'number' then iproto_ops[v] = k end
end
local update_op_map = {
[0] = '=',
[1] = '+',
[2] = '&',
[3] = '^',
[4] = '|',
[6] = 'x',
[7] = '>'
}
local function parse_tuple(rb)
local tuple_count = rb:u32()
local tuple = table.new(tuple_count, 0)
for i = 1, tuple_count do
tuple[i] = rb:str(rb:ber())
end
return tuple
end
local parse = {
replace = function(rb)
local space, flags = rb:i32(), rb:i32()
local op = 'replace'
if bit.band(flags, 0x02) == 0x02 then
op = 'insert'
end
local tuple = parse_tuple(rb)
assert(rb:avail() == 0, "some extra data left in buffer. Parsing failed")
return {
op = op,
space = space,
tuple = tuple,
}
end,
update = function(rb)
local space, flags = rb:i32(), rb:i32() -- luacheck: no unused
local tuple = parse_tuple(rb)
local op_count = rb:i32()
local ops = {}
for i = 1, op_count do
local fno = rb:i32()
local opcode = rb:i8()
assert(opcode ~= 5, "perl splice not implemented")
local field = rb:str(rb:ber())
ops[i] = { update_op_map[opcode], fno, field }
end
assert(rb:avail() == 0, "some extra data left in buffer. Parsing failed")
return {
op = 'update',
space = space,
tuple = tuple,
operations = ops,
}
end,
delete = function(rb)
local space, flags = rb:i32(), rb:i32() -- luacheck: no unused
local tuple = parse_tuple(rb)
assert(rb:avail() == 0, "some extra data left in buffer. Parsing failed")
return {
op = 'delete',
space = space,
tuple = tuple,
}
end,
}
local saferbuf = require 'bin.saferbuf'.new
local safe_on_read = function(func)
return function(self, ...)
local ok, err = pcall(func, self, ...)
if not ok then
self:log('E', "replication failed: "..err)
self:on_connect_reset(0)
end
end
end
cnn.on_read = safe_on_read(function (self, last)
local rb = saferbuf(self.rbuf, self.avail)
self:log('D', 'on_read (last: ', last, ')', rb:avail())
if not self.initial_version then
self.initial_version = rb:u32()
self:log('D', 'initial version: ', self.initial_version)
self.avail = self.avail - 4
end
while hsize < rb:avail() do
local hdr = ffi.cast('struct header_v11 *', rb.p.c)
self:log('D', 'header crc32c: ', hdr.header_crc32c)
self:log('D', 'header lsn: ', hdr.lsn)
self:log('D', 'header tm: ', hdr.tm)
self:log('D', 'header len: ', hdr.len)
self:log('D', 'header data_crc32c: ', hdr.data_crc32c)
assert(cnn.confirmed_lsn + 1 == hdr.lsn, "lsn missmatch")
-- not enough buffer
if not pcall(rb.have, rb, hdr.len+hsize) then break end
rb:skip(hsize) -- read replication header
local p = rb.p.c
-- parse magic
assert(rb:u16() == ffi.C.XLOG)
self:log('D', 'xlog: ok')
-- if math.random() < 0.5 then
-- error "Just Random error"
-- end
-- drop cookie
rb:skip(8)
-- parse request_type
local op = rb:u16()
if not iproto_ops[op] then
-- skip packet?
error("Unknown op: "..op)
else
op = iproto_ops[op]
end
self:log('D', 'req.type: ', op)
local body_len = hdr.len - (rb.p.c-p)
rb:have(body_len)
local trans = parse[op](saferbuf(rb.p.c, body_len))
cnn:log('I', trans.op, trans.space, trans.tuple)
rb:skip(body_len)
cnn:log('D', 'Updating lsn %s -> %s', cnn.confirmed_lsn, hdr.lsn)
cnn.confirmed_lsn = hdr.lsn
cnn:log('D', 'Replication lag: %s', fiber.time() - hdr.tm)
self.avail = rb:avail()
end
end)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment