RxJS

Caching with publishReplay() and refCount

export class ItemService {

    private cache$: Observable<Item[]>;
    constructor(private httpClient: HttpClient) { }

    // Get items from server | HTTP GET
    getItems(): Observable<Item[]> {
        if (!this.cache$) { // lazy
            this.items = this.httpClient.get(`${api_url}/items`).pipe(
                publishReplay(1), // cache the latest emitted
                refCount() //  keep  Observable alive if Subscribers > 0
            );
        }
        return this.cache$;
    }

    clearCache() {
        this.cache$ = null;
    }
}

Advanced Observable Caching with shareReplay() and time triggered expiry

export class ItemService {
  private cache$: Observable<Array<Item>>;

  constructor(private http: HttpClient) { }

  get items() {
    if (!this.cache$) {
      // Set up timer that ticks every 10 seconds
      const timer$ = timer(0, 10000);
      // For each tick make an http request to fetch new data
      this.cache$ = timer$.pipe(
        switchMap(_ => this.requestItems()),
        shareReplay(1) //  replay only most recent value, hence 1
      );
    }
    return this.cache$;
  }

  private requestItems() {
    return this.http.get<ItemResponse>(API_ENDPOINT).pipe(
      map(response => response.value)
    );
  }
}

Unsubscribe from subscriptions with takeUntil

Articles

Guidelines

  • Usually ubsubscribe not needed for Angular Services (which are singletons)

  • Also not needed for http-requests, as they only call onNext once and then they call onComplete.

  • Also not needed in DialogRef afterClosed

  • Needed for components etc. there’s a recommendation for the takeUntil() pattern from RxJs core team member Nicholas Jamieson and a tslint rule to help enforce

  • the general rule is that takeUntil should be the last operator in the sequence:

    export class BooksComponent implements OnDestroy, OnInit {
        private ngUnsubscribe = new Subject();
    
        ngOnInit() {
            this.booksService.getBooks()
                .pipe(
                   startWith([]),
                   filter(books => books.length > 0),
                   takeUntil(this.ngUnsubscribe)
                )
                .subscribe(books => console.log(books));
              // subscribe to other observables using same ngUnsubscribe subject
            }
    
        ngOnDestroy() {
            this.ngUnsubscribe.next();
            this.ngUnsubscribe.complete();
        }
    }
  • When you don’t expect any updates, you can use first() operator which "will complete after the first item is emitted from the observable." source on stackoverflow, so there’s no need to unbsubscribe

        this.linkService.getLinkMediaTypes$()
          .pipe(first())
          .subscribe(items => this.mediaTypes = items,
            err => this.logger.error(err),
            () => this.logger.debug('Complete')); // for demo will be executed after first fetch

Communication via EventBus (Mediator Pattern)

@Injectable()
export class EventBusService {
    private sub$ = new Subject();
    emit(event: EmitEvent) {
        this.sub$.next(event);
    }
    on(event: Events, action: any): Subscription {
        return this.sub$.pipe(
            filter((e: EmitEvent) => e.name === event),
            map((e: EmitEvent) => e.value)
        ).subscribe(action);
    }
}

export class SomeSevice {
    constructor(private bus: EventBusService) {}
    doSomething(item: Item) {
        this.bus.emit(new EmitEvent(Events.SomeEvent,item ));
    }
}

export class SomeComponent {
    item: Item;
    subs: Subscription;
    constructor(private bus: EventBusService) {}
    ngOnInit() {
        subs=this.bus.on(Events.SomeEvent,item => this.item = item)

    }
    ngOnDestory() {
        this.subs.unsubscribe();
    }
}
  • Subject: Send data to subscribed Observers. Previously emitted data is not sent to new Observers

  • BehaviorSubject: Sent last data value to new observers (replay with 1)

  • ReplaySubect: Already sent data can be replayed to new Observers

  • AsyncSubject: Emits the last and only the last value when sequence is complete

Naming reactive "things"

  • If the observable is all about the side effect, name it like you might name a function e.g. `performBounceAnimation$

  • Pluralize observables that could give you many values e.g. accounts$, Singular nouns for observables that just give you one value e.g. person$

  • $ is OPTIONAL (Finnish Notation postfix). do not include the $ like an “s” for pluralization, as it’s confusing

Expose Subject as Observable (but don’t wrap)

Don’t use Subject.asObservable() in Typescript, just cast as Observable see this dicussion and this

Dynamic Filtering with combineLatest

html
<input type="text" [formControl]="filterCtl" placeholder="Filter states...">
<ul> <li *ngFor="let state of filteredStates$ | async">{{state.name}}</li></ul>
ts
export class AppComponent implements OnInit {
  states$: Observable<State[]>;
  filteredStates$: Observable<State[]>;
  filterCtl: FormControl;
  filter$: Observable<string>;

  constructor() {}
  ngOnInit(): void {
    this.states$ = of(states);
    this.filterCtl = new FormControl('');
    this.filter$ = this.filterCtl.valueChanges.pipe(startWith(''));
    this.filteredStates$ = combineLatest(this.states$, this.filter$).pipe(
      map(([states, filterString]) =>
        states.filter(
          (state) =>
            state.name.toLowerCase().indexOf(filterString.toLowerCase()) !== -1
        )
      )
    );
  }
}

Watch valueChanges & trigger service call with switchMap

Every FormControl, FormGroup and FormArray supports valueChanges and statusChanges properties that return Observables!
Source Angular Book p299
this.myForm.get('coordinates').valueChanges.pipe(
    debounceTime(500),
    switchMap( coordinates => this.geoservice.getLocation(coordinates))
).subscribe(location => console.log(location))

Conditional switchmap based on result of first observable

RxJS iif() operator is a creation operator used to decide which observable will be subscribed at subscription time.

    dialogRef.afterClosed()
      .pipe(
        switchMap(dialogResult =>
          iif(() => (dialogResult as ConfirmDialogResult).confirmed
            , this.authService.removeMe$() //  if true
            , of({result: false}))   // if false
        )
      )
      .subscribe({
        next: (response) => console.log("response of observable 1 or 2"),
        error: (error) => console.log("error"),
        complete: () => console.log("error")
      });