Skip to content

Instantly share code, notes, and snippets.

@homam
Last active April 26, 2016 08:25
Show Gist options
  • Save homam/a125c1958278ecc6e6afdc86b53b9c94 to your computer and use it in GitHub Desktop.
Save homam/a125c1958278ecc6e6afdc86b53b9c94 to your computer and use it in GitHub Desktop.
const trace = (msg, x) => {
console.log(msg)
return x
}
const serviceCall = x =>
Math.random() > 0.5
? Promise.resolve(x)
: Promise.reject(`not lucky ${x}`)
var source = Rx.Observable.range(1, 10).controlled()
source
.flatMap(x =>
Rx.Observable.defer(_ =>
Rx.Observable.fromPromise(serviceCall(x))
).retryWhen(errors =>
errors.map(x => trace(`err = ${x}`, x)).delay(200)
)
)
.subscribe(
x => {
console.log(x)
source.request(1)
}
, x => console.error(x)
, _ => console.info("completed")
)
source.request(1)
const trace = (msg, x) => {
console.log(msg)
return x
}
const serviceCall = x =>
Math.random() > 0.5
? Promise.resolve(x)
: Promise.reject(`not lucky ${x}`)
Rx.Observable.range(1, 10)
.flatMap(x =>
Rx.Observable.defer(_ =>
Rx.Observable.fromPromise(serviceCall(x))
).retryWhen(errors =>
errors.map(x => trace(`err = ${x}`, x)).delay(200)
)
)
.subscribe(
x => console.log(x)
, x => console.error(x)
, _ => console.info("completed")
)
var count = 0
mkp = x => {
//console.log(`c = ${count}`)
return new Promise((resolve, reject) => {
if(count > 3) {
return reject(`Limit Exceeded. Count = ${count}, x = ${x}`)
} else {
count++
setTimeout(_ => {
count--
resolve(x)
}, 1)
}
})
}
mkp = x =>
Math.random() > 0.2 ? Promise.resolve(x) : Promise.reject(x)
Rx.Observable.range(1, 10)
.flatMap(x =>
Rx.Observable.defer(_ =>
Rx.Observable.fromPromise(mkp(x))
).retryWhen(errors => errors.delay(200))
)
.subscribe(
x => console.log(x)
, x => console.log(x)
)
const trace = (msg, x) => {
console.log(msg)
return x
}
const serviceCall = x =>
Math.random() > 0.5
? Promise.resolve(x)
: Promise.reject(`not lucky ${x}`)
// var source = Rx.Observable.range(1, 10).controlled()
lock = (function() {
let callback = null
let free = true
return {
whenFree: cb => {
free ? cb() : callback = cb
}
, lock: _ => free = false
, unlock: _ => {
if(!!callback) {
callback()
callback = null
}
free = true
}
}
})()
var source = Rx.Observable.create(o => {
i = 0
let next = _ => {
o.onNext(++i)
lock.whenFree(_ =>
setTimeout(next, Math.random() * 10 + 10)
)
}
next()
}).take(1000)
runningProcs = 0
consumer = x => new Promise(resolve =>
setTimeout(_ => resolve(x), (Math.random() * 1000))
)
source
// .flatMap(x =>
// Rx.Observable.defer(_ =>
// Rx.Observable.fromPromise(serviceCall(x))
// ).retryWhen(errors =>
// errors.map(x => trace(`err = ${x}`, x)).delay(200)
// )
// )
.subscribe(
x => {
runningProcs++
console.log(`rp, ${runningProcs}`)
consumer(x).then(_ => {
runningProcs--
console.log(x)
lock.unlock()
})
if(runningProcs >= 5) {
lock.lock()
}
}
, x => console.error(x)
, _ => console.info("completed")
)
//source.request(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment