Skip to content

Instantly share code, notes, and snippets.

@ishiduca
Created May 5, 2019 08:35
Show Gist options
  • Save ishiduca/7b437c06b754034b88914a48540cd761 to your computer and use it in GitHub Desktop.
Save ishiduca/7b437c06b754034b88914a48540cd761 to your computer and use it in GitHub Desktop.
const { through, duplex } = require('mississippi')
module.exports = function semaphore (
max = 1,
f = (data, index, done) => done(null, data)
) {
var current = 0
var index = 0
const buf = []
const test = () => {
if (!(current < max)) return
// current += 1
const data = buf.shift()
if (data == null) return
current += 1
index += 1
f(data, index, (err, data) => {
if (err) source.emit('error', err)
if (data != null) source.write(data)
current -= 1
if (buf.length === 0 && current === 0) {
return source.end()
}
// if (buf.length === 0) {
// if (current === 0) {
// return source.end()
// }
// // current -= 1
// }
test()
})
}
const source = through.obj()
const sink = through.obj((data, _, done) => {
buf.push(data)
process.nextTick(test)
done()
})
return duplex.obj(sink, source)
}
'use strict'
const test = require('tape')
const { pipe, through } = require('mississippi')
const semaphore = require('semaphore')
test('transf = semaphore(num, (data, index, done) => {...})', t => {
t.test('semaphore()', tt => {
const reader = through.obj()
const datas = [0, 1, 2, 3, 11]
// const expected = [0, 2, 4, 6, 22]
const expected = [ ...datas ]
const spy = []
pipe(
reader,
semaphore(),
through.obj((data, _, done) => {
console.log(data)
spy.push(data)
done()
}),
err => {
tt.error(err, 'no error')
tt.deepEqual(spy, expected, expected)
tt.end()
}
)
datas.forEach(data => reader.write(data))
reader.end()
})
t.test('semaphore(3)', tt => {
const reader = through.obj()
const datas = [0, 1, 2, 3, 11]
// const expected = [0, 2, 4, 6, 22]
const expected = [ ...datas ]
const spy = []
pipe(
reader,
semaphore(3),
through.obj((data, _, done) => {
console.log(data)
spy.push(data)
done()
}),
err => {
tt.error(err, 'no error')
tt.deepEqual(spy, expected, expected)
tt.end()
}
)
datas.forEach(data => reader.write(data))
reader.end()
})
t.test('semaphore(1, (data, index, done) => done(null, data*2)', tt => {
const reader = through.obj()
const datas = [0, 1, 2, 3, 11]
const expected = [0, 2, 4, 6, 22]
const spy = []
pipe(
reader,
semaphore(1, (data, index, done) => done(null, data * 2)),
through.obj((data, _, done) => {
console.log(data)
spy.push(data)
done()
}),
err => {
tt.error(err, 'no error')
tt.deepEqual(spy, expected, expected)
tt.end()
}
)
datas.forEach(data => reader.write(data))
reader.end()
})
t.test('semaphore(1, (data, index, done) => { setTimeout(...) }', tt => {
const reader = through.obj()
const datas = [0, 1, 2, 3, 11]
const expected = [0, 2, 4, 6, 22]
const spy = []
const _max = 1
pipe(
reader,
semaphore(_max, (data, index, done) => {
setTimeout(() => done(null, data * 2), 1000)
}),
through.obj((data, _, done) => {
console.log(data)
spy.push(data)
done()
}),
err => {
tt.error(err, 'no error')
tt.deepEqual(spy, expected, expected)
tt.end()
}
)
datas.forEach(data => reader.write(data))
reader.end()
})
t.test('semaphore(3, (data, index, done) => { setTimeout(...) }', tt => {
const reader = through.obj()
const datas = [0, 1, 2, 3, 11]
const expected = [0, 2, 4, 6, 22]
const spy = []
const _max = 3
pipe(
reader,
semaphore(_max, (data, index, done) => {
setTimeout(() => done(null, data * 2), 1000)
}),
through.obj((data, _, done) => {
console.log(data)
spy.push(data)
done()
}),
err => {
tt.error(err, 'no error')
tt.deepEqual(spy, expected, expected)
tt.end()
}
)
datas.forEach(data => reader.write(data))
reader.end()
})
t.test('semaphore(10, (data, index, done) => { setTimeout(...) }', tt => {
const reader = through.obj()
const datas = [0, 1, 2, 3, 11]
const expected = [0, 2, 4, 6, 22]
const spy = []
const _max = 10
pipe(
reader,
semaphore(_max, (data, index, done) => {
setTimeout(() => done(null, data * 2), 1000)
}),
through.obj((data, _, done) => {
console.log(data)
spy.push(data)
done()
}),
err => {
tt.error(err, 'no error')
tt.deepEqual(spy, expected, expected)
tt.end()
}
)
datas.forEach(data => reader.write(data))
reader.end()
})
t.end()
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment