- Reactive programming is a programming paradigm that focuses on data streams and the propagation of changes.
- RxJS is a library for reactive programming using Observables, to make it easier to compose asynchronous or callback-based code.
- Observables:
- An observable represents a stream of data that can be observed over time.
- Observables can emit multiple values over time.
- Observables can be synchronous or asynchronous.
- Observables represent invokable collections of future values or events.
- Observers:
- An observer is an object that listens to the observable stream.
- An observer can receive notifications of new values, errors, or completion.
- Subscriptions:
- A subscription is a connection between an observable and an observer.
- A subscription is created by calling the
subscribemethod on an observable. - A subscription can be used to unsubscribe from an observable.
-
- From arrays:
import { of } from 'rxjs'; const numbers = of(1, 2, 3, 4, 5); numbers.subscribe(x => console.log(x));
- From events:
import { fromEvent } from 'rxjs'; const button = document.getElementById('myButton'); const clicks = fromEvent(button, 'click'); clicks.subscribe(event => console.log(event));
- From promises:
import { from } from 'rxjs'; const promise = new Promise((resolve, reject) => { setTimeout(() => { resolve('Hello from Promise!'); }, 1000); }); const observable = from(promise); observable.subscribe(value => console.log(value));
- From interval:
import { interval } from 'rxjs'; const secondsCounter = interval(1000); secondsCounter.subscribe(n => console.log(`It's been ${n} seconds since subscribing!`));
- From arrays:
- Notes on the above examples:
- The observables are created using factory functions like
of,fromEvent,from, andinterval. - The observers are the arrow functions that receive the emitted values.
- The subscriptions are created by calling the
subscribemethod on the observables.
- The observables are created using factory functions like
- Creation:
- Transformation:
- Filtering:
- Combination:
- Multicasting:
- Error Handling:
- Utility:
- Conditional and Boolean:
map:filter:mergeMap:switchMap:concatMap:merge:combineLatest:forkJoin:reduce:reduceis an operator that applies an accumulator function over the source observable, and returns the accumulated result when the source completes.- Example:
import { of } from 'rxjs'; import { reduce } from 'rxjs/operators'; const numbers = of(1, 2, 3, 4, 5); const sum = numbers.pipe(reduce((acc, val) => acc + val, 0)); sum.subscribe(result => console.log(result)); // Output: 15
scan:scanis similar toreduce, but it emits the intermediate results of the accumulator function.- Example:
import { of } from 'rxjs'; import { scan } from 'rxjs/operators'; const numbers = of(1, 2, 3, 4, 5); const sum = numbers.pipe(scan((acc, val) => acc + val, 0)); sum.subscribe(result => console.log(result)); // Output: 1, 3, 6, 10, 15
share:shareis an operator that multicasts the source observable and shares the subscription among multiple subscribers.- Example:
import { interval } from 'rxjs'; import { take, share } from 'rxjs/operators'; const source = interval(1000).pipe(take(3), share()); source.subscribe(value => console.log(`Subscriber 1: ${value}`)); setTimeout(() => { source.subscribe(value => console.log(`Subscriber 2: ${value}`)); }, 1500);
shareReplay:shareReplayis similar toshare, but it repeats the prior emitted values for any later subscribers.- Example:
import { interval } from 'rxjs'; import { take, shareReplay } from 'rxjs/operators'; const source = interval(1000).pipe(take(3), shareReplay(1)); source.subscribe(value => console.log(`Subscriber 1: ${value}`)); setTimeout(() => { source.subscribe(value => console.log(`Subscriber 2: ${value}`)); }, 1500);
throwError:throwErroris an operator that creates an observable that emits an error.- Example:
import { throwError } from 'rxjs'; const error = throwError('This is an error message'); error.subscribe({ next: value => console.log(value), error: err => console.error('Error:', err) });
- Pipeable Operators:
- Pipeable operators are functions that can be chained together using the
pipemethod on an observable. - Pipeable operators are preferred over creation operators for better code readability and maintainability.
- Example:
import { of } from 'rxjs'; import { map, filter } from 'rxjs/operators'; const numbers = of(1, 2, 3, 4, 5); numbers.pipe( filter(value => value % 2 === 0), // Filter out odd numbers map(value => value * 2) // Double the value of each even number ).subscribe(result => console.log(result)); // Output: 4, 8
- Pipeable operators are functions that can be chained together using the
- All operators return a new observable and do not modify the source observable.
- Importing individual operators from
rxjs/operatorscan help reduce bundle size.
- Subjects:
- A
Subjectis a special type of observable in RxJS that allows for both emitting and subscribing to values. - Unlike regular observables,
Subjectsact as both an observable and an observer. Subjectsare often used to broadcast values to multiple subscribers.
- A
- Multicasting:
- When a
Subjectis used, multiple subscribers can receive the same emitted values from the subject. - This is useful for scenarios where multiple parts of an application need to respond to the same event or data stream.
- When a
- Key Differences:
- Observables:
- Push values to subscribers.
- Emit values only when subscribed to.
- Subjects:
- Can both emit values (as an observer) and allow multiple subscribers to listen to those values (as an observable).
- Allow for more flexible data sharing and event handling.
- Observables:
- Example of Using a Subject to Manage State:
// Product service class with a selectedProductSource Subject private selectedProductSource = new Subject<Product>(); // Exposes the Observable associated with a Subject. selectedProductChanges$ = this.selectedProductSource.asObservable();
- Explanation:
selectedProductSourceis aSubjectthat emitsProductobjects, used internally within a class to manage the selected product's state.selectedProductChanges$is an observable derived fromselectedProductSourceusing.asObservable(). It is publicly exposed to allow other parts of the application to react to changes in the selected product.- This pattern ensures that other parts of the application can subscribe to
selectedProductChanges$to be notified when a new product is selected, while maintaining control over when and howProductvalues are emitted.
- Use Case:
- This pattern is commonly used in services to manage state changes, such as when a user selects a different product in an e-commerce application. The selected product can be shared across multiple components using this subject-observable pattern.
- Explanation:
catchError:catchErroris an operator that intercepts errors emitted by the source observable and replaces them with a new observable or value.catchErrorcan return a new observable or value to continue the stream, or throw an error to terminate the stream.- Example:
// Return new observable to continue the stream import { of } from 'rxjs'; import { map, catchError } from 'rxjs/operators'; const source = of('a', 'b', 3, 'd'); source.pipe( map(value => value.toUpperCase()), catchError(error => of('X')) // Replace error with 'X' ).subscribe( value => console.log(value), error => console.log('Error:', error) ); // throw an error to terminate the stream source.pipe( map(value => value.toUpperCase()), catchError(error => { throw new Error('Custom Error') }) ).subscribe( value => console.log(value), error => console.log('Error:', error) );
retry:retryis an operator that resubscribes to the source observable when an error occurs, up to a specified number of times.- Example:
import { of } from 'rxjs'; import { map, catchError, retry } from 'rxjs/operators'; const source = of('a', 'b', 3, 'd'); source.pipe( map(value => value.toUpperCase()), catchError(error => of('X')), retry(2) // Retry up to 2 times ).subscribe( value => console.log(value), error => console.log('Error:', error) );
retryWhen:retryWhenis an operator that resubscribes to the source observable when an error occurs, based on a custom logic defined by a notifier observable.- Example:
import { of, timer } from 'rxjs'; import { map, catchError, retryWhen, delayWhen } from 'rxjs/operators'; const source = of('a', 'b', 3, 'd'); source.pipe( map(value => value.toUpperCase()), catchError(error => of('X')), retryWhen(errors => errors.pipe(delayWhen(() => timer(1000))) // Retry after a delay of 1 second ).subscribe( value => console.log(value), error => console.log('Error:', error) );
async:queue:animationFrame:
switchMap:mergeMap:concatMap:exhaustMap:
- Redux with RxJS: