Last active
July 21, 2020 12:14
-
-
Save yongjun21/23a93f3027136108c7f7f4e2da0c7e34 to your computer and use it in GitHub Desktop.
Helper function to read file line by line with small memory footprint. Returns a thenable that can be chained with `.filter` and `.map` methods
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const readline = require('readline') | |
const fs = require('fs') | |
module.exports = function (...args) { | |
const transforms = [] | |
let state = 'pending' | |
let error = null | |
const rl = readline.createInterface({ | |
input: fs.createReadStream(...args) | |
}) | |
rl.on('line', line => { | |
for (const t of transforms) { | |
const { value, done } = t.input || { value: line, done: false } | |
if (done) { | |
t.output.done = true | |
} else if (t.type === 'filter') { | |
t.output.value = value | |
t.output.done = !t.fn(value, t.index++) | |
} else if (t.type === 'map') { | |
t.output.value = t.fn(value, t.index++) | |
t.output.done = false | |
} else if (t.type === 'reduce') { | |
if (t.index === 0 && t.output.value === undefined) t.output.value = value | |
else t.output.value = t.fn(t.output.value, value, t.index++) | |
} | |
} | |
}) | |
rl.on('close', () => { | |
transforms.forEach(t => { | |
if (t.type === 'reduce') t.resolve(t.output.value) | |
}) | |
state = 'fulfilled' | |
}) | |
rl.on('error', err => { | |
transforms.forEach(t => { | |
if (t.type === 'reduce') t.reject(err) | |
}) | |
state = 'rejected' | |
error = err | |
}) | |
function createPromise (context) { | |
const p = {} | |
const methods = ['filter', 'map'] | |
methods.forEach(method => { | |
p[method] = fn => { | |
const output = { done: false } | |
transforms.push({ | |
type: method, | |
fn, | |
index: 0, | |
input: context, | |
output | |
}) | |
return createPromise(output) | |
} | |
}) | |
p.reduce = (fn, init) => { | |
if (state === 'fulfilled') return Promise.resolve() | |
if (state === 'rejected') return Promise.reject(error) | |
return new Promise((resolve, reject) => { | |
transforms.push({ | |
type: 'reduce', | |
fn, | |
index: 0, | |
input: context, | |
output: { value: init, done: false }, | |
resolve, | |
reject | |
}) | |
}) | |
} | |
p.then = (...args) => { | |
return p.reduce((a, v) => { | |
a.push(v) | |
return a | |
}, []).then(...args) | |
} | |
p.forEach = fn => { | |
return p.reduce((a, v, i) => { | |
fn(v, i) | |
return undefined | |
}, undefined) | |
} | |
p.some = fn => p.reduce((a, v, i) => a || fn(v, i), false) | |
p.every = fn => p.reduce((a, v, i) => a && !fn(v, i), true) | |
p.find = fn => { | |
let found = false | |
return p.reduce((a, v, i) => { | |
if (found) return a | |
if (fn(v, i)) { | |
found = true | |
return v | |
} | |
return a | |
}, undefined) | |
} | |
p.findIndex = fn => ( | |
p.reduce((a, v, i) => (a < 0 && fn(v, i)) ? i : a, -1) | |
) | |
p.findLastIndex = fn => ( | |
p.reduce((a, v, i) => fn(v, i) ? i : a, -1) | |
) | |
p.slice = (start, end) => ( | |
p.filter((v, i) => (start == null || i >= start) && (end == null || i < end)) | |
) | |
p.includes = value => p.some(v => v === value) | |
p.indexOf = value => p.findIndex(v => v === value) | |
p.lastIndexOf = value => p.findLastIndex(v => v === value) | |
return p | |
} | |
return createPromise() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment