ReplaySubject
ReplaySubject
is a variant of a Subject which keeps a cache of previous values emitted by a source observable and sends them to all new observers immediately on subscription. This behavior of replaying a sequence of old values to new subscribes is where the name for this type of a subject comes from. When an observer subscribes to a ReplaySubject, the subject begins by first emitting all values from the cache and then continues to emit any other items emitted by the source observable after the subscription. ReplaySubject
will replay the cached sequence of values even if the observer subscribes much later than the values were cached.
ReplaySubject
is similar to the BehaviorSubject in the way that it can send cached values to new subscribers, but instead of just one current value, it can record and replay a whole series of values.
There’s another more crucial difference. Once BehaviorSubject
receives the complete or the error notification and transitions into a stopped state, all subsequent subscriptions will only receive the complete or the error notification and will not receive the current value.
In contrast, even in the stopped state in case of a completion or an error on the source observable, ReplaySubject
still replays the cached values before sending the complete or the error error notification to new subsequent subscriptions.
When creating a ReplaySubject
you can specify how many values you want to store in the buffer (bufferSize) and the amount of time to hold a value in the buffer before removing it from it (windowTime). Both configurations may exist simultaneously.
So if you would like to buffer a maximum of 3 values, as long as the values are less than 2 seconds old, you could do so with a new ReplaySubject(3, 2000)
. Another way to think about windowTime
is to define it as the amount of time that has passed prior to a new subscription. In other words, the configuration above means “I want to store the last 3 values, that have been executed in the last 2 seconds prior to a new subscription”.
ReplaySubject
works in the following way:
- Create an internal subscriptions container
- When a new subscription occurs, add it to the container and replay the values from the cache if any to the corresponding observer
- When a source observable emits a new value or the method
next
is called on the subject, add the emitted value to the cache overriding earliest values if needed and send the new value to observers for all subscriptions - If the source observable completes or the method
complete
is called on the subject, set the state of the subject tostopped
and add the complete notification to the cache; send the complete notification to all existing subscriptions, remove them from the container - If the source observable throws an error or the method
error
is called on the subject, set the state of the subject tostopped
and add theerror
notification to the cache; send theerror
notification to all existing subscriptions, remove them from the container - If the subject is
stopped
, do not add any new subscriptions to the container and instead replay the values from the cache including complete or error notification immediately on subscription to the corresponding observer - If the
stopped
subject is subscribed to a new source observable, ignore the values from this source
The following diagram demonstrates this sequence of steps:
UsageLink to this section
ReplaySubject
is commonly used when you need to replay an event or a series of events. Since ReplaySubject
doesn’t need a default value as opposed to BehaviorSubject
, it’s a handy mechanism to use if an event may never even occur.
Imagine you lazy load a library that needs to process user events. Some events will occur before the library is loaded, so you’ll need to replay them to the library. To do that, simply create a ReplaySubject
, push events to it and let the library subscribe to it when loaded.
Here’s a code block that demonstrates this:
<>Copyconst events = setUpListeners(); emulateLibraryLoad(events); function emulateLibraryLoad(events) { setTimeout(() => { events.subscribe((event) => console.log(event)); }, 3000); } function setUpListeners() { const events = new ReplaySubject(); const clicks = fromEvent(document, 'click'); const spacebars = fromEvent(document, 'keyup').pipe(filter((event: any) => event.code === 'Space')); merge(clicks, spacebars).pipe( tap((event) => events.next(event)) ).subscribe(); return events.asObservable(); }