If you have been using TypeScript/ES7+ lately, you have to be familiar with the async/await keywords and their usefulness when working with asynchronous code. Here is a basic example to freshen up your memory:

async function request() {
  const result = await fetch('some-api-url');
  const data = await result.json();
  return data.items; // for example a nested array;
}

const list = await request();
list.map(/*do stuff with the array*/);

(notice I used top-level await which is available in TypeScript v3.8)

In this example we create a basic Promise and handle it in a sync-style code, without calling .then and callbacks. This makes our code look more linear and the lack of callbacks make our code easier to reason about.

Then what is for-await?#

Of curse, Promises represent a result of an asynchronous operation, but what if we have a process (instead of a singular task) that will emit events on a timeline? (sounds familiar, right?). Here is where for await comes into play:

for await (const event of events) {
  handleEvent(event);
}

So here we have a stream of events (important: this is not an array, but a stream and emits events asynchronously), but instead of providing a callback to handle events over time, we use for await to deal with them in a more linear fashion. This, together with usual async/await allows us to escape callbacks in our codebase entirely!

Or does it?…

RxJS Observables are more powerful than Promises

Obviously, not only are RxJS Observables capable of streaming lots of events (as opposed to a singular result from a Promise), but they also have powerful operators to deal with the data in a beautiful, functional way.

But can we use async/await with RxJS? Turns out not. Or at least not entirely.

Of course, if our Observable is going to emit just one value and then complete, we can always use toPromise and then apply async/await. And before you think “why would I need an Observable of just one value” bear in mind, that Angular’s HttpClient is built on the premise of Observables emitting a singular value and then shutting down. SO with HttpClient, we can cast the resulting Observable to a Promise and leverage the powers of both async/await and RxJS operators!

Of, so far so good. But what about streams of events?

import { fromEvent } from 'rxjs'; 

const clicks$ = fromEvent(document.body, 'click');

Here I have a stream of all the clicks on the document’s body. Obviously — this does not do much, and I need to handle those events — log them to the console, at least. Here is how would we do that:

const clicks$ = fromEvent(document.body, 'click').subscribe(event => console.log(`Event ${event} occured`));

But this is a callback inside the subscribe function. Can we use async/await with this code to avoid callbacks? Not really, toPromise relies on our source Observable to complete, and this particular Observable never completes. So are we stuck with the callback?

So what is it? As seen in the screenshot, this library provides a eachValueFrom function which converts a source Observable into an AsyncInterable (read more on AsyncIterables here). This, in its turn, allows us to use for await on an Observable stream! Here is how we can refactor our code:

import { fromEvent } from 'rxjs'; 
import { eachValueFrom } from 'rxjs-for-await';

const clicks$ = fromEvent(document.body, 'click');

async function handleClicks() {
  for await (const event of eachValueFrom(clicks$)) {
    console.log(event);
  }
}

handleClicks();

Now we don’t have to write any callbacks at all — we can just use for await with RxJS Observables now.

Potential use cases#

Of course this does not mean that you have to run and refactor all your apps right away; but there are cases where this neat little thing will come into help.

One of those cases is, actually, unit tests. When writing tests, it is important to keep them as plain and simple as possible. Introducing subscribe calls with callbacks introduces unnecessary complexity and nesting inside unit tests (remember — tests should read like plain English to properly reflect what the app does). For example, think about this piece of code:

describe('AppComponent', () => {
  beforeEach(async(() => {
    TestBed.configureTestingModule({
      imports: [
        RouterTestingModule
      ],
      declarations: [
        AppComponent
      ],
    }).compileComponents();
  }));

  it(`should have a source$ Observable emit numbers`, () => {
    const fixture = TestBed.createComponent(AppComponent);
    const app = fixture.debugElement.componentInstance as AppComponent;
    app.start();
    let counter = 1;
    app.source$.subscribe(data => {
      expect(data).toBe(counter);
      counter++;
    });
  });
});

This test calls for a start method on the AppComponent, which is supposed to create a stream of numbers incrementing by one. Then we check if the stream does, in fact, emit such numbers. Obviously, we used subscribe to get the emitted values, which created a third level of callback nesting. If we had to check something with a callback inside that last callback, which happens fairly often with unit tests, we would already have callback hell. rxjs-for-await will help us deal with it:

it(`should have a source$ Observable emit numbers`, async () => {
  const fixture = TestBed.createComponent(AppComponent);
  const app = fixture.debugElement.componentInstance as AppComponent;
  app.start();
  let counter = 1;
  for await (const data of eachValueFrom(app.source$)) {
    expect(data).toBe(counter);
    counter++;
  }
});

Here we just used eachValueFrom to be able to implement the same assertions with for await, and thus avoided nesting callbacks. Also, if we need to check another async thing inside, we can use async/await, so no callbacks at all.

Conclusion#

While we shouldn’t be running to change our codebases completely, this library provides us with the ability to utilize newest features of ES while enjoying the power of RxJS.