RxJS is my favorite library. I love how observables offer a declarative, composable twist on async (and sync )programming. Now that I finally understand how to model events as streams, I don’t ever want to look back. For me, RxJS is just downright fun.

But the sources have always remained something of a mystery to me. I’ve written a small series on how to build your own observable from scratch. If you want to understand the fundamentals of observables, please check it out.

However, that series really had nothing to do with the RxJS library.
Since I’ve been reading the Angular and React sources, I figure it’s time to dig into my favorite library.

Target Audience

You like RxJS, you want to know more about how it’s implemented, you’re curious about this stuff, or you just have too much time on your hands ?

Personally, I want to contribute to the RxJS library one day soon, so it’s time to dig in.

Disclaimer/Format

This series will be my notes as I try to understand the sources. I’ll try to condense things, so that the articles stay somewhat coherent.

But, these are really just my notes. I’ll publish them in case anyone wants to follow along while I read through the sources. I’ll also post summaries at the end so that you don’t have to go trudging through every word if you don’t want to.

If you don’t want to read all the details, there is a summary at the end.
(but all of the fun is in the details)

I’m not an RxJS expert, so a lot of what I say and do here will be a hypothesis. If you need a definitive answer for something RxJS related, please ask someone from the core team.

On My Marks, Go!

Whenever I dive into a new codebase, I like to start with a simple program whose behavior I know, and then set breakpoints inside, and step through its execution. Along the way, I’ll end up inside other functions and classes, which is a great way to start painting the broader picture of the library.

I’m going to start with the default RxJS application on Stackblitz.
https://stackblitz.com/edit/read-rxjs-sources

map and pipe are coming soon

I’ll even simplify things further by commenting out pipe and map for now. All I’m doing is taking the string 'World' and wrapping it inside of an observable using the of operator. This creates a cold observable which will emit the value 'World' when it is subscribed to, then it will complete.

I’ve added a debugger statement on line 12 inside of the observer’s nextcallback, so that I’ll have a good spot in the code to start examining the stack traces. Here’s what the stack looks like when I hit line 12:

ye gods man

Don’t strain eyes reading that just yet. It says, “I have a long way to go” ?

Let’s start at the top. Remember, observables are lazy, so nothing is really run until I call subscribe on line 11. Naturally, Observable.subscribe is the first function I’ll jump to in the call stack.

Notes on Observable.subscribe

Here’s the code from the official RxJS sources:

 subscribe(observerOrNext?: PartialObserver<T> | ((value: T) => void),
            error?: (error: any) => void,
            complete?: () => void): Subscription {

  const { operator } = this;
  const sink = toSubscriber(observerOrNext, error, complete);

  if (operator) {
    operator.call(sink, this.source);
  } else {
    sink.add(
      this.source || (config.useDeprecatedSynchronousErrorHandling && !sink.syncErrorThrowable) ?
      this._subscribe(sink) :
      this._trySubscribe(sink)
    );
  }

  if (config.useDeprecatedSynchronousErrorHandling) {
    if (sink.syncErrorThrowable) {
      sink.syncErrorThrowable = false;
      if (sink.syncErrorThrown) {
        throw sink.syncErrorValue;
      }
    }
  }

  return sink;
}

Note the parameters to subscribe. It’s just an observer (next, error, complete), which can come in one of two forms:

  • three functions: next, error, and complete
  • an object which contains those three functions as properties

This must be what the PartialObserver type defines, so I’ll make a note to look into that later.

Also note that subscribe will return something of type Subscription.

Lines 5–16 are what I care about for now. 18–24 seem to deal with an edge case.

Line 5: Use destructuring to pull operator out of this. Here’s what thislooks like for me:

no operator property, so { operator } will be undefined. I’ll come back to this later. My guess is that operator is defined in situations where we are chaining operators together.

On line 6 I step into toSubscriber, and pass it the observer (whether it’s three functions or an object. In this case, I’ve just given it one function).

In my case, I’m just passing one function into subscribe, which is why my nextOrObserver parameter is a function (next, the one I passed into subscribe in the stackblitz example). I didn’t pass anything for error or complete. In this case, I’ll hit the bottom line of toSubscriber.

return new Subscriber(nextOrObserver, error, complete)

I think the point of toSubscribe is just to take whatever form of observer is passed in, and return an instance of Subscriber. Stepping into that:

toSubscriber

The call to super calls into Subscription, which is:

I’ll make a map of these class hierarchies later. For now, it’s worth knowing that Subscriptions are pretty important in RxJS. They are the disposable resources which let us consume data.

Continuing in subscriber.ts, we return an instance of SafeSubscriber. SafeSubscriber is something I have seen a lot of over the past year whenever I’ve ventured into the RxJS sources, so I will dedicate an article to it shortly. it extends Subscription as well. Again, at this point, it seems like all this code is determining which type of subscription object to return, depending on the type of observer we have passed into Observable.subscribe…it’s getting kind of messy in here. Time to get back to the main thread of this article.

Bubbling back up to subscribe:

sink is the subscriber, in this case, it’s just a Subscriber, with a destinationof type SafeSubscriber.

subscriber.add does the following

duckies!! (163)

More notes on add below. The important thing to keep in mind is that Observable.subscribe returns a Subscriber, the point of which is to let us unsubscribe whenever we want. I’m pretty sure that operators can use this stuff to unsubscribe too (for instance, takeUntil).

That’s enough for now. Need to see what happens before and after subscribe is called.

Notes on Subscriptions and Their Hierarchy

The first class to focus on is Subscription

As the sources say:

/**
Represents a disposable resource, such as the execution of an Observable. A Subscription has one important method, `unsubscribe`, that takes no argument and just disposes the resource held by the subscription. Additionally, subscriptions may be grouped together through the `add()` method, which will attach a child Subscription to the current Subscription.
When a Subscription is unsubscribed, all its children (and its grandchildren)will be unsubscribed as well.
*/

Very interesting. I know from my series on Building Your Own Observable that with something like of(1,2,3).map(x => x + 1).filter(x > 2)observables are subscribing to other observables. So filter is subscribing to the observable returned by map, which is subscribing to the observable returned by of, and so on. In this way, unsubscribing from the observable returned by filter would let you effectively unsubscribe from everything. Maybe that’s what the add method does internally?

The unsubscribe sets closed to true, and sets some other references to null. I’ll worry about these details more later. No reason to get too bogged down at the moment

add has this to say:

Adds a tear down to be called during the unsubscribe() of this Subscription. Can also be used to add a child subscription.
If the tear down being added is a subscription that is already unsubscribed, is the same reference `add` is being called on, or is`Subscription.EMPTY`, it will not be added.
If this subscription is already in an `closed` state, the passed
tear down logic will be executed immediately.
When a parent subscription is unsubscribed, any child subscriptions that were added to it are also unsubscribed.

‘teardown’ refers to something of type TearDownLogic. Most likely a function to be called on unsubscribe. Let’s test this out!

I used interval(1000) instead of of in order to print a value every 1000ms, and I call unsubscribe after 5000ms. Just as I suspected, the lambda that was passed into sub.add is executed on unsubscribe. It is also executed when the source observable completes (as in the case of of).

Neat, so now we know what add does. It lets you register some logic to be executed on unsubscription or completion.

*Note: RxJS Core Team member Nicholas Jamieson provides an excellent explanation of Subscribers and add in this article.

I think I have a good enough understanding of Subscription at this point, so I’ll move on.


How of() works

Now, I’ll put a breakpoint inside of of to see what is happening before subscribe is called.

The of() creational method

In this example, ...args is simply ['World'], since that’s the single value we passed in.

Lines 76–81 have to do with schedulers, which basically control the timing of an observable (is it sync, or async. if it’s async, is it a microtask or a macrotask or should it run on an animation frame, etc). Eventually, I’ll dig into schedulers and understand them better, but they can be skipped for now. By convention, you can always specify a scheduler as the last argument to a creational method like of, which is what is going on on line 76.

Now, args.length will be 1 in this case, so I see that line 86 will execute. Time to dig into scalar(args[0] as T).

When in doubt, always return a new Observable

Short functions like this are a breath of fresh air when doing this ?

Since we have only one value (‘World’), it will be treated as a scalar. On line 4, a new Observable is created (but not executed. they are lazy, remember?). This Observable will take a subscriber, like any other observable. subscriber is the stuff we just looked at in the previous section (next, error, complete), in other words, an observer.

On lines 5 and 6, we see that the subscriber's next method will be called with ‘World’, and then it will complete (since it’s just a scalar value, i.e. singular).

lines 8 and 9 do some bookkeeping, and on line 10, this observable is returned. Thus, the observable defined on line 4 is what is returned when you do something like:

let source = of('World'); //of returns the observable from line 4

This idea is absolutely critical if you want to understand observables. I cannot stress this enough.

Observable creators, and operators, always return observables.

Why? Otherwise, composition of observables would not be possible. You wouldn’t be able to do things like of(1,2,3).map(x=>x+1).filter(x=>x>2). That chaining of of and map and filter is composition, and it is possible because each of those is returning an Observable (which has access to map, and filter, etc). In the old days, all of these operators were on Observable.prototype, but the team has moved away from that recently, for better tree-shaking. That’s what the new pipe operator is for. I’ll explore the new pipe syntax later in this series.

Anyways, after of returns that observable, we just subscribe to it, and we’ve already seen what happens there. Observable.subscribe is called, the magic on line 205 is called:

on 205, this._trySubscribe(sink) will eventually get to the `next` callback

which calls next on the subscriber which is passed into the observable that was created in scalar!

value is ‘World’. There are a couple of other objects to step through, but at the end of the day, subscriber.next is the callback I passed into subscribe:

We’re home!

Thus we’ve done it! We’ve seen how an observable is created using of, and then how that data flows into the observer’s next method. Notice how the string value 'World' was unpacked from the observable along the way… ?

Btw, the best variable name ever:

love the sense of humor ?

:D

of() with multiple args

Now I’ll try passing the strings 'Hello' and 'World' into of, because I want to see how fromArray works:

On line 88, args is ['Hello', 'World'] and scheduler is undefined. Stepping into fromArray.

I see that !scheduler is true, so it returns a new Observable, much like scalar did in the last example. However, this time, it’s going to call subscribeToArray(['Hello', 'World']), I wonder what that does. (note subscribeToArray is being invoked here, with the [‘Hello’, ‘World’] as an argument. that’s going to be important in a second)

I’m going to ignore lines 10–23 for now, but those look interesting! There’s that add function again, and this time, it looks like it is being used to schedule some tasks with a scheduler (doesn’t matter here, since I didn’t provide a scheduler).

Here’s subscribeToArray:

Interesting. It’s a function which takes an ArrayLike (in this case, [‘Hello’, ‘World’]) and then returns a function which takes a subscriber (again, think observer), and then loops over each element in the array, calls subscriber.next(array[i]), and then completes the subscriber on line 12 once it has iterated through each value in the array. Awesome! This is very similar to how scalar worked, expect it calls next for each value in the array ?
Remember, subscribeToArray is just returning a function which accepts a subscriber, and that is being passed into the returned observable (line 8 of fromArray). No data is running through the observable just yet, but once I subscribe to the observable returned from fromArray, it knows to iterate through the array [‘Hello’, ‘World’] and call the next method of whatever observer I pass into subscribe once for each item in the array. So in this case, it will call next twice, and then complete.
That covers how of works with scalars, and arrays of values. That leaves one more thing.
of() of of()
Time to nest some observables.

In this case, nothing too interesting happens. of('World') returns a scalar observable as seen before, and immediately passes that value into the second call to of, which looks like this:

It’s going to call scalar as well, and pass in the observable which is wrapping 'World'.

Eventually, when our next method is hit, the value it receives is a nested observable:

This is because of doesn’t contain any logic for flattening observables. In future articles, we’ll look into operators that can do this for us, like flatMap.

Summary

In summary, here are the takeaways from this session:

  • of will always return an observable
  • depending on how many values you provide to of it will use either scalar or fromArray to create that observable
  • the returned observable accepts a subscriber argument, which is what is passed into subscribe. This is either three functions next, error, complete, or an object containing these functions as properties. Really, only next is needed.
  • of doesn’t know how to unpack nested observables
  • Subscriptions are pretty fascinating, and will warrant their own article at some point
  • The same is true for schedulers

Stepping through the sources for a very simple example can take some time! In the next article, I’ll uncomment map and pipe, and see what happens. Stay tuned. Please let me know in the comments if this kind of article is useful or fun.

But wait, there’s more!

If you want to learn the basics of building a simple observable from scratch, I’ve written a small series of articles:

Part 1: Arrays
Part 2: Containers
Part 3: Creating Observables with of, from, and fromEvent
Part 4: Operators