How to not create your RxJS Observables

Post Editor

In this article I explain how to not create your RxJS Observables.

4 min read
post

How to not create your RxJS Observables

In this article I explain how to not create your RxJS Observables.

post
post
4 min read

Last week a friend and college of mine was stuck with a problem.
An NgRx effect was subscribed to a stream of WebSocket messages, but the effect didn't receive any messages.
Though, we saw that the server was sending them and that they reached the client.

The problem wasn't the Effect, but the WebSocket stream that was wrongfully initialized.

The use case was to only establish the WebSocket connection for the users that had enough permissions to start a process.
The WebSocket was created to report the progress of this process to the rest of the users.

The simplified version looks like this:

<>Copy
stream$ = of({ status: 'idle' }) // and later... if (userCanEstablishConnection()) { this.stream$ = fromEvent(this.hub, 'ReportProgress') }

This doesn't work because the steam$ is reassignment to the "real" WebSocket stream after the Effect was initialized.
When the WebSocket stream emits a new progress value, the Effect doesn't receive the update because it is listening to of({ status: 'idle' })

So how do we solve this?
Mostly, the answer to that question when it comes to RxJS is to wrap the Observable inside another Observable.

Simplified reproduction
Link to this section

To reproduce this in a simple way, I created 2 streams.
One stream is listening for "a" keydown events, the second stream is listening to "b" keydown events.
At first we're interested in the "a" events, and when the button (toggle) is clicked, we only want to receive the "b" events.

<>Copy
// a helper to listen to keystrokes by key // will be used in all of the examples const fromKeydown = (filterKey: string) => fromEvent<KeyboardEvent>(document, 'keydown').pipe( map(e => e.key), filter(key => key === filterKey), scan((acc, key) => acc + ' ' + key, ''), ) // a toggle to switch between the two streams // will be used in all of the examples let toggle$ = fromEvent(document.querySelector('button'), 'click') // start listening to "a" keydowns let source = fromKeydown('a') // log keydown strokes source.subscribe(key => console.log('[wrong]', key)) // switch the stream on click toggle$.subscribe(() => { console.log('[wrong]', 'listening to b') source = fromKeydown('b') })
Content imageContent image

Implementation One: The Imperative Way
Link to this section

To stay in the imperative world we can re-create this if statement inside an outer Observable.
So, we start off by using the "a"-events and when the button is clicked, we switch the inner Observable to return the "b"-event stream.
In the code below we use a RxJS Subject to recreate the toggle.

<>Copy
// create the toggle const toggleSubject = new Subject<boolean>() // create an outer Observable based on toggleSubject let source2 = toggleSubject.pipe( // switch the inner stream based on the toggle switchMap(toggle => (toggle ? fromKeydown('b') : fromKeydown('a'))), ) // flip the toggle on button click toggle$.subscribe(() => { console.log('[imperative]', 'listening to b') toggleSubject.next(true) }) // log keydown strokes source2.subscribe(key => console.log('[imperative]', key)) // start the strean toggleSubject.next(false)
Content imageContent image

While this works, we can do better.

Implementation Two: Let's Think In Streams
Link to this section

Instead of re-creating a new stream with a Subject, why not re-use a stream?
The toggle$ stream is exactly what we need to switch between the two streams, and it's already there!

<>Copy
// when toggle$ receives a new value (on click) // switch to the "b"-stream let source3 = toggle$.pipe(switchMap(() => fromKeydown('b'))) // log keydown strokes source3.subscribe(key => console.log('[reactive]', key))
Content imageContent image

The above doesn't take the "a"-stream into account, it just creates the "b"-stream when the toggle emits a value.
For our use-case this was perfect, but if needed we can provide an initial value.

With an initial value
Link to this section

By using the startWith operator, we can start-off the stream with a single "a" value.

<>Copy
let source4 = toggle$.pipe( switchMap(() => fromKeydown('b')), startWith('a'), ) source4.subscribe(key => console.log('[reactive with initial value]', key))
Content imageContent image

With an initial stream
Link to this section

Or, if you're interested in the "a"-stream you can use the concat method
in combination with the takeUntil operator.
This will handle all streams sequentially.

For our code this means that it will first emit all the "a" events, and when the toggle is clicked it switches to the "b" events.

<>Copy
let source5 = concat( fromKeydown('a').pipe(takeUntil(toggle$)), fromKeydown('b'), ) source5.subscribe(key => console.log('[reactive with initial steam]', key))
Content imageContent image

Conclusion
Link to this section

By wrapping the Observable (the inner-Observable) in an other Observable (the outer-Observable), the reference to the Observable remains the same. In the Observable we forsee a way to switch between the two Observables.
This way, in our case, the NgRx Effect works as intended.

There are multiple operators that provide a solution to different use case, try the RxJS Operator Decision Tree to find your solution

Share

About the author

author_image

NgRx team member

author_image

About the author

Tim Deschryver

NgRx team member

About the author

author_image

NgRx team member

Looking for a JS job?
Job logo
Sr. Angular Developer (NgRx,RxJS)

SVTC IT

United States
Remote
$135k - $155k
More jobs

Featured articles

Angularpost
17 January 202323 min read
Improve page performance and LCP with NgOptimizedImage

Explore mechanisms of NgOptimizedImage directive to improve overall page performance, targeting especially the Largest Contentful Paint (LCP) metric from Core Web Vitals. Enhance pages, make the best user experience and improve the web.