-
-
Save FSX/fb86595c64751201497e2050aeb722e2 to your computer and use it in GitHub Desktop.
Lua script to delete/trim all processed messages from a Redis stream
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- The goal of this script is to trim messages that have been processed by | |
-- all extant groups from the a given Redis stream. It returns the number | |
-- of messages that were deleted from the stream, if any. I make no | |
-- guarantees about its performance, particularly if the stream is large | |
-- and not fully processed (so a simple XTRIM isn't possible). | |
-- First off, bail out early if the stream doesn't exist. | |
if redis.call("EXISTS", KEYS[1]) == 0 then | |
return false | |
end | |
-- To figure out what messages are deletable, we fetch the "last- | |
-- delivered-id" for each consumer group of the stream, and set the lowest | |
-- one of those ids as our upper bound. Next, we scan the pending lists | |
-- for each group, because we also don't want to delete any events that | |
-- are delivered but not acknowledged. The lowest unacknowledged id (if | |
-- any exists) then becomes our new upper bound. | |
-- "last-delivered-id" isn't mentioned in the Redis docs, for some reason, | |
-- but it's in there, as of 5.0.3. | |
-- In the (common?) case where there are no pending messages, and the | |
-- lowest last-delivered-id equals the most recent id on the stream, we | |
-- can just do a simpler (and much more efficient) XTRIM stream MAXLEN 0. | |
-- If we can't do that, we'll have to pull in all the message ids before | |
-- the lowest unacknowledged id, and XDEL them all. | |
-- First, use XINFO GROUPS to get all group names and the most recently | |
-- distributed ids. | |
local xinfo_groups = redis.call("XINFO", "GROUPS", KEYS[1]) | |
local last_delivered_ids = {} | |
local groups = {} | |
if #xinfo_groups == 0 then | |
-- When there's no groups, there's nothing to delete. | |
return 0 | |
end | |
for _, group_info_array in ipairs(xinfo_groups) do | |
-- Redis passes us a flattened array of key, value pairs, so before | |
-- anything else, convert it into a proper hash-style table so that it's | |
-- easier to use. | |
local group_info = {} | |
for i = 1, #group_info_array, 2 do | |
group_info[group_info_array[i]] = group_info_array[i+1] | |
end | |
table.insert(groups, group_info["name"]) | |
table.insert(last_delivered_ids, group_info["last-delivered-id"]) | |
end | |
local lowest_pending_ids = {} | |
for _, group_name in ipairs(groups) do | |
local pending = redis.call("XPENDING", KEYS[1], group_name) | |
local lowest_id = pending[2] | |
if not lowest_id == false then | |
table.insert(lowest_pending_ids, lowest_id) | |
end | |
end | |
local function string_id_to_table(s) | |
local t = {} | |
for k, v in string.gmatch(s, "(%d+)-(%d+)") do | |
table.insert(t, tonumber(k)) | |
table.insert(t, tonumber(v)) | |
end | |
return t | |
end | |
-- Returns true if a < b, or if a == b (which is important later). | |
local function compare_ids(a, b) | |
local a_t = string_id_to_table(a) | |
local b_t = string_id_to_table(b) | |
return ((a_t[1] <= b_t[1]) and (a_t[2] <= b_t[2])) | |
end | |
table.sort(last_delivered_ids, compare_ids) | |
table.sort(lowest_pending_ids, compare_ids) | |
-- Here's our XTRIM optimization. | |
if #lowest_pending_ids == 0 then | |
local stream_info_array = redis.call("XINFO", "STREAM", KEYS[1]) | |
local stream_info = {} | |
for i = 1, #stream_info_array, 2 do | |
stream_info[stream_info_array[i]] = stream_info_array[i+1] | |
end | |
if last_delivered_ids[1] == stream_info["last-generated-id"] then | |
-- Yay! | |
return redis.call("XTRIM", KEYS[1], "MAXLEN", 0) | |
end | |
end | |
-- If we've gotten here, looks like we need to do a big XDEL, so find our | |
-- lower bound. | |
local lowest_id = last_delivered_ids[1] | |
-- We can include the lowest delivered id in the deletion, so long as it | |
-- isn't pending, which we'll check for next. | |
local protect_lowest_id = false | |
if #lowest_pending_ids > 0 then | |
-- We rely here on compare_ids returning true if the ids are equal. | |
if compare_ids(lowest_pending_ids[1], lowest_id) then | |
lowest_id = lowest_pending_ids[1] | |
protect_lowest_id = true | |
end | |
end | |
local messages = redis.call("XRANGE", KEYS[1], "-", lowest_id) | |
if #messages == 0 then | |
-- Nothing to delete. | |
return 0 | |
end | |
local delete_command = {"XDEL", KEYS[1]} | |
for _,t in pairs(messages) do | |
local id = t[1] | |
if (lowest_id ~= id) or (not protect_lowest_id) then | |
table.insert(delete_command, id) | |
end | |
end | |
-- There's nothing to delete, because no message IDs have | |
-- been added to the delete command. | |
if #delete_command == 2 then | |
-- Nothing to delete. | |
return 0 | |
end | |
return redis.call(unpack(delete_command)) |
Here https://gist.github.com/acardinale/4f84c9698fb1297df4a6faa20a59ce88 my version that takes advantage of XTRIM MINID and avoids the use of XDEL
Thanks for sharing, noticed on line 80, i.e.
return ((a_t[1] <= b_t[1]) and (a_t[2] <= b_t[2]))
this may fail to handle when the lowest pending msg id has -1 and the last delivered id has -0, this results in all the pending msgs being deleted as the last deliver id is being taken in this case.
Some test evidences attached
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Thanks for sharing this