Skip to content

Instantly share code, notes, and snippets.

@jdx
Created November 1, 2017 15:15
Show Gist options
  • Save jdx/781ac5493ca46a3e6660ea406cf36035 to your computer and use it in GitHub Desktop.
Save jdx/781ac5493ca46a3e6660ea406cf36035 to your computer and use it in GitHub Desktop.
const fs = require('fs-extra');
const path = require('path');
const debug = require('debug')('rwlockfile');
const mkdir = require('mkdirp');
let locks = {};
let readers = {};
async function pidActive(pid) {
if (!pid || isNaN(pid)) return false;
return process.platform === 'win32' ? pidActiveWindows(pid) : pidActiveUnix(pid);
}
function pidActiveWindows(pid) {
return new Promise((resolve, reject) => {
const { spawn } = require('child_process');
const p = spawn('tasklist', ['/fi', `PID eq ${pid}`]);
p.on('close', code => {
if (code !== 0) reject(new Error(`tasklist exited with code ${code}`));
});
p.stdout.on('data', stdout => {
resolve(!stdout.includes('No tasks are running'));
});
});
}
function pidActiveUnix(pid) {
try {
// flow$ignore
return process.kill(pid, 0);
} catch (e) {
return e.code === 'EPERM';
}
}
async function lockActive(path) {
try {
let file = await readFile(path);
let pid = parseInt(file.trim());
let active = pidActive(pid);
if (!active) debug(`stale pid ${path} ${pid}`);
return active;
} catch (err) {
if (err.code !== 'ENOENT') throw err;
return false;
}
}
function unlock(path) {
return new Promise(resolve => fs.remove(path, resolve)).then(() => {
delete locks[path];
});
}
function wait(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
function unlockSync(path) {
try {
fs.removeSync(path);
} catch (err) {
debug(err);
}
delete locks[path];
}
function lock(p, timeout) {
let pidPath = path.join(p, 'pid');
if (!fs.existsSync(path.dirname(p))) mkdir.sync(path.dirname(p));
return new Promise((resolve, reject) => {
fs.mkdir(p, err => {
if (!err) {
locks[p] = 1;
fs.writeFile(pidPath, process.pid.toString(), resolve);
return;
}
if (err.code !== 'EEXIST') return reject(err);
lockActive(pidPath).then(active => {
if (!active) return unlock(p).then(resolve).catch(reject);
if (timeout <= 0) throw new Error(`${p} is locked`);
debug(`locking ${p} ${timeout / 1000}s...`);
wait(1000).then(() => lock(p, timeout - 1000).then(resolve).catch(reject));
}).catch(reject);
});
});
}
function readFile(path) {
return new Promise((resolve, reject) => {
fs.readFile(path, 'utf8', (err, body) => {
if (err) return reject(err);
resolve(body);
});
});
}
function writeFile(path, content) {
return new Promise((resolve, reject) => {
fs.writeFile(path, content, (err, body) => {
if (err) return reject(err);
resolve(body);
});
});
}
async function getReadersFile(path) {
try {
let f = await readFile(path + '.readers');
return f.split('\n').map(r => parseInt(r));
} catch (err) {
return [];
}
}
function getReadersFileSync(path) {
try {
let f = fs.readFileSync(path + '.readers', 'utf8');
return f.split('\n').map(r => parseInt(r));
} catch (err) {
return [];
}
}
const unlink = p => new Promise((resolve, reject) => fs.unlink(p, err => err ? reject(err) : resolve()));
function saveReaders(path, readers) {
path += '.readers';
if (readers.length === 0) {
return unlink(path).catch(() => {});
} else {
return writeFile(path, readers.join('\n'));
}
}
function saveReadersSync(path, readers) {
path += '.readers';
try {
if (readers.length === 0) {
fs.unlinkSync(path);
} else {
fs.writeFileSync(path, readers.join('\n'));
}
} catch (err) {}
}
async function getActiveReaders(path, timeout, skipOwnPid = false) {
await lock(path + '.readers.lock', timeout);
let readers = await getReadersFile(path);
let promises = readers.map(r => pidActive(r).then(active => active ? r : null));
let activeReaders = await Promise.all(promises);
activeReaders = activeReaders.filter(r => r !== null);
if (activeReaders.length !== readers.length) {
await saveReaders(path, activeReaders);
}
await unlock(path + '.readers.lock');
return skipOwnPid ? activeReaders.filter(r => r !== process.pid) : activeReaders;
}
async function waitForReaders(path, timeout, skipOwnPid) {
let readers = await getActiveReaders(path, timeout, skipOwnPid);
if (readers.length !== 0) {
if (timeout <= 0) throw new Error(`${path} is locked with ${readers.length === 1 ? 'a reader' : 'readers'} active: ${readers.join(' ')}`);
debug(`waiting for readers: ${readers.join(' ')} timeout=${timeout}`);
await wait(1000);
await waitForReaders(path, timeout - 1000, skipOwnPid);
}
}
function waitForWriter(path, timeout) {
return hasWriter(path).then(active => {
if (active) {
if (timeout <= 0) throw new Error(`${path} is locked with an active writer`);
debug(`waiting for writer: path=${path} timeout=${timeout}`);
return wait(1000).then(() => waitForWriter(path, timeout - 1000));
}
return unlock(path);
});
}
async function unread(path, timeout = 60000) {
await lock(path + '.readers.lock', timeout);
let readers = await getReadersFile(path);
if (readers.find(r => r === process.pid)) {
await saveReaders(path, readers.filter(r => r !== process.pid));
}
await unlock(path + '.readers.lock');
}
exports.unread = unread;
function unreadSync(path) {
// TODO: potential lock issue here since not using .readers.lock
let readers = getReadersFileSync(path);
saveReadersSync(path, readers.filter(r => r !== process.pid));
}
/**
* lock for writing
* @param path {string} - path of lockfile to use
* @param options {object}
* @param [options.timeout=60000] {number} - Max time to wait for lockfile to be open
* @param [options.skipOwnPid] {boolean} - Do not wait on own pid (to upgrade current process)
* @returns {Promise}
*/
exports.write = async function (path, options = {}) {
let skipOwnPid = !!options.skipOwnPid;
let timeout = options.timeout || 60000;
debug(`write ${path}`);
await waitForReaders(path, timeout, skipOwnPid);
await lock(path + '.writer', timeout);
return () => unlock(path + '.writer');
};
/**
* lock for reading
* @param path {string} - path of lockfile to use
* @param options {object}
* @param [options.timeout=60000] {number} - Max time to wait for lockfile to be open
* @returns {Promise}
*/
exports.read = async function (path, options = {}) {
let timeout = options.timeout || 60000;
debug(`read ${path}`);
await waitForWriter(path, timeout);
await lock(path + '.readers.lock', timeout);
let readersFile = await getReadersFile(path);
await saveReaders(path, readersFile.concat([process.pid]));
await unlock(path + '.readers.lock');
readers[path] = 1;
return () => unread(path, timeout);
};
/**
* check if active writer
* @param path {string} - path of lockfile to use
*/
async function hasWriter(p) {
let pid;
try {
pid = await readFile(path.join(p + '.writer', 'pid'));
} catch (err) {
if (err.code !== 'ENOENT') throw err;
}
if (!pid) return false;
let active = await pidActive(parseInt(pid));
return active;
}
exports.hasWriter = hasWriter;
async function hasReaders(p, options = {}) {
let timeout = options.timeout || 60000;
let skipOwnPid = !!options.skipOwnPid;
let readers = await getActiveReaders(p, timeout, skipOwnPid);
return readers.length !== 0;
}
exports.hasReaders = hasReaders;
exports.unreadSync = unreadSync;
exports.cleanup = function () {
Object.keys(locks).forEach(unlockSync);
Object.keys(readers).forEach(unreadSync);
};
process.once('exit', exports.cleanup);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment