Rx.js Operators, Part II

Post Editor

It’s been more than 2 years after I published my first article on Rx.js, and now it’s time to talk more about its operators. Lots of stuff has changed since then; we didn’t even have the .pipe method back when I wrote the first piece.

6 min read
post

Rx.js Operators, Part II

It’s been more than 2 years after I published my first article on Rx.js, and now it’s time to talk more about its operators. Lots of stuff has changed since then; we didn’t even have the .pipe method back when I wrote the first piece.

post
post
6 min read

It’s been more than 2 years after I published my first article on Rx.js, and now it’s time to talk more about its operators. Lots of stuff has changed since then; we didn’t even have the .pipe method back when I wrote the first piece.

In the meantime I used Rx.js more and more in the applications I’ve been working on, and the main insight I got from that experience is this:

Developers’ biggest problem with Rx.js is not knowing more operators

The amount of code that was made better just by using one or another (or a combination of) operators has been ridiculous. Of course, expecting someone to know all of them is an overkill (granted there are over a 100 operators in Rx.js), but at least understanding some of the important groups can make coding (especially in Angular) so much easier.

So, without a further ado, let’s dive in

debounceTime vs auditTime
Link to this section

If you ever tried to make an autocomplete input with Rx.js, then you probably heard about debounceTime: this operator, as the documentation says,

Emits a value from the source Observable only after a particular time span has passed without another source emission.

Which means, if we debounceTime(3000), for example, we will be getting notifications after the source has sort-of settled down for at least 3000 milliseconds. This is useful if we want to make a request for information from a server after the user finished typing, and don’t want to hit the server with useless requests before the user has actually finished typing. Here is an example:

<>Copy
const inputEl = document.querySelector('input'); fromEvent(inputEl, 'input') .pipe( debounceTime(300), map(event => event.target.value), switchMap(query => from(fetch(`https://some-url?q=${query}`))) ) .subscribe(console.log);

If we now remove the debounceTime call and type “Hello” really fast, we will make 5 requests to https://some-url?q=, 4 of which are completely unnecessary. But with debounceTime we will only hit the server after the user stops typing.

So far so good. But what is auditTime then and what use cases does it have?

As the documentations states, auditTime

Ignores source values for duration milliseconds, then emits the most recent value from the source Observable, then repeats this process.

This may feel a lot like debounceTime, but it really is not. While both of this functions ignore some of the emissions based on a time interval, there is a significant difference: debounceTime waits after each emission for a given time period, and if there are no new emissions, it will allow that last emission to pass; auditTime does not care about emissions; instead, it comes back every given interval, checks if there have been any emissions in between the previous checking and the current one, and if there are, it lets the last one of them to pass. Imagine having an Observable of Facebook messages coming from all your different friends. If you auditTime(3000) this Observable, and in 3 seconds Anthony and Julia send you a message in that order, then at the top of the checking you will receive a message from Julia, but the message from Anthony will be lost forever.

But what is a use case for this operator? Imagine a situation where we receive emissions from a source that emits very frequently, say, a WebSocket conveying live data on stock exchange rates fluctuations. Every time we receive an emission from that source, a chart representing stock values is being repainted, depending on new values, which is a costly operation. Because this rates can change very fast, and the user won’t notice the smallest fluctuations that take place in <500ms, we can auditTime it to only repaint the chart every 500 milliseconds:

<>Copy
observableFromSocket$ .pipe( auditTime(500), ) .subscribe(repaintChart);

scan vs reduce
Link to this section

Both of this functions are aggregating emitted values, for example, counting the sum of numbers in an Observable of an Array. The main difference is that reduce emits only once — as soon as the source Observable completes. So we can use it to calculate the average age of users, for example:

<>Copy
of(23,25,24, 25, 25, 25) .pipe( map(age => [age, 1]), reduce(([accAge, accCount], [age, count]) => [accAge + age, accCount + count]), map(([sum, count]) => sum / count) ) .subscribe(console.log);

In this example, we map ages of people to a tuple containing the age and the number 1 (which is the count). We then add up the ages and increment the count separately, still keeping the tuple, and after the observable completes, map the the result to sum / count, which is the average.

scan, on the other hand, emits each aggregated value. Every time it adds up a new number, an emission will fire. Here is a use case: imagine a webpage that displays donations from different people. Every time someone makes a donation, and Observable notifies us of the amount being donated. It does not show us the entire collected money, it only shows the amount of a new donation. So we essentially want to aggregate the donations as they arrive and update the DOM every time a new donation arrives:

<>Copy
donations$ .pipe( scan((acc, next) => acc + next), ) .subscribe(updateAmount);

distinct vs distinctUntilChange vs distinctUntilKeyChange
Link to this section

The name of the distinct operator speaks for itself — it only allows emissions that haven’t already happened. Here is a quick example:

<>Copy
of(1, 2, 3, 2, 4, 4) .pipe( distinct() ) .subscribe(console.log);

This example will log 1, 2, 3, 4 , omitting the duplicate emissions of 2 and 4.

What is a good use case for this operator? Mainly, disallowing repeating actions on the same data entry. For example, say we have a Subject which fires a product object every time the user deletes one. Of curse, we can only delete a product once, but user may click on the button twice before our request is fulfilled, so we may want to use only distinct values. In this case we want the distinct operator to work on the user id-s instead of the object references, but Rx.js has got us covered: the distinct operator can accept a mapping function which can help it determine by what part of the emission to make the distinction. Here is an example:

<>Copy
interface Product { id: number; title: string; } deleteProduct$ // a Subject emitting Products .pipe( distinct(product => product.id), ) .subscribe(product => ProductService.delete(product));

But what if we don’t want two subsequent identical emissions, but two identicals in the overall stream are acceptable? For example, say we have a game in which the player has to answer some questions from different fields of science. One can choose from math, physics, history, geography and literature. The user chooses a field of science and is presented with the question. But we have to ensure that the player must answer question from at least two different areas, so they cannot choose the same field twice in a row. Here is where distinctUntilChanged comes to play:

<>Copy
fieldsOfStudy$ .pipe( distinctUntilChanged(), ) .subscribe(field => presentQuestion(field));

This will now allow duplicates, but not one after another.

Of course, as with the distinct operator, we may provide a function to check distinction on a nested value rather then the emitted value itself. But Rx.js went as far as to provide a shorthand for this: distinctUntilKeyChanged. Here is how it works:

<>Copy
of<Product>( {id: 1, title: 'Chair'}, {id: 1, title: 'Chair'}, {id: 2, title: 'Table'}, {id: 1, title: 'Chair'}, ).pipe( distinctUntilKeyChanged('id'), map(product => product.title), ).subscribe(console.log);

This code will log “Chair, Table, Chair”.
Link to this section

takeWhile vs takeUntil
Link to this section

Sometimes we just need manual methods of stopping emissions from an Observable source. Actually, there are three possible scenarios:

  1. Take only a fixed amount of emissions
  2. Take emissions while they satisfy a constraint (example: read a list of numbers while they are lower than 5)
  3. Take emissions until some external event happens.

Actually, the names of the methods that Rx.js has for this are exactly descriptive of these scenarios. I am not going to cover the take operator, as it is pretty straightforward. As to the other two, here is a simple example:

<>Copy
of(1, 2, 3, 4, 5) .pipe( takeWhile(n => n < 4), ) .subscribe(console.log);

This will only log “1, 2, 3”. This behavior may seem similar to filter, but it is very important to understand that takeWhile actually unsubscribes from the source completely — filter on the other hand will just disallow some emissions. It will resume after some other emissions do satisfy the constraint. takeWhile, on the other hand, will cut off forever.

But what if I want to stop emissions based on some external event? Say, I want to log something every second, but only for 10 seconds? So I will need to stop the emissions after that amount of time?

We can actually achieve it by using takeWhile:

<>Copy
let stopped = false; setTimeout(() => stopped = true, 10000); interval(1000) .pipe( takeWhile(() => !stopped), ) .subscribe(console.log);

This works fine, but looks pretty terrible. As a matter of fact, Rx.js does provide a way of handling things like this: the takeUntil operator accepts an Observable, and will continue reading emissions until the said Observable emits some value. Thus we can use it by passing an Observable to it, which will fire in 10 seconds (using timer). Here is the code:

<>Copy
interval(1000) .pipe( takeUntil(timer(10000)), ) .subscribe(console.log);

This pattern is actually quite popular in Angular, when unsubscribing from Observables in our components. I elaborate on it in my article Harnessing the Power of Mixins in Angular.

See the reference section to more about takeWhile and takeUntil.

Conclusion
Link to this section

As I mentioned, Rx.js contains hundreds of operators, each of them accomplishing some fascinating features. I will continue talking about them in my follow-up articles.

Share

About the author

author_image

Senior Angular developer from Armenia. Passionate about WebDev, Football, Chess and Music

author_image

About the author

Armen Vardanyan

Senior Angular developer from Armenia. Passionate about WebDev, Football, Chess and Music

About the author

author_image

Senior Angular developer from Armenia. Passionate about WebDev, Football, Chess and Music

NxAngularCli
NxAngularCli
NxAngularCli

Featured articles

Angularpost
13 September 20218 min read
Tracking user interaction area

Explore one of the most complex pieces of Taiga UI — ActiveZone directive that keeps an eye on what region user is working with. It touches on low-level native DOM events API, advanced RxJS and Dependency Injection, ShadowDOM and more!

Angularpost
13 September 20218 min read
Tracking user interaction area

Explore one of the most complex pieces of Taiga UI — ActiveZone directive that keeps an eye on what region user is working with. It touches on low-level native DOM events API, advanced RxJS and Dependency Injection, ShadowDOM and more!

Read more
AngularpostTracking user interaction area

13 September 2021

8 min read

Explore one of the most complex pieces of Taiga UI — ActiveZone directive that keeps an eye on what region user is working with. It touches on low-level native DOM events API, advanced RxJS and Dependency Injection, ShadowDOM and more!

Read more
Angularpost
7 September 202122 min read
Designing Angular architecture - Container-Presentation pattern

Designing architecture could be tricky, especially in the agile world, where requirement changes are frequent. So your design has to support that and provides extendibility without the need for serious modification. In such cases, you will find the Container-Presentation pattern instrumental.

micro frontendspost
6 September 202125 min read
Taking micro-frontends to the next level

The micro-frontends concept has been out there for quite a while. We’ve been using this architecture in Wix since around 2013, long before it was even given this name. In this article I’d like to share some of the things we did in order to evolve the concept of developing big scale micro-frontends.

micro frontendspost
6 September 202125 min read
Taking micro-frontends to the next level

The micro-frontends concept has been out there for quite a while. We’ve been using this architecture in Wix since around 2013, long before it was even given this name. In this article I’d like to share some of the things we did in order to evolve the concept of developing big scale micro-frontends.

Read more
micro frontendspostTaking micro-frontends to the next level

6 September 2021

25 min read

The micro-frontends concept has been out there for quite a while. We’ve been using this architecture in Wix since around 2013, long before it was even given this name. In this article I’d like to share some of the things we did in order to evolve the concept of developing big scale micro-frontends.

Read more