Last active
July 30, 2023 14:42
-
-
Save fabianeichinger/994be334ac91e63f5dd67c8d56734e07 to your computer and use it in GitHub Desktop.
Redis Tag Cache
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { WatchError, createClient } from 'redis'; | |
import { readFileSync } from 'fs'; | |
const SET_SCRIPT = readFileSync(`SET.lua`, 'utf-8') | |
const VACUUM_SCRIPT = readFileSync(`VACUUM.lua`, 'utf-8'); | |
/** | |
* @param {string} key | |
*/ | |
function valueKey(key) { | |
return "value:" + key | |
} | |
/** | |
* @param {string} tag | |
*/ | |
function tagSetKey(tag) { | |
return "tag:" + tag | |
} | |
/** | |
* @param {string} tag | |
*/ | |
function tagItsKey(tag) { | |
return "tagits:" + tag | |
} | |
function tagVacuumKey(tag) { | |
return "vacuum:" + tag; | |
} | |
function timeSecs(time) { | |
return Math.floor(time.getTime() / 1000); | |
} | |
class C { | |
static sh = new WeakMap() | |
/** | |
* | |
* @param {import('redis').RedisClientType<?, ?, ?>} client | |
* @returns {Promise<C>} | |
*/ | |
static async create(client) { | |
return new C(client).init() | |
} | |
/** | |
* | |
* @param {import('redis').RedisClientType<?, ?, ?>} client | |
*/ | |
constructor(client) { | |
if (!client.isOpen) { | |
throw new Error('Redis not open'); | |
} | |
this.client = client; | |
this.t = 0; | |
} | |
async init() { | |
const t = await this.client.time(); | |
this.t = timeSecs(t); | |
if (C.sh.has(this.client)) { | |
this.sh = C.sh.get(this.client); | |
} else { | |
this.sh = Promise.all([ | |
this.client.scriptLoad(SET_SCRIPT), | |
this.client.scriptLoad(VACUUM_SCRIPT) | |
]).then(([set, vacuum]) => ({ | |
set, vacuum | |
})); | |
C.sh.set(this.client, this.sh) | |
} | |
return this; | |
} | |
/** | |
* @param {string} key | |
*/ | |
async get(key) { | |
this.client.get(valueKey(key)); | |
} | |
/** | |
* @param {string} key | |
* @param {string} value | |
* @param {Set<string>} tags | |
* @param {number} ex | |
* @param {boolean} vac | |
*/ | |
async set(key, value, tags, ex, vac) { | |
const tagsArr = [...tags] | |
const rk = [valueKey(key), ...tagsArr.map(tagSetKey), ...tagsArr.map(tagItsKey)] | |
const exat = this.t + ex; | |
const ret = await this.client.evalSha((await this.sh).set, { | |
keys: rk, | |
arguments: [value, String(this.t), String(exat)] | |
}); | |
if (vac && ret) { | |
for (const tag of tags) { | |
vacuum(this.client, tag, this.t); | |
} | |
} | |
} | |
} | |
let vacs = 0 | |
/** | |
* @param {import('redis').RedisClientType<?, ?, ?>} client | |
* @param {string} tag | |
* @param {number} t | |
*/ | |
export async function vacuum(client, tag, t) { | |
vacs++ | |
const tsk = tagSetKey(tag); | |
const tik = tagItsKey(tag); | |
const tvk = tagVacuumKey(tag); | |
const vl = Math.random() | |
let fr = true | |
while(await client.evalSha((await C.sh.get(client)).vacuum, { | |
keys: [tsk, tik, tvk], | |
arguments: [String(t), String(vl), fr ? 'yes' : 'no'] | |
})) { | |
/*console.count('vacuum ' + tag)*/ | |
fr = false | |
} | |
vacs-- | |
} | |
setInterval(() => console.log('vacs: ' + vacs), 1000); | |
/** | |
* @param {import("redis").RedisClientType<any, any, any>} client | |
* @param {Set<string>} tags | |
*/ | |
async function invalidate(client, tags) { | |
const t = String(timeSecs(await client.time())); | |
await client.multi() | |
.del([...tags].map(tag => tagVacuumKey(tag))) | |
.mSet(Object.fromEntries([...tags].map(tag => ([tagItsKey(tag), t])))) | |
.exec(); | |
let c = 0 | |
await Promise.all([...tags].map(async (tag) => { | |
await client.executeIsolated(async (client) => { | |
const tsk = tagSetKey(tag); | |
const tik = tagItsKey(tag); | |
client.watch([tsk, tik]); | |
let tvks, cursor = 0, tt | |
do { | |
[{cursor, members: tvks}, tt] = await Promise.all([ | |
client.sScan(tsk, cursor, { COUNT: 100 }), | |
client.get(tik) | |
]); | |
// console.count('tvks ' + tsk) | |
// console.log(tvks.length) | |
// console.log(new Set(tvks).size) | |
// console.log(tvks[0]) | |
if (tvks.length > 0) { | |
client.del(tvks).then(cc => c += cc); | |
} | |
if (t !== tt) { | |
console.warn('different inval started', tag, t, tt) | |
return | |
} | |
} while (cursor !== 0) | |
try { | |
await client.multi().del(tsk).exec(); | |
} catch (e) { | |
if (e instanceof WatchError) { | |
console.warn('this is fine') | |
} | |
} | |
}) | |
})) | |
return c | |
} | |
export class Compat extends C { | |
/** | |
* @param {import("redis").RedisClientType<any, any, any>} client | |
*/ | |
static async create(client) { | |
return new Compat(client).init() | |
} | |
/** | |
* @param {string[]} keys | |
*/ | |
async get(...keys) { | |
if (keys.length === 1) { | |
const res = await super.get(keys[0]); | |
return JSON.parse(res); | |
} | |
return Promise.all(keys.map(k => { | |
const res = super.get(k); | |
return JSON.parse(res); | |
})) | |
} | |
/** | |
* @param {string} key | |
* @param {string} data | |
* @param {string[]} tags | |
*/ | |
async set(key, data, tags, options = {vacuum: true}) { | |
return super.set(key, JSON.stringify(data), new Set(tags), Math.floor(options.timeout / 1000), options.vacuum); | |
} | |
/** | |
* @param {string[]} tags | |
*/ | |
async invalidate(...tags) { | |
return invalidate(this.client, new Set(tags)); | |
} | |
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- KEYS[nk = 1+2m]: vk, tsk_1, ..., tsk_m, tik_1, ..., tik_m | |
-- ARGV[]: v, t, exat | |
-- redis.log(redis.LOG_WARNING, x) | |
local nk = #KEYS | |
local m = math.floor(nk / 2) | |
local v = ARGV[1] | |
local t = tonumber(ARGV[2]) | |
local exat = tonumber(ARGV[3]) | |
local tis = redis.call('mget', unpack(KEYS,2+m,nk)) | |
for i=1,m do | |
local ti = tonumber(tis[i]) | |
-- Check if any tag was invalidated while processing | |
if ti and ti >= t then | |
return false | |
end | |
end | |
for i=2,1+m do | |
redis.call('sadd', KEYS[i], KEYS[1]) | |
if redis.call('expireat', KEYS[i], exat, 'gt') ~= 1 then | |
redis.call('expireat', KEYS[i], exat, 'nx') | |
end | |
end | |
redis.call('set', KEYS[1], v, 'exat', exat) | |
return true |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- KEYS: tsk, tik, tvk | |
-- ARGS: t, vl, fr | |
-- redis.log(redis.LOG_WARNING, x) | |
local nk = table.getn(KEYS) | |
local m = math.floor(nk / 2) | |
local t = tonumber(ARGV[1]) | |
local vl = ARGV[2] | |
local fr = ARGV[3] == 'yes' | |
-- local before = redis.call('get', KEYS[3]) | |
local lock | |
if fr then | |
lock = redis.call('set', KEYS[3], vl, 'get', 'nx', 'ex', 1) | |
else | |
lock = redis.call('get', KEYS[3]) | |
end | |
-- local after = redis.call('get', KEYS[3]) | |
-- redis.log(redis.LOG_WARNING, vl.." "..tostring(before).." "..tostring(lock).." "..tostring(after)) | |
if lock then | |
if lock == vl then | |
redis.call('expire', KEYS[3], 1) | |
-- redis.log(redis.LOG_WARNING, "cont"..KEYS[1]) | |
else | |
-- redis.log(redis.LOG_WARNING, "stop"..KEYS[1]) | |
return false | |
end | |
elseif not fr then | |
return false | |
end | |
if not fr then | |
redis.log(redis.LOG_WARNING, "not fr") | |
end | |
local ti = tonumber(redis.call('get', KEYS[2])) | |
if ti and ti >= t then | |
return false | |
end | |
local i = 0 | |
local delcnt | |
repeat | |
i = i+1 | |
local membs = redis.call('srandmember', KEYS[1], -20) | |
for j=#membs,1,-1 do | |
if redis.call('exists', membs[j]) == 1 then | |
table.remove(membs, j) | |
end | |
end | |
if #membs > 0 then | |
-- rems = rems + #membs | |
delcnt = redis.call('srem', KEYS[1], unpack(membs)) | |
else | |
delcnt = 0 | |
end | |
until delcnt < 5 or i == 5 | |
redis.log(redis.LOG_WARNING, delcnt) | |
return delcnt >= 5 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment