RxJS custom operators

Post Editor

Undoubtedly, it’s a wide array of operators that makes the RxJS library extremely powerful utility in a developer’s tool belt. In this blog post, I will introduce you to the concept of custom RxJS operators and present exemplary implementations.

4 min read
post

RxJS custom operators

Undoubtedly, it’s a wide array of operators that makes the RxJS library extremely powerful utility in a developer’s tool belt. In this blog post, I will introduce you to the concept of custom RxJS operators and present exemplary implementations.

post
post
4 min read
4 min read

Undoubtedly, it’s a wide array of operators that makes the RxJS library extremely powerful utility in a developer’s tool belt. I’ve recently written several custom operators in order to reuse some repetitive combinations of operators in a nifty way. In this blog post, I will introduce you to the concept of custom RxJS operators and present exemplary implementations.


Identity operator

An RxJS operator is simply a function which takes a source observable as an input and returns a resulting stream. Therefore, the task of creating a custom RxJS operator comes down to writing a regular JavaScript (TypeScript) function. Let’s start with a basic identity operator which simply mirrors a source observable:

import { interval, Observable } from "rxjs";
import { take } from "rxjs/operators";

const source$ = interval(1000).pipe(take(3));

function identity<T>(source$: Observable<T>): Observable<T> {
  return source$;
}

const results$ = source$.pipe(identity);

results$.subscribe(console.log);
  
// console output: 0, 1, 2

In the next step, let’s add some basic logic to a custom operator.


Logging operator

The following custom operator performs a side effect (logs values to the console) for each value of a source stream:

import { interval, Observable } from "rxjs";
import { take, tap } from "rxjs/operators";

const source$ = interval(1000).pipe(take(3));

function log<T>(source$: Observable<T>): Observable<T> {
  return source$.pipe(tap(v => console.log(`log: ${v}`)));
}

const results$ = source$.pipe(log);

results$.subscribe(console.log);
  
// console output: log: 0, log: 1, log: 2

The resulting stream is composed on the basis of the source$ and altered by applying built-in operators via the pipe method.


Operator’s factory

In certain scenarios, it’s convenient to provide a context for a custom operator. You can accomplish it by defining a function which returns an operator. The factory’s arguments belong to the operator’s lexical scope:

import { interval, Observable } from "rxjs";
import { take, tap } from "rxjs/operators";

const source$ = interval(1000).pipe(take(3));

function logWithTag<T>(tag: string): (source$: Observable<T>) => Observable<T> {
  return source$ =>
    source$.pipe(tap(v => console.log(`logWithTag(${tag}): ${v}`)));
}

const results$ = source$.pipe(logWithTag("RxJS"));

results$.subscribe(console.log);
  
// console output: logWithTag(RxJS): 0, logWithTag(RxJS): 1, logWithTag(RxJS): 2

You can simplify the return type annotation by making use of the MonoTypeOperatorFunction provided by the RxJS. In addition, the static pipe function enables to define an operator in a less verbose way:

import { interval, MonoTypeOperatorFunction, pipe } from "rxjs";
import { take, tap } from "rxjs/operators";

const source$ = interval(1000).pipe(take(3));

function logWithTag<T>(tag: string): MonoTypeOperatorFunction<T> {
  return pipe(tap(v => console.log(`logWithTag(${tag}): ${v}`)));
}

const results$ = source$.pipe(logWithTag("RxJS"));

results$.subscribe(console.log);
  
// console output: logWithTag(RxJS): 0, logWithTag(RxJS): 1, logWithTag(RxJS): 2

If you seek for more RxJS-related tips, check this out.


Observer-unique lexical scope

An operator’s factory function is called once only at the moment of a stream definition. As a result, there’s a shared lexical scope for all observers:

import { interval, MonoTypeOperatorFunction, pipe } from "rxjs";
import { take, tap } from "rxjs/operators";

const source$ = interval(1000).pipe(take(3));

function tapOnce<T>(job: Function): MonoTypeOperatorFunction<T> {
  let isFirst = true;

  return pipe(
    tap(v => {
      if (!isFirst) {
        return;
      }

      job(v);
      isFirst = false;
    })
  );
}

const results$ = source$.pipe(tapOnce(() => console.log("First value emitted")));

results$.subscribe(console.log);
results$.subscribe(console.log);
  
// console output: First value emitted, 0, 0, 1, 1, 2, 2

If you want to get a unique lexical scope for each observer, you can make use of the defer function:

import { defer, interval, MonoTypeOperatorFunction } from "rxjs";
import { take, tap } from "rxjs/operators";

const source$ = interval(1000).pipe(take(3));

function tapOnceUnique<T>(job: Function): MonoTypeOperatorFunction<T> {
  return source$ =>
    defer(() => {
      let isFirst = true;

      return source$.pipe(
        tap(v => {
          if (!isFirst) {
            return;
          }

          job(v);
          isFirst = false;
        })
      );
    });
}

const results$ = source$.pipe(tapOnceUnique(() => console.log("First value emitted")));

results$.subscribe(console.log);
results$.subscribe(console.log);
  
// console output: First value emitted, 0, First value emitted, 0, 1, 1, 2, 2

If you want to get to know how to solve the tapOnce task in a different way, take a look at one of my blog posts.


Real-life examples

firstTruthy operator:

import { MonoTypeOperatorFunction, of, pipe } from "rxjs";
import { first } from "rxjs/operators";

const source1$ = of(0, "", "foo", 69);

function firstTruthy<T>(): MonoTypeOperatorFunction<T> {
  return pipe(first(v => Boolean(v)));
}

const result1$ = source1$.pipe(firstTruthy());

result1$.subscribe(console.log);

// console output: foo

evenMultiplied operator:

import { interval, MonoTypeOperatorFunction, pipe } from "rxjs";
import { filter, map, take } from "rxjs/operators";

const source2$ = interval(10).pipe(take(3));

function evenMultiplied(multiplier: number): MonoTypeOperatorFunction<number> {
  return pipe(
    filter(v => v % 2 === 0),
    map(v => v * multiplier)
  );
}

const result2$ = source2$.pipe(evenMultiplied(3));

result2$.subscribe(console.log);
  
// console output: 0, 6

liveSearch operator:

import { ObservableInput, of, OperatorFunction, pipe  } from "rxjs";
import { debounceTime, delay, distinctUntilChanged, switchMap } from "rxjs/operators";

const source3$ = of("politics", "sport");

type DataProducer<T> = (q: string) => ObservableInput<T>;

function liveSearch<R>(
  time: number,
  dataProducer: DataProducer<R>
): OperatorFunction<string, R> {
  return pipe(
    debounceTime(time),
    distinctUntilChanged(),
    switchMap(dataProducer)
  );
}

const newsProducer = (q: string) =>
  of(`Data fetched for ${q}`).pipe(delay(2000));

const result3$ = source3$.pipe(liveSearch(500, newsProducer));

result3$.subscribe(console.log);
  
// console output: Data fetched for sport

Conclusions

Common RxJS operators’ combinations can be extracted into custom operators and reused in the future when implementing similar features. Generics enables proper type inference for emitted values further down in the pipe sequence.

Live example:

I hope you liked the post and learned something new.

Discuss with community

Share

About the author

author_image

I'm a Frontend Developer freelancer. Passionate of Angular and reactive programming who is always seeking for new coding challenges. Chocolate lover.

author_image

About the author

Wojciech Trawiński

I'm a Frontend Developer freelancer. Passionate of Angular and reactive programming who is always seeking for new coding challenges. Chocolate lover.

About the author

author_image

I'm a Frontend Developer freelancer. Passionate of Angular and reactive programming who is always seeking for new coding challenges. Chocolate lover.

NxAngularCli
NxAngularCli
NxAngularCli

Featured articles

Angularpost
4 March 20218 min read
Angular Universal: real app problems

Angular Universal is an open-source project that extends the functionality of @angular/platform-server. The project makes server-side rendering possible in Angular. This article will discuss the issues and possible solutions we encountered while developing a real application with Angular Universal.

Angularpost
4 March 20218 min read
Angular Universal: real app problems

Angular Universal is an open-source project that extends the functionality of @angular/platform-server. The project makes server-side rendering possible in Angular. This article will discuss the issues and possible solutions we encountered while developing a real application with Angular Universal.

Read more
AngularpostAngular Universal: real app problems

4 March 2021

8 min read

Angular Universal is an open-source project that extends the functionality of @angular/platform-server. The project makes server-side rendering possible in Angular. This article will discuss the issues and possible solutions we encountered while developing a real application with Angular Universal.

Read more
Angularpost
3 March 20215 min read
View State Selector  - Angular design pattern

As a web developer you may have noticed a repetitive boiler plate code of displaying a loader while an asynchronous request is being processed, then switching to the main view or displaying an error. Personally, I noticed these repetitions both in my code and other developers I work with. And even worse than the repetitive code is the fact that there are no indications for missing state views (such as unhandled errors or a missing loader). <div *ngIf="data$ | async as data"> <ng-container *ng

Angularpost
3 March 20215 min read
View State Selector  - Angular design pattern

As a web developer you may have noticed a repetitive boiler plate code of displaying a loader while an asynchronous request is being processed, then switching to the main view or displaying an error. Personally, I noticed these repetitions both in my code and other developers I work with. And even worse than the repetitive code is the fact that there are no indications for missing state views (such as unhandled errors or a missing loader). <div *ngIf="data$ | async as data"> <ng-container *ng

Read more
AngularpostView State Selector  - Angular design pattern

3 March 2021

5 min read

As a web developer you may have noticed a repetitive boiler plate code of displaying a loader while an asynchronous request is being processed, then switching to the main view or displaying an error. Personally, I noticed these repetitions both in my code and other developers I work with. And even worse than the repetitive code is the fact that there are no indications for missing state views (such as unhandled errors or a missing loader). <div *ngIf="data$ | async as data"> <ng-container *ng

Read more
RxJSpost
26 February 20213 min read
RxJS: Why memory leaks occur when using a Subject

It's not uncommon to see the words 'unsubscribe', 'memory leaks', 'subject' in the same phrase when reading upon RxJS-related materials. In this article, we're going to tackle this fact and by the end of it you should gain a better insight as to why memory leaks occur.

RxJSpost
26 February 20213 min read
RxJS: Why memory leaks occur when using a Subject

It's not uncommon to see the words 'unsubscribe', 'memory leaks', 'subject' in the same phrase when reading upon RxJS-related materials. In this article, we're going to tackle this fact and by the end of it you should gain a better insight as to why memory leaks occur.

Read more
RxJSpostRxJS: Why memory leaks occur when using a Subject

26 February 2021

3 min read

It's not uncommon to see the words 'unsubscribe', 'memory leaks', 'subject' in the same phrase when reading upon RxJS-related materials. In this article, we're going to tackle this fact and by the end of it you should gain a better insight as to why memory leaks occur.

Read more