Last active
October 4, 2018 18:31
-
-
Save evaldosantos/25b10df2426bda257f1e343ebe0d34ba to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
RxJS | |
RxJS is a library for composing asychronous and event-based programs by using observable sequences. | |
It combines the Observer pattern with the Iterator pattern and functional programming with collections. | |
Essential conecpts: | |
- Observable: (event stream) | |
represents the idea of an invokable collection of furure values or events. | |
- Observer (callback) | |
is a collection of callbacks that knows how to listen to values delivered by the Observable | |
- Subscription (execution) | |
represents the execution of an Observable, is primarily useful for cancelling the execution | |
- Operators | |
are pure functions that enable a function programming style of dealing with collections with operations like map, filter... | |
- Subject | |
Is the equivalent to an EventEmitter, and the only way of multicasting a value of event to multiple Observers. | |
- Schedulers | |
are centralized dispatchers to controlconcurrency, allowing us to coordinate when computattion happens | |
## Observable | |
Observables are lazy Push collections of multiple values. | |
The following is an observable that pushes the values 1,2,3 immediately (synchronously) whe subscribed, | |
and the value 4 after one second has passed since the subscribe call, than completes | |
var observable = Rx.Observable.create(observer => { | |
observer.next(1); | |
observer.next(2); | |
observer.next(3); | |
setTimeout(() => { | |
observer.next(4); | |
observer.complete(); | |
}, 1000) | |
}); | |
To invoque the Observable and see these values, we need to subscribe to it: | |
console.log('just before subscribe'); | |
observable.subscribe({ | |
next: x => console.log('got value', x), | |
error: err => console.log('something wrong occurred: ', err), | |
complete: () => console.log('done') | |
}) | |
console.log('just after subscribe'); | |
## Pull versus Push | |
Pull/Push are two different protocols that describes how a data producer can communicate with a data Consumer. | |
- Pull (Single: function, multiple: iterator) | |
The consumer determines when it receives data from the data Producer. | |
The producer itself is unaware of when the data will be delivered to the Consumer. | |
Producer is passive: produces data when requested | |
Consumer is active: decides when data is requested | |
- Push (single: promise, multiple: observable ) | |
The producer determines when to send data to the consumer. | |
The consumer is unaware of when it will receive that data. | |
Producer is active: produces data at its own pace. | |
Consumer is passive: reacts to received data | |
## Observables as generalizations of functions | |
Observables are not like EventEmitters not are they like Promises for multiple values. | |
Observables are like functions with zero arguments, but generalize those to allow multiple values. | |
function foo() { | |
console.log('Hello'); | |
return 42; | |
} | |
var x = foo.call(); | |
console.log(x) | |
var y = foo.call() | |
console.log(y) | |
var foo = Rx.Observable.create(function(observer) { | |
console.log('Hello'); | |
observer.next(42); | |
}); | |
foo.subscribe({ | |
x => console.log(x) | |
}) | |
foo.subscribe({ | |
y => console.log(y) | |
}) | |
If you don't call the function, the console.log('Hello') won't happen. | |
Also with Observables, if you don't "call" it (with subscribe), the console.log('Hello') won't happen. | |
Plus, "calling" or "subscribing" is an isolated operation: two function calls trigger two separate side effects, | |
and two Observable subscribes trigger two separate side effects. | |
Subscribing to an Observable is analogous to calling a function. | |
Obsevables are able to deliver values either synchronously or asynchronously | |
The difference between an observable and a function is that an observable can return multiple values over time, | |
and function cannot. | |
Conclusion: | |
func.call() - means give one value synchronously | |
observable.subscribe() means give me any amount of values, either synchronously or asynchronously | |
# Anatomy of an Observable | |
Observables are created using Rx.Observable.create or a createion operator, are subscribed to with an Observer, | |
execute to deliver next / error / complete notification to the Observer, and their executiono may be disposed. | |
- Creating Observables | |
Rx.Observable.create is an alias for the Observable constructor, and it takes one argument: the subscribe function. | |
var observable = Rx.Observable.create(function subscribe(observer) { | |
var id = setInterval(() => { | |
observer.next('hi'); | |
}, 1000); | |
}); | |
Observables can be created with create, but suallly we use the so-called creation operators, like of, from, interval, etc | |
- Subscribing to Observables | |
The observable above can be subscribed to, like this: | |
observable.subscribe(x => console.log(x)) | |
It is not a coincidence that observable.subscribe and subscribe in Observable.create(function subscribe(observer) { ... }) | |
have the same name. In the library, they are differente, but for pratical purposes you can consider them conceptually equal. | |
This shows how subscribe calls are not shared among multiple Observers of the same Observable. | |
When calling observable.subscribe with an Observer, the function subscribe in Observable.create(function subscribe(observer) { ... }) | |
is run for that given Observer. Each call to observable.subscribe triggers its own independent setup for that given Observer. | |
Subscribing to an observable is like calling a function , providing callbacks where the data will be delivered to. | |
- Executing the Observables | |
The code inside Obserbeble.create(function subscribe(observer) { ... }) represents an "Observable execution", a lazy computation that | |
only happens for each Observer that subscribes. | |
There are three types of values an Observable execution can deliver: | |
- Next notification: sends a value such a Number, a String, an Object, etc. | |
- Error notification: sends a javascript Error o exception | |
- Complete notification: does not send a value | |
Next notification are the most important and most common type: they represent actual data being delivered to an Observer. | |
Error and Complete notifications may happen only once during the Observable Execution, and there can only be either one of them. | |
These constraints are expressed best in the so-called Observable Grammar or Contract, written as a regular expression: | |
next*(error|complete)? | |
In an Observable Execution, zero to infinite Next notifications may be delivered. If either an Error or Complete notification | |
is delivered, then nothing else can be delivered afterwards. | |
It is a good idea to wrap any code in subscribe with try/catch block that will deliver an Error notification if it catches an exception | |
- Disposing Observables | |
Because Observable Exxecutions may be infinite, and it's common for an Observer to want to abort execution in fininte time, | |
we need an API for canceling an execution. Since each execution is exclusive to one Observer only, once the observer is done | |
receiving values, it has to have a way to stop the execution, in order to avoid wasting computation power or memory resources. | |
When observable.subscribe is called, the Observer gets attached to the newly created Observable execution. This call also | |
returns an object, the Subscription: | |
var subscription = observable.subscribe(x => console.log(x)); | |
The subscription represents the ongoing execution, and has a minimal API which allows you to cancel that execution. | |
With subscription.unsubscribe() you can cancel the ongoing execution | |
Just llike observable.subscribe resembles Observable.create(function subscribe() { ... }), the unbsubscribe we return from subscribe | |
is conceptually equal to subscription.usubscribe. In fact, if we remove the ReactiveX types surrounding these concepts, we're left | |
with rather straightforrward JAVASCRIPT. | |
function subscribe(observer) { | |
var intervalID = setInterval(() => { | |
observer.next('hi'); | |
}, 1000); | |
return function unsubscribe() { | |
clearInterval(intervalID); | |
}; | |
} | |
var unsubscribe = subscribe({ next: (x) => console.log(x) }); | |
unsubscribe(); | |
### Observer | |
An observer is a consumer of values delivered by an observable. Observers are simplly a set of callbacks, one for each type of notifcation delivered by the observable? next, error, and complete: | |
var observer = { | |
next: x => console.log('Observer got a next value: ' + x), | |
error: err => console.log('Observer got an error' + err), | |
complete: () => console.log('Observer got a complete notification') | |
} | |
To use the Observer, provide it to the subscribe of an observable: | |
observable.subscribe(observer); | |
Observers in RxJS may also be partial. If you don't provide one of the callbacks, the execution of the Observable will still happen normally, except some types of notifications will be ignored, because they don't have a corresponding callback in the Observer. | |
var observer = { | |
next: x => console.log('Observer got a next value: ' + x), | |
error: err => console.log('Observer got an error', err); | |
When subscribing to an Observable, you may also just provide the callbacks as arguments, without being attached to an Observer object, for instance like this: | |
observable.subscribe(x => console.log('Observer got a next value: ' + x)); | |
} | |
### Subscription | |
A Subscription is an object that represents a disposable resource, usually the execution of an Observable. | |
A Subscription has one important method, unsubscribe, that takes no argument and just disposes the resource held by the subscription. | |
Subscriptions can also be put together, so that a call to an unsubscribe() of one Subscription may unsubscribe multiple subscriptions. You can do this by "adding" one subscription into another. | |
Subscriptions also have a remove(otherSbuscription) method, in order to undo the addition of a child Subscription. | |
### Subject | |
An RxJS Subject is a special type of observable that allows values to be muticasted to many Observers. | |
While plain Observables are unicast (each subscribed Observer owns an independent execution of the Observable), Subjects are multicast. | |
Every subject is an Observable. given a Subject, you can subscribe to it, providing an Observer, which will start receiving values normally. from the perspective of the Observer, it cannot tell whether the observable execution is coming from a plain unicast Observable or a Subject. | |
Internally to the Subject, subscribe does not invoke a new execution that delivers values. | |
It simply registers the given Observer in a list of Observers, similarly to how addListener usually works in other libraries and languages. | |
Every Subject is an Observer.. It is an object with the methods next(v), error9e), and complete(). To feed a new value to the subject, just call next(thevalue) and it will be multicasted to the observers reistered to listen to the Subject. | |
var subject = new Rx.Subject(); | |
subject.subscribe({ | |
next: v => console.log('A', v) | |
}) | |
subject.subscribe({ | |
next: v => console.log('B', v) | |
}) | |
subject.next(1) | |
Since a Subject is an Observer, this also means you may provide a subject as the argument to the subscribe of any Observable | |
var subject = new Rx.Subject() | |
subject.subscribe({ | |
next: v => console.log('A', v) | |
}) | |
subject.subscribe({ | |
next: v => console.log('B', v) | |
}) | |
var observable = Rx.Observable.from([1,2,3]) | |
observable.subscribe(subject); | |
Withe the approach above, we essentially just converted a unicast Observable execution to multicast, through the subject. This demonstrates how Subjects are the only way of making any Observable execution be shared to multiple Observers. | |
#### Multicasted Observables | |
A "multicasted Observable" passes notification through a Subject which may have many subscribers, wheres a plan "unicast observable" only sends notifications to a single Observer. | |
A multicasted Observable uses a subject under the hood to make multiple Observers see the same Observable execution. | |
Under the hood, this is how the multicast operator works: Observers subscribe to an underlying Subject, and the subject subscribes to the source Observable. | |
var source = Rx.Observable.from([1,2,3]); | |
var subject = new Rx.Subject(); | |
var multicasted = source.multicast(subject); | |
// These are, under the hood, `subject.subscribe({...})`; | |
multicasted.subscribe({ | |
next: v => console.log('A', v); | |
}); | |
multicasted.subscribe({ | |
next: v => console.log('B', v); | |
}); | |
// This is under the hood, `source.subscribe(subject)`; | |
multicasted.connect(); | |
Multicast returns an Observable that looks like a normal Observable, but works like a subject when it comes to subscribing. | |
Multicast returns a Connectableobservable, which is simply an an Observable with the connect() method. | |
If we with to avoid explicit calls to connect(), we can use ConnectableObservable's refCount(0 method(reference counting), which returns an Observable that keeps track of how many subscribers it has. When the number of subscribers increases from 0 to 1, it will call connect() for us, which starts the shared execution. Only when the number of subscribers decreases from 1 to 0 will it be fully unsubscribed, stopping further execution. | |
### BehaviourSubject | |
One of the variants of Subjects is the BehaviourSubject, which has a notion of the "current value". It stores the latest value emitted to its consumers, and whenever a new Observersubscribes, it will immediately receive the "current value" from the BehaviourSubject. | |
BehaviourSubjects are useful for representing "values over time". For instance, an event stream of birthdays is a Subject, but the stream of a person's age would be a BehaviourSubject. | |
var subject = new Rx.BehaviorSubject(0); // 0 is the initial value | |
subject.subscribe({ | |
next: (v) => console.log('observerA: ' + v) | |
}); | |
subject.next(1); | |
subject.next(2); | |
subject.subscribe({ | |
next: (v) => console.log('observerB: ' + v) | |
}); | |
subject.next(3); | |
### ReplaySubject | |
A Replaysubject is similar to a behaviourSubject in that it can send old values to new subscribers, but it can also record a part of the Observable execution. | |
A replaySubject records multiple values from the Observable execution and replays them to new subscribers. | |
var subject = new Rx.ReplaySubject(3); // buffer 3 values for new subscribers | |
subject.subscribe({ | |
next: (v) => console.log('observerA: ' + v) | |
}); | |
subject.next(1); | |
subject.next(2); | |
subject.next(3); | |
subject.next(4); | |
subject.subscribe({ | |
next: (v) => console.log('observerB: ' + v) | |
}); | |
subject.next(5) | |
You can also specify a window time in milliseconds, bseides of the buffer size, to determine how old the recorded values can be. | |
### AsyncSubject | |
The AsyncSubject is a variant where only the last value of the Observable execution is sent to its observers, and only when the execution completes. | |
var subject = new Rx.AsyncSubject(); | |
subject.subscribe({ | |
next: (v) => console.log('observerA: ' + v) | |
}); | |
subject.next(1); | |
subject.next(2); | |
subject.next(3); | |
subject.next(4); | |
subject.subscribe({ | |
next: (v) => console.log('observerB: ' + v) | |
}); | |
subject.next(5); | |
subject.complete(); | |
### Operators | |
RxJS is mostly useful for its ooperatos, even though the Observable is the foundation. | |
Operators are methods on the Observable type, sucha as .map(), .filter(), .merge(), ets. | |
When called, they do not change the existing Observable instance. | |
Instead, they return a new Observable, whose subscription logic is based on the first Observable. | |
An operator is a function which creates a new Observable based on the current Observable. This is a pure operation: the previous Observable stays unmodified. | |
An operato is essentially a pure function which takes one Observer as input and generates another Observable as outpu. Subscribing to the output Observable will also subscribe to the input Observable. | |
#### Instance operator versus static operators | |
Typically when referring to operators, we assume instance operators, which are methods on Observable instances. | |
Rx.Observable.prototype.multiplyByTen = function multiplyByTen() { | |
var input = this; | |
return Rx.Observable.create(function subscribe(observer) { | |
input.subscribe({ | |
next: (v) => observer.next(10 * v), | |
error: (err) => observer.error(err), | |
complete: () => observer.complete() | |
}); | |
}); | |
} | |
Instance operators are functions that use the this keyword to infer what is the input Observable. | |
var observable = Rx.Observable.from([1, 2, 3, 4]).multiplyByTen(); | |
Statis Operators are functions attached to the Observable class directly. A static operator uses no this keyword internally, but instead relies entirely on its arguments. | |
Static operators are pure functions attached to the observable class, and usually are used to create observables form scratch. | |
The most common common type of static operators are the so-called Creation Operators. | |
Instead of trnsforming an input Observable to an output Observable, they simply take a non-observable argument, like a number, and create a new Observable. | |
### Scheduler | |
A scheduler controls when a subscription starts and when notifications are delivered. It consists of three components. | |
- A scheduler is a data structure. It knows how to store and queue tasks based on priority or other criteria | |
- A shceduler is an executiono context. It denotes where and when the taks is executed (e.g. immediately, or in another callback mechanism such as setTimeout or process.nexttick, or the animation frame). | |
- A Scheduler has a (virtual) clock. It provides a notion of "time" by a getter method now() on the scheduler. Tasks being shceduled on a particular scheduler will adhere only to the time denoted by that clock. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment