Skip to content

Instantly share code, notes, and snippets.

@evaldosantos
Last active October 4, 2018 18:31
Show Gist options
  • Save evaldosantos/25b10df2426bda257f1e343ebe0d34ba to your computer and use it in GitHub Desktop.
Save evaldosantos/25b10df2426bda257f1e343ebe0d34ba to your computer and use it in GitHub Desktop.
RxJS
RxJS is a library for composing asychronous and event-based programs by using observable sequences.
It combines the Observer pattern with the Iterator pattern and functional programming with collections.
Essential conecpts:
- Observable: (event stream)
represents the idea of an invokable collection of furure values or events.
- Observer (callback)
is a collection of callbacks that knows how to listen to values delivered by the Observable
- Subscription (execution)
represents the execution of an Observable, is primarily useful for cancelling the execution
- Operators
are pure functions that enable a function programming style of dealing with collections with operations like map, filter...
- Subject
Is the equivalent to an EventEmitter, and the only way of multicasting a value of event to multiple Observers.
- Schedulers
are centralized dispatchers to controlconcurrency, allowing us to coordinate when computattion happens
## Observable
Observables are lazy Push collections of multiple values.
The following is an observable that pushes the values 1,2,3 immediately (synchronously) whe subscribed,
and the value 4 after one second has passed since the subscribe call, than completes
var observable = Rx.Observable.create(observer => {
observer.next(1);
observer.next(2);
observer.next(3);
setTimeout(() => {
observer.next(4);
observer.complete();
}, 1000)
});
To invoque the Observable and see these values, we need to subscribe to it:
console.log('just before subscribe');
observable.subscribe({
next: x => console.log('got value', x),
error: err => console.log('something wrong occurred: ', err),
complete: () => console.log('done')
})
console.log('just after subscribe');
## Pull versus Push
Pull/Push are two different protocols that describes how a data producer can communicate with a data Consumer.
- Pull (Single: function, multiple: iterator)
The consumer determines when it receives data from the data Producer.
The producer itself is unaware of when the data will be delivered to the Consumer.
Producer is passive: produces data when requested
Consumer is active: decides when data is requested
- Push (single: promise, multiple: observable )
The producer determines when to send data to the consumer.
The consumer is unaware of when it will receive that data.
Producer is active: produces data at its own pace.
Consumer is passive: reacts to received data
## Observables as generalizations of functions
Observables are not like EventEmitters not are they like Promises for multiple values.
Observables are like functions with zero arguments, but generalize those to allow multiple values.
function foo() {
console.log('Hello');
return 42;
}
var x = foo.call();
console.log(x)
var y = foo.call()
console.log(y)
var foo = Rx.Observable.create(function(observer) {
console.log('Hello');
observer.next(42);
});
foo.subscribe({
x => console.log(x)
})
foo.subscribe({
y => console.log(y)
})
If you don't call the function, the console.log('Hello') won't happen.
Also with Observables, if you don't "call" it (with subscribe), the console.log('Hello') won't happen.
Plus, "calling" or "subscribing" is an isolated operation: two function calls trigger two separate side effects,
and two Observable subscribes trigger two separate side effects.
Subscribing to an Observable is analogous to calling a function.
Obsevables are able to deliver values either synchronously or asynchronously
The difference between an observable and a function is that an observable can return multiple values over time,
and function cannot.
Conclusion:
func.call() - means give one value synchronously
observable.subscribe() means give me any amount of values, either synchronously or asynchronously
# Anatomy of an Observable
Observables are created using Rx.Observable.create or a createion operator, are subscribed to with an Observer,
execute to deliver next / error / complete notification to the Observer, and their executiono may be disposed.
- Creating Observables
Rx.Observable.create is an alias for the Observable constructor, and it takes one argument: the subscribe function.
var observable = Rx.Observable.create(function subscribe(observer) {
var id = setInterval(() => {
observer.next('hi');
}, 1000);
});
Observables can be created with create, but suallly we use the so-called creation operators, like of, from, interval, etc
- Subscribing to Observables
The observable above can be subscribed to, like this:
observable.subscribe(x => console.log(x))
It is not a coincidence that observable.subscribe and subscribe in Observable.create(function subscribe(observer) { ... })
have the same name. In the library, they are differente, but for pratical purposes you can consider them conceptually equal.
This shows how subscribe calls are not shared among multiple Observers of the same Observable.
When calling observable.subscribe with an Observer, the function subscribe in Observable.create(function subscribe(observer) { ... })
is run for that given Observer. Each call to observable.subscribe triggers its own independent setup for that given Observer.
Subscribing to an observable is like calling a function , providing callbacks where the data will be delivered to.
- Executing the Observables
The code inside Obserbeble.create(function subscribe(observer) { ... }) represents an "Observable execution", a lazy computation that
only happens for each Observer that subscribes.
There are three types of values an Observable execution can deliver:
- Next notification: sends a value such a Number, a String, an Object, etc.
- Error notification: sends a javascript Error o exception
- Complete notification: does not send a value
Next notification are the most important and most common type: they represent actual data being delivered to an Observer.
Error and Complete notifications may happen only once during the Observable Execution, and there can only be either one of them.
These constraints are expressed best in the so-called Observable Grammar or Contract, written as a regular expression:
next*(error|complete)?
In an Observable Execution, zero to infinite Next notifications may be delivered. If either an Error or Complete notification
is delivered, then nothing else can be delivered afterwards.
It is a good idea to wrap any code in subscribe with try/catch block that will deliver an Error notification if it catches an exception
- Disposing Observables
Because Observable Exxecutions may be infinite, and it's common for an Observer to want to abort execution in fininte time,
we need an API for canceling an execution. Since each execution is exclusive to one Observer only, once the observer is done
receiving values, it has to have a way to stop the execution, in order to avoid wasting computation power or memory resources.
When observable.subscribe is called, the Observer gets attached to the newly created Observable execution. This call also
returns an object, the Subscription:
var subscription = observable.subscribe(x => console.log(x));
The subscription represents the ongoing execution, and has a minimal API which allows you to cancel that execution.
With subscription.unsubscribe() you can cancel the ongoing execution
Just llike observable.subscribe resembles Observable.create(function subscribe() { ... }), the unbsubscribe we return from subscribe
is conceptually equal to subscription.usubscribe. In fact, if we remove the ReactiveX types surrounding these concepts, we're left
with rather straightforrward JAVASCRIPT.
function subscribe(observer) {
var intervalID = setInterval(() => {
observer.next('hi');
}, 1000);
return function unsubscribe() {
clearInterval(intervalID);
};
}
var unsubscribe = subscribe({ next: (x) => console.log(x) });
unsubscribe();
### Observer
An observer is a consumer of values delivered by an observable. Observers are simplly a set of callbacks, one for each type of notifcation delivered by the observable? next, error, and complete:
var observer = {
next: x => console.log('Observer got a next value: ' + x),
error: err => console.log('Observer got an error' + err),
complete: () => console.log('Observer got a complete notification')
}
To use the Observer, provide it to the subscribe of an observable:
observable.subscribe(observer);
Observers in RxJS may also be partial. If you don't provide one of the callbacks, the execution of the Observable will still happen normally, except some types of notifications will be ignored, because they don't have a corresponding callback in the Observer.
var observer = {
next: x => console.log('Observer got a next value: ' + x),
error: err => console.log('Observer got an error', err);
When subscribing to an Observable, you may also just provide the callbacks as arguments, without being attached to an Observer object, for instance like this:
observable.subscribe(x => console.log('Observer got a next value: ' + x));
}
### Subscription
A Subscription is an object that represents a disposable resource, usually the execution of an Observable.
A Subscription has one important method, unsubscribe, that takes no argument and just disposes the resource held by the subscription.
Subscriptions can also be put together, so that a call to an unsubscribe() of one Subscription may unsubscribe multiple subscriptions. You can do this by "adding" one subscription into another.
Subscriptions also have a remove(otherSbuscription) method, in order to undo the addition of a child Subscription.
### Subject
An RxJS Subject is a special type of observable that allows values to be muticasted to many Observers.
While plain Observables are unicast (each subscribed Observer owns an independent execution of the Observable), Subjects are multicast.
Every subject is an Observable. given a Subject, you can subscribe to it, providing an Observer, which will start receiving values normally. from the perspective of the Observer, it cannot tell whether the observable execution is coming from a plain unicast Observable or a Subject.
Internally to the Subject, subscribe does not invoke a new execution that delivers values.
It simply registers the given Observer in a list of Observers, similarly to how addListener usually works in other libraries and languages.
Every Subject is an Observer.. It is an object with the methods next(v), error9e), and complete(). To feed a new value to the subject, just call next(thevalue) and it will be multicasted to the observers reistered to listen to the Subject.
var subject = new Rx.Subject();
subject.subscribe({
next: v => console.log('A', v)
})
subject.subscribe({
next: v => console.log('B', v)
})
subject.next(1)
Since a Subject is an Observer, this also means you may provide a subject as the argument to the subscribe of any Observable
var subject = new Rx.Subject()
subject.subscribe({
next: v => console.log('A', v)
})
subject.subscribe({
next: v => console.log('B', v)
})
var observable = Rx.Observable.from([1,2,3])
observable.subscribe(subject);
Withe the approach above, we essentially just converted a unicast Observable execution to multicast, through the subject. This demonstrates how Subjects are the only way of making any Observable execution be shared to multiple Observers.
#### Multicasted Observables
A "multicasted Observable" passes notification through a Subject which may have many subscribers, wheres a plan "unicast observable" only sends notifications to a single Observer.
A multicasted Observable uses a subject under the hood to make multiple Observers see the same Observable execution.
Under the hood, this is how the multicast operator works: Observers subscribe to an underlying Subject, and the subject subscribes to the source Observable.
var source = Rx.Observable.from([1,2,3]);
var subject = new Rx.Subject();
var multicasted = source.multicast(subject);
// These are, under the hood, `subject.subscribe({...})`;
multicasted.subscribe({
next: v => console.log('A', v);
});
multicasted.subscribe({
next: v => console.log('B', v);
});
// This is under the hood, `source.subscribe(subject)`;
multicasted.connect();
Multicast returns an Observable that looks like a normal Observable, but works like a subject when it comes to subscribing.
Multicast returns a Connectableobservable, which is simply an an Observable with the connect() method.
If we with to avoid explicit calls to connect(), we can use ConnectableObservable's refCount(0 method(reference counting), which returns an Observable that keeps track of how many subscribers it has. When the number of subscribers increases from 0 to 1, it will call connect() for us, which starts the shared execution. Only when the number of subscribers decreases from 1 to 0 will it be fully unsubscribed, stopping further execution.
### BehaviourSubject
One of the variants of Subjects is the BehaviourSubject, which has a notion of the "current value". It stores the latest value emitted to its consumers, and whenever a new Observersubscribes, it will immediately receive the "current value" from the BehaviourSubject.
BehaviourSubjects are useful for representing "values over time". For instance, an event stream of birthdays is a Subject, but the stream of a person's age would be a BehaviourSubject.
var subject = new Rx.BehaviorSubject(0); // 0 is the initial value
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject.next(1);
subject.next(2);
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
subject.next(3);
### ReplaySubject
A Replaysubject is similar to a behaviourSubject in that it can send old values to new subscribers, but it can also record a part of the Observable execution.
A replaySubject records multiple values from the Observable execution and replays them to new subscribers.
var subject = new Rx.ReplaySubject(3); // buffer 3 values for new subscribers
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
subject.next(5)
You can also specify a window time in milliseconds, bseides of the buffer size, to determine how old the recorded values can be.
### AsyncSubject
The AsyncSubject is a variant where only the last value of the Observable execution is sent to its observers, and only when the execution completes.
var subject = new Rx.AsyncSubject();
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
subject.next(5);
subject.complete();
### Operators
RxJS is mostly useful for its ooperatos, even though the Observable is the foundation.
Operators are methods on the Observable type, sucha as .map(), .filter(), .merge(), ets.
When called, they do not change the existing Observable instance.
Instead, they return a new Observable, whose subscription logic is based on the first Observable.
An operator is a function which creates a new Observable based on the current Observable. This is a pure operation: the previous Observable stays unmodified.
An operato is essentially a pure function which takes one Observer as input and generates another Observable as outpu. Subscribing to the output Observable will also subscribe to the input Observable.
#### Instance operator versus static operators
Typically when referring to operators, we assume instance operators, which are methods on Observable instances.
Rx.Observable.prototype.multiplyByTen = function multiplyByTen() {
var input = this;
return Rx.Observable.create(function subscribe(observer) {
input.subscribe({
next: (v) => observer.next(10 * v),
error: (err) => observer.error(err),
complete: () => observer.complete()
});
});
}
Instance operators are functions that use the this keyword to infer what is the input Observable.
var observable = Rx.Observable.from([1, 2, 3, 4]).multiplyByTen();
Statis Operators are functions attached to the Observable class directly. A static operator uses no this keyword internally, but instead relies entirely on its arguments.
Static operators are pure functions attached to the observable class, and usually are used to create observables form scratch.
The most common common type of static operators are the so-called Creation Operators.
Instead of trnsforming an input Observable to an output Observable, they simply take a non-observable argument, like a number, and create a new Observable.
### Scheduler
A scheduler controls when a subscription starts and when notifications are delivered. It consists of three components.
- A scheduler is a data structure. It knows how to store and queue tasks based on priority or other criteria
- A shceduler is an executiono context. It denotes where and when the taks is executed (e.g. immediately, or in another callback mechanism such as setTimeout or process.nexttick, or the animation frame).
- A Scheduler has a (virtual) clock. It provides a notion of "time" by a getter method now() on the scheduler. Tasks being shceduled on a particular scheduler will adhere only to the time denoted by that clock.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment