Last active
August 20, 2022 01:44
-
-
Save khun84/15c6e16558602801d28b31ff29caff0b to your computer and use it in GitHub Desktop.
Nodejs snippet
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
- sleep | |
- pipeline | |
- parallel task |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class TaskQueuePC { | |
constructor(maxConcurrency) { | |
this.maxConcurrency = maxConcurrency; | |
this.consumerQueue = []; | |
this.taskQueue = []; | |
for (let i = 0; i < maxConcurrency; i++) { | |
this.consumer() | |
} | |
} | |
async consumer() { | |
while (true) { | |
try { | |
const task = await this.getNextTask() | |
await task(); | |
} catch (e) { | |
console.error(e); | |
} | |
} | |
} | |
async getNextTask() { | |
// The purpose of introducing a Promise here is that we need a "resolve" callback to achieve a sleep effect where there is not task | |
// The resolve callback will be pushed into the consumerQueue and this method caller will keep waiting until the resolve callback is being "wake up" to perform a task | |
return new Promise((resolve, reject) => { | |
if (this.taskQueue.length === 0) { | |
this.consumerQueue.push(resolve); | |
} else { | |
return resolve(this.taskQueue.shift()); | |
} | |
}); | |
} | |
// When producer submit task, we should not block the event loop. | |
// In fact, we'll just return a promise and allow the caller to register callback on the promise. | |
// So from here we can see can the caller can create a clean task and express interest on the task result by registering callback on the promise returned by #runTask. | |
runTask(task) { | |
return new Promise((resolve, reject) => { | |
const taskWrapper = () => { | |
const taskPromise = task(); | |
taskPromise.then(resolve, reject); | |
return taskPromise; | |
} | |
if (this.consumerQueue.length !== 0) { | |
const consumer = this.consumerQueue.shift(); | |
consumer(taskWrapper); | |
} else { | |
this.taskQueue.push(taskWrapper); | |
} | |
}) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module.paths.push('/Users/Daniel/.nvm/versions/node/v14.17.1/lib/node_modules'); | |
const { createReadStream, createWriteStream } = require('fs'); | |
const { parse } = require('csv-parse'); | |
const { Transform, pipeline } = require('stream'); | |
const { format: csvFormat, writeToString } = require('fast-csv'); | |
const myOutputStream = createWriteStream('my-free-credits-2.csv'); | |
const phOutputStream = createWriteStream('ph-free-credits-2.csv'); | |
const idOutputStream = createWriteStream('id-free-credits-2.csv'); | |
const sgOutputStream = createWriteStream('sg-free-credits-2.csv'); | |
const streamMapper = { | |
my: myOutputStream, | |
ph: phOutputStream, | |
id: idOutputStream, | |
sg: sgOutputStream | |
}; | |
// headers = ['country', 'vendor_id', 'name', 'paid_credit_balance', 'free', 'latest_quoted_at', 'latest_booked_at', 'inactive_since_dec']; | |
class CsvWriter extends Transform { | |
constructor(opts = {}) { | |
opts.objectMode = true; | |
super(opts); | |
this.headersWritten = { | |
my: false, | |
id: false, | |
ph: false, | |
sg: false | |
}; | |
} | |
async _transform(row, enc, done) { | |
let csvString = await writeToString([row]); | |
csvString = csvString + "\n"; | |
if (!this.headersWritten[row.country.toLowerCase()]) { | |
const headers = []; | |
Object.entries(row).forEach((pair) => { | |
const [colname, _] = pair; | |
headers.push(colname); | |
}); | |
csvString = [headers.join(','), csvString].join("\n"); | |
this.headersWritten[row.country.toLowerCase()] = true; | |
} | |
streamMapper[row.country.toLowerCase()].write(Buffer.from(csvString, enc)); | |
done(); | |
} | |
} | |
const inputStream = createReadStream('source-file-path'); | |
const csvParser = parse({ columns: true }); | |
pipeline( | |
inputStream, | |
csvParser, | |
(new CsvWriter()).on('end', () => { | |
for (const [_, streamer] of Object.entries(streamMapper)) { | |
streamer.close(); | |
} | |
}), | |
(err) => { | |
if (err) { | |
console.log(err); | |
for (const [_, streamer] of Object.entries(streamMapper)) { | |
streamer.close(); | |
} | |
} | |
} | |
); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
async function sleep(ms) { | |
return new Promise(resolve => setTimeout(resolve, ms)); | |
} | |
console.log('start'); | |
await sleep(1000).then(() => { | |
console.log('after sleep');}); | |
console.log('end'); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment