If you are not familiar with Schedulers in RxJS, there is a short overview in my article about queueScheduler. In short — asyncScheduler allows you emit each value in a separate macrotask (in terms of browser event-loop queue tasks). If you want to find out more about how the event-loop works — watch this wonderful video and read this great article.
If you want to apply a scheduler to an observable sequence you can do that in two ways:
2. You can apply it with the observeOn operator:
Both of these solutions make each separate data emission happen in a separate macrotask (so each emission is queued in the browser event-loop macrotasks queue).
But, they do that in a slightly different ways. Usually, you don’t have to worry about it, but in some specific cases this difference can be important.
Let’s review how each solution works in detail:
ofemits the first value according to the scheduler (the next macrotask in this example)
- It puts the next value to the scheduler queue (another macrotask)
- Steps 1 and 2 are repeated with each of the next emissions.
Since only next value emission is scheduled in the browser event-loop queue, theoretically there is possibility that other macrotasks can run between the values being produced.
What about the
of produces all the values, they are buffered by the
observeOn schedules all values, but separately with the specified Scheduler (separate macrotask in example) and puts the respective tasks in the event-loop queue.
This means that all the emission tasks are put into the event-loop queue one after another, so it isn’t possible for other tasks to run between them.
You can check these statements in a codepen.
Here is our code for the argument scheduler:
In the subscription handler I use
console.log to print the value for the current macrotask. Also
Promise.resolve.then is used to print the value in the microtask just after the current macrotask. And finally,
setTimeout prints the value in another macrotask according to the event-loop queue order.
And this is its output:
OK, so what do we have here?
Macrotask value 1 is printed after
Microtask value 2. So what? This means that before
of observable did the first emission, it had already scheduled the next emission in event-loop queue.
Packtpub.com and I prepared a whole RxJS course with many other details of how you can solve your every-day developer’s tasks with this amazing library. It can be interesting for beginners but also contains advanced topics. Take a look!
OK, now let’s run the code with
asyncScheduler applied by the
Now the output is a bit different:
of observable emitted all the values. Then
observeOn scheduled all the emissions in separate macrotasks and put them in the event-loop queue. That’s why all the
setTimeout log outputs from the subscription handler run at the very end — because the event loop queue already contained the scheduled tasks from ‘observeOn’.
Applying a scheduler as a factory function argument is much more CPU/memory resource efficient if the number of emitted values is substantial. So if you plan to apply
asyncScheduler to such expression like
range(0, 1e10).pipe(tap(v => doSomething(v)))
you definitely should prefer to applying it as a
// non-efficient range(0, 1e10).pipe( tap(v => doSomething(v)), observeOn(asyncScheduler) ) // efficient range(0, 1e10, asyncScheduler).pipe( tap(v => doSomething(v)) )
I hope you enjoyed this article. Please leave comments with your personal use-cases of RxJS schedulers!
Like this article? Let’s keep in touch on Twitter.
Starting from section 4 of my RxJS video course advances staff is reviewed — so if you familiar with RxJS already — you can find something useful for you as well: higher-order observables, anti-patterns, schedulers, unit testing, etc! Give it a try!