Created
June 8, 2016 20:03
-
-
Save superherointj/d53886ac8cece5c689f25d4f218dea93 to your computer and use it in GitHub Desktop.
Take 2 items per time and execute a list of synchronous operation. 2 * 6 * 4
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const Rx = require('rxjs'); | |
// This function is expected to be executed serially. One item per time. So it should take 6 seconds. | |
function processItem(item){ | |
let queueTransformed = [ | |
Rx.Observable.timer(1000).do(x=>console.log('1',item)).map(x=>'1'+item), | |
Rx.Observable.timer(1000).do(x=>console.log('2',item)).map(x=>'2'+item), | |
Rx.Observable.timer(1000).do(x=>console.log('3',item)).map(x=>'3'+item), | |
Rx.Observable.timer(1000).do(x=>console.log('4',item)).map(x=>'4'+item), | |
Rx.Observable.timer(1000).do(x=>console.log('5',item)).map(x=>'5'+item), | |
Rx.Observable.timer(1000).do(x=>console.log('6',item)).map(x=>'6'+item) | |
] | |
return Rx.Observable.from(queueTransformed).concatAll() | |
} | |
//'arr' items should be processed two per time. It should take 4 iterations to finish this array. | |
let arr = ['a','b','c','d','e','f','g']; | |
Rx.Observable.from(arr) | |
.windowCount(2) // Splits in different observable. | |
.map(win => { // Per observable item do processItem. | |
return win.mergeMap(item => processItem(item)) | |
}) | |
.concatAll() // Wait for each Window Observable to finish before taking next. | |
.subscribe(x=>console.log('Z-nxt =>',x), err=>console.log('Z-err =>',err), ()=>console.log('Z-end.')); | |
/* === CURRENT OUTPUT === | |
c:\dev\_sandbox\rxwindow>node app.js | |
1 a | |
Znxt => 1a | |
1 b | |
Znxt => 1b | |
2 a | |
Znxt => 2a | |
2 b | |
Znxt => 2b | |
3 a | |
Znxt => 3a | |
3 b | |
Znxt => 3b | |
4 a | |
Znxt => 4a | |
4 b | |
Znxt => 4b | |
5 a | |
Znxt => 5a | |
5 b | |
Znxt => 5b | |
6 a | |
Znxt => 6a | |
6 b | |
Znxt => 6b | |
C:\dev\_sandbox\rxwindow\node_modules\rxjs\scheduler\QueueScheduler.js:24 | |
throw action.error; | |
^ | |
ObjectUnsubscribedError: | |
c:\dev\_sandbox\rxwindow> | |
-------------------------------- | |
==> The desirable behaviour would be instead of throwing for 'ObjectUnsubscribedError' taking next set of items (c,d) and keep doing same operation. | |
*/ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment