Angular 6 RxJS 6 Tutorial & Example

In this tutorial, we'll learn to use the RxJS 6 library with Angular 6. We'll learn about how to import the Observable class and the other operators. How to subscribe and unsbscribe from Observables, how to import and call operators and wrap them with the pipe() function. We'll also see how to use the async pipe to subscribe to Observables from templates. Finally we'll see how to use some popular pipeable operators such as map() and filter() and their new import paths in RxJS 6.

Angular uses observables, implemented with the RxJS library, for all asynchronous events. If you are using Angular CLI 6, RxJS 6 will be installed, by default, on your project. Otherwise you can install it via npm using:

$ npm install rxjs --save 

To be able to use the Observable class in your code, you first need to import it:

import { Observable} from 'rxjs';

This is the new import path in RxJS 6 which is different from RxJS 5.

Working with the HttpClient Module and Observables

The new Angular HttpClient works with Observables by default. Methods such as get(), post(), put() and delete() return an instance of the Observable interface. HTTP requests are only sent when we subscribe to the Observable.

This is an example of making an HTTP request

getItems(): Observable<Item[]> {
   return this.httpClient.get<Item[]>(this.itemUrl);
}

We suppose that we've injected the HttpClient class as httpClient.

Using Observable with AsyncPipe

Angular AsyncPipe subscribes to Observable and returns the emitted data. For example:

Let's suppose we have this method:

getItems(): Observable<Item[]> {
   this.items$ = this.httpClient.get<Item[]>(this.itemUrl);
}

The items$ variable is of type Observable<Item[]>.

After calling the getItems() method on the component we can use the async pipe in the component template to subscribe to the returned Observable:

<ul>
  <li *ngFor="let item of items$ | async" >

  </li>
</ul>  

Subscribing to Observables

Observables are used for better support of event handling, asynchronous programming, and handling multiple values. When you define an Observable to publish some values for a consumer, the values are not emitted until you actually subscribe to the Observable.

The Consumer that subscribes to the Observable keeps receiving values until the Observable is completed or the consumer unsubscribes from the observable.

Let's start by defining an observable that provides a stream of updates

Using the map() Operator

The map() operator is similar to the Array.map() method. It lets you map observable responses to other values. For example:

import { Observable} from 'rxjs';
import { map } from 'rxjs/operators';

getItems(): Observable<Array<any>> {

  return this.aService.getItems()
    .pipe(map(response => response.data));
} 

The getItems() method returns an Observable. We're using the map() operator to return the data property of the response object. The operator enables us to map the response of the Observable stream to the data value.

We import the pipeable operator map() from the rxjs/operators package and we use the pipe() method (which takes a variable number of pipeable operators) to wrap to wrap the operator.

Using the filter() Operator

The filter() operator is similar to the Array.filter() method. It lets you filter the observable stream and returns another observable. For example:

import { Observable} from 'rxjs';
import { filter } from 'rxjs/operators';

filter(): Observable<Array<any>> {

  return this.aService.getItems()
    .pipe(
      filter(response => response.code === 200));
}

We use the filter() operator to only emit a notification to observers of the observable stream when the status code of the HTTP response is 200.


comments powered by Disqus