Last active
July 1, 2022 23:23
-
-
Save intech/4c6d064bf4afa38a817df9af69da62f1 to your computer and use it in GitHub Desktop.
Moleculer implementation of the redlock algorithm for distributed Redis locks
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module.exports = { | |
name: "lock", | |
settings: { | |
lock: { | |
ttl: 10e3 | |
} | |
}, | |
methods: { | |
/** | |
* Try lock key | |
* @param {String} key | |
* @param {Object} opts | |
* @return {Promise<void>} | |
*/ | |
async tryLock(key, opts = {}) { | |
key = | |
this.broker.cacher.prefix + [this.fullName, key, "lock"].filter(v => !!v).join("-"); | |
this.logger.debug("Check locked...", key); | |
if (this.locks.has(key)) return this.locks.get(key); | |
this.logger.debug("Await locking...", key); | |
const client = | |
"retryCount" in opts && opts.retryCount === 0 ? "redlockNonBlocking" : "redlock"; | |
const lock = await this[client].lockWithOptions(key, this.settings.lock.ttl, opts); | |
lock.key = key; | |
this.locks.set(key, lock); | |
this.logger.debug(key, "Locked!"); | |
return lock; | |
}, | |
/** | |
* Unlock | |
* @param key | |
* @return {Promise<*>} | |
*/ | |
async unlock(key) { | |
const lock = this.locks.get(key); | |
if (!lock) throw new Error(`Unlock key not found ${key}`); | |
return lock.unlock().then(() => { | |
this.locks.delete(key); | |
return key; | |
}); | |
}, | |
/** | |
* Retry | |
* @param {Function} fn | |
* @param {Number} timeout | |
* @return {Promise<*>} | |
*/ | |
async retry(fn, timeout = 0) { | |
try { | |
return fn(); | |
} catch (e) { | |
await new Promise(resolve => setTimeout(() => resolve(), timeout)); | |
return this.retry(fn); | |
} | |
}, | |
/** | |
* Get locks | |
* @return {*} | |
*/ | |
getLocked() { | |
return this.locks; | |
} | |
}, | |
created() { | |
if (!this.broker.cacher.redlock) this.broker.fatal("Redlock is not available"); | |
this.redlock = this.broker.cacher.redlock; | |
this.redlockNonBlocking = this.broker.cacher.redlockNonBlocking; | |
this.locks = new Map(); | |
}, | |
async started() { | |
const shift = this.settings.lock.ttl - this.settings.lock.ttl * 0.1; | |
this.intervalLock = setInterval(async () => { | |
const results = await Promise.all( | |
[...this.locks.values()].map(lock => lock.extend(this.settings.lock.ttl)) | |
).catch(e => { | |
this.broker.fatal(e); | |
}); | |
if (results.length) this.logger.debug("Lock extended", this.settings.lock.ttl); | |
}, shift); | |
}, | |
async stopped() { | |
this.logger.debug("stopped..."); | |
if (this.intervalLock) { | |
clearInterval(this.intervalLock); | |
this.logger.debug("clear interval lock"); | |
} | |
return this.locks.size | |
? Promise.allSettled([...this.locks.values()].map(lock => lock.unlock())) | |
: Promise.resolve(); | |
} | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment