Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save wjkhappy14/a59a5c7855af7e0b07da088a6ac68be6 to your computer and use it in GitHub Desktop.
Save wjkhappy14/a59a5c7855af7e0b07da088a6ac68be6 to your computer and use it in GitHub Desktop.
「译」RxJS 5 Operators By Example

用例子讲解 RxJS 5 的 Operators 。

原文链接:RxJS 5 Operators By Example by @btroncone

目录

buffer

signature: buffer<T>(closingNotifier: Observable<any>): Observable<T[]>

缓冲所有输出的值,直到他们被提交。重复之。。。

(demo | official docs)

// 创建一个 observable ,每秒 emit 一个值
const myInterval = Rx.Observable.interval(1000)

// 创建一个 observable ,每当点击 document 的时候,emit 一个值
const bufferBy = Rx.Observable.fromEvent(document, 'click')

// 缓冲 interval observable emit 的所有值
// 直到我们点击了 document ,这会使得 bufferBy Observable emit 一个值
// 最后,把缓冲好的值,当作一个数组输出
const myBufferedInterval = myInterval.buffer(bufferBy)

// 打印到控制台
// [1,2,3]...[4,5,6,7,8]
const subscribe = myBufferedInterval.subscribe(val => console.log('Buffered Values', val))

bufferCount

signature: bufferCount<T>(bufferSize: number, startBufferEvery: number = null): Observable<T[]>

缓冲所有的输出值,直到某个数字被 fullfilled ,然后把它们 emit 。重复之。。。

(demo | docs)

// 创建一个 Observable ,每秒 emit 一个 value 。
const myInterval = Rx.Observable.interval(1000)

// 当 emit 了三个值之后,把它们作为数组,输出。
const bufferThree = myInterval.bufferCount(3)

// 打印到控制台
// 比如,输出 [0,1,2]...[3,4,5]
const subscribe = bufferThree.subscribe(val => console.log(`Buffered Values:`, val))

/*
`bufferCount` 的第2个参数:什么时候开始下一个 buffer
比如,bufferCount(3, 1)

第一个时间间隔后:
buffer 1: [0]

第二个时间间隔后:
buffer 1: [0,1]
buffer 2: [1]

第三个时间间隔后:
buffer 1: [0,1,2]
buffer 2: [1,2]
buffer 3: [2]

第四个时间间隔后:
buffer 2: [1,2,3]
buffer 3: [2,3]
buffer 4: [3]
*/

const bufferEveryOne = myInterval.bufferCount(3, 1)
// 打印到控制台
const secondSubscribe = bufferEveryOne.subscribe(val => console.log('Start Buffer Every 1:', val))

bufferTime

signature: bufferTime(bufferTimeSpan: number, bufferCreationInterval: number, scheduler: Scheduler): Observable<T[]>

缓冲输出的值,直到达到指定的时间点,然后把他们 emit 。重复之。。。

(demo | docs)

// 创建一个 observable ,每隔 500ms 就会 emit 一个值
const myInterval = Rx.Observable.interval(500)

// 2秒后,把所有缓冲的值,作为一个数组输出
const bufferTime = myInterval.bufferTime(2000)

// 输出值到控制台
// 比如,输出 [0,1,2]...[3,4,5,6]
const subscribe = bufferTime.subscribe(val => console.log(`Buffered with Time:`, val))

/*
类似地,`bufferTime` 的第2个参数:什么时候开始下一个 buffer
比如,bufferTime(2000, 1000)
可能会输出:[0,1,2]...[1,2,3,4,5]...[3,4,5,6,7]
*/
const bufferTimeTwo = myInterval.bufferTime(2000, 1000)
// 输出到控制台
const secondSubscribe = bufferTimeTwo.subscribe(val => console.log(`Start buffer Every 1s`, val))

bufferToggle

signature: bufferToggle(openings: Observable<O>, closingSelector: Function): Observable<T[]>

打开 buffer ,使其捕捉 source emit 的值,关闭 buffer ,使其 emit 缓冲了的值。

(demo | docs)

// 每隔1秒 emit 值
const sourceInterval = Rx.Observable.interval(1000)
// 在5秒后,emit 值。接着每隔5秒 emit 值
const startInterval = Rx.Observable.interval(5000)

// 在3s后 emit 值,并且关闭对应的 buffer
const closingInterval = val => {
  console.log(`Value ${val} emitted, starting buffer! Closing in 3s!`)
  return Rx.Observable.interval(3000)
}

// 每隔5s就会开始一个新的 buffer ,缓冲3s内的所有 emit 过的值,然后 emit 缓冲好的值
const bufferToggleInterval = sourceInterval.bufferToggle(startInterval, closingInterval)

// 打印到控制台
// Emitted Buffer:[4,5,6]...[9,10,11]
const subscribe = bufferToggleInterval.subscribe(val => console.log('Emitted Buffer:', val))

bufferWhen

signature: bufferWhen(closingSelector: function): Observable<T[]>

缓冲所有的值直到 closing selector emit 值,接着 emit 缓冲好的值

(demo | docs)

// 每隔1s emit 一个值
const oneSecondInterval = Rx.Observable.interval(1000)
// 每隔5秒 emit 一个值
const fiveSecondInterval = Rx.Observable.interval(5000)

// 每隔5秒 emit 缓冲了的值
const bufferWhenExample = oneSecondInterval.bufferWhen(fiveSecondInterval)

// 打印到控制台
//[0,1,2,3]...[4,5,6,7,8]
const subscribe = bufferWhenExample.subscribe(val => console.log('Emitted Buffer', val))

combineAll

signature: combineAll(project: function): Observable

当 outer Observable complete 的时候,输出 inner Observable 最新的值

(demo | docs)

// 5s 后 emit 值,然后 complete
const fiveSecondTimer = Rx.Observable.timer(5000)

// 当 timer(outer Observable) 触发以及完成时,
// 输出 inner Observable 最新的值,在例子中就是一个单独的值
const example = fiveSecondTimer.mapTo(Rx.Observable.of('Hello', 'World'))
const combined = example.combineAll()

// 输出:['Hello']...['World']
const subscribe = combined.subscribe(val => console.log(`Values from inner observable`, val))

// 我们也可以传一个 project function 去接收 emit 的值
const fiveSecondTimer = Rx.Observable.timer(5000)
const example = fiveSecondTimer.mapTo(Rx.Observable.of('Hello', 'Goodbye'))
const combined = example.combineAll(val => `${val} Friend!`)

// 输出:`Hello Friend!' ... 'Goodbye Friend!'
const subscribeProjected = combined.subscribe(val => console.log(`Values Using Projection`, val))

combineLatest

signature: combineLatest(observables: ...Observable, project: function): Observable

给定一组 Observable ,当其中一个 emit 的时候,其他的 emit 最新的值

(demo | docs)

// `timerOne` 会在第1s的时候 emit 第一个值,接着每隔4s emit
const timerOne = Rx.Observable.timer(1000, 4000)
// `timerTwo` 会在第2s的时候 emit 第一个值,接着每隔4s emit
const timerTwo = Rx.Observable.timer(2000, 4000)
// `timerThree` 会在第3s的时候 emit 第一个值,接着每隔4s emit
const timerThree = Rx.Observable.timer(3000, 4000)

// 当其中一个 timer emit 的时候,其他的也同时 emit 最新的值,作为一个数组返回
const combined = Rx.Observable
  .combineLatest(
    timeOne,
    timeTwo,
    timeTrhee
  )
  
const subscribe = combined.subscribe(latestValues => {
  const [timerValOne, timerValTwo, timerValThree] = latestValues
  
  /*
  * timerOne first tick: timer One Latest: 1, Timer Two Latest: 0, Timer Three Latest: 0
  * timerOne first tick: timer One Latest: 1, Timer Two Latest: 1, Timer Trhee Latest: 0
  * timerOne first tick: timer One Latest: 1, Timer Two Latest: 1, Timer Trhee Latest: 1
  */
  console.log(
    `Timer One Latest: ${timerValOne}`,
    `Timer Two Latest: ${timerValTwo}`,
    `Timer Three Latest: ${timerValThree}`
  )
})

// `combineLastest` 也可以接收一个可选的参数:projection function
const combineProject = Rx.Observable(
  timerOne,
  timerTwo,
  timerThree,
  (one, two, three) => {
    return `Timer One (Proj) Latest: ${one},
      Timer Two (Proj) Latest: ${two},
      Timer Three (Proj) Latest: ${three}
    `
  )
  
const subscribe = combineProject.subscribe(latestValuesProject => console.log(latestValuesProject))

concat

signature: concat(observables: ...*): Observable

就像在 ATM 交易的队伍一样,直到上一个交易完毕,下一个才能开始

(demo | docs)

// emit 1,2,3
const sourceOne = Rx.Observable.of(1,2,3)
// emit 4,5,6
const sourceTwo = Rx.Observable.of(4,5,6)

// emit `sourceOne`,直到他 complete ,才开始 subscribe `sourceTwo`
const concatSource = sourceOne.concat(sourceTwo)

// 输出:1,2,3,4,5,6
const subscribe = concatSource.subscribe(val => console.log('Example 1: Basic concat:', val))

// 延迟3s再 emit
const delayedSourceOne = sourceOne.delay(3000)
// 等到 `delaySourceOne` complete 之后才开始 subscibe `sourceTwo`
const concatDelayedSource = delayedSourceOne.concat(sourceTwo)

// 输出:1,2,3,4,5,6
const subscribeDelayed = concatDelayedSource.subscribe(val => console.log('Example 2: Delayed source one:', val))


// 如果 `sourceOne` 永远不 complete ,那么永远也不会 subcribe 第二个 Observable
const sourceOneNeverComplete = Rx.Observable
  .concat(
    Rx.Observable.interval(1000),
    Rx.Observable.of('Thiis', 'Never', 'Runs')
  )
  .delay(5000)

// 输出:1,2,3,4....
const subscribeNeverComplete = sourceOneNeverComplete.subscribe(val => 
  console.log('Example 3: Source never complete, Second Observable never run', val)
)

concatAll

signature: concatAll(): Observable

用于 nestet Observalbe(observable of observables) ,当上一个 complete 的时候,subscribe 每个,然后合并(merge)每个 emit 的值

(demo | docs)

// 每隔2s emit 一个值
const sourceOne = Rx.Observable.interval(2000)
const example = sourceOne
  // 加10,并且返回一个 Observable
  .map(val => Rx.Observable.of(val + 10)
  // 合并从 inner Observable 返回的值
  .concatAll()
  
// 输出:'Example with Basic Observable 10', `Example with Basic Observable 11'...
const subscribe = example.subscribe(val => console.log('Exmaple with Basic Observable', val))


// 创建一个只有 resolve 的 promise
const samplePromise = val => new Promise(resolve => resolve(val))

const exampleTwo = sourceOne
  .map(val => samplePromise(val))
  // 合并从 samplePromise 来的值
  .concatAll()
  
// 输出:`Example with Promise 0`, `Example with Promise 1`
const subscribeTwo = exampleTwo.subscribe(val => console.log('Example with Promise', val))

concatMap

signature: concatMap(project: function, resultSelector: function): Observable

把 source Observable 的值映射到 inner Observable ,接着按顺序 subscribe 和 emit inner Observable 的值

相等于:map -> concat

(demo | docs)

// emit 'Hello' 和 'Goodbye'
const source = Rx.Observable.of('Hello', 'Goodbye')
// 把 source value 映射到 inner Observable ,当 complete 时,emit 结果,接着下一个。
const exampleOne = source.concatMap(val => Rx.Observable.of(`${val} World!`))

// 输出:`Example One: 'Hello World', Example One: 'Goodbye World'
const subscribe = exampleOne.subscribe(val => console.log('Example One:', val))


// 结合 promise 的例子
const examplePromise = val => new Promise(resolve => resolve(`${val} World`))
// 把 source value 映射到 inner Observable ,当 complete 时,emit 结果,接着下一个。
const exampleTwo = source.concatMap(val => examplePromise(val))

// 输出:`Example w/ Promise: 'Hello World', Exmaple w/ Promise: 'Goodbye World'
const subscribeTwo = exampleTwo.subscribe(val => console.log('Exmaple w/ Promise', val))


// 第一个参数返回的值,会传给第二个参数
const exampleWithSelector = source.concatMap(val => examplePromise(val), result => `${result} w/ selector!`)

// 输出:Example w/ Selector: 'Hello w/ Selector', Example w/ Selector: 'Goodbye w/ Selector'
const subscribeThree = exampleWithSelector
  .delay(2000)
  .subscribe(val => console.log('Exmaple w/ Selector', val))

concatMapTo

signature: concatMapTo(observable: Observable, resultSelector: function): Observable

当 source emit 的时候,总是 subscribe 相同的 Observable ,当 complete 时合并结果

(demo | docs)

// 每隔2s emit 一个值
const interval = Rx.Observable.interval(2000)
const message = Rx.Observable.of(`Second(s) elapsed!`)

// 当 `interval` emit 时,subscribe `message` 直到他 complete ,最后合并结果
const example = interval.concatMapTo(message, (time, msg) => `${time} ${msg}`)

// 输出:`0 Second(s) elapsed! 1 Second(s) elapsed!`
const subscribe = example.subscribe(val => console.log(val))

// 每隔1s emit 一个值,取前5个
const basicTimer = Rx.Observable.interval(1000).take(5)

/*
* ***注意*** 
* 像这种 source Observable emit 的速度比 inner Observable complete 快的情景下,
* 就会出现内存问题
* (`interval` emit 时间间隔为1s ,而 `basicTimer` complete 则需要5s)
*/

// `basicTimer` 会在 5s 后 complete ,emit 的值有:0,1,2,3,4
const exampleTwo = interval
  .concatMapTo(basicTimer, (firstInterval, secondInterval) => `${fisrtInterval} ${secondInterval}`)

/* 输出:
* 0 0
* 0 1
* 0 2
* 0 3
* 0 4
* 1 0
* 1 1
* continue...
*/

const subscribe = exampleTwo.subscribe(val => console.log(val))

count

signature: count(predicate: function): Observable

计算 source emit 值的数量,直到 complete 。

(demo | docs)

// emit 1,2,3 然后 complete
const threeItems = Rx.Observable.of(1, 2, 3)
// 当 `threeItems` complete 的时候,计算他 emit 值得数量
const exampleOne = threeItems.count()

// 输出:`Count from Example One: 3'
const subscribe = exampleOne.subscribe(val => console.log(`Count from Example One: `, val))


// 数组的计数
const myArray = [1, 2, 3, 4, 5]
const myObsArray = Rx.Observable.from(myArray)
const exampleTwo = myObsArray.count()

// 输出:'Count of Basic Array: 5'
const subscribeTwo = exampleTwo.subscribe(val => console.log(`Count of Basic Array: ${val}`))


// 可选参数:计数符合 predicate function 的值
const evensCount = myObsArray.count(val => val % 2 === 0)
// 输出:'Count of Even Numbers: 2'
const subscribeThree = evensCount.subscribe(val => console.log(`Count of Even Number: ${val}`))

debounce

signature: debounce(durationSelector: function): Observable

忽略 source Observable 某段时间内的 emit 的值。

(demo | docs)

// emit 4个字符串
const example = Rx.Observable.of('WAIT', 'ONE', 'SECOND', 'Last will display')

/*
* 仅仅 emit 最后一个 emission 过后的1s的值
* 其他的值丢掉
*/
const debouncedExmaple = example.debounce(() => Rx.Observable.timer(1000))

/*
* 在这个例子中,所有的值除了最后一个之外全部都忽略掉
* 输出:'Last will display'
*/
const subscribe = deboundedExmaple.subscibe(val => console.log(val))


// 每隔1s emit 一个值
const interval = Rx.Observable.interval(1000)
// 每隔1s 增加我们的 dobounce time
const debouncedInterval = interval.debounce(val => Rx.Observable.timer(val * 200))

/*
* 在5s后,debounce time 就大于 interval time
* 因此,未来的值都会被丢掉
* 输出:0...1...2...3...4......(此时 debounce time 大于1s ,再也没有值 emit)
*/

const subscribeTwo = debounceInterval.subscribe(val => console.log(`Example Two: ${val}`))

debounceTime

signature: debounceTime(dueTime: number, scheduler: Scheduler): Observable

忽略 source Observable 某段时间内的 emit 的值。

(demo | docs)

const input = document.getElementById('example')

// 对于每个 keyup 事件,映射为当前 input 的值
const example = Rx.observable
  .fromEvent(input, 'keyup')
  .map(i => i.currentTarget.value)
  
// 在 keyups 和 emit 当前值之间,等待 .5s
const debouncedInput = example.debounceTime(500)

// 输出
const subscribe = debouncedInput.subscribe(val => {
  console.log(`Debounced Input: ${val}`)
}

defaultIfEmpty

signature: defaultIfEmpty(defaultValue: any): Observable

当 Observable 为空时,使用这个设定的默认值,否则值为 null

(demo | docs)

const empty = Rx.Observable.of()
// 当 source Observable 的值为空时,使用这个默认值
const exampleOne = empty.defaultIfEmpty('Observable.of() Empty!')
// 输出:'Observable.of() Empty!'
const subscribe = exampleOne.subscribe(val => console.log(val))

// 空的 Observable
const emptyTwo = Rx.Observable.empty()
const exampleTwo = emptyTwo.defaultIfEmpty('Observable.empty()!')
// 输出:'Observable.empty()!'
const subscribe = exampleTwo.subscribe(val => console.log(val))

delay

signature: delay(delay: number | Date, scheduler: Scheduler): Observable

延迟 Observable emit 的时间。

(demo | docs)

const example = Rx.Observable.of(null)

const merge = example.merge(
  example.mapTo('Hello'),
  example.mapTo('World').delay(1000),
  example.mapTo('Goodbye').delay(2000),
  example.mapTo('World!').delay(3000)
)

// 输出:'Hello'...'World'...'Goodbye'...'World!'
const subscribe = merge.subscribe(val => console.log(val))

delayWhen

signature: delayWhen(selector: Function, sequence: Observable): Observable

根据指定的函数,延迟 emit 的时间。

(demo | docs)

// 每隔1s emit 一个值
const message = Rx.Observable.interval(3000)
// 在5s后 emit 值
const delayForFiveSeconds = () => Rx.Observable.timer(5000)
// 在5s后,开始 emit 值
const delayWhenExample = message.delayWhen(delayForFiveSeconds)

// 输出:延迟5s后,
// 5s....1...2...3
const subscribe = delayWhenExample.subscribe(val => console.log(val))

dematerialize

sigature: dematerialize(): Observable

把 notification object 变成 notification values

(demo | docs)

// emit next notification 和 error notification
const source = Rx.Observable
  .from([
    Rx.Notification.createNext('SUCCESS'),
    Rx.Notification.createError('ERROR!')
  ])
  // 把 notification object 变成 notification values
  .dematerialize()
  
// 输出:`NEXT VALUE: SUCCESS' 'ERROR VALUE: 'ERROR!'
const subscriptionn = source.subscribe({
  next: val => console.log(`NEXT VALUE: ${val}`),
  error: val => console.log(`ERROR VALUE: ${val}`),
})

distinctUntilChanged

signature: distinctUntilChanged(compare: function): Observable

只有当前的值与上个值不同,才 emit 。

(demo | docs)

// 仅仅输出那些与上个值不同的值
const myArrayWithDuplicatesInARow = Rx.Observable
  .from([1, 1, 2, 2, 3, 1, 2, 3])
  
const distinctSub = myArrayWithDuplicatesInARow
  .distinctUntilChanged()
  // 输出:1, 2, 3, 1, 2, 3
  .subscribe(val => console.log('DISTINCT SUB: ', val))
  
const nonDistinctSub = myArrayWithDuplicatesInARow
  // 输出:1, 1, 2, 2, 3, 1, 2, 3
  .subscribe(val => console.log('NON DISTINCT SUB:', val))
  
const sampleObject = { name: 'Test' }
const myArrayWithDuplicateObjects = Rx.Observable.from([sampleObject, sampleObject])

const nonDisinctObjects = myArrayWithDuplicateObjects
  .distinctUntilChanged()
  // 输出:`DISTINCT OBJECTS: { name: 'Test' }`
  .subscribe(val => console.log('DISTINCT OBJECTS', val))

do

signature: do(nextOrObserver: function, error: function, complete: function): Observable

显式地进行某些 action ,比如 logging

(demo | docs)

const source = Rx.Observable.of(1, 2, 3, 4, 5)

// 通过 do ,显式地打印某些值
const example = source
  .do(val => console.log(`BEFORE MAP: ${val}`)
  .map(val => val + 10)
  .do(val => console.log(`AFTER MAP: ${val}`)
  
// `do` 是不会 emit 值
const subscribe = example.subscribe(val => console.log(val))

every

signature: every(predicate: function, thisArg: any): Observable

检测是否「所有 emit 的值」都符合某个条件。

(demo | docs)

// emit 5个值
const source = Rx.Observable.of(1, 2, 3, 4, 5)
const example = source
  .every(val => val % 2 == 0)
  
// 输出:false
const subscribe = example.subscribe(val => console.log(val))


// emit 5个值
const allEvens = Rx.Observable.of(2, 4, 6, 8, 10)
const exampleTwo = allEvens
  // 是否每个值都是偶数
  .every(val => val % 2 === 0)
  
// 输出:true
const subscribe = exampleTwo.subscribe(val => console.log(val))

expand

signature: expand(project: function, concurrent: number, scheduler: Scheduler): Observable

递归地调用给定的函数。

(demo | docs)

// emit 2
const source = Rx.Observable.of(2)

const example = source
  .expand(val => {
    // 2, 3, 4, 5
    console.log(`Passed value: ${val}`)
    // 3, 4, 5, 6
    return Rx.Observable.of(1 + val)
  })
  // 只调用 5 次
  .take(5)
  
/*
* RESULT: 2
* Passed value: 2
* RESULT: 3
* Passed value: 3
* RESULT: 4
* Passed value: 4
* RESULT: 5
* Passed value: 5
* RESULT: 6
* Passes value: 6
*/

// 输出:2, 3, 4, 5, 6
const subscribe = exmaple.subscribe(val => console.log(`RESULT: ${val}`))

filter

signature: filter(select: Function, thisArg: any): Observable

只返回符合给定条件的值。

(demo | docs)

// emit (1, 2, 3, 4, 5)
const source = Rx.Observable.from([1, 2, 3, 4, 5])
// 过滤掉不是偶数的值
const example = source.filter(num => num % 2 === 0)
// 输出:Even Number: 2, Even Number: 4
const subscribe = example.subscribe(val => console.log(`Even number: ${val}`))

// emit ({ name: 'Joe', age: 31 }, { name: 'Bob', age: 25 })
const sourceTwo = Rx.Observable.from([{ name: 'Joe', age: 31 }, { name: 'Bob', age: 25 }]
// 过滤掉30岁以下的
const exampleTwo = sourceTwo.filter(person => person.age >= 300)
// 输出:Over 30: Joe
const subscribeTwo = exampleTwo.subscribe(val => console.log(`Over 30: ${val.name}`))

first

signature: first(predicate: function, select: function)

emit 第一个值,或者第一个符合给定条件的值。

(demo | docs)

const source = Rx.Observable.from([1, 2, 3, 4, 5])
// 没有传任何参数,那么就 emit 第一个值
const example = source.first()
// 输出:First Value: 1
const subscribe = example.subscribe(val => console.log(`First Value: ${val}`))

// emit 第一个符合给定条件的值
const exampleTwo = source.first(num => num === 5)
// 输出:First to pass test: 5
const subscribe = exampleTwo.subscribe(val => console.log(`First to pass test: ${val}`))

// 传递可选参数 project function
const exampleThree = source.first(
  num => num % 2 === 0,
  (result, index) => `First even: ${result} is at index: ${index}`
)
// 输出:First even: 2 is at index 1
const subscribeThree = exampleThree.subscribe(val => console.log(val))

groupBy

signature: groupBy(keySelector: Function, elementSelector: Function): Observable

按照给定的值,分组到 Observable。

(demo | docs)

const people = [{name: 'Sue', age:25},{name: 'Joe', age: 30},{name: 'Frank', age: 25}, {name: 'Sarah', age: 35}];
const source = Rx.Observable.from(people)
// 按年龄分组
const example = source
  .groupBy(person => person.age)
  // 每个分组作为数组返回
  .flatMap(group => group.reduce((acc, curr) => [...acc, curr], []))
  
/*
* 输出:
* [{age: 25, name: 'Sue'}, {age: 25, name: 'Frank'}]
* [{age: 30, name: 'joe'}]
* [{age: 35, name: 'Sarah'}]
*/

const subscribe = example.subscribe(val => console.log(val))

ignoreElements

signature: ignoreElements(): Observable

忽略所有的东西,除了 complete 和 error 。

(demo | docs)

// 每隔500ms emit 值
const source = Rx.Observable.interval(100)
// 忽略一切东西除了 complete
const example = source
  .take(5)
  .ignoreElements()
// 输出:'COMPLETE!'
const subscribe = example.subscribe(
  val => console.log(`NEXT: ${val}`),
  val => console.log(`ERROR: ${val}`),
  () => console.log(`COMPLETE`)
)

// 忽略一切东西除了 error
const error = source
  .flatMap(val => {
    if (val === 4) return Rx.Observable.throw(`ERROR AT ${val}`)
    return Rx.Observable.of(val)
  })
  .ignoreElements()
  
// 输出:ERROR: ERROR at 4
const subscribeTwo = error.subscribe(
  val => console.log(`NEXT: ${val}`),
  err => console.log(`ERROR: ${err}`),
  () => console.log('SECOND COMPLETE')
)

last

signature: last(predicate: function): Observable

emit 最后一个值,或者最后一个通过 test 的值。

(demo | docs)

const source = Rx.Observable.from([1, 2, 3, 4, 5])
// 没有传参数,emit 最后一个值
const example = source.last()
// 输出:"Last value: 5"
const subscribe = example.subscribe(val => console.log(`Last value: ${val}`))

// emit 最后一个偶数
const exampleTwo = source.last(num => num % 2 === 0)
// 输出:Last to pass test: 4
const subscribeTwo = exampleTwo.subscribe(val => console.log(`Last to pass test: ${val}`))

map

signature: map(project: Function, thisArg: any): Observable

把每个值都应用到 project function ,映射为新的值。

(demo | docs)

// emit (1, 2, 3, 4, 5)
const source = Rx.Observable.of([1, 2, 3, 4, 5])
// 每个值都加10
const example = source.map(val => val + 10)
// 输出:11, 12, 13, 14, 15
const subscribe = example.subscribe(val => console.log(val))


// emit ({name: 'Joe', age: 30}, {name: 'Frank', age: 20},{name: 'Ryan', age: 50})
const sourceTwo =  Rx.Observable.from([{name: 'Joe', age: 30}, {name: 'Frank', age: 20},{name: 'Ryan', age: 50}]);
// 获取每个人的名字
const exmapleTwo = sourceTwo.map(person => person.name)
// 输出:"Joe", "Frank", "Ryan"
const subscribeTwo = exampleTwo.subscribe(val => console.log(val))

mapTo

signature: mapTo(value: any): Observable

每一次都映射(map)为一个常量。

(demo | docs)

const source = Rx.Observable.interval(2000)
// 每个值都映射为一个常量
const example = source.mapTo('HELLO WORLD!')
// 输出:'HELLO WORLD!'...'HELLO WORLD!'...'HELLO WORLD!'...
const subscribe = example.subscribe(val => console.log(val))


// 每次点击 document 时都 emit
const clickSource = Rx.Observable.fromEvent(document, 'click')
// 把所有的 emission 都设为一个值
const exampleTwo = clickSource.mapTo('GOODBYE WORLD!')
// 输出:(click)'GOODBYE WORLD!'...
const subscribeTwo = exampleTwo.subscribe(val => console.log(val))

merge

signature: merge(input: Observable): Observable

把多个 Observable 压扁为一个 Observable 。

(demo | docs)

// 每隔2.5s emit 值
const first = Rx.Observable.interval(2500)
// 每隔2s emit 值
const second = Rx.Observable.interval(2000)
// 每隔1.5s emit 值
const third = Rx.Observable.interval(1500)
// 每隔1s emit 值
const fourth = Rx.Observable.interval(1000)

// 把多个 Observable 合并为一个单独的 Observable
const example = Rx.Observable.merge(
  first.mapTo('FIRST'),
  second.mapTo('SECOND'),
  third.mapTo('THIRD'),
  fourth.mapTo('FOURTH')
)

// 输出:'FOURTH', 'THIRD', 'SECOND', 'FOURTH', 'FIRST', 'THIRD', 'FOURTH'...
const subscribe = example.subscribe(val => console.log(val))

// merge 可以当作 Observable instance 的一个方法
const exampleTwo = first.merge(fourth)
// 输出:0, 1, 0, 2...
const subscribeTwo = exampleTwo.subscribe(val => console.log(val))

mergeMap

signature: mergeMap(project: function: Observable, resultSelector: function: any, concurrent: number): Observable

把 source 的值先 map 到 inner Observable ,最后压扁返回。简而言之:map => mergeAll

(demo | docs)

const source = Rx.Observable.of('Hello')
// map 到 inner Observable ,并且 flatten 。
const example = source.mergeMap(val => Rx.Observable.of(`${val} World!`)
// 输出:Hello World!
const subscribe = example.subscribe(val => console.log(val))

// mergeMap 也可以 emit Promise
const myPromise = val => new Promise(resolve => resolve(`${val} World from Promise!`))
// map 到 promise ,然后 emit 最后结果
const exampleTwo = source.mergeMap(val => myPromise(val))
// 输出:Hello World From Promise!
const subscribeTwo = exampleTwo.subscribe(val => console.log(val))

/*
* 你提供第2个参数,他可以接收 source 传来的值,然后在 inner Observable emit
*/
const exampleThree = source
  .mergeMap(val => myPromise(val),
    (valueFromSource, valueFromPromise) => {
      return `Source: ${valueFromSource}, Promise: ${valueFromPromise}`
    }
  )
  
// 输出:Source: Hello, Promise: Hello World From Promise!
const subscribeThree = exampleThree.subscribe(val => console.log(val))

pluck

signature: pluck(properties: ...args): Observable

挑选出嵌套的属性(nested property)。

(demo | docs)

const source = Rx.Observable.from([
  { name: 'Joe', age: 30 },
  { name: 'Sarah', age: 35 },
])

// 抓取所有的名字(name)
const example = source.pluck('name')
// 输出:"Joe", "Sarch"
const subscribe = example.subscribe(val => console.log(val))

const sourceTwo = Rx.Observable.from([
  { name: 'Joe', age: 30, job: { title: 'Developer', language: 'JavaScript' }},
  // 如果没有找到 `job` 的话,就会返回 undefined
  { name: 'Sarah', age: 25 },
])
// 抓取在 `job` 里面的 `title`
const exampleTwo = sourceTwo.pluck('job', 'title')
// 输出:'Developer', undefined
const subscribeTwo = exampleTwo.subscribe(val => console.log(val))

publish

signature: publish() : ConnectableObservable

什么事情都不做,直到调用 connect ,共享 source 。

(demo | docs)

const source = Rx.Observable.interval(1000)
const example = source
  // side effect
  .do(() => console.log('Do something'))
  // 直到调用 connect() ,否则什么事情都不会发生
  .publish()
  
/*
 * source 不会 emit 值,直到调用 connect 方法
 * 输出:(5s 后)
 * 'Do Something'
 * 'Subscriber One: 0'
 * 'Subscriber Two: 0'
 * 'Do something'
 * 'Subscriber One: 1'
 * 'Subscriber Two: 1'
 */
const subscribe = example.subscribe(val => console.log(val))
const subscribeTwo = example.subscribe(val => console.log(val))

// 5秒后调用 connect 方法,这会引起 source 开始 emit 值
setTimeout(() => {
  example.connect()
}, 5000)

race

signature: race(): Observable

让第一个 Observable emit 。

(demo | docs)

// 让第一个 Observable emit
const example = Rx.Observable.race(
  Rx.Observable.interval(1500),
  Rx.Observable.interval(1000).mapTo('1s won!'),
  Rx.Observable.interval(2000),
  Rx.Observable.interval(2500)
)
// 输出:'1s won!'...'1s won!'...
const subscribe = example.subscribe(val => console.log(val))

repeat

signature: repeat(scheduler: Scheduler, count: number): Observable

指定 source 重复的次数。

(demo | docs)

// emit 'Repeat this!'
const source = Rx.Observable.of('Repeat this')
// 重复 source emit 的值3次
const example = source.repeat(3)
// 输出:Repeat this! ... Repeat this! ... Repeat this
const subscribe = example.subscribe(val => console.log(val))

const sourceTwo = Rx.Observable.interval(1000)
// 取前5个值,重复2次
const exampleTwo = source.take(5).repeat(2)
// 输出:0,1,2,3,4 ... 0,1,2,3,4
const subscribeTwo = exampleTwo.subscribe(val => console.log(val))

retry

signature: retry(number: number): Observable

当发生 error 的时候,指定 retry 的次数。

(demo | docs)

const source = Rx.Observable.interval(1000)
const example = source
  .flatMap(val => {
    // 抛出 error
    if (val > 5) {
      return Rx.Observable.throw('Errro!')
    }
    return Rx.Observable.of(val)
  })
  // retry 2 times on error
  .retry(2)
  
/*
 * 输出:
 * 0..1..2..3..4..5..
 * 0..1..2..3..4..5..
 * 0..1..2..3..4..5..
 * Error! Retried 2 times when quit!
 */
const subscribe = example
  .subscribe({
    next: val => console.log(val),
    error: val => console.log(`${va}: Retried 2 times when quit!`)
  })

retryWhen

signture: etryWhen(receives: notificationHandler, the: scheduler): Observable

在额外的逻辑的 retry 。

(demo | docs)

const source = RxObservable.interval(1000)
const example = source
  .map(val => {
    if (val > 5) {
      throw val
    }
    return val
  })
  .retryWhen(errors => errors
    .do(val => console.log(`Value ${val} was too high!`)
    .delayWhen(val => Rx.Observable.timer(val * 1000))
  )
  
/*
 * 输出:
 * 0
 * 1
 * 2
 * 3
 * 4
 * 5
 * Value 6 was too high!
 * ... wait 5s then repeat
 */
const subscribe = example.subscribe(console.log)

sample

signature: sample(sampler: Observable): Observable

当给定的 Observable emit 的时候,返回 source 最新的一个样本(sample)。

(demo | docs)

// 每隔1s emit 值
const source = Rx.Observable.interval(1000)
// 每隔2s ,返回 source 最新的值
const example = source.sample(Rx.Observble.interval(2000))
// 输出:2..4..6..8
const subscribe = example.subscribe(val => console.log(val))


const sourceTwo = Rx.Observable.zip(
  Rx.Observable.from(['Joe', 'Frank', 'Bo']),
  Rx.Observable.interval(2000)
)
// 每隔2.5s ,返回 sourceTwo 的最新值
const exampleTwo = sourceTwo.sample(Rx.Observable.interval(2500))
// 输出:['Joe', 0]...['Frank', 1]...
const subscribeTwo = exampleTwo.subscribe(console.log)

scan

signature: scan(accumulator: function, seed: any): Observable

reducer / 累加器

(demo | docs)

const testSubject = new Rx.Subject()
// 最简单的 scan 的例子,从零开始累加
const basicScan = testSubject
  .startWith(0)
  .scan((acc, curr) => acc + curr)
// 输出累加后的值
const subscribe = basiScan.subscribe(val => console.log('Accumulated total: ', val))

testSubject.next(1) // 1
testSubject.next(2) // 3
testSubject.next(3) // 6

const testSubjectTwo = new Rx.Subject()
const objectScan = testSubject.scan((acc, curr) => Object.assign({}, acc, curr), {})
const subscribe = objectScan.subscribe(val => console.log('Accumulated object: ', val))

testSubject.next({ name: 'Joe' }) // { name: 'Joe' }
testSubject.next({ age: 30 }) // { name: 'Joe', age: 30 }
testSubject.next({ favoriteLanguage: 'JavaScript' }) // { name: 'Joe', age: 30, favoriteLanguage: 'JavaScript' }

share

signature: share(): Observable

在多个 subscriber 中共享 observable 。

(demo | dcos)

const source = Rx.Observable.timer(1000)
const example = source
  .do(() => console.log('*** SIDE EFFECT ***'))
  .mapTo('*** RESULT ***')
  
/*
 * 如果不 share 的话,SIDE EFFECT 会执行两次
 * 输出:
 * *** SIDE EFFET ***
 * *** RESULT ***
 * *** SIDE EFFET ***
 * *** RESULT ***
*/
const subscribe = example.subscribe(console.log)
const subscribeTwo = example.subscribe(console.log)

// 在 subscriber 中共享 observable
const shareExample = example.share()

/*
 * 如果 share 的话,SIDE EFFECT 只会执行一次
 * 输出:
 * *** SIDE EFFECT ***
 * *** RESULT ***
 * *** RESULT ***
 */
const subscribeThree = shareExample.subscribe(console.log)
const subscribeFour = shareExample.subscribe(console.log)

single

signature: signature: single(a: Function): Observable

emit 符合条件的一个单独的值。

(demo | docs)

// emit (1,2,3,4,6)
const source = Rx.Observable.of([1,2,3,4,5])
// emit 符合条件的一个 value 
const example = source.single(val => val === 4)
// 输出:4
const subscribe = example.subscribe(console.log)

skip

signature: skip(the: Number): Observable

跳过指定数量的 emitted value 。

(demo | docs)

const source = Rx.Observable.interval(1000)
// 跳过前5个 emitted value 
const example = source.skip(5)
// 输出:5..6..7..8...
const subscribe = example.subscribe(console.log)

skipUntil

signature: skipUntil(the: Observable): Observable

跳过 source emit 的值,直到 inner Observable emit 。

(demo | docs)

const source = Rx.Observable.interval(1000)
// 跳过 source emit 的值,直到 inner Observable emit 。
const example = source.skipUntil(Rx.Observable.timer(6000))
// 输出:5...6...7...8...
const subscribe = example.subscribe(console.log)

skipwhile

signature: skipWhile(predicate: Function): Observable

跳过 source emit 的值,直到给定的条件为 false 。

(demo | docs)

const source = Rx.Observable.interval(1000)
const example = source.skipWhile(val => val < 5)
// 输出:5...6...7...8.......
const subscribe = example.subscribe(console.log)

startWith

signature: startWith(an: Values): Observable

指定第一个 emit 的值。

(demo | docs)

const source = Rx.Observable.of([1, 2, 3])
const example = source.startWith(0)
// 输出:0,1,2,3
const subscribe = example.subscribe(console.log)

const sourceTwo = Rx.Observable.of('World! ', 'Goodbye', 'World!')
const exampleTwo = sourceTwo
  .startWith('Hello')
  .scan((acc, curr) => `${acc} ${curr}`)
  
// 输出:
// Hello
// Hello Wrold!
// Hello World! Goodybe
// Hello World! Goodbye World!
const subscribe = exmapleTwo.subscribe(console.log)

switchMap

signature: switchMap(a: Observable): Observable

当 source emit 的时候,切换到 inner Observable ,并且 emit 他已经 emit 过的值。

(demo | docs)

const source = Rx.Observable.timer(0, 5000)
// 切换到 inner Observable ,emit 已经 emit 过的值
const example = source.switchMap(() => Rx.Observable.interval(500))
// 输出:0,1,2,3,4,5,6,7,8,9.....0,1,2,3,4,5,6,7,8,9
const subscribe = example.subscribe(console.log)

const sourceTwo = Rx.Observable.fromEvent(document, 'click')
// 如果下一个 click 在 3s 内发生的话,不会产生新的 msg('Hello, I made it!')
const exampleTwo = sourcwTwo.switchMap(val => Rx.Observable.interval(3000).mapTo('Hello, I made it!'))
// 输出:(click)...3s...'Hello, I made it!'...(click)...2s(click)...
const subscribeTwo = exampleTwo.subscribe(console.log)

window

signature: window(windowBoundaries: Observable): Observable

类似于 buffer ,但是返回的是 nested Observable 。

(demo | docs)

// 马上 emit ,然后每隔1s emit
const source = Rx.Observable.timer(0, 1000)
const example = source
  .window(Rx.Observable.interval(3000))
  
const count = example.scan((acc, curr) => acc + 1, 0)
  
/*
 * window 1:
 * 0
 * 1
 * 2
 * window 2:
 * 3
 * 4
 * 5
*/
const subscribe = count.subscribe(val => console.log(`Window ${val}:`))
const subscribeTwo = example.mergeAll().subscribe(val => console.log(val))

windowCount

signature: windowCount(windowSize: number, startWindowEvery: number): Observable

source emit 的值是 Observable ,emit 的间隔是指定的时间。

(demo | docs)

// 每隔1s emit
const source = Rx.Observable.interval(1000)
const example = source
  // 每隔4个值就开始新的 window
  .windowCount(4)
  .do(() => console.log('NEW WINDOW!'))
  
const subscribeTwo = example
  // window emit 的是 nested Observable
  .mergeAll()
  .sbuscribe(console.log)
/*
 * 输出:
 * NEW WINDOW!
 * 0
 * 1
 * 2
 * 3
 * NEW WINDOW!
 * 4
 * 5
 * 6
 * 7
*/

windowTime

signature: windowTime(windowTimeSpan: number, windowCreationInterval: number, scheduler: Scheduler): Observable

bufferTime 一样,除了 emit 的值是 nested Observable 而不是一个 array 。

(demo | docs)

const source = Rx.Observable.timer(0, 1000)
const example = source
  // 每隔3s开始一个新的 window
  .windowTime(3000)
  .do(() => console.log('New Window!'))
  
const subscribe = example
  // window emit 的值为 nested Observable
  .mergeAll()
  
/*
 * 输出
 * New Window:
 * 0
 * 1
 * 2
 * New Window:
 * 3
 * 4
 * 5
 */
 .subscribe(console.log)

windowToggle

signature: windowToggle(openings: Observable, closingSelector: function(value): Observable): Observable

bufferTime 一样,除了 emit 的值是 nested Observable 而不是一个 array 。

(demo | docs)

const source = Rx.Observable.timer(0, 1000)
// 在第5s的时候 toggle window
const toggle = Rx.Observable.interval(5000)
const example = source
  // 每隔5s 打开 window
  .windowToggle(toggle, val => Rx.Observable.interval(val * 1000))
  .do(() => console.log('New Window!'))
  
const subscribe = example
  // window emit 的值为 nested Observable
  .mergeAll()
  
/*
 * New Window!
 * New Window!
 * 10
 * New Window!
 * 15
 * 16
 * ...
 */

windowWhen

signature: windowWhen(closingSelector: function(): Observable): Observable

bufferWhen 一样,除了 emit 的值是 nested Observable 而不是一个 array 。

(demo | docs)

const source = Rx.Observable.timer(0, 1000)
const example = source
  // 每隔5s就关闭wndow,并且 emit 从 source 中缓冲好的值
  .windowWhen(val => Rx.Observable.interval(5000))
  .do(() => console.log('New Window!'))
  
const subscribe = example
  .mergeAll()
  /*
   * New Window!
   * 0
   * 1
   * 2
   * 3
   * 4
   * New Window!
   * 5
   * 6
   * 7 
   * 8
   * 9
   */
    .subscribe(console.log)

withLatestFrom

signature: withLatestFrom(other: Observable, project: Function): Observable

当 source emit 的时候,同时也返回另一个 Obserable 最近 emit 的那个值。

(demo | docs)

const source = Rx.Observable.interval(5000)
const secondSource = Rx.Observable.interval(1000)
const example = source
  .withLatestFrom(secondSource)
  .map(([first, second]) => `First Source(5s): ${first} Second Source(1s): ${second}`)
  
/*
 * First Source(5s): 0 Second Source(1s): 4
 * First Source(5s): 1 Second Source(1s): 9
 * First Source(5s): 2 Second Source(1s): 14
 */
const subscribe = example.subscribe(console.log)


// withLastest 比 source 慢
const exampleTwo = secondSource
  .withLatestFrom(source)
  .map(([first, second]) => `Source(1s): ${first} Latest from(5s): ${second}`)
  
/*
 * Source(1s): 4 Latest Lastest from(5s): 0
 * Source(1s): 5 Latest Lastest from(5s): 0
 * Source(1s): 6 Latest Lastest from(5s): 0
 */
const subscribeTwo = exampleTwo.subscribe(console.log)

zip

signature: zip(observables: *): Observable

等到所有的 Observable 都 emit 之后,才作为数组返回。

(demo | docs)

const sourceOne = Rx.Observable.of('Hello')
const sourceTwo = Rx.Observable.of('World')
const sourceThree = Rx.Observable.of('Goodbye')
const sourceFour = Rx.Observable.of('World!')
// 等到所有的 Observable 都 emit 之后,才把它们作为数组 emit 出去
const example = Rx.Observable
  .zip(
    sourceOne,
    sourceTwo.delay(1000),
    sourceThree.delay(2000),
    sourceFour.delay(3000)
  )
// 输出:['Hello', 'World', 'Goodbye', 'World!']
const subscribe = example.subscribe(console.log)

// 每隔1s emit
const interval = Rx.Observable.interval(1000)
// 当一个 Observable complete 后,再也不会 emit 任何值
const exampleTwo = Rx.Observable
  .zip(
    interval,
    iterval.take(2)
  )
// 输出:[0, 0]...[1,1]
const subscribeTwo = exampleTwo.subscribe(console.log)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment