Skip to content

Instantly share code, notes, and snippets.

@ezzabuzaid
Last active November 19, 2024 12:00
Show Gist options
  • Save ezzabuzaid/b3d757cd2f1ea4f67dda8865297da858 to your computer and use it in GitHub Desktop.
Save ezzabuzaid/b3d757cd2f1ea4f67dda8865297da858 to your computer and use it in GitHub Desktop.
observe file changes in node.js
import { createInterface } from 'readline';
import { observeFile } from './observer.ts';
import { PassThrough } from 'stream';
const controller = new AbortController();
const signal = controller.signal;
const duplex = new PassThrough();
const reader = createInterface({
input: duplex,
crlfDelay: Infinity,
terminal: false,
});
await observeFile('filePath', duplex, signal);
for await (const chunk of reader) {
console.log(chunk);
}
import {
createReadStream,
createWriteStream,
unwatchFile,
watchFile,
} from 'fs';
import { stat } from 'fs/promises';
import { tmpdir } from 'os';
import { join } from 'path';
import { createInterface } from 'readline';
import { type Writable } from 'stream';
export async function observeFile(
filePath: string,
pass: Writable,
signal: AbortSignal
) {
const stream = createInterface({
input: createReadStream(filePath, {
signal,
flags: 'r',
encoding: 'utf-8',
}),
terminal: false,
crlfDelay: Infinity,
});
signal.addEventListener('abort', () => {
unwatchFile(filePath);
});
pass.on('close', () => {
unwatchFile(filePath);
});
pass.on('error', (error) => {
console.error(error);
unwatchFile(filePath);
});
// keep it blocking because we don't want to start the watch until the current content is sent
let canWrite = true;
for await (const chunk of stream) {
canWrite = write(chunk);
}
if (!canWrite) {
return;
}
let lastSize = await stat(filePath).then((it) => it.size);
watchFile(filePath, { interval: 500 }, async (curr, prev) => {
if (curr.size > lastSize) {
const start = lastSize;
const end = curr.size;
const stream = createInterface({
input: createReadStream(filePath, {
signal,
flags: 'r',
encoding: 'utf-8',
start,
end: end - 1, // `end` is exclusive
}),
terminal: false,
crlfDelay: Infinity,
});
lastSize = curr.size; // Update lastSize for next change
for await (const chunk of stream) {
if (!write(chunk)) {
unwatchFile(filePath);
break;
}
}
}
});
function write(chunk: string) {
const entry = JSON.parse(chunk);
pass.write('\n');
pass.write(chunk);
if (entry.type === 'complete') {
pass.end();
return false;
}
return true;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment