Skip to content

Instantly share code, notes, and snippets.

@mhart
Created January 24, 2014 04:47
Show Gist options
  • Save mhart/8592228 to your computer and use it in GitHub Desktop.
Save mhart/8592228 to your computer and use it in GitHub Desktop.
var util = require('util')
var Readable = require('stream').Readable
var request = require('request')
var async = require('async')
function PagingFetchStream(opts) {
Readable.call(this, {objectMode: true})
opts = opts || {}
this.getIdsUrl = opts.getIdsUrl
this.getItemUrl = opts.getItemUrl
this.parseIdsBody = opts.parseIdsBody || function(body) {
var ids = JSON.parse(body)
return {
ids: ids,
isLastPage: !ids.length,
}
}
this.parseItemsBody = opts.parseItemsBody || function(body) {
return JSON.parse(body)
}
this.concurrency = opts.concurrency || 10
this.items = []
this.index = 0
this.isLastPage = false
this.fetchingNextPage = false
this.paused = false
this.drained = false
this.queue = async.queue(function(task, cb) { task(cb) }, this.concurrency)
this.queue.drain = function() { this.drained = true }.bind(this)
}
util.inherits(PagingFetchStream, Readable)
PagingFetchStream.prototype.fetchNextPage = function() {
if (this.fetchingNextPage || this.isLastPage) return
var self = this
this.fetchingNextPage = true
this.queue.push(function(cb) {
request(self.getIdsUrl(self.index), function(error, response, body) {
self.fetchingNextPage = false
response = self.parseIdsBody(body)
self.isLastPage = response.isLastPage
self.index++
self.fetchNextItems(response.ids)
cb()
})
})
}
PagingFetchStream.prototype.fetchNextItems = function(ids) {
var self = this
this.queue.push(ids.map(function(id) {
return function(cb) {
request(self.getItemUrl(id), function(error, response, body) {
self.items = self.items.concat(self.parseItemsBody(body))
self.pushBufferedItems()
cb()
})
}
}))
}
PagingFetchStream.prototype.pushBufferedItems = function() {
if (this.paused) return
while (this.items.length > 0) {
var item = this.items.shift()
if (this.push(item) === false) {
this.paused = true
break
}
}
}
PagingFetchStream.prototype._read = function() {
this.paused = false
if (this.items.length > 0) {
this.pushBufferedItems()
} else if (this.isLastPage && this.drained) {
this.push(null)
} else {
this.fetchNextPage()
}
}
module.exports = PagingFetchStream
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment