Skip to content

Instantly share code, notes, and snippets.

@yongjun21
Last active July 21, 2020 12:14
Show Gist options
  • Save yongjun21/23a93f3027136108c7f7f4e2da0c7e34 to your computer and use it in GitHub Desktop.
Save yongjun21/23a93f3027136108c7f7f4e2da0c7e34 to your computer and use it in GitHub Desktop.
Helper function to read file line by line with small memory footprint. Returns a thenable that can be chained with `.filter` and `.map` methods
const readline = require('readline')
const fs = require('fs')
module.exports = function (...args) {
const transforms = []
let state = 'pending'
let error = null
const rl = readline.createInterface({
input: fs.createReadStream(...args)
})
rl.on('line', line => {
for (const t of transforms) {
const { value, done } = t.input || { value: line, done: false }
if (done) {
t.output.done = true
} else if (t.type === 'filter') {
t.output.value = value
t.output.done = !t.fn(value, t.index++)
} else if (t.type === 'map') {
t.output.value = t.fn(value, t.index++)
t.output.done = false
} else if (t.type === 'reduce') {
if (t.index === 0 && t.output.value === undefined) t.output.value = value
else t.output.value = t.fn(t.output.value, value, t.index++)
}
}
})
rl.on('close', () => {
transforms.forEach(t => {
if (t.type === 'reduce') t.resolve(t.output.value)
})
state = 'fulfilled'
})
rl.on('error', err => {
transforms.forEach(t => {
if (t.type === 'reduce') t.reject(err)
})
state = 'rejected'
error = err
})
function createPromise (context) {
const p = {}
const methods = ['filter', 'map']
methods.forEach(method => {
p[method] = fn => {
const output = { done: false }
transforms.push({
type: method,
fn,
index: 0,
input: context,
output
})
return createPromise(output)
}
})
p.reduce = (fn, init) => {
if (state === 'fulfilled') return Promise.resolve()
if (state === 'rejected') return Promise.reject(error)
return new Promise((resolve, reject) => {
transforms.push({
type: 'reduce',
fn,
index: 0,
input: context,
output: { value: init, done: false },
resolve,
reject
})
})
}
p.then = (...args) => {
return p.reduce((a, v) => {
a.push(v)
return a
}, []).then(...args)
}
p.forEach = fn => {
return p.reduce((a, v, i) => {
fn(v, i)
return undefined
}, undefined)
}
p.some = fn => p.reduce((a, v, i) => a || fn(v, i), false)
p.every = fn => p.reduce((a, v, i) => a && !fn(v, i), true)
p.find = fn => {
let found = false
return p.reduce((a, v, i) => {
if (found) return a
if (fn(v, i)) {
found = true
return v
}
return a
}, undefined)
}
p.findIndex = fn => (
p.reduce((a, v, i) => (a < 0 && fn(v, i)) ? i : a, -1)
)
p.findLastIndex = fn => (
p.reduce((a, v, i) => fn(v, i) ? i : a, -1)
)
p.slice = (start, end) => (
p.filter((v, i) => (start == null || i >= start) && (end == null || i < end))
)
p.includes = value => p.some(v => v === value)
p.indexOf = value => p.findIndex(v => v === value)
p.lastIndexOf = value => p.findLastIndex(v => v === value)
return p
}
return createPromise()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment