Skip to content

Instantly share code, notes, and snippets.

@ishiduca
Last active August 29, 2015 14:20
Show Gist options
  • Save ishiduca/cac3d49449deea0d9f4f to your computer and use it in GitHub Desktop.
Save ishiduca/cac3d49449deea0d9f4f to your computer and use it in GitHub Desktop.
semaphoreで複雑な遅延ストリームを実装
'use strict'
var semaphore = require('./semaphore')
module.exports = function guard (n) {
var sem = semaphore(n)
return function timer (f, timeout) {
var id
sem.wait(function () {
id = setTimeout(cancel, timeout)
f()
})
return cancel
function cancel () {
if (id) {
clearTimeout(id)
id = undefined
sem.signal()
}
}
}
}
'use strict'
module.exports = function semaphore (n) {
var capacity = n || 1
var current = 0
var queue = []
return {
wait: wait
, signal: signal
}
function wait (f) {
;(current < capacity) ? f() : queue.push(f)
return (current += 1)
}
function signal () {
if (current === 0) return
current -= 1
;(typeof queue[0] === 'function') && queue.shift()()
}
}
'use strict'
var es = require('event-stream')
var timer = require('./lib/timer-stream')
es.readArray(('AbcDefGhiJklMnoPqrStuVwxYz').split(''))
.pipe(timer(3, 3000))
.pipe(timer(1, 500))
.pipe(es.map(function (cha, cb) { cb(null, cha + '\n') }))
.pipe(process.stdout)
'use strict'
var es = require('event-stream')
var guard = require('./guard-timer')
module.exports = function timerStream (n, timeout) {
var g = guard(n)
return es.map(function (value, cb) {
g(function () { cb(null, value) }, timeout)
})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment