Last active
April 26, 2016 13:25
-
-
Save homam/56165354e41592bfcb8c8c353009f4d4 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const trace = (msg, x) => { | |
console.log(msg) | |
return x | |
} | |
const serviceCall = x => | |
Math.random() > 0.5 | |
? Promise.resolve(x) | |
: Promise.reject(`not lucky ${x}`) | |
const Semaphore = (capacity) => (function() { | |
capacity++ | |
let callbacks = [] | |
let size = 0 | |
let sem = { | |
getSize: _ => size | |
, getCallbacks: _ => callbacks | |
, waitOne: cb => { | |
size++ | |
if(size >= capacity) { | |
callbacks.push(cb) | |
} else { | |
cb() | |
} | |
} | |
, release: _ => { | |
size-- | |
let cb = callbacks.shift() | |
if(!!cb) { | |
cb() | |
} | |
} | |
} | |
return sem | |
})() | |
const lock = Semaphore(5) | |
const source = Rx.Observable.create(o => { | |
i = 0 | |
let next = _ => { | |
lock.waitOne(_ => { | |
o.onNext(++i) | |
setTimeout(next, Math.random() * 10 + 10) | |
}) | |
} | |
next() | |
}).take(100) | |
const consumer = x => new Promise(resolve => | |
setTimeout(_ => resolve(x), (Math.random() * 1000 )) | |
) | |
source | |
.subscribe( | |
x => { | |
console.log(`rp, ${lock.getSize()}, ${lock.getCallbacks().length}`) | |
consumer(x).then(_ => { | |
console.log(x) | |
lock.release() | |
}) | |
} | |
, x => console.error(x) | |
, _ => console.info("completed") | |
) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const trace = (msg, x) => { | |
console.log(msg) | |
return x | |
} | |
const serviceCall = x => | |
Math.random() > 0.5 | |
? Promise.resolve(x) | |
: Promise.reject(`not lucky ${x}`) | |
// var source = Rx.Observable.range(1, 10).controlled() | |
Lock = _ => (function() { | |
let callback = null | |
let free = true | |
return { | |
whenFree: cb => { | |
free ? cb() : callback = cb | |
} | |
, lock: _ => free = false | |
, unlock: _ => { | |
if(!!callback) { | |
callback() | |
callback = null | |
} | |
free = true | |
} | |
} | |
})() | |
Semaphore = (capacity) => (function() { | |
capacity++ | |
let callback = null | |
let size = 0 | |
return { | |
getSize: _ => size | |
, waitOne: cb => { | |
size++ | |
if(size >= capacity) { | |
callback = cb | |
} else { | |
cb() | |
} | |
} | |
, release: _ => { | |
size-- | |
if(!!callback) { | |
callback() | |
callback = null | |
} | |
} | |
} | |
})() | |
lock = Semaphore(5) | |
var source = Rx.Observable.create(o => { | |
i = 0 | |
let next = _ => { | |
lock.waitOne(_ => { | |
o.onNext(++i) | |
setTimeout(next, Math.random() * 10 + 10) | |
}) | |
} | |
next() | |
}).take(100) | |
consumer = x => new Promise(resolve => | |
setTimeout(_ => resolve(x), (Math.random() * 1000 + 100)) | |
) | |
source | |
.subscribe( | |
x => { | |
console.log(`rp, ${lock.getSize()}`) | |
consumer(x).then(_ => { | |
console.log(x) | |
lock.release() | |
}) | |
} | |
, x => console.error(x) | |
, _ => console.info("completed") | |
) | |
//source.request(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment