Last active
October 29, 2020 23:05
-
-
Save ochaton/f524d2da345d8aede67cd36920589847 to your computer and use it in GitHub Desktop.
Online parser of Tarantool 1.5 replication (working draft)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| --- | |
| -- TODO: | |
| -- Support all ops for update | |
| -- Create separate module | |
| local ffi = require 'ffi' | |
| local fiber = require 'fiber' | |
| ffi.cdef[[ | |
| struct header_v11 { | |
| uint32_t header_crc32c; | |
| int64_t lsn; | |
| double tm; | |
| uint32_t len; | |
| uint32_t data_crc32c; | |
| } __attribute__((packed)); | |
| enum log_format { XLOG = 65534, SNAP = 65535 }; | |
| ]] | |
| local cnn = require 'connection':new("127.0.0.1", "3316") | |
| cnn.confirmed_lsn = 1 | |
| function cnn:on_connected() | |
| self:log('D', 'connected') | |
| self.initial_version = nil | |
| self:log('D', 'confirming lsn', self.confirmed_lsn) | |
| self:push_write( | |
| ffi.new('uint64_t[1]', self.confirmed_lsn+1), 8 | |
| ) | |
| end | |
| function cnn:on_disconnected() | |
| self.initial_version = nil | |
| end | |
| local hsize = ffi.sizeof('struct header_v11') | |
| local iproto_ops = { | |
| replace = 13, | |
| update = 19, | |
| delete = 21, | |
| } | |
| for k, v in pairs(iproto_ops) do | |
| if type(v) == 'number' then iproto_ops[v] = k end | |
| end | |
| local update_op_map = { | |
| [0] = '=', | |
| [1] = '+', | |
| [2] = '&', | |
| [3] = '^', | |
| [4] = '|', | |
| [6] = 'x', | |
| [7] = '>' | |
| } | |
| local function parse_tuple(rb) | |
| local tuple_count = rb:u32() | |
| local tuple = table.new(tuple_count, 0) | |
| for i = 1, tuple_count do | |
| tuple[i] = rb:str(rb:ber()) | |
| end | |
| return tuple | |
| end | |
| local parse = { | |
| replace = function(rb) | |
| local space, flags = rb:i32(), rb:i32() | |
| local op = 'replace' | |
| if bit.band(flags, 0x02) == 0x02 then | |
| op = 'insert' | |
| end | |
| local tuple = parse_tuple(rb) | |
| assert(rb:avail() == 0, "some extra data left in buffer. Parsing failed") | |
| return { | |
| op = op, | |
| space = space, | |
| tuple = tuple, | |
| } | |
| end, | |
| update = function(rb) | |
| local space, flags = rb:i32(), rb:i32() -- luacheck: no unused | |
| local tuple = parse_tuple(rb) | |
| local op_count = rb:i32() | |
| local ops = {} | |
| for i = 1, op_count do | |
| local fno = rb:i32() | |
| local opcode = rb:i8() | |
| assert(opcode ~= 5, "perl splice not implemented") | |
| local field = rb:str(rb:ber()) | |
| ops[i] = { update_op_map[opcode], fno, field } | |
| end | |
| assert(rb:avail() == 0, "some extra data left in buffer. Parsing failed") | |
| return { | |
| op = 'update', | |
| space = space, | |
| tuple = tuple, | |
| operations = ops, | |
| } | |
| end, | |
| delete = function(rb) | |
| local space, flags = rb:i32(), rb:i32() -- luacheck: no unused | |
| local tuple = parse_tuple(rb) | |
| assert(rb:avail() == 0, "some extra data left in buffer. Parsing failed") | |
| return { | |
| op = 'delete', | |
| space = space, | |
| tuple = tuple, | |
| } | |
| end, | |
| } | |
| local saferbuf = require 'bin.saferbuf'.new | |
| local safe_on_read = function(func) | |
| return function(self, ...) | |
| local ok, err = pcall(func, self, ...) | |
| if not ok then | |
| self:log('E', "replication failed: "..err) | |
| self:on_connect_reset(0) | |
| end | |
| end | |
| end | |
| cnn.on_read = safe_on_read(function (self, last) | |
| local rb = saferbuf(self.rbuf, self.avail) | |
| self:log('D', 'on_read (last: ', last, ')', rb:avail()) | |
| if not self.initial_version then | |
| self.initial_version = rb:u32() | |
| self:log('D', 'initial version: ', self.initial_version) | |
| self.avail = self.avail - 4 | |
| end | |
| while hsize < rb:avail() do | |
| local hdr = ffi.cast('struct header_v11 *', rb.p.c) | |
| self:log('D', 'header crc32c: ', hdr.header_crc32c) | |
| self:log('D', 'header lsn: ', hdr.lsn) | |
| self:log('D', 'header tm: ', hdr.tm) | |
| self:log('D', 'header len: ', hdr.len) | |
| self:log('D', 'header data_crc32c: ', hdr.data_crc32c) | |
| assert(cnn.confirmed_lsn + 1 == hdr.lsn, "lsn missmatch") | |
| -- not enough buffer | |
| if not pcall(rb.have, rb, hdr.len+hsize) then break end | |
| rb:skip(hsize) -- read replication header | |
| local p = rb.p.c | |
| -- parse magic | |
| assert(rb:u16() == ffi.C.XLOG) | |
| self:log('D', 'xlog: ok') | |
| -- if math.random() < 0.5 then | |
| -- error "Just Random error" | |
| -- end | |
| -- drop cookie | |
| rb:skip(8) | |
| -- parse request_type | |
| local op = rb:u16() | |
| if not iproto_ops[op] then | |
| -- skip packet? | |
| error("Unknown op: "..op) | |
| else | |
| op = iproto_ops[op] | |
| end | |
| self:log('D', 'req.type: ', op) | |
| local body_len = hdr.len - (rb.p.c-p) | |
| rb:have(body_len) | |
| local trans = parse[op](saferbuf(rb.p.c, body_len)) | |
| cnn:log('I', trans.op, trans.space, trans.tuple) | |
| rb:skip(body_len) | |
| cnn:log('D', 'Updating lsn %s -> %s', cnn.confirmed_lsn, hdr.lsn) | |
| cnn.confirmed_lsn = hdr.lsn | |
| cnn:log('D', 'Replication lag: %s', fiber.time() - hdr.tm) | |
| self.avail = rb:avail() | |
| end | |
| end) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment