Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save ajcrites/55009a1c2245685bf47d44d0295a337c to your computer and use it in GitHub Desktop.
Save ajcrites/55009a1c2245685bf47d44d0295a337c to your computer and use it in GitHub Desktop.
import { Observable } from "rxjs/Observable";
import "rxjs/add/observable/interval";
import "rxjs/add/operator/map";
import "rxjs/add/operator/take";
import "rxjs/add/operator/startWith";
import "rxjs/add/operator/mergeMap";
import "rxjs/add/operator/concatMap";
import "rxjs/add/operator/switchMap";
import "rxjs/add/operator/exhaustMap";
const getValuesOnHalfSecondIntervalTimesIndex = oper => idx =>
Observable
.interval(idx * 50)
.take(5)
.map(val => `(${oper}) ${idx}: ${(val + 1) * idx}`);
["merge", "concat", "exhaust", "switch"].forEach(oper =>
Observable
.interval(100)
.startWith(-1)
.map(idx => idx + 2)
.take(5)[`${oper}Map`](getValuesOnHalfSecondIntervalTimesIndex(oper))
.subscribe(ev => console.log(ev)));
import { interval } from 'rxjs/observable/interval';
import {
map,
take,
startWith,
mergeMap,
concatMap,
switchMap,
exhaustMap,
} from 'rxjs/operators';
const getValuesOnHalfSecondIntervalTimesIndex = oper => idx =>
interval(idx * 50).pipe(
take(5),
map(val => `(${oper}) ${idx}: ${(val + 1) * idx}`),
);
[mergeMap, concatMap, switchMap, exhaustMap].forEach(oper =>
interval(100)
.pipe(
startWith(-1),
map(idx => idx + 2),
take(5),
oper(getValuesOnHalfSecondIntervalTimesIndex(oper.name)),
)
.subscribe(ev => console.log(ev)),
);
{
"babel": {
"env": {
"development": {
"presets": [
[
"env",
{
"targets": {
"node": true
}
}
]
]
}
}
},
"eslintConfig": {
"parser": "babel-eslint",
"env": {
"browser": true,
"commonjs": true,
"es6": true,
"node": true
},
"extends": "eslint:recommended",
"parserOptions": {
"sourceType": "module",
"ecmaVersion": 8
},
"rules": {
"indent": [
"error",
4,
{
"SwitchCase": 1
}
],
"linebreak-style": [
"error",
"unix"
],
"semi": [
"error",
"always"
],
"no-console": 0
}
},
"devDependencies": {
"@types/node": "^8.0.57",
"babel-cli": "^6.24.0",
"babel-eslint": "^7.2.1",
"babel-preset-env": "^1.3.2",
"ts-node": "^3.3.0",
"typescript": "^2.6.2"
},
"dependencies": {
"rxjs": "^5.5.0"
},
"scripts": {
"start": "babel-node ."
}
}

https://medium.com/@ExplosionPills/rxjs-switch-switchmap-and-other-map-operations-e8ccdfb7e5a9 is more up-to-date.

Exploring RxJS still feels like a jungle to me. Even when I think I understand something, I find out that I actually don't understand it. In my quest to truly understand it, I end up learning quite a lot more about some other topics or operations than the one I was originally trying to understand. This is generally a positive thing, but it still feels like traveling deep into a jungle to me.

Just today I was trying to learn how to use ngrx/store with ngrx/effects to use http requests with an ngrx/store-backed app. This introduced me to the RxJS Observable switchMap operator that I was not familiar with. The main question I came up with -- the question that I usually have when dealing with RxJS operators -- was

How would I know when to use this operator?

This led me into the jungle as I worked to unravel what switchMap actually did and to understand the difference between switch and switchMap. While trying to build a working example, I also finally managed to understand what other xMap operators do including mergeMap and concatMap. These require understanding their non-map counterparts, merge/mergeAll, etc. as well as the map operator. I'm going to talk about all of these.

Knowing Where to Look to Learn About RxJS

One key hiccup in my research was caused by the branching of the RxJS project into separate repositories:

Just to reiterate, Be very careful about the docs you are using. v4 and v5 operators can work very differently. I spent a lot of time trying to use v4 examples with v5.

This article uses v5 (5.3.0, the latest as of writing this, specifically)

There are tons and tons of articles and examples out there as well. Just be careful that they are consistent with the version you are trying to learn. I wasted a lot of time trying to get the .merge example from v4 to work with my v5 code.

A Brief Introduction to RxJS / Observables

Of course there are lots of other articles out there that explain RxJS and Observables. I particularly like this one about understanding Observable by building one.

Observables are very difficult to explain because even though their purpose is to be conceptually simple this actually makes them more complicated. I can say that they are "event streams," but this is not totally accurate either. In fact, I'm not even quite sure why it's not accurate. This Stackoverflow answer says of them:

The Observable object represents a push based collection.

I particularly like this explanation. Essentially an Observable is a container for event emission that an observer can attach to. In fact, RxJS has Observers, but this is often just done ad-hoc by using a .subscribe method on the Observable.

An Observable has a source, or multiple sources. When it receives data from that source, it broadcasts it to its observers. Often an Observable creates its own source as in Observable.interval (emit on a timer) and Observable.range (emit numbers synchronously). However, there are many ways to set the source of an Observable that have practical usefulness. These sources can include http requests, user events such as clicks, and much more.

Observables allow you to operate on the results of these emissions in a variety of ways as they come in. Observables come with a lot of useful functional operators such as .map and .reduce that you would be able to call on Arrays. Rather than operating on an array in memory, Observables work online in that the entire collection doesn't have to exist in memory for you to operate it. In fact, the collection can be infinite.

There are also a lot of Observable-specific operators that make it easier to work with observable-specific behavior ... particularly emitting observables in response to observable events. Confused yet? Maybe some examples will help.

Understanding the .map Operator

.map works on an Observable just like it works on an array: apply a projection function to the collection and get a collection as output.

[1, 2, 3].map(num => num * 2) // [2, 4, 6]

For Observables, an analogous operation would be:

Observable.of(1, 2, 3).map(num => num * 2)

However, observables do not emit unless they have an observer. Therefore you would have to call .subscribe before you started to get the doubled values.

Observable.of(1, 2, 3).map(num => num *2).subscribe(doubled => console.log(doubled))

This is an important concept to understand about Observables: they do not start emitting values unless they have an observer they can emit to. Pretty much every example of Observables that you see will include a .subscribe. In some cases such as with Angular, you can use the async pipe as in {{someObservable$ | async}} in which case there won't be a .subscribe in the application code, but it is still in the framework code.

The reason I bring this up is because it's helpful when you're writing your own Observable code trying to understand it: just don't forget to use .subscribe to get values out of your observable.

Observable.of(1, 2, 3).subscribe(x => console.log(x))
// 1
// 2
// 3

Just remember that you always need an Observable to subscribe to, and you have to call .subscribe and do something with the values that are emitted. If you're not building towards that, you may have to rethink how you're trying to use Observables.

Using .map to Operate on Values

The reason .map is significant is because it's often used to operate on data that's just been emitted from an Observable. This is the case even if you only expect one value to come from an Observable such as an Http response. In fact, .map used to be an alias for an operator called .select. Typically you think of .map as operating on a collection, but it's useful for operating on single values in this context as well. You can see this in Angular's http documentation:

this.http
    .get(url)
    .map(response => response.json())

this.http is an Observable, but it will only emit one value: the http response. Here we get the response body as a JavaScript object parsed from JSON. We could also do this at the subscriber level, but this transformation is so typical we might as well do it at the Observable level.

Returning Observables from .map -- A Collection of Observables!

It is very common to return an Observable from a .map projection. This results in a collection of Observables, or an Observable of Observables as it were. For example, you could do:

[1,2,3].map(x => Observable.of(x))
// [Observable<1>, Observable<2>, Observable<3>]

This will give you an array of Observables. Of course, with Observable being its own collection you could do this instead:

Observable.of(1, 2, 3).map(x => Observable.of(x)).subscribe(num$ => console.log(num$));
// Observable<1>
// Observable<2>
// Observable<3>

Now we might have a bit of a problem ... you might be expecting a logging of 1... 2... 3... but instead we're getting Observable logged. This is because an Observable is emitted. If we want the value from this inner -- or wrapped -- Observable, we would have to subscribe to it as well. We could do this:

Observable.of(1, 2, 3).map(x => Observable.of(x)).subscribe(num$ =>
  num$.subscribe(num => console.log(num))
);
// 1
// 2
// 3

By the way, it's a convention to end a variable with $, as in num$, when it contains an Observable or asynchronous value.

However, this requires the subscriber to handle its own manipulation of the data it's receiving from the events. We generally want to avoid this. It also looks ugly and confusing.

Instead, we want to flatten the observables in some way and get the values out. RxJS provides many ways to do this for a lot of different scenarios. We'll go over some of these and ultimately explain and reveal the uses for switch and switchMap.

Our Example Problem: A Timer That Creates Timers

As an illustration, take this example:

Start a timer. Every two seconds, this creates another timer. The lower-order timers will emit every half second multiplied by their index. They should emit a string containing their index counter multiplied by their index. Both the lower and higher order timers should emit at most 5 times.

There is more that we can do for this problem, but for now we'll start with this. Essentially we want output like:

1: 1    2: 2     3: 3     4: 4     5: 5
1: 2    2: 4     3: 6     4: 8     5: 10
1: 3    2: 6     3: 9     4: 12    5: 15
1: 4    2: 8     3: 12    4: 16    5: 20
1: 5    2: 10    3: 15    4: 20    5: 25

Emitting these values with .map

First things first, we have Observable.interval which emits an integer, or a counter, on the specified interval in milliseconds. If we do Observable.interval(2000).subscribe(num => console.log(num)) this will emit 012 (0 immediately, 1 after 2 seconds, and 2 after 4 seconds). This makes .interval ideal for creating our timers. We want to create other timers on a timer, so we'll use .map to do this:

Observable.interval(2000).map(num => Observable.interval(num * 500));

The .map projection operation is called when the outer timer emits its values. Thus, .map is called every 2 seconds so it creates a lower-order timer every 2 seconds.

The lower-order timer should emit every half second multiplied by its index, hence the num * 500. num is the index, which will be 0, 1, 2,... With this we have laid the groundwork for our solution.

Next up, we want to make sure we only emit 5 values for each timer. That is 5 timers for the higher-order, and 5 values for each lower-order.

Fortunately, this is very easy to do with the .take operator that Observables have. They will only emit up to the number of values you provide to .take. Once that many values have been emitted, the Observable will complete and stop emitting values even if its source could provides values.

Observable
  .interval(2000)
  .take(5)
  .map(num =>
    Observable
      .interval(num * 500)
      .take(5)
  );

Flattening Observables

If we wanted to log out the values we got from the lower-order timers the previous exampe, we could perform a nested subscribe. Instead we will use an Observable operation to flatten / unwrap / etc. the lower-order timers so we only need to subscribe to the higher-order timer.

We can do this using the merge operation. Specifically, we will use .mergeAll which combines all emitted observables into a single observable and emits their values.

Observable
  .interval(2000)
  .take(5)
  .map(num =>
    Observable
      .interval(num * 500)
      .take(5)
  ).mergeAll();

Now we can .subscribe to this first-order observable and get all of the values of the inner observables as they are emitted.

const timers = Observable /* code from above would go here */

// String concatenation because `process.stdout.write` doesn't like numbers
timers.subscribe(ev => process.stdout.write(ev + ""));

This prints out:

012340123401203140213...

These are the counters of the inner times. You might notice that they interleave as later timers take more than two seconds to emit all of their values. Newer timers get created before some older timers can finish emitting. No problem here, it's just something to note.

Note: we use .mergeAll instead of .merge. .merge is used to merge a second observable into the first, e.g. timer1.merge(timer2). The outcome is similar, but .mergeAll works on a collection of observables.

Flattening Observables While Mapping

It is common practice to create a collection -- specifically an Observable -- of Observables and then flatten them using something like .mergeAll. RxJS offers some convenient methods for doing both of these at the same time, typically with xMap operators. For example, you can use .mergeMap to combine .map and .mergeAll into one operation:

Observable
  .interval(2000)
  .take(5)
  .mergeMap(num =>
    Observable
      .interval(num * 500)
      .take(5)
  );

This is identical to the previous example, but we've combined the merge and map operations into one operator.

This works for a few other operators that emit values from collections of Observables, namely combineAll, concatAll, exhaust, mergeAll, and switch. There are also concatMap, exhaustMap, mergeMap, and switchMap which combine their corresponding operations while mapping.

You'll notice that I use the word flatten here or there to describe the concept of pulling values out of a collection of Observables. If you don't understand this, it might help to think of a data structure like Observable[<Observable<value>]. This is in essence a tree structure with an outer Observable that points to indexed Observables that point to values. If you were to pull the values up this would flatten the data structure -> Observable[<value>]. In point of fact, .mergeMap used to be called .flatMap.

Why You Have to Use .mergeAll instead of .merge

This took me some time to understand, and I believe this functionality has also changed between v4 and v5 which has the potential to make things even more confusing, but what it comes down to is use All operators on Collections of Observables. .map operates on a collection and returns a collection. This is true of Observables and other ordinal data structures. Using map allows you to return a collection of Observables. .merge and .concat operate on single Observables, not collections. That is, if we replace .mergeAll with .merge in our example from a little while back, it will attempt to merge an observable we could provide into the higher-order timer. It will not merge the lower-order timers together.

The xMap functions operate on collections as well. They will return a collection, but they flatten the obvservable or unwrap the values from the inner observable in order to create the collection. xAll does this too, but it does not give you the opportunity to do any other operations before flattening. You would use them to flatten a collection of Observables you already had.

Explanation of higher/lower/first-order Observables

The RxJS documentation makes reference to higher-order and first-order observables. Their explanations are not prominent, but the docs do specifically say

an Observable that emits Observables, also known as a higher-order Observable

So there you go: a higher-order Observable is an Observable that emits Observables. I've also been calling this a collection of Observables. Observables are collections.

"First-order" Observable is not defined explicitly in the documentation, but I would consider it to be an Observable that emits values that are not asynchronous primitives. An asynchronous primitive wraps a value. Essentially it is a data structure that either has or will eventually have a value. Observables and Promises are examples. first-order Observables will not emit these. They will emit other values such as numbers, strings, Objects, and so on.

I've been referring to "lower-order" Observables, but these are not actually defined in the documentation. I use this term to mean an Observable that is emitted by a higher-order Observable. They are a subset of first-order Observables.

Thus, "converting a higher-order Observable to a first-order Observable" as operations like .mergeAll and .concatAll do emit all values of a collection of Observables.

Continuing Our Example Problem

So far we've managed to create code that emits the counters of the inner timers, but the original ask was

[The lower-order timers] emit a string containing their index counter multiplied by their index

This is pretty simple to do: we have the index of the timer from the value emitted from the higher-order timer, and we can perform a .map on the lower-order timers to create this string from each counter value the lower-order timers themselves emit. Building on our previous example, we'll have:

Observable
  .interval(2000)
  .take(5)
  .mergeMap(higherCounter =>
    Observable
      .interval(higherCounter * 500)
      .take(5)
      .map(lowerCounter => higherCounter + ": " + lowerCounter * higherCounter)
);

This is mostly the same as the last code we had, but now we've added a projection that creates the output string we want. This will print

0: 0 (5 rows)
1: 0
1: 1
1: 2
1: 3
1: 4
2: 0
2: 2
2: 4
3: 0
2: 6 ...

This is more in line with what we want, but we can clean it up a little bit more. Multiplying everything by 0 makes the first timer relatively useless. Instead, let's go ahead and bump the values up by 1:

Observable
  .interval(2000)
  .take(5)
  .mergeMap(higherCounterIdx => {
    const higherCounterVal = higherCounterIdx + 1;
    Observable
      .interval(higherCounterVal * 500)
      .take(5)
      .map(lowerCounter => higherCounterVal + ": " + lowerCounter * higherCounterVal)
});

This will start the first timer emitting every half second instead of all at once and make the emitted values more interesting. We an also add lowerCounter + 1 to start the inner counter's index from 1 instead of 0.

We are operating on the higherCounterIdx and adding one to get a new value which we emit each time. A keen reader may pick up on the fact that this is a map operation. We can write this even more concisely as:

Observable
  .interval(2000)
  .map(idx => idx + 1)
  .take(5)
  .mergeMap(higherCounter =>
    Observable
      .interval(higherCounter * 500)
      .take(5)
      .map(lowerCounter => higherCounter + ": " + lowerCounter * higherCounter)
);

Optional: More Tricks For a Cleaner Experience

Our timer above works fine, but it does not start emitting values until after two seconds. Whether or not this is a problem is up to the user, but it is possible to emit an initial value from before .interval starts emitting by using .startWith. .startWith will emit a value immediately -- even before the interval starts emitting. We can use this to create a lower-order timer immediately instead of having to wait two seconds

Observable
  .interval(2000)
  .startWith(0)
  .map(idx => idx + 1)

However, this doesn't quite work as expected. Our output is:

1: 1
1: 2
1: 3
1: 4
1: 1
1: 5
1: 2

That is, we have two timers indexed at 1. This is because the first value emitted from .interval is 0. .startWith also emits a 0, so we end up with two 0's emitted.

We can get around this by starting at -1 instead: .startWith(-1). This also means we need to add 2 to the index. 1 will be the index of the first timer from our immediate -1 .startWith value. 2 will be the index from the first value emitted from the highe order timer, which is 0.

Observable
  .interval(2000)
  .startWith(-1)
  .map(idx => idx + 2)

This will work almost exactly the same as the code we had before, but now we don't have to wait for the first lower-order timer to start.

Different Flatten operations and finally explaining switch and switchMap

We have our timers working the way we want, and .mergeMap has served us well. We could use .concatMap, .exhaustMap, and .switchMap instead to get different effects. Let's talk about what each of these will do.

We'll use the example code:

Observable
  .interval(100)
  .startWith(-1)
  .map(idx => idx + 2)
  .take(5)
  [YOUR_OPERATOR_HERE](idx =>
    Observable
      .interval(idx * 50)
      .take(5)
      .map(val => idx + ": " + (val + 1) * idx)
  )
  .subscribe(ev => console.log(ev));

Here we create a timer every 10th of a second that emits values every 20th of a second. This interleaves values more frequently for some operators. Let's talk about them.

.mergeMap

We already know what the merge operation does: combines the lower-order observables and emits their values as they come. This is what we have been working with in our example.

.concatMap

This is very similar to merge, but it will not emit values from the next lower-order Observable in a collection until the previous one has completed. An example illustrates this clearly:

1: 1    1: 1
1: 2    1: 2
2: 2    1: 3
1: 3    1: 4
1: 4    1: 5
2: 4    2: 2
1: 5    2: 4
3: 3    2: 6
2: 6    2: 8
4: 4    2: 10
3: 6    3: 3
2: 8    3: 6

When you use concatenation instead of merging, the values from a timer will only be emitted when the previous timer has completed, i.e. counted five times.

.exhaustMap

If a lower-order timer is emitted before a previous timer has completed, it is discarded. This results in very straightforward output:

1: 1
1: 2
1: 3
1: 4
1: 5
5: 5
5: 10
5: 15
5: 20
5: 25

We can do the math here. The initial timer takes a quarter of a second to emit all of its values (50ms * 5). It is created instantly along with the second timer. The third timer is created at 100ms, and the fourth timer at 200ms. The first timer has not completed yet... it takes 250ms to complete. However, it does complete at 250ms and the fifth timer is created at 300ms. All of the interim timers were discarded, but the 5th timer emits because it was created there were no lower-order timers emitting anything.

.switchMap

This operation will discard any previous lower-order Observable when a new lower-order Observable is emitted. As the docs say, only the most recently created Observable is subscribed to. This generates this output:

1: 1
2: 2
5: 5 (this timer finishes)

You can think of newer timers as taking over for previous timers when they are created. A new timer is created every 100ms. The first and second timers can both emit their initial values in under 100ms, but the third and fourth timers cannot. They both require more than 100ms to emit a value, so a new timer supersedes them before they can even emit anything. The final timer does not get superseded by anything, so it gets to emit all its values.

I originally learned about .switchMap from the ngrx/effects example. In this case it is used to make sure that only the most recent search attempt dispatches to the action. If multiple searches are attempted, only the latest one is actually used as this is the one that is of interest to the user.

Difference between switch and switchMap

I originally wanted to write this article to learn and explain the difference between .switch and .switchMap, and instead I learned a lot more about RxJS and Observables, and higher-order and first-order Observables. The difference between .switch and .switchMap is very simple if you've been paying attention: .switchMap operates on an observable and returns a flattened Observable by returning inner Observables. .switch Operates on a collection of Observables. Our very last example will work exactly the same way if we write it like this:

Observable
  .interval(100)
  .startWith(-1)
  .map(idx => idx + 2)
  .take(5)
  .map(idx =>
    Observable
      .interval(idx * 50)
      .take(5)
      .map(val => idx + ": " + (val + 1) * idx)
  )
  .switch()
  .subscribe(ev => console.log(ev));

.switch Is Not Called .switchAll

You might have noticed that .switch operates similarly to .mergeAll, .concatAll. However, like .exhaust it doesn't have All in the name.

The reason for this is that .switch and .exhaust only emit the values of one lower-order Observable. If another lower-order Observable is emitted to an Observable that .switch or .exhaust is chained to, they will discard another Observable -- the one they are emitting or the one they received, respectively. Thus, All does not make sense semantically.

Conclusion

You can clone the gist that contains these examples combined and emits the timer index and counters along with the operator emitting them. Clone with git, use yarn install and yarn start to run it. You can pipe the output through something like grep as you see fit if you want to see the emissions of a particular operator, mess with the timers, and do anything you want to do.

I hope that these examples and explanations will help someone understand RxJS and Observables more. If I've said anything that's wrong or unclear, please feel free to leave a comment and let me know. If I'm on the right track but can improve with better examples or illustrations, I'd love to hear those suggestions too.

{
"compilerOptions": {
"target": "ES5",
"lib": ["ES2015"],
"experimentalDecorators": true
}
}
@sebastien-p
Copy link

Awesome, thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment