Prerequisites: you should be familiar with RxJS basics, and higher-order-observables.

The Issues section in RxJS repo contains a lot of interesting information. And challenging tasks developers meet in their work, for example:

We are implementing a custom notification system for our web messenger app.

1. If a message comes, a user should see notification with details of who wrote a message. Notification disappears after 3s.

2. If user A wrote to me twice within these 3s, I should see only one notification. System starts counting 3s when user A writes, and would ignore subsequent messages from him(throttling), but AFTER 3s would again listen to messages from him.

3. If however some other user B writes to me within these 3s, I should see new notification, this time counting(for B only) from the moment I got first message from B.

Quite interesting, isn’t it? :-)


Before we start — just a few words about what is throttling (and the difference between throttling and debouncing). I will take a definition from throttleTime operator description:

throttleTime(durationMs, scheduler, config)
Emits a value from the source Observable, then ignores subsequent source values for duration milliseconds, then repeats this process.

throttleTime operator diagram from @Michael_Hladky.

*In a config param we can also specify whether throttling will be leading(default) or trailing, but here we will talk about leading throttling.

Don’t confuse throttling with debounce. Debounce works in a slightly different way — it waits a specified period of time after source LAST emission, then emits only the last source value (at the end of specified duration timespan) and after that again becomes ready to handle next emissions. If some other value was emitted before timespan is over - then we won't emit but wait again the whole timespan, and so on.
debounceTime operator diagram from @Michael_Hladky.

Now, with that in mind, we can start.


Packtpub.com and I prepared a whole RxJS course with many other details of how you can solve your every-day developer’s tasks with this amazing library. It can be interesting for beginners but also contains advanced topics. Take a look!


Without any throttling, our code will look like this:

let Rx = window['rxjs'];
let {from, of, asyncScheduler} = Rx;
let {mergeMap, filter, delay} = Rx.operators;
console.clear();

let notifications = [
  { userId: 1, name: 'A1', delay: 100 }, // should be shown
  { userId: 1, name: 'A2', delay: 1500 }, // shouldn't be shown
  { userId: 1, name: 'A3', delay: 2500 }, // shouldn't be shown
  { userId: 1, name: 'A4', delay: 3500 }, // should be shown
  { userId: 2, name: 'B1', delay: 200 }, // should be shown
  { userId: 2, name: 'B2', delay: 300 }, // shouldn't be shown
  { userId: 2, name: 'B3', delay: 3500 }, // should be shown
]
//mock source that emits notifications
let mockSource$ = from(notifications).pipe(
  mergeMap((notif) => {
    return of(notif).pipe(delay(notif.delay));
  }),
)

mockSource$.subscribe(showNotification);

//display notifications widget
let container =  document.querySelector('.container');
function showNotification(notif) {
 const newElem = document.createElement('div');
  newElem.classList.add('item');
  newElem.innerHTML = notif.name;
  container.appendChild(newElem);
  setTimeout(() => {newElem.remove()}, 800); // remove notification element
}
Notifications with noThrottling.js hosted with ❤ by GitHub

And work like this:

No throttling — try it in a codepen.

So what we need to do is to implement throttling for each separate user notification! This means using the only throttleTime operator will not help us here because it doesn’t recognize different users and handle all the notifications in the same manner.

What if we can somehow split source observable sequence to many observables using our selector function? And then just use throttleTime for each of such observables?
And this is where groupBy operator can help us.

groupBy(keySelector, elementSelector?, durationSelector?, subjectSelector?)
Groups the items emitted by an Observable according to a specified criterion, and emits these grouped items as GroupedObservables, one GroupedObservableper group.

So for our case it will work like this:

  1. Take keySelector function and apply it for source value
Given:
nextSourceValue = { userId: 1, name: ‘A1’, delay: 100 }
keySelector = (notif) => notif.userId
Result:
let GroupedObservablesIdentifier = keySelector(nextSourceValue)

2. Then we should check in an internal cache if we have respective Observable for this GroupedObservablesIdentifier. If not — create one and emit, if yes — just get it and emit to subscribers. Since groupBy emits Observables — we should use mergeMap to handle them.
With all that our solution will look like:

const selector = (x) => x.userId
const throttleTimeout = 3000;
source$
  .pipe(
        groupBy(selector),
        mergeMap((group$) => group$.pipe(throttleTime(throttleTimeout)))
      )
.subscribe(showNotification);
groupBy.js hosted with ❤ by GitHub

And it works as expected:

Here is a codepen

How ‘groupBy’ really works?

In a previous section we made some assumptions how groupBy operator may be implemented — now let's check how it really works by reading its source-code.

It is typical for RxJS built-in operator to be implemented with 3 main parts (you can read more here):

  1. The pure function “operatorName” (in our case — groupBy)
groupBy

2. “operatorName” function uses “‘operatorName’Operator” class, that creates a new Subscriber and subscribes to the source (‘GroupByOperator’ class in our case).

GroupByOperator

3. “‘operatorName’Operator” class usually creates subscriber with “‘operatorName’Subscriber” class (here — GroupBySubscriber). Subscriber class contains the main operator’s logic. Let's check it out.

Our first assumption was that the operator should calculate GroupedObservablesIdentifier using source value and keySelector. And here it is:

Let's check our second assumption — that operator should have a cache where it saves groupObservables and emits them or create new ones if needed:

OK, here is the cache — groupBy uses Map for it. And in line 170 it checks if we already have groupObservables for a respective group:

link

But what if groupObservables is not yet created?

  1. Then we create new Subject for this group (to be able to re-emit appropriate source value for a specific group) (line 184)
  2. Save this group Subject to cache (this.group) (line 185);
  3. Create new GroupObservable that is subscribed to group Subject. (line 186)
  4. Emit this GroupObservable (you remember — groupBy emits groupObservables) (line 187) — it will be emitted only once.

If we have group Subject in a cache — in that case we emit source value with this Subject (So respective groupObservable re-emit it to mergeMap operator(in our solution), and mergeMap operator will re-emit it to final subscribers).

Conclusion:

  1. groupBy operator is a powerful tool for separating source sequence on a few by a specified condition.
  2. Review RxJS issues to find more challenging topics :-)
  3. Want to dig more in RxJS sources? Check articles of Nate Lapinski, Nicholas Jamieson, and my other articles.
  4. Nice article: “RxJS switchMap, concatMap, mergeMap, exhaustMap” from angular-academy.com
  5. @Michael_Hladky is working on creating easy-to-understand RxJS marble diagrams — you can take a look and leave your feedback for him here.
  6. Have interesting findings in RxJS sources? Leave comments!
  7. Like this article? Let’s keep in touch on Twitter.

Starting from section 4 of my RxJS video course advances staff is reviewed — so if you familiar with RxJS already — you can find something useful for you as well: higher-order observables, anti-patterns, schedulers, unit testing, etc! Give it a try!