RxJS
Caching with publishReplay() and refCount
Fastest way to cache for lazy developers — Angular with RxJS
Using publishReplay()
and refCount()
export class ItemService {
private cache$: Observable<Item[]>;
constructor(private httpClient: HttpClient) { }
// Get items from server | HTTP GET
getItems(): Observable<Item[]> {
if (!this.cache$) { // lazy
this.items = this.httpClient.get(`${api_url}/items`).pipe(
publishReplay(1), // cache the latest emitted
refCount() // keep Observable alive if Subscribers > 0
);
}
return this.cache$;
}
clearCache() {
this.cache$ = null;
}
}
Advanced Observable Caching with shareReplay() and time triggered expiry
Source: Advanced caching with RxJS code on stackblitz
export class ItemService {
private cache$: Observable<Array<Item>>;
constructor(private http: HttpClient) { }
get items() {
if (!this.cache$) {
// Set up timer that ticks every 10 seconds
const timer$ = timer(0, 10000);
// For each tick make an http request to fetch new data
this.cache$ = timer$.pipe(
switchMap(_ => this.requestItems()),
shareReplay(1) // replay only most recent value, hence 1
);
}
return this.cache$;
}
private requestItems() {
return this.http.get<ItemResponse>(API_ENDPOINT).pipe(
map(response => response.value)
);
}
}
Unsubscribe from subscriptions with takeUntil
Articles
-
6 Ways to Unsubscribe from Observables in Angular …
-
unsubscribe()
-
async pipe
-
take* operator (take(n), takeUntil(notifier), takeWhile(predicate))
-
-
first (similar to take(1) + takeWhile)
-
Angular: Implement Unsubscribe Decorator (TODO Checkout)
-
tslint custom rule to remind e.g. absence of ngOnDestroy hook
Guidelines
-
Usually ubsubscribe not needed for Angular Services (which are singletons)
-
Also not needed for http-requests, as they only call onNext once and then they call onComplete.
-
Also not needed in DialogRef afterClosed
-
Needed for components etc. there’s a recommendation for the
takeUntil()
pattern from RxJs core team member Nicholas Jamieson and a tslint rule to help enforce -
the general rule is that takeUntil should be the last operator in the sequence:
export class BooksComponent implements OnDestroy, OnInit { private ngUnsubscribe = new Subject(); ngOnInit() { this.booksService.getBooks() .pipe( startWith([]), filter(books => books.length > 0), takeUntil(this.ngUnsubscribe) ) .subscribe(books => console.log(books)); // subscribe to other observables using same ngUnsubscribe subject } ngOnDestroy() { this.ngUnsubscribe.next(); this.ngUnsubscribe.complete(); } }
-
When you don’t expect any updates, you can use
first()
operator which "will complete after the first item is emitted from the observable." source on stackoverflow, so there’s no need to unbsubscribethis.linkService.getLinkMediaTypes$() .pipe(first()) .subscribe(items => this.mediaTypes = items, err => this.logger.error(err), () => this.logger.debug('Complete')); // for demo will be executed after first fetch
Communication via EventBus (Mediator Pattern)
@Injectable()
export class EventBusService {
private sub$ = new Subject();
emit(event: EmitEvent) {
this.sub$.next(event);
}
on(event: Events, action: any): Subscription {
return this.sub$.pipe(
filter((e: EmitEvent) => e.name === event),
map((e: EmitEvent) => e.value)
).subscribe(action);
}
}
export class SomeSevice {
constructor(private bus: EventBusService) {}
doSomething(item: Item) {
this.bus.emit(new EmitEvent(Events.SomeEvent,item ));
}
}
export class SomeComponent {
item: Item;
subs: Subscription;
constructor(private bus: EventBusService) {}
ngOnInit() {
subs=this.bus.on(Events.SomeEvent,item => this.item = item)
}
ngOnDestory() {
this.subs.unsubscribe();
}
}
-
Subject: Send data to subscribed Observers. Previously emitted data is not sent to new Observers
-
BehaviorSubject: Sent last data value to new observers (replay with 1)
-
ReplaySubect: Already sent data can be replayed to new Observers
-
AsyncSubject: Emits the last and only the last value when sequence is complete
Naming reactive "things"
Source: Observables are “just functions”, but also collections… How do I name them? by Ben Lesh (RxJS Lead)
-
If the observable is all about the side effect, name it like you might name a function e.g. `performBounceAnimation$
-
Pluralize observables that could give you many values e.g.
accounts$
, Singular nouns for observables that just give you one value e.g.person$
-
$ is OPTIONAL (Finnish Notation postfix). do not include the $ like an “s” for pluralization, as it’s confusing
Expose Subject as Observable (but don’t wrap)
Don’t use Subject.asObservable()
in Typescript, just cast as Observable see this dicussion and this
Dynamic Filtering with combineLatest
Based on Dynamic filtering with RxJs and Angular forms — A tutorial, code on stackblitz fork
<input type="text" [formControl]="filterCtl" placeholder="Filter states...">
<ul> <li *ngFor="let state of filteredStates$ | async">{{state.name}}</li></ul>
export class AppComponent implements OnInit {
states$: Observable<State[]>;
filteredStates$: Observable<State[]>;
filterCtl: FormControl;
filter$: Observable<string>;
constructor() {}
ngOnInit(): void {
this.states$ = of(states);
this.filterCtl = new FormControl('');
this.filter$ = this.filterCtl.valueChanges.pipe(startWith(''));
this.filteredStates$ = combineLatest(this.states$, this.filter$).pipe(
map(([states, filterString]) =>
states.filter(
(state) =>
state.name.toLowerCase().indexOf(filterString.toLowerCase()) !== -1
)
)
);
}
}
Watch valueChanges & trigger service call with switchMap
Every FormControl, FormGroup and FormArray supports valueChanges and statusChanges properties
that return Observables!
|
this.myForm.get('coordinates').valueChanges.pipe(
debounceTime(500),
switchMap( coordinates => this.geoservice.getLocation(coordinates))
).subscribe(location => console.log(location))
Conditional switchmap based on result of first observable
RxJS iif() operator is a creation operator used to decide which observable will be subscribed at subscription time.
dialogRef.afterClosed()
.pipe(
switchMap(dialogResult =>
iif(() => (dialogResult as ConfirmDialogResult).confirmed
, this.authService.removeMe$() // if true
, of({result: false})) // if false
)
)
.subscribe({
next: (response) => console.log("response of observable 1 or 2"),
error: (error) => console.log("error"),
complete: () => console.log("error")
});