用例子讲解 RxJS 5 的 Operators 。
原文链接:RxJS 5 Operators By Example by @btroncone
- buffer
- bufferCount
- bufferTime
- bufferToggle
- bufferWhen
- combineAll
- combineLatest
- concat
- concatAll
- concatMap
- concatMapTo
- count
- debounce
- debounceTime
- defaultIfEmpty
- delay
- delayWhen
- dematerialize
- distinctUntilChanged
- do
- every
- expand
- filter
- first
- groupBy
- ignoreElements
- last
- map
- mapTo
- merge
- mergeMap
- pluck
- publish
- race
- repeat
- retry
- retryWhen
- sample
- scan
- share
- single
- skip
- skipUntil
- skipWhile
- startWith
- switchMap
- window
- windowCount
- windowTime
- windowToggle
- windowWhen
- withLatestFrom
- zip
缓冲所有输出的值,直到他们被提交。重复之。。。
(demo | official docs)
// 创建一个 observable ,每秒 emit 一个值
const myInterval = Rx.Observable.interval(1000)
// 创建一个 observable ,每当点击 document 的时候,emit 一个值
const bufferBy = Rx.Observable.fromEvent(document, 'click')
// 缓冲 interval observable emit 的所有值
// 直到我们点击了 document ,这会使得 bufferBy Observable emit 一个值
// 最后,把缓冲好的值,当作一个数组输出
const myBufferedInterval = myInterval.buffer(bufferBy)
// 打印到控制台
// [1,2,3]...[4,5,6,7,8]
const subscribe = myBufferedInterval.subscribe(val => console.log('Buffered Values', val))
缓冲所有的输出值,直到某个数字被 fullfilled ,然后把它们 emit 。重复之。。。
// 创建一个 Observable ,每秒 emit 一个 value 。
const myInterval = Rx.Observable.interval(1000)
// 当 emit 了三个值之后,把它们作为数组,输出。
const bufferThree = myInterval.bufferCount(3)
// 打印到控制台
// 比如,输出 [0,1,2]...[3,4,5]
const subscribe = bufferThree.subscribe(val => console.log(`Buffered Values:`, val))
/*
`bufferCount` 的第2个参数:什么时候开始下一个 buffer
比如,bufferCount(3, 1)
第一个时间间隔后:
buffer 1: [0]
第二个时间间隔后:
buffer 1: [0,1]
buffer 2: [1]
第三个时间间隔后:
buffer 1: [0,1,2]
buffer 2: [1,2]
buffer 3: [2]
第四个时间间隔后:
buffer 2: [1,2,3]
buffer 3: [2,3]
buffer 4: [3]
*/
const bufferEveryOne = myInterval.bufferCount(3, 1)
// 打印到控制台
const secondSubscribe = bufferEveryOne.subscribe(val => console.log('Start Buffer Every 1:', val))
signature: bufferTime(bufferTimeSpan: number, bufferCreationInterval: number, scheduler: Scheduler): Observable<T[]>
缓冲输出的值,直到达到指定的时间点,然后把他们 emit 。重复之。。。
// 创建一个 observable ,每隔 500ms 就会 emit 一个值
const myInterval = Rx.Observable.interval(500)
// 2秒后,把所有缓冲的值,作为一个数组输出
const bufferTime = myInterval.bufferTime(2000)
// 输出值到控制台
// 比如,输出 [0,1,2]...[3,4,5,6]
const subscribe = bufferTime.subscribe(val => console.log(`Buffered with Time:`, val))
/*
类似地,`bufferTime` 的第2个参数:什么时候开始下一个 buffer
比如,bufferTime(2000, 1000)
可能会输出:[0,1,2]...[1,2,3,4,5]...[3,4,5,6,7]
*/
const bufferTimeTwo = myInterval.bufferTime(2000, 1000)
// 输出到控制台
const secondSubscribe = bufferTimeTwo.subscribe(val => console.log(`Start buffer Every 1s`, val))
打开 buffer ,使其捕捉 source emit 的值,关闭 buffer ,使其 emit 缓冲了的值。
// 每隔1秒 emit 值
const sourceInterval = Rx.Observable.interval(1000)
// 在5秒后,emit 值。接着每隔5秒 emit 值
const startInterval = Rx.Observable.interval(5000)
// 在3s后 emit 值,并且关闭对应的 buffer
const closingInterval = val => {
console.log(`Value ${val} emitted, starting buffer! Closing in 3s!`)
return Rx.Observable.interval(3000)
}
// 每隔5s就会开始一个新的 buffer ,缓冲3s内的所有 emit 过的值,然后 emit 缓冲好的值
const bufferToggleInterval = sourceInterval.bufferToggle(startInterval, closingInterval)
// 打印到控制台
// Emitted Buffer:[4,5,6]...[9,10,11]
const subscribe = bufferToggleInterval.subscribe(val => console.log('Emitted Buffer:', val))
缓冲所有的值直到 closing selector emit 值,接着 emit 缓冲好的值
// 每隔1s emit 一个值
const oneSecondInterval = Rx.Observable.interval(1000)
// 每隔5秒 emit 一个值
const fiveSecondInterval = Rx.Observable.interval(5000)
// 每隔5秒 emit 缓冲了的值
const bufferWhenExample = oneSecondInterval.bufferWhen(fiveSecondInterval)
// 打印到控制台
//[0,1,2,3]...[4,5,6,7,8]
const subscribe = bufferWhenExample.subscribe(val => console.log('Emitted Buffer', val))
当 outer Observable complete 的时候,输出 inner Observable 最新的值
// 5s 后 emit 值,然后 complete
const fiveSecondTimer = Rx.Observable.timer(5000)
// 当 timer(outer Observable) 触发以及完成时,
// 输出 inner Observable 最新的值,在例子中就是一个单独的值
const example = fiveSecondTimer.mapTo(Rx.Observable.of('Hello', 'World'))
const combined = example.combineAll()
// 输出:['Hello']...['World']
const subscribe = combined.subscribe(val => console.log(`Values from inner observable`, val))
// 我们也可以传一个 project function 去接收 emit 的值
const fiveSecondTimer = Rx.Observable.timer(5000)
const example = fiveSecondTimer.mapTo(Rx.Observable.of('Hello', 'Goodbye'))
const combined = example.combineAll(val => `${val} Friend!`)
// 输出:`Hello Friend!' ... 'Goodbye Friend!'
const subscribeProjected = combined.subscribe(val => console.log(`Values Using Projection`, val))
给定一组 Observable ,当其中一个 emit 的时候,其他的 emit 最新的值
// `timerOne` 会在第1s的时候 emit 第一个值,接着每隔4s emit
const timerOne = Rx.Observable.timer(1000, 4000)
// `timerTwo` 会在第2s的时候 emit 第一个值,接着每隔4s emit
const timerTwo = Rx.Observable.timer(2000, 4000)
// `timerThree` 会在第3s的时候 emit 第一个值,接着每隔4s emit
const timerThree = Rx.Observable.timer(3000, 4000)
// 当其中一个 timer emit 的时候,其他的也同时 emit 最新的值,作为一个数组返回
const combined = Rx.Observable
.combineLatest(
timeOne,
timeTwo,
timeTrhee
)
const subscribe = combined.subscribe(latestValues => {
const [timerValOne, timerValTwo, timerValThree] = latestValues
/*
* timerOne first tick: timer One Latest: 1, Timer Two Latest: 0, Timer Three Latest: 0
* timerOne first tick: timer One Latest: 1, Timer Two Latest: 1, Timer Trhee Latest: 0
* timerOne first tick: timer One Latest: 1, Timer Two Latest: 1, Timer Trhee Latest: 1
*/
console.log(
`Timer One Latest: ${timerValOne}`,
`Timer Two Latest: ${timerValTwo}`,
`Timer Three Latest: ${timerValThree}`
)
})
// `combineLastest` 也可以接收一个可选的参数:projection function
const combineProject = Rx.Observable(
timerOne,
timerTwo,
timerThree,
(one, two, three) => {
return `Timer One (Proj) Latest: ${one},
Timer Two (Proj) Latest: ${two},
Timer Three (Proj) Latest: ${three}
`
)
const subscribe = combineProject.subscribe(latestValuesProject => console.log(latestValuesProject))
就像在 ATM 交易的队伍一样,直到上一个交易完毕,下一个才能开始
// emit 1,2,3
const sourceOne = Rx.Observable.of(1,2,3)
// emit 4,5,6
const sourceTwo = Rx.Observable.of(4,5,6)
// emit `sourceOne`,直到他 complete ,才开始 subscribe `sourceTwo`
const concatSource = sourceOne.concat(sourceTwo)
// 输出:1,2,3,4,5,6
const subscribe = concatSource.subscribe(val => console.log('Example 1: Basic concat:', val))
// 延迟3s再 emit
const delayedSourceOne = sourceOne.delay(3000)
// 等到 `delaySourceOne` complete 之后才开始 subscibe `sourceTwo`
const concatDelayedSource = delayedSourceOne.concat(sourceTwo)
// 输出:1,2,3,4,5,6
const subscribeDelayed = concatDelayedSource.subscribe(val => console.log('Example 2: Delayed source one:', val))
// 如果 `sourceOne` 永远不 complete ,那么永远也不会 subcribe 第二个 Observable
const sourceOneNeverComplete = Rx.Observable
.concat(
Rx.Observable.interval(1000),
Rx.Observable.of('Thiis', 'Never', 'Runs')
)
.delay(5000)
// 输出:1,2,3,4....
const subscribeNeverComplete = sourceOneNeverComplete.subscribe(val =>
console.log('Example 3: Source never complete, Second Observable never run', val)
)
用于 nestet Observalbe(observable of observables) ,当上一个 complete 的时候,subscribe 每个,然后合并(merge)每个 emit 的值
// 每隔2s emit 一个值
const sourceOne = Rx.Observable.interval(2000)
const example = sourceOne
// 加10,并且返回一个 Observable
.map(val => Rx.Observable.of(val + 10)
// 合并从 inner Observable 返回的值
.concatAll()
// 输出:'Example with Basic Observable 10', `Example with Basic Observable 11'...
const subscribe = example.subscribe(val => console.log('Exmaple with Basic Observable', val))
// 创建一个只有 resolve 的 promise
const samplePromise = val => new Promise(resolve => resolve(val))
const exampleTwo = sourceOne
.map(val => samplePromise(val))
// 合并从 samplePromise 来的值
.concatAll()
// 输出:`Example with Promise 0`, `Example with Promise 1`
const subscribeTwo = exampleTwo.subscribe(val => console.log('Example with Promise', val))
把 source Observable 的值映射到 inner Observable ,接着按顺序 subscribe 和 emit inner Observable 的值
相等于:map
-> concat
// emit 'Hello' 和 'Goodbye'
const source = Rx.Observable.of('Hello', 'Goodbye')
// 把 source value 映射到 inner Observable ,当 complete 时,emit 结果,接着下一个。
const exampleOne = source.concatMap(val => Rx.Observable.of(`${val} World!`))
// 输出:`Example One: 'Hello World', Example One: 'Goodbye World'
const subscribe = exampleOne.subscribe(val => console.log('Example One:', val))
// 结合 promise 的例子
const examplePromise = val => new Promise(resolve => resolve(`${val} World`))
// 把 source value 映射到 inner Observable ,当 complete 时,emit 结果,接着下一个。
const exampleTwo = source.concatMap(val => examplePromise(val))
// 输出:`Example w/ Promise: 'Hello World', Exmaple w/ Promise: 'Goodbye World'
const subscribeTwo = exampleTwo.subscribe(val => console.log('Exmaple w/ Promise', val))
// 第一个参数返回的值,会传给第二个参数
const exampleWithSelector = source.concatMap(val => examplePromise(val), result => `${result} w/ selector!`)
// 输出:Example w/ Selector: 'Hello w/ Selector', Example w/ Selector: 'Goodbye w/ Selector'
const subscribeThree = exampleWithSelector
.delay(2000)
.subscribe(val => console.log('Exmaple w/ Selector', val))
当 source emit 的时候,总是 subscribe 相同的 Observable ,当 complete 时合并结果
// 每隔2s emit 一个值
const interval = Rx.Observable.interval(2000)
const message = Rx.Observable.of(`Second(s) elapsed!`)
// 当 `interval` emit 时,subscribe `message` 直到他 complete ,最后合并结果
const example = interval.concatMapTo(message, (time, msg) => `${time} ${msg}`)
// 输出:`0 Second(s) elapsed! 1 Second(s) elapsed!`
const subscribe = example.subscribe(val => console.log(val))
// 每隔1s emit 一个值,取前5个
const basicTimer = Rx.Observable.interval(1000).take(5)
/*
* ***注意***
* 像这种 source Observable emit 的速度比 inner Observable complete 快的情景下,
* 就会出现内存问题
* (`interval` emit 时间间隔为1s ,而 `basicTimer` complete 则需要5s)
*/
// `basicTimer` 会在 5s 后 complete ,emit 的值有:0,1,2,3,4
const exampleTwo = interval
.concatMapTo(basicTimer, (firstInterval, secondInterval) => `${fisrtInterval} ${secondInterval}`)
/* 输出:
* 0 0
* 0 1
* 0 2
* 0 3
* 0 4
* 1 0
* 1 1
* continue...
*/
const subscribe = exampleTwo.subscribe(val => console.log(val))
计算 source emit 值的数量,直到 complete 。
// emit 1,2,3 然后 complete
const threeItems = Rx.Observable.of(1, 2, 3)
// 当 `threeItems` complete 的时候,计算他 emit 值得数量
const exampleOne = threeItems.count()
// 输出:`Count from Example One: 3'
const subscribe = exampleOne.subscribe(val => console.log(`Count from Example One: `, val))
// 数组的计数
const myArray = [1, 2, 3, 4, 5]
const myObsArray = Rx.Observable.from(myArray)
const exampleTwo = myObsArray.count()
// 输出:'Count of Basic Array: 5'
const subscribeTwo = exampleTwo.subscribe(val => console.log(`Count of Basic Array: ${val}`))
// 可选参数:计数符合 predicate function 的值
const evensCount = myObsArray.count(val => val % 2 === 0)
// 输出:'Count of Even Numbers: 2'
const subscribeThree = evensCount.subscribe(val => console.log(`Count of Even Number: ${val}`))
忽略 source Observable 某段时间内的 emit 的值。
// emit 4个字符串
const example = Rx.Observable.of('WAIT', 'ONE', 'SECOND', 'Last will display')
/*
* 仅仅 emit 最后一个 emission 过后的1s的值
* 其他的值丢掉
*/
const debouncedExmaple = example.debounce(() => Rx.Observable.timer(1000))
/*
* 在这个例子中,所有的值除了最后一个之外全部都忽略掉
* 输出:'Last will display'
*/
const subscribe = deboundedExmaple.subscibe(val => console.log(val))
// 每隔1s emit 一个值
const interval = Rx.Observable.interval(1000)
// 每隔1s 增加我们的 dobounce time
const debouncedInterval = interval.debounce(val => Rx.Observable.timer(val * 200))
/*
* 在5s后,debounce time 就大于 interval time
* 因此,未来的值都会被丢掉
* 输出:0...1...2...3...4......(此时 debounce time 大于1s ,再也没有值 emit)
*/
const subscribeTwo = debounceInterval.subscribe(val => console.log(`Example Two: ${val}`))
忽略 source Observable 某段时间内的 emit 的值。
const input = document.getElementById('example')
// 对于每个 keyup 事件,映射为当前 input 的值
const example = Rx.observable
.fromEvent(input, 'keyup')
.map(i => i.currentTarget.value)
// 在 keyups 和 emit 当前值之间,等待 .5s
const debouncedInput = example.debounceTime(500)
// 输出
const subscribe = debouncedInput.subscribe(val => {
console.log(`Debounced Input: ${val}`)
}
当 Observable 为空时,使用这个设定的默认值,否则值为 null
。
const empty = Rx.Observable.of()
// 当 source Observable 的值为空时,使用这个默认值
const exampleOne = empty.defaultIfEmpty('Observable.of() Empty!')
// 输出:'Observable.of() Empty!'
const subscribe = exampleOne.subscribe(val => console.log(val))
// 空的 Observable
const emptyTwo = Rx.Observable.empty()
const exampleTwo = emptyTwo.defaultIfEmpty('Observable.empty()!')
// 输出:'Observable.empty()!'
const subscribe = exampleTwo.subscribe(val => console.log(val))
延迟 Observable emit 的时间。
const example = Rx.Observable.of(null)
const merge = example.merge(
example.mapTo('Hello'),
example.mapTo('World').delay(1000),
example.mapTo('Goodbye').delay(2000),
example.mapTo('World!').delay(3000)
)
// 输出:'Hello'...'World'...'Goodbye'...'World!'
const subscribe = merge.subscribe(val => console.log(val))
根据指定的函数,延迟 emit 的时间。
// 每隔1s emit 一个值
const message = Rx.Observable.interval(3000)
// 在5s后 emit 值
const delayForFiveSeconds = () => Rx.Observable.timer(5000)
// 在5s后,开始 emit 值
const delayWhenExample = message.delayWhen(delayForFiveSeconds)
// 输出:延迟5s后,
// 5s....1...2...3
const subscribe = delayWhenExample.subscribe(val => console.log(val))
把 notification object 变成 notification values
// emit next notification 和 error notification
const source = Rx.Observable
.from([
Rx.Notification.createNext('SUCCESS'),
Rx.Notification.createError('ERROR!')
])
// 把 notification object 变成 notification values
.dematerialize()
// 输出:`NEXT VALUE: SUCCESS' 'ERROR VALUE: 'ERROR!'
const subscriptionn = source.subscribe({
next: val => console.log(`NEXT VALUE: ${val}`),
error: val => console.log(`ERROR VALUE: ${val}`),
})
只有当前的值与上个值不同,才 emit 。
// 仅仅输出那些与上个值不同的值
const myArrayWithDuplicatesInARow = Rx.Observable
.from([1, 1, 2, 2, 3, 1, 2, 3])
const distinctSub = myArrayWithDuplicatesInARow
.distinctUntilChanged()
// 输出:1, 2, 3, 1, 2, 3
.subscribe(val => console.log('DISTINCT SUB: ', val))
const nonDistinctSub = myArrayWithDuplicatesInARow
// 输出:1, 1, 2, 2, 3, 1, 2, 3
.subscribe(val => console.log('NON DISTINCT SUB:', val))
const sampleObject = { name: 'Test' }
const myArrayWithDuplicateObjects = Rx.Observable.from([sampleObject, sampleObject])
const nonDisinctObjects = myArrayWithDuplicateObjects
.distinctUntilChanged()
// 输出:`DISTINCT OBJECTS: { name: 'Test' }`
.subscribe(val => console.log('DISTINCT OBJECTS', val))
显式地进行某些 action ,比如 logging
const source = Rx.Observable.of(1, 2, 3, 4, 5)
// 通过 do ,显式地打印某些值
const example = source
.do(val => console.log(`BEFORE MAP: ${val}`)
.map(val => val + 10)
.do(val => console.log(`AFTER MAP: ${val}`)
// `do` 是不会 emit 值
const subscribe = example.subscribe(val => console.log(val))
检测是否「所有 emit 的值」都符合某个条件。
// emit 5个值
const source = Rx.Observable.of(1, 2, 3, 4, 5)
const example = source
.every(val => val % 2 == 0)
// 输出:false
const subscribe = example.subscribe(val => console.log(val))
// emit 5个值
const allEvens = Rx.Observable.of(2, 4, 6, 8, 10)
const exampleTwo = allEvens
// 是否每个值都是偶数
.every(val => val % 2 === 0)
// 输出:true
const subscribe = exampleTwo.subscribe(val => console.log(val))
递归地调用给定的函数。
// emit 2
const source = Rx.Observable.of(2)
const example = source
.expand(val => {
// 2, 3, 4, 5
console.log(`Passed value: ${val}`)
// 3, 4, 5, 6
return Rx.Observable.of(1 + val)
})
// 只调用 5 次
.take(5)
/*
* RESULT: 2
* Passed value: 2
* RESULT: 3
* Passed value: 3
* RESULT: 4
* Passed value: 4
* RESULT: 5
* Passed value: 5
* RESULT: 6
* Passes value: 6
*/
// 输出:2, 3, 4, 5, 6
const subscribe = exmaple.subscribe(val => console.log(`RESULT: ${val}`))
只返回符合给定条件的值。
// emit (1, 2, 3, 4, 5)
const source = Rx.Observable.from([1, 2, 3, 4, 5])
// 过滤掉不是偶数的值
const example = source.filter(num => num % 2 === 0)
// 输出:Even Number: 2, Even Number: 4
const subscribe = example.subscribe(val => console.log(`Even number: ${val}`))
// emit ({ name: 'Joe', age: 31 }, { name: 'Bob', age: 25 })
const sourceTwo = Rx.Observable.from([{ name: 'Joe', age: 31 }, { name: 'Bob', age: 25 }]
// 过滤掉30岁以下的
const exampleTwo = sourceTwo.filter(person => person.age >= 300)
// 输出:Over 30: Joe
const subscribeTwo = exampleTwo.subscribe(val => console.log(`Over 30: ${val.name}`))
emit 第一个值,或者第一个符合给定条件的值。
const source = Rx.Observable.from([1, 2, 3, 4, 5])
// 没有传任何参数,那么就 emit 第一个值
const example = source.first()
// 输出:First Value: 1
const subscribe = example.subscribe(val => console.log(`First Value: ${val}`))
// emit 第一个符合给定条件的值
const exampleTwo = source.first(num => num === 5)
// 输出:First to pass test: 5
const subscribe = exampleTwo.subscribe(val => console.log(`First to pass test: ${val}`))
// 传递可选参数 project function
const exampleThree = source.first(
num => num % 2 === 0,
(result, index) => `First even: ${result} is at index: ${index}`
)
// 输出:First even: 2 is at index 1
const subscribeThree = exampleThree.subscribe(val => console.log(val))
按照给定的值,分组到 Observable。
const people = [{name: 'Sue', age:25},{name: 'Joe', age: 30},{name: 'Frank', age: 25}, {name: 'Sarah', age: 35}];
const source = Rx.Observable.from(people)
// 按年龄分组
const example = source
.groupBy(person => person.age)
// 每个分组作为数组返回
.flatMap(group => group.reduce((acc, curr) => [...acc, curr], []))
/*
* 输出:
* [{age: 25, name: 'Sue'}, {age: 25, name: 'Frank'}]
* [{age: 30, name: 'joe'}]
* [{age: 35, name: 'Sarah'}]
*/
const subscribe = example.subscribe(val => console.log(val))
忽略所有的东西,除了 complete 和 error 。
// 每隔500ms emit 值
const source = Rx.Observable.interval(100)
// 忽略一切东西除了 complete
const example = source
.take(5)
.ignoreElements()
// 输出:'COMPLETE!'
const subscribe = example.subscribe(
val => console.log(`NEXT: ${val}`),
val => console.log(`ERROR: ${val}`),
() => console.log(`COMPLETE`)
)
// 忽略一切东西除了 error
const error = source
.flatMap(val => {
if (val === 4) return Rx.Observable.throw(`ERROR AT ${val}`)
return Rx.Observable.of(val)
})
.ignoreElements()
// 输出:ERROR: ERROR at 4
const subscribeTwo = error.subscribe(
val => console.log(`NEXT: ${val}`),
err => console.log(`ERROR: ${err}`),
() => console.log('SECOND COMPLETE')
)
emit 最后一个值,或者最后一个通过 test 的值。
const source = Rx.Observable.from([1, 2, 3, 4, 5])
// 没有传参数,emit 最后一个值
const example = source.last()
// 输出:"Last value: 5"
const subscribe = example.subscribe(val => console.log(`Last value: ${val}`))
// emit 最后一个偶数
const exampleTwo = source.last(num => num % 2 === 0)
// 输出:Last to pass test: 4
const subscribeTwo = exampleTwo.subscribe(val => console.log(`Last to pass test: ${val}`))
把每个值都应用到 project function ,映射为新的值。
// emit (1, 2, 3, 4, 5)
const source = Rx.Observable.of([1, 2, 3, 4, 5])
// 每个值都加10
const example = source.map(val => val + 10)
// 输出:11, 12, 13, 14, 15
const subscribe = example.subscribe(val => console.log(val))
// emit ({name: 'Joe', age: 30}, {name: 'Frank', age: 20},{name: 'Ryan', age: 50})
const sourceTwo = Rx.Observable.from([{name: 'Joe', age: 30}, {name: 'Frank', age: 20},{name: 'Ryan', age: 50}]);
// 获取每个人的名字
const exmapleTwo = sourceTwo.map(person => person.name)
// 输出:"Joe", "Frank", "Ryan"
const subscribeTwo = exampleTwo.subscribe(val => console.log(val))
每一次都映射(map)为一个常量。
const source = Rx.Observable.interval(2000)
// 每个值都映射为一个常量
const example = source.mapTo('HELLO WORLD!')
// 输出:'HELLO WORLD!'...'HELLO WORLD!'...'HELLO WORLD!'...
const subscribe = example.subscribe(val => console.log(val))
// 每次点击 document 时都 emit
const clickSource = Rx.Observable.fromEvent(document, 'click')
// 把所有的 emission 都设为一个值
const exampleTwo = clickSource.mapTo('GOODBYE WORLD!')
// 输出:(click)'GOODBYE WORLD!'...
const subscribeTwo = exampleTwo.subscribe(val => console.log(val))
把多个 Observable 压扁为一个 Observable 。
// 每隔2.5s emit 值
const first = Rx.Observable.interval(2500)
// 每隔2s emit 值
const second = Rx.Observable.interval(2000)
// 每隔1.5s emit 值
const third = Rx.Observable.interval(1500)
// 每隔1s emit 值
const fourth = Rx.Observable.interval(1000)
// 把多个 Observable 合并为一个单独的 Observable
const example = Rx.Observable.merge(
first.mapTo('FIRST'),
second.mapTo('SECOND'),
third.mapTo('THIRD'),
fourth.mapTo('FOURTH')
)
// 输出:'FOURTH', 'THIRD', 'SECOND', 'FOURTH', 'FIRST', 'THIRD', 'FOURTH'...
const subscribe = example.subscribe(val => console.log(val))
// merge 可以当作 Observable instance 的一个方法
const exampleTwo = first.merge(fourth)
// 输出:0, 1, 0, 2...
const subscribeTwo = exampleTwo.subscribe(val => console.log(val))
signature: mergeMap(project: function: Observable, resultSelector: function: any, concurrent: number): Observable
把 source 的值先 map 到 inner Observable ,最后压扁返回。简而言之:map
=> mergeAll
。
const source = Rx.Observable.of('Hello')
// map 到 inner Observable ,并且 flatten 。
const example = source.mergeMap(val => Rx.Observable.of(`${val} World!`)
// 输出:Hello World!
const subscribe = example.subscribe(val => console.log(val))
// mergeMap 也可以 emit Promise
const myPromise = val => new Promise(resolve => resolve(`${val} World from Promise!`))
// map 到 promise ,然后 emit 最后结果
const exampleTwo = source.mergeMap(val => myPromise(val))
// 输出:Hello World From Promise!
const subscribeTwo = exampleTwo.subscribe(val => console.log(val))
/*
* 你提供第2个参数,他可以接收 source 传来的值,然后在 inner Observable emit
*/
const exampleThree = source
.mergeMap(val => myPromise(val),
(valueFromSource, valueFromPromise) => {
return `Source: ${valueFromSource}, Promise: ${valueFromPromise}`
}
)
// 输出:Source: Hello, Promise: Hello World From Promise!
const subscribeThree = exampleThree.subscribe(val => console.log(val))
挑选出嵌套的属性(nested property)。
const source = Rx.Observable.from([
{ name: 'Joe', age: 30 },
{ name: 'Sarah', age: 35 },
])
// 抓取所有的名字(name)
const example = source.pluck('name')
// 输出:"Joe", "Sarch"
const subscribe = example.subscribe(val => console.log(val))
const sourceTwo = Rx.Observable.from([
{ name: 'Joe', age: 30, job: { title: 'Developer', language: 'JavaScript' }},
// 如果没有找到 `job` 的话,就会返回 undefined
{ name: 'Sarah', age: 25 },
])
// 抓取在 `job` 里面的 `title`
const exampleTwo = sourceTwo.pluck('job', 'title')
// 输出:'Developer', undefined
const subscribeTwo = exampleTwo.subscribe(val => console.log(val))
什么事情都不做,直到调用 connect
,共享 source 。
const source = Rx.Observable.interval(1000)
const example = source
// side effect
.do(() => console.log('Do something'))
// 直到调用 connect() ,否则什么事情都不会发生
.publish()
/*
* source 不会 emit 值,直到调用 connect 方法
* 输出:(5s 后)
* 'Do Something'
* 'Subscriber One: 0'
* 'Subscriber Two: 0'
* 'Do something'
* 'Subscriber One: 1'
* 'Subscriber Two: 1'
*/
const subscribe = example.subscribe(val => console.log(val))
const subscribeTwo = example.subscribe(val => console.log(val))
// 5秒后调用 connect 方法,这会引起 source 开始 emit 值
setTimeout(() => {
example.connect()
}, 5000)
让第一个 Observable emit 。
// 让第一个 Observable emit
const example = Rx.Observable.race(
Rx.Observable.interval(1500),
Rx.Observable.interval(1000).mapTo('1s won!'),
Rx.Observable.interval(2000),
Rx.Observable.interval(2500)
)
// 输出:'1s won!'...'1s won!'...
const subscribe = example.subscribe(val => console.log(val))
指定 source 重复的次数。
// emit 'Repeat this!'
const source = Rx.Observable.of('Repeat this')
// 重复 source emit 的值3次
const example = source.repeat(3)
// 输出:Repeat this! ... Repeat this! ... Repeat this
const subscribe = example.subscribe(val => console.log(val))
const sourceTwo = Rx.Observable.interval(1000)
// 取前5个值,重复2次
const exampleTwo = source.take(5).repeat(2)
// 输出:0,1,2,3,4 ... 0,1,2,3,4
const subscribeTwo = exampleTwo.subscribe(val => console.log(val))
当发生 error 的时候,指定 retry 的次数。
const source = Rx.Observable.interval(1000)
const example = source
.flatMap(val => {
// 抛出 error
if (val > 5) {
return Rx.Observable.throw('Errro!')
}
return Rx.Observable.of(val)
})
// retry 2 times on error
.retry(2)
/*
* 输出:
* 0..1..2..3..4..5..
* 0..1..2..3..4..5..
* 0..1..2..3..4..5..
* Error! Retried 2 times when quit!
*/
const subscribe = example
.subscribe({
next: val => console.log(val),
error: val => console.log(`${va}: Retried 2 times when quit!`)
})
在额外的逻辑的 retry 。
const source = RxObservable.interval(1000)
const example = source
.map(val => {
if (val > 5) {
throw val
}
return val
})
.retryWhen(errors => errors
.do(val => console.log(`Value ${val} was too high!`)
.delayWhen(val => Rx.Observable.timer(val * 1000))
)
/*
* 输出:
* 0
* 1
* 2
* 3
* 4
* 5
* Value 6 was too high!
* ... wait 5s then repeat
*/
const subscribe = example.subscribe(console.log)
当给定的 Observable emit 的时候,返回 source 最新的一个样本(sample)。
// 每隔1s emit 值
const source = Rx.Observable.interval(1000)
// 每隔2s ,返回 source 最新的值
const example = source.sample(Rx.Observble.interval(2000))
// 输出:2..4..6..8
const subscribe = example.subscribe(val => console.log(val))
const sourceTwo = Rx.Observable.zip(
Rx.Observable.from(['Joe', 'Frank', 'Bo']),
Rx.Observable.interval(2000)
)
// 每隔2.5s ,返回 sourceTwo 的最新值
const exampleTwo = sourceTwo.sample(Rx.Observable.interval(2500))
// 输出:['Joe', 0]...['Frank', 1]...
const subscribeTwo = exampleTwo.subscribe(console.log)
reducer / 累加器
const testSubject = new Rx.Subject()
// 最简单的 scan 的例子,从零开始累加
const basicScan = testSubject
.startWith(0)
.scan((acc, curr) => acc + curr)
// 输出累加后的值
const subscribe = basiScan.subscribe(val => console.log('Accumulated total: ', val))
testSubject.next(1) // 1
testSubject.next(2) // 3
testSubject.next(3) // 6
const testSubjectTwo = new Rx.Subject()
const objectScan = testSubject.scan((acc, curr) => Object.assign({}, acc, curr), {})
const subscribe = objectScan.subscribe(val => console.log('Accumulated object: ', val))
testSubject.next({ name: 'Joe' }) // { name: 'Joe' }
testSubject.next({ age: 30 }) // { name: 'Joe', age: 30 }
testSubject.next({ favoriteLanguage: 'JavaScript' }) // { name: 'Joe', age: 30, favoriteLanguage: 'JavaScript' }
在多个 subscriber 中共享 observable 。
const source = Rx.Observable.timer(1000)
const example = source
.do(() => console.log('*** SIDE EFFECT ***'))
.mapTo('*** RESULT ***')
/*
* 如果不 share 的话,SIDE EFFECT 会执行两次
* 输出:
* *** SIDE EFFET ***
* *** RESULT ***
* *** SIDE EFFET ***
* *** RESULT ***
*/
const subscribe = example.subscribe(console.log)
const subscribeTwo = example.subscribe(console.log)
// 在 subscriber 中共享 observable
const shareExample = example.share()
/*
* 如果 share 的话,SIDE EFFECT 只会执行一次
* 输出:
* *** SIDE EFFECT ***
* *** RESULT ***
* *** RESULT ***
*/
const subscribeThree = shareExample.subscribe(console.log)
const subscribeFour = shareExample.subscribe(console.log)
emit 符合条件的一个单独的值。
// emit (1,2,3,4,6)
const source = Rx.Observable.of([1,2,3,4,5])
// emit 符合条件的一个 value
const example = source.single(val => val === 4)
// 输出:4
const subscribe = example.subscribe(console.log)
跳过指定数量的 emitted value 。
const source = Rx.Observable.interval(1000)
// 跳过前5个 emitted value
const example = source.skip(5)
// 输出:5..6..7..8...
const subscribe = example.subscribe(console.log)
跳过 source emit 的值,直到 inner Observable emit 。
const source = Rx.Observable.interval(1000)
// 跳过 source emit 的值,直到 inner Observable emit 。
const example = source.skipUntil(Rx.Observable.timer(6000))
// 输出:5...6...7...8...
const subscribe = example.subscribe(console.log)
跳过 source emit 的值,直到给定的条件为 false 。
const source = Rx.Observable.interval(1000)
const example = source.skipWhile(val => val < 5)
// 输出:5...6...7...8.......
const subscribe = example.subscribe(console.log)
指定第一个 emit 的值。
const source = Rx.Observable.of([1, 2, 3])
const example = source.startWith(0)
// 输出:0,1,2,3
const subscribe = example.subscribe(console.log)
const sourceTwo = Rx.Observable.of('World! ', 'Goodbye', 'World!')
const exampleTwo = sourceTwo
.startWith('Hello')
.scan((acc, curr) => `${acc} ${curr}`)
// 输出:
// Hello
// Hello Wrold!
// Hello World! Goodybe
// Hello World! Goodbye World!
const subscribe = exmapleTwo.subscribe(console.log)
当 source emit 的时候,切换到 inner Observable ,并且 emit 他已经 emit 过的值。
const source = Rx.Observable.timer(0, 5000)
// 切换到 inner Observable ,emit 已经 emit 过的值
const example = source.switchMap(() => Rx.Observable.interval(500))
// 输出:0,1,2,3,4,5,6,7,8,9.....0,1,2,3,4,5,6,7,8,9
const subscribe = example.subscribe(console.log)
const sourceTwo = Rx.Observable.fromEvent(document, 'click')
// 如果下一个 click 在 3s 内发生的话,不会产生新的 msg('Hello, I made it!')
const exampleTwo = sourcwTwo.switchMap(val => Rx.Observable.interval(3000).mapTo('Hello, I made it!'))
// 输出:(click)...3s...'Hello, I made it!'...(click)...2s(click)...
const subscribeTwo = exampleTwo.subscribe(console.log)
类似于 buffer
,但是返回的是 nested Observable 。
// 马上 emit ,然后每隔1s emit
const source = Rx.Observable.timer(0, 1000)
const example = source
.window(Rx.Observable.interval(3000))
const count = example.scan((acc, curr) => acc + 1, 0)
/*
* window 1:
* 0
* 1
* 2
* window 2:
* 3
* 4
* 5
*/
const subscribe = count.subscribe(val => console.log(`Window ${val}:`))
const subscribeTwo = example.mergeAll().subscribe(val => console.log(val))
source emit 的值是 Observable ,emit 的间隔是指定的时间。
// 每隔1s emit
const source = Rx.Observable.interval(1000)
const example = source
// 每隔4个值就开始新的 window
.windowCount(4)
.do(() => console.log('NEW WINDOW!'))
const subscribeTwo = example
// window emit 的是 nested Observable
.mergeAll()
.sbuscribe(console.log)
/*
* 输出:
* NEW WINDOW!
* 0
* 1
* 2
* 3
* NEW WINDOW!
* 4
* 5
* 6
* 7
*/
signature: windowTime(windowTimeSpan: number, windowCreationInterval: number, scheduler: Scheduler): Observable
跟 bufferTime
一样,除了 emit 的值是 nested Observable 而不是一个 array 。
const source = Rx.Observable.timer(0, 1000)
const example = source
// 每隔3s开始一个新的 window
.windowTime(3000)
.do(() => console.log('New Window!'))
const subscribe = example
// window emit 的值为 nested Observable
.mergeAll()
/*
* 输出
* New Window:
* 0
* 1
* 2
* New Window:
* 3
* 4
* 5
*/
.subscribe(console.log)
signature: windowToggle(openings: Observable, closingSelector: function(value): Observable): Observable
跟 bufferTime
一样,除了 emit 的值是 nested Observable 而不是一个 array 。
const source = Rx.Observable.timer(0, 1000)
// 在第5s的时候 toggle window
const toggle = Rx.Observable.interval(5000)
const example = source
// 每隔5s 打开 window
.windowToggle(toggle, val => Rx.Observable.interval(val * 1000))
.do(() => console.log('New Window!'))
const subscribe = example
// window emit 的值为 nested Observable
.mergeAll()
/*
* New Window!
* New Window!
* 10
* New Window!
* 15
* 16
* ...
*/
跟 bufferWhen
一样,除了 emit 的值是 nested Observable 而不是一个 array 。
const source = Rx.Observable.timer(0, 1000)
const example = source
// 每隔5s就关闭wndow,并且 emit 从 source 中缓冲好的值
.windowWhen(val => Rx.Observable.interval(5000))
.do(() => console.log('New Window!'))
const subscribe = example
.mergeAll()
/*
* New Window!
* 0
* 1
* 2
* 3
* 4
* New Window!
* 5
* 6
* 7
* 8
* 9
*/
.subscribe(console.log)
当 source emit 的时候,同时也返回另一个 Obserable 最近 emit 的那个值。
const source = Rx.Observable.interval(5000)
const secondSource = Rx.Observable.interval(1000)
const example = source
.withLatestFrom(secondSource)
.map(([first, second]) => `First Source(5s): ${first} Second Source(1s): ${second}`)
/*
* First Source(5s): 0 Second Source(1s): 4
* First Source(5s): 1 Second Source(1s): 9
* First Source(5s): 2 Second Source(1s): 14
*/
const subscribe = example.subscribe(console.log)
// withLastest 比 source 慢
const exampleTwo = secondSource
.withLatestFrom(source)
.map(([first, second]) => `Source(1s): ${first} Latest from(5s): ${second}`)
/*
* Source(1s): 4 Latest Lastest from(5s): 0
* Source(1s): 5 Latest Lastest from(5s): 0
* Source(1s): 6 Latest Lastest from(5s): 0
*/
const subscribeTwo = exampleTwo.subscribe(console.log)
等到所有的 Observable 都 emit 之后,才作为数组返回。
const sourceOne = Rx.Observable.of('Hello')
const sourceTwo = Rx.Observable.of('World')
const sourceThree = Rx.Observable.of('Goodbye')
const sourceFour = Rx.Observable.of('World!')
// 等到所有的 Observable 都 emit 之后,才把它们作为数组 emit 出去
const example = Rx.Observable
.zip(
sourceOne,
sourceTwo.delay(1000),
sourceThree.delay(2000),
sourceFour.delay(3000)
)
// 输出:['Hello', 'World', 'Goodbye', 'World!']
const subscribe = example.subscribe(console.log)
// 每隔1s emit
const interval = Rx.Observable.interval(1000)
// 当一个 Observable complete 后,再也不会 emit 任何值
const exampleTwo = Rx.Observable
.zip(
interval,
iterval.take(2)
)
// 输出:[0, 0]...[1,1]
const subscribeTwo = exampleTwo.subscribe(console.log)