Created
July 5, 2023 18:46
-
-
Save ochaton/67a1778e289ef69b34b733f3ec8cdd8e to your computer and use it in GitHub Desktop.
Index scanner for Tarantool
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
local msgpack = require 'msgpack' | |
local base64_encode = require 'digest'.base64_encode | |
local base64_decode = require 'digest'.base64_decode | |
---@alias scan.cursor string | |
---Encodes cursor to string | |
---@param index boxIndex | |
---@param tuple box.tuple | |
---@return string | |
local function encode_cursor(index, tuple) | |
local r = {} | |
local pos = 1 | |
for _, p in ipairs(index.parts) do | |
r[pos] = tuple[p.fieldno] | |
pos = pos + 1 | |
end | |
return base64_encode(msgpack.encode(r), {nopad = true, nowrap = true}) | |
end | |
---Decodes cursor to tuple | |
---@param cursor string | |
---@return box.tuple | |
local function decode_cursor(cursor) | |
return (msgpack.decode(base64_decode(cursor))) | |
end | |
local req_ok, key_def = pcall(require, 'key_def') | |
if not req_ok then | |
-- cheap and nasty key_def | |
-- for backwords compatibility supported only method :compare_with_key | |
key_def = {} | |
---@param tuple box.tuple | |
---@param key_tuple box.tuple | |
---@return integer | |
function key_def:compare_with_key(tuple, key_tuple) | |
for i = 1, #self.parts do | |
-- TODO: nullable fields? | |
local value = tuple[self.parts[i].fieldno] | |
if not key_tuple[i] then | |
break | |
end | |
if value < key_tuple[i] then | |
return -1 | |
elseif value > key_tuple[i] then | |
return 1 | |
end | |
end | |
return 0 | |
end | |
---@param parts IndexPart[] | |
---@return key_def_object | |
function key_def.new(parts) | |
local self = { parts = {} } | |
for i, part in ipairs(parts) do | |
self.parts[i] = {} | |
self.parts[i].fieldno = part.fieldno | |
self.parts[i].type = part.type | |
self.parts[i].is_nullable = part.is_nullable | |
end | |
return setmetatable(self, {__index = key_def}) | |
end | |
end | |
---@class scan.args | |
---@field index boxIndex | |
---@field limit integer required numeric limit | |
---@field cursor scan.cursor? optional string | |
---@field prefix_eq any[]? optional prefix equality | |
---@field on_each fun(box.tuple): any? optional callback which is called on each resulting tuple | |
---@class scan.result | |
---@field result box.tuple[] result | |
---@field truncated boolean true if result is truncated | |
---@field next_cursor scan.cursor? empty if not truncated | |
---@param args scan.args | |
---@return scan.result | |
return function(args) | |
local index = args.index | |
if not index then | |
error("Scan: field 'index' not given", 2) | |
end | |
if not index.unique then | |
error("Scan: non-unique indexes are not supported", 2) | |
end | |
local iter_type = 'GE' | |
local iter_opts = { iterator = iter_type } | |
local cursor = args.cursor | |
local start_from | |
if index.tuple_pos then | |
-- Tarantool ≥ 2.11 | |
start_from = args.prefix_eq | |
iter_opts.after = cursor | |
else | |
-- Tarantool ≤ 2.11 | |
if type(cursor) == 'string' then | |
local ok, res = pcall(decode_cursor, cursor) | |
if not ok then | |
error("Scan: malformed cursor is given", 2) | |
end | |
start_from = res | |
elseif cursor ~= nil then | |
error(("Scan: malformed cursor is given (required string, got %s)"):format(type(cursor)), 2) | |
else | |
start_from = args.prefix_eq | |
end | |
end | |
local prefix_eq, kd | |
if args.prefix_eq then | |
prefix_eq = args.prefix_eq | |
kd = key_def.new(index.parts) | |
end | |
local limit = tonumber(args.limit) | |
if not limit then | |
limit = 100 | |
end | |
limit = math.floor(limit) | |
if limit > 100 then | |
error("Scan: limit is too large (must be ≤100)", 2) | |
elseif limit < 1 then | |
error("Scan: limit is too low (must be ≥ 1)", 2) | |
end | |
limit = limit + 1 | |
local on_each = args.on_each | |
local result = table.new(limit, 0) | |
local res_n = 1 | |
local next_cursor, last_tuple | |
for _, tuple in index:pairs(start_from, iter_opts) do | |
limit = limit - 1 | |
-- if prefix_eq is given then | |
-- we break iteration when prefix_eq is not compared | |
if kd and kd:compare_with_key(tuple, prefix_eq) ~= 0 then | |
break | |
end | |
if limit == 0 then | |
next_cursor = tuple | |
break | |
end | |
last_tuple = tuple | |
if on_each then | |
result[res_n] = on_each(tuple) | |
else | |
result[res_n] = tuple | |
end | |
res_n = res_n + 1 | |
end | |
if next_cursor then | |
if index.tuple_pos then | |
-- Yes, in tarantool's after is strict > | |
-- so we need to get tuple_pos of last_tuple answered to client | |
next_cursor = index:tuple_pos(last_tuple) | |
else | |
next_cursor = encode_cursor(index, next_cursor) | |
end | |
end | |
return { | |
count = res_n, | |
result = result, | |
truncated = next_cursor ~= nil, | |
next_cursor = next_cursor, | |
} | |
end | |
--[[ | |
Example: | |
local scanner = require 'scan' | |
-- Assume we have such space: | |
box.schema.space.create('payments', { | |
format = { | |
{ name = 'payment_id', type = 'uuid' }, | |
{ name = 'user', type = 'unsigned' }, | |
{ name = 'timestamp', type = 'unsigned' }, | |
{ name = 'amount', type = 'decimal' }, | |
}, | |
}) | |
-- And following indexes: | |
box.space.payments:create_index('primary', { parts = {'payment_id'} }) | |
box.space.payments:create_index('by_user', { parts = {'user', 'payment_id'} }) | |
box.space.payments:create_index('by_user_timestamp', { parts = {'user', 'timestamp' 'payment_id'} }) | |
-- And we want to fetch all payments for specified user ordered by date in Ascending Order | |
-- Then we use scanner like this | |
local records = scanner { | |
index = box.space.payments.index.by_user_timestamp, | |
prefix_eq = { 12312313 }, -- `12312313` is user_id | |
} | |
-- records will have following fields: | |
records = { | |
result = { ... }, -- all fetched records with user == `12312313` (this is prefix of our index) | |
count = 100, -- number of records were scanned to build result | |
truncated = true|false, -- boolean flag which is true when there are some records left to fetch for this user | |
next_cursor = ".....", -- cursor which points to next valid record, client may pass it to get next portion of data | |
} | |
-- After that client ma fetch next portion of data: | |
local next_records = scanner { | |
index = box.space.payments.index.by_user_timestamp, -- same as previous | |
prefix_eq = { 12312313 }, -- `12312313` is user_id -- same as previous | |
cursor = "...." -- cursor which was retrieved from previous request (field `next_cursor`) | |
} | |
-- Size of `result` is limited by variable limit (default=100, and cannot be increased) | |
-- User may decrease limit passing needed value: | |
local records = scanner { | |
index = box.space.payments.index.by_user_timestamp, | |
prefix_eq = { 12312313 }, -- `12312313` is user_id | |
limit = 50, | |
} | |
-- limit must be integer and in between interval [1, 100] | |
-- User may pass on_each callback to make some preprocessing | |
-- Then returned value from `on_each` callback will be returned | |
local records = scanner { | |
index = box.space.payments.index.by_user_timestamp, | |
prefix_eq = { 12312313 }, -- `12312313` is user_id | |
-- Casts box.tuple to kv-map (for java/golang and others) | |
on_each = function(tuple) | |
return tuple:tomap{ names_only = true } | |
end | |
} | |
]] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment