The essential concepts in RxJS which solve async event management are:
- Observable: represents the idea of an invokable collection of future values or events.
- Observer: is a collection of callbacks that knows how to listen to values delivered by the Observable.
- Subscription: represents the execution of an Observable, is primarily useful for cancelling the execution.
- Operators: are pure functions that enable a functional programming style of dealing with collections with operations like map, filter, concat, reduce, etc.
- Subject: is equivalent to an EventEmitter, and the only way of multicasting a value or event to multiple Observers.
- Schedulers: are centralized dispatchers to control concurrency, allowing us to coordinate when computation happens on e.g. setTimeout or requestAnimationFrame or others.
Observables are lazy Push collections of multiple values. Observables are like functions with zero arguments, but generalize those to allow multiple values. Observables have no shared execution and are lazy. Subscribing to an Observable is analogous to calling a Function. Observables are created using new Observable
. The Observable
constructor takes one argument: the subscribe
function.
import { Observable } from 'rxjs';
const observable = new Observable(function subscribe(subscriber) {
const id = setInterval(() => {
subscriber.next('hi');
}, 1000);
});
The Observable observable in the example can be subscribed to, like this:
observable.subscribe((x) => console.log(x));
When calling observable.subscribe
with an Observer, the function subscribe
in new Observable(function subscribe(subscriber) {...})
is run for that given subscriber. Each call to observable.subscribe
triggers its own independent setup for that given subscriber.
Subscribing to an Observable is like calling a function, providing callbacks where the data will be delivered to.
The code inside new Observable(function subscribe(subscriber) {...})
represents an "Observable execution", a lazy computation that only happens for each Observer that subscribes.
There are three types of values an Observable Execution can deliver:
- "Next" notification: sends a value such as a Number, a String, an Object, etc.
- "Error" notification: sends a JavaScript Error or exception.
- "Complete" notification: does not send a value.
The following is an example of an Observable execution that delivers three Next notifications, then completes:
import { Observable } from 'rxjs';
const observable = new Observable(function subscribe(subscriber) {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
});
Observables strictly adhere to the Observable Contract, so the following code would not deliver the Next notification 4:
import { Observable } from 'rxjs';
const observable = new Observable(function subscribe(subscriber) {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
subscriber.next(4); // Is not delivered because it would violate the contract
});
Each Observable must define how to dispose resources of that execution when we create the Observable using create()
. You can do that by returning a custom unsubscribe
function from within function subscribe()
.
import { Observable } from 'rxjs';
const observable = new Observable(function subscribe(subscriber) {
// Keep track of the interval resource
const intervalId = setInterval(() => {
subscriber.next('hi');
}, 1000);
// Provide a way of canceling and disposing the interval resource
return function unsubscribe() {
clearInterval(intervalId);
};
});
An Observer is a consumer of values delivered by an Observable. Observers are simply a set of callbacks, one for each type of notification delivered by the Observable: next, error, and complete.
const observer = {
next: x => console.log('Observer got a next value: ' + x),
error: err => console.error('Observer got an error: ' + err),
complete: () => console.log('Observer got a complete notification'),
};
observable.subscribe(observer);
Observers are just objects with three callbacks, one for each type of notification that an Observable may deliver. When subscribing to an Observable, you may also just provide the next callback as an argument, without being attached to an Observer object.
A Subject is like an Observable, but can multicast to many Observers. Subjects are like EventEmitters: they maintain a registry of many listeners. Every Subject is an Observable. Given a Subject, you can subscribe to it, providing an Observer, which will start receiving values normally. Internally to the Subject, subscribe does not invoke a new execution that delivers values. It simply registers the given Observer in a list of Observers, similarly to how addListener usually works in other libraries and languages. Every Subject is an Observer. It is an object with the methods next(v)
, error(e)
, and complete()
. To feed a new value to the Subject, just call next(theValue)
, and it will be multicasted to the Observers registered to listen to the Subject.
import { Subject } from 'rxjs';
const subject = new Subject<number>();
subject.subscribe({
next: (v) => console.log(`observerA: ${v}`),
});
subject.subscribe({
next: (v) => console.log(`observerB: ${v}`),
});
subject.next(1);
subject.next(2);
// Logs:
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2
-
Pipeable Operators: A Pipeable Operator is a function that takes an Observable as its input and returns another Observable. It is a pure operation: the previous Observable stays unmodified. Subscribing to the output Observable will also subscribe to the input Observable.
import { of, map } from 'rxjs'; of(1, 2, 3) .pipe(map((x) => x * x)) .subscribe((v) => console.log(`value: ${v}`)); // Logs: // value: 1 // value: 4 // value: 9
-
Scan operator: Useful for encapsulating and managing state. It's like reduce, but emits the current accumulation state after each update
observable.pipe(scan((previous, current) => previous + current, intialValue));
- You want a HOT observable when you don’t want to create your producer over and over again.
COLD is when your observable creates the producer:
// COLD
var cold = new Observable((observer) => {
var producer = new Producer();
// have observer listen to producer here
});
HOT is when your observable closes over the producer:
// HOT
var producer = new Producer();
var hot = new Observable((observer) => {
// have observer listen to producer here
});
-
Observables are just functions!: Observables are functions that tie an observer to a producer. That’s it. They don’t necessarily set up the producer, they just set up an observer to listen to the producer, and generally return a teardown mechanism to remove that listener. The act of subscription is the act of “calling” the observable like a function, and passing it an observer.
-
What’s a “Producer”?: A producer is the source of values for your observable. It could be a web socket, it could be DOM events, it could be an iterator, or something looping over an array. Basically, it’s anything you’re using to get values and pass them to
observer.next(value)
.
An observable is “cold” if its underlying producer is created and activated during subscription. This means, that if observables are functions, then the producer is created and activated by calling that function.
- creates the producer
- activates the producer
- starts listening to the producer
- unicast
The example below is “cold” because it creates and listens to the WebSocket inside of the subscriber function that is called when you subscribe to the Observable:
const source = new Observable((observer) => {
const socket = new WebSocket('ws://someurl');
socket.addEventListener('message', (e) => observer.next(e));
return () => socket.close();
});
So anything that subscribes to source
above, will get its own WebSocket instance, and when it unsubscribes, it will close()
that socket. This means that our source is really only ever unicast, because the producer can only send to one observer.
An observable is “hot” if its underlying producer is either created or activated outside of subscription.
- shares a reference to a producer
- starts listening to the producer
- multicast (usually)
If we were to take our example above and move the creation of the WebSocket outside of our observable it would become “hot”:
const socket = new WebSocket('ws://someurl');
const source = new Observable((observer) => {
socket.addEventListener('message', (e) => observer.next(e));
});
Now anything that subscribes to source
will share the same WebSocket instance. It will effectively multicast to all subscribers now. But we have a little problem: We’re no longer carrying the logic to teardown the socket with our observable. That means that things like errors and completions, as well as unsubscribe, will no longer close the socket for us.
From the first example above showing a cold observable, you can see that there might be some problems with having all cold observables all the time. For one thing, if you’re subscribing to an observable more than once that is creating some scarce resource, like a web socket connection, you don’t want to create that web socket connection over and over. It’s actually really easy to create more than one subscription to an observable without realizing it too. Let’s say you want to filter all of the “odd” and “even” values out of your web socket subscription. You’ll end up creating two subscriptions in the following scenario:
source.filter(x => x % 2 === 0)
.subscribe(x => console.log('even', x));
source.filter(x => x % 2 === 1)
.subscribe(x => console.log('odd', x));
- It’s an observable. It’s shaped like an observable, and has all the same operators.
- It’s an observer. It duck-types as an observer. When subscribed to as an observable, will emit any value you “next” into it as an observer.
- It multicasts. All observers passed to it via
subscribe()
are added to an internal observers list. - When it’s done, it’s done. Subjects cannot be reused after they’re unsubscribed, completed or errored.
- It passes values through itself. To restate #2, really. If you
next
a value into it, it will come out of the observable side of itself.
function makeHot(cold) {
const subject = new Subject();
cold.subscribe(subject);
return new Observable((observer) => subject.subscribe(observer));
}
You probably shouldn’t use any of the makeHot
functions above, and instead should use operators like publish()
and share()
. There are a lot of ways and means to make a cold observable hot, and in Rx there are efficient and concise ways to perform each of those things.
In RxJS 5, the operator share()
makes a hot, refCounted observable that can be retried on failure, or repeated on success. Because subjects cannot be reused once they’ve errored, completed or otherwise unsubscribed, the share()
operator will recycle dead subjects to enable resubscription to the resulting observable.