Skip to content

Instantly share code, notes, and snippets.

@kharandziuk
Created March 2, 2017 13:11
Show Gist options
  • Save kharandziuk/02fb2e2acd83fa703f70152552c30ba6 to your computer and use it in GitHub Desktop.
Save kharandziuk/02fb2e2acd83fa703f70152552c30ba6 to your computer and use it in GitHub Desktop.
const H = require('highland')
const Promise = require('bluebird')
const _ = require('lodash')
const Writable = require('stream').Writable;
const SIZE = 10
function grabFromApi(i) {
console.log('start reading', i)
return Promise.delay(1000).then(() => {
const range = _.range(SIZE * i, SIZE* (i + 1))
return _(range).map(x => ({id: x})).value()
})
.tap(entries =>
console.log(`---> read from ${_.first(entries).id} till ${_.last(entries).id}`)
)
}
function writeToDB(entries) {
console.log(`start write from ${_.first(entries).id} till ${_.last(entries).id}`)
return Promise.delay(5000).tap(() => {
console.log(`<--- write from ${_.first(entries).id} till ${_.last(entries).id}`)
})
}
class DBWritable extends Writable {
constructor(options) {
super({highWaterMark: 10, objectMode: true});
}
_write(chunk, encoding, callback) {
writeToDB(chunk).asCallback(callback)
}
_writev(chunks, callback) {
const entries = _.map(chunks, 'chunk')
writeToDB(_.flatten(entries)).asCallback(callback)
}
}
let i = 0
const stream = H(function(push, next) {
grabFromApi(i).then(data => {
push(null, data)
i++;
next()
})
})
const dbWritable = new DBWritable()
stream.pipe(dbWritable)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment