AsyncSubject
AsyncSubject
is a variant of a Subject which keeps the last value emitted by a source observable before completion and sends it to all new subscriptions. AsyncSubject
needs to wait until the source observable completes before identifying the current value as the latest and only then emit it to existing or future subscribers.
This behavior means that you can always directly get the last emitted value from the AsyncSubject
even if the subscriber subscribes much later than the value was stored.
In a way, this AsyncSubject
is similar to how Promise works. The biggest difference between the two is that a Promise is always eager, meaning it will execute the function you pass to it immediately. With AsyncSubject
, however, you can control when you want to subscribe to a source observable. And because all observables are lazy, only when you subscribe the producer function in the source observable will be executed.
In case of an error on the source observable, AsyncSubject
will not emit the latest value to subscriptions. Instead, it will simply pass along the error notification from the source Observable to new subscriptions.
AsyncSubject
works in the following way:
- Create an internal subscriptions container
- When a new subscription occurs, add it to the container
- When a source observable emits a new value or the method
next
is called on the subject, set the emitted value as the current latest value overriding any existing value if any - If the source observable completes or the method
complete
is called on the subject, set the state of the subject tostopped
and send the current latest value alongside the complete notification to all existing subscriptions; remove the subscriptions from the container - If the source observable throws an error or the method
error
is called on the subject, set the state of the subject tostopped
and set theerror
notification as the current value; send theerror
notification to all existing subscriptions; remove the subscriptions from the container - If the subject is
stopped
, do not add any new subscriptions to the container. Instead, if the subject hasn’t been errored, send the current latest alongside complete or notification immediately on subscription to the corresponding observer, otherwise send the error notification without the latest vlue - If the
stopped
subject is subscribed to a new source observable, ignore the values from this source
The following diagram demonstrates this sequence of steps:
UsageLink to this section
AsyncSubject
is a great choice if you need to fetch and cache (one-shot) resources. When making a network request we’re generally interested in the final output, which is a response. And to get it we need to wait until the request is fully loaded at which point an observable stream completes.
Here’s an example that demonstrates that scenario:
<>Copyconst cache = {}; function getResource(url) { if (!cache[url]) { cache[url] = new AsyncSubject(); fetch(url) .then((response) => response.json()) .then((data) => { cache[url].next(data); cache[url].complete(); }); } return cache[url].asObservable(); } const url = 'https://api.mocki.io/v1/ce5f60e2'; getResource(url).subscribe((data) => console.log(data)); setTimeout(() => { // no request is made, data is served from the AsyncSubject's cache getResource(url).subscribe((data) => console.log(data)); }, 3000);