Skip to content

Instantly share code, notes, and snippets.

@homam
Last active April 26, 2016 13:25
Show Gist options
  • Save homam/56165354e41592bfcb8c8c353009f4d4 to your computer and use it in GitHub Desktop.
Save homam/56165354e41592bfcb8c8c353009f4d4 to your computer and use it in GitHub Desktop.
const trace = (msg, x) => {
console.log(msg)
return x
}
const serviceCall = x =>
Math.random() > 0.5
? Promise.resolve(x)
: Promise.reject(`not lucky ${x}`)
const Semaphore = (capacity) => (function() {
capacity++
let callbacks = []
let size = 0
let sem = {
getSize: _ => size
, getCallbacks: _ => callbacks
, waitOne: cb => {
size++
if(size >= capacity) {
callbacks.push(cb)
} else {
cb()
}
}
, release: _ => {
size--
let cb = callbacks.shift()
if(!!cb) {
cb()
}
}
}
return sem
})()
const lock = Semaphore(5)
const source = Rx.Observable.create(o => {
i = 0
let next = _ => {
lock.waitOne(_ => {
o.onNext(++i)
setTimeout(next, Math.random() * 10 + 10)
})
}
next()
}).take(100)
const consumer = x => new Promise(resolve =>
setTimeout(_ => resolve(x), (Math.random() * 1000 ))
)
source
.subscribe(
x => {
console.log(`rp, ${lock.getSize()}, ${lock.getCallbacks().length}`)
consumer(x).then(_ => {
console.log(x)
lock.release()
})
}
, x => console.error(x)
, _ => console.info("completed")
)
const trace = (msg, x) => {
console.log(msg)
return x
}
const serviceCall = x =>
Math.random() > 0.5
? Promise.resolve(x)
: Promise.reject(`not lucky ${x}`)
// var source = Rx.Observable.range(1, 10).controlled()
Lock = _ => (function() {
let callback = null
let free = true
return {
whenFree: cb => {
free ? cb() : callback = cb
}
, lock: _ => free = false
, unlock: _ => {
if(!!callback) {
callback()
callback = null
}
free = true
}
}
})()
Semaphore = (capacity) => (function() {
capacity++
let callback = null
let size = 0
return {
getSize: _ => size
, waitOne: cb => {
size++
if(size >= capacity) {
callback = cb
} else {
cb()
}
}
, release: _ => {
size--
if(!!callback) {
callback()
callback = null
}
}
}
})()
lock = Semaphore(5)
var source = Rx.Observable.create(o => {
i = 0
let next = _ => {
lock.waitOne(_ => {
o.onNext(++i)
setTimeout(next, Math.random() * 10 + 10)
})
}
next()
}).take(100)
consumer = x => new Promise(resolve =>
setTimeout(_ => resolve(x), (Math.random() * 1000 + 100))
)
source
.subscribe(
x => {
console.log(`rp, ${lock.getSize()}`)
consumer(x).then(_ => {
console.log(x)
lock.release()
})
}
, x => console.error(x)
, _ => console.info("completed")
)
//source.request(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment