Created
May 5, 2019 08:35
-
-
Save ishiduca/7b437c06b754034b88914a48540cd761 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const { through, duplex } = require('mississippi') | |
module.exports = function semaphore ( | |
max = 1, | |
f = (data, index, done) => done(null, data) | |
) { | |
var current = 0 | |
var index = 0 | |
const buf = [] | |
const test = () => { | |
if (!(current < max)) return | |
// current += 1 | |
const data = buf.shift() | |
if (data == null) return | |
current += 1 | |
index += 1 | |
f(data, index, (err, data) => { | |
if (err) source.emit('error', err) | |
if (data != null) source.write(data) | |
current -= 1 | |
if (buf.length === 0 && current === 0) { | |
return source.end() | |
} | |
// if (buf.length === 0) { | |
// if (current === 0) { | |
// return source.end() | |
// } | |
// // current -= 1 | |
// } | |
test() | |
}) | |
} | |
const source = through.obj() | |
const sink = through.obj((data, _, done) => { | |
buf.push(data) | |
process.nextTick(test) | |
done() | |
}) | |
return duplex.obj(sink, source) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
'use strict' | |
const test = require('tape') | |
const { pipe, through } = require('mississippi') | |
const semaphore = require('semaphore') | |
test('transf = semaphore(num, (data, index, done) => {...})', t => { | |
t.test('semaphore()', tt => { | |
const reader = through.obj() | |
const datas = [0, 1, 2, 3, 11] | |
// const expected = [0, 2, 4, 6, 22] | |
const expected = [ ...datas ] | |
const spy = [] | |
pipe( | |
reader, | |
semaphore(), | |
through.obj((data, _, done) => { | |
console.log(data) | |
spy.push(data) | |
done() | |
}), | |
err => { | |
tt.error(err, 'no error') | |
tt.deepEqual(spy, expected, expected) | |
tt.end() | |
} | |
) | |
datas.forEach(data => reader.write(data)) | |
reader.end() | |
}) | |
t.test('semaphore(3)', tt => { | |
const reader = through.obj() | |
const datas = [0, 1, 2, 3, 11] | |
// const expected = [0, 2, 4, 6, 22] | |
const expected = [ ...datas ] | |
const spy = [] | |
pipe( | |
reader, | |
semaphore(3), | |
through.obj((data, _, done) => { | |
console.log(data) | |
spy.push(data) | |
done() | |
}), | |
err => { | |
tt.error(err, 'no error') | |
tt.deepEqual(spy, expected, expected) | |
tt.end() | |
} | |
) | |
datas.forEach(data => reader.write(data)) | |
reader.end() | |
}) | |
t.test('semaphore(1, (data, index, done) => done(null, data*2)', tt => { | |
const reader = through.obj() | |
const datas = [0, 1, 2, 3, 11] | |
const expected = [0, 2, 4, 6, 22] | |
const spy = [] | |
pipe( | |
reader, | |
semaphore(1, (data, index, done) => done(null, data * 2)), | |
through.obj((data, _, done) => { | |
console.log(data) | |
spy.push(data) | |
done() | |
}), | |
err => { | |
tt.error(err, 'no error') | |
tt.deepEqual(spy, expected, expected) | |
tt.end() | |
} | |
) | |
datas.forEach(data => reader.write(data)) | |
reader.end() | |
}) | |
t.test('semaphore(1, (data, index, done) => { setTimeout(...) }', tt => { | |
const reader = through.obj() | |
const datas = [0, 1, 2, 3, 11] | |
const expected = [0, 2, 4, 6, 22] | |
const spy = [] | |
const _max = 1 | |
pipe( | |
reader, | |
semaphore(_max, (data, index, done) => { | |
setTimeout(() => done(null, data * 2), 1000) | |
}), | |
through.obj((data, _, done) => { | |
console.log(data) | |
spy.push(data) | |
done() | |
}), | |
err => { | |
tt.error(err, 'no error') | |
tt.deepEqual(spy, expected, expected) | |
tt.end() | |
} | |
) | |
datas.forEach(data => reader.write(data)) | |
reader.end() | |
}) | |
t.test('semaphore(3, (data, index, done) => { setTimeout(...) }', tt => { | |
const reader = through.obj() | |
const datas = [0, 1, 2, 3, 11] | |
const expected = [0, 2, 4, 6, 22] | |
const spy = [] | |
const _max = 3 | |
pipe( | |
reader, | |
semaphore(_max, (data, index, done) => { | |
setTimeout(() => done(null, data * 2), 1000) | |
}), | |
through.obj((data, _, done) => { | |
console.log(data) | |
spy.push(data) | |
done() | |
}), | |
err => { | |
tt.error(err, 'no error') | |
tt.deepEqual(spy, expected, expected) | |
tt.end() | |
} | |
) | |
datas.forEach(data => reader.write(data)) | |
reader.end() | |
}) | |
t.test('semaphore(10, (data, index, done) => { setTimeout(...) }', tt => { | |
const reader = through.obj() | |
const datas = [0, 1, 2, 3, 11] | |
const expected = [0, 2, 4, 6, 22] | |
const spy = [] | |
const _max = 10 | |
pipe( | |
reader, | |
semaphore(_max, (data, index, done) => { | |
setTimeout(() => done(null, data * 2), 1000) | |
}), | |
through.obj((data, _, done) => { | |
console.log(data) | |
spy.push(data) | |
done() | |
}), | |
err => { | |
tt.error(err, 'no error') | |
tt.deepEqual(spy, expected, expected) | |
tt.end() | |
} | |
) | |
datas.forEach(data => reader.write(data)) | |
reader.end() | |
}) | |
t.end() | |
}) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment