RxJS custom operators

Post Editor

Undoubtedly, it’s a wide array of operators that makes the RxJS library extremely powerful utility in a developer’s tool belt. In this blog post, I will introduce you to the concept of custom RxJS operators and present exemplary implementations.

4 min read
post

RxJS custom operators

Undoubtedly, it’s a wide array of operators that makes the RxJS library extremely powerful utility in a developer’s tool belt. In this blog post, I will introduce you to the concept of custom RxJS operators and present exemplary implementations.

post
post
4 min read

Undoubtedly, it’s a wide array of operators that makes the RxJS library extremely powerful utility in a developer’s tool belt. I’ve recently written several custom operators in order to reuse some repetitive combinations of operators in a nifty way. In this blog post, I will introduce you to the concept of custom RxJS operators and present exemplary implementations.

Identity operator
Link to this section

An RxJS operator is simply a function which takes a source observable as an input and returns a resulting stream. Therefore, the task of creating a custom RxJS operator comes down to writing a regular JavaScript (TypeScript) function. Let’s start with a basic identity operator which simply mirrors a source observable:

<>Copy
import { interval, Observable } from "rxjs"; import { take } from "rxjs/operators"; const source$ = interval(1000).pipe(take(3)); function identity<T>(source$: Observable<T>): Observable<T> { return source$; } const results$ = source$.pipe(identity); results$.subscribe(console.log); // console output: 0, 1, 2

In the next step, let’s add some basic logic to a custom operator.

Logging operator
Link to this section

The following custom operator performs a side effect (logs values to the console) for each value of a source stream:

<>Copy
import { interval, Observable } from "rxjs"; import { take, tap } from "rxjs/operators"; const source$ = interval(1000).pipe(take(3)); function log<T>(source$: Observable<T>): Observable<T> { return source$.pipe(tap(v => console.log(`log: ${v}`))); } const results$ = source$.pipe(log); results$.subscribe(console.log); // console output: log: 0, log: 1, log: 2

The resulting stream is composed on the basis of the source$ and altered by applying built-in operators via the pipe method.

Operator’s factory
Link to this section

In certain scenarios, it’s convenient to provide a context for a custom operator. You can accomplish it by defining a function which returns an operator. The factory’s arguments belong to the operator’s lexical scope:

<>Copy
import { interval, Observable } from "rxjs"; import { take, tap } from "rxjs/operators"; const source$ = interval(1000).pipe(take(3)); function logWithTag<T>(tag: string): (source$: Observable<T>) => Observable<T> { return source$ => source$.pipe(tap(v => console.log(`logWithTag(${tag}): ${v}`))); } const results$ = source$.pipe(logWithTag("RxJS")); results$.subscribe(console.log); // console output: logWithTag(RxJS): 0, logWithTag(RxJS): 1, logWithTag(RxJS): 2

You can simplify the return type annotation by making use of the MonoTypeOperatorFunction provided by the RxJS. In addition, the static pipe function enables to define an operator in a less verbose way:

<>Copy
import { interval, MonoTypeOperatorFunction, pipe } from "rxjs"; import { take, tap } from "rxjs/operators"; const source$ = interval(1000).pipe(take(3)); function logWithTag<T>(tag: string): MonoTypeOperatorFunction<T> { return pipe(tap(v => console.log(`logWithTag(${tag}): ${v}`))); } const results$ = source$.pipe(logWithTag("RxJS")); results$.subscribe(console.log); // console output: logWithTag(RxJS): 0, logWithTag(RxJS): 1, logWithTag(RxJS): 2

If you seek for more RxJS-related tips, check this out.

Observer-unique lexical scope
Link to this section

An operator’s factory function is called once only at the moment of a stream definition. As a result, there’s a shared lexical scope for all observers:

<>Copy
import { interval, MonoTypeOperatorFunction, pipe } from "rxjs"; import { take, tap } from "rxjs/operators"; const source$ = interval(1000).pipe(take(3)); function tapOnce<T>(job: Function): MonoTypeOperatorFunction<T> { let isFirst = true; return pipe( tap(v => { if (!isFirst) { return; } job(v); isFirst = false; }) ); } const results$ = source$.pipe(tapOnce(() => console.log("First value emitted"))); results$.subscribe(console.log); results$.subscribe(console.log); // console output: First value emitted, 0, 0, 1, 1, 2, 2

If you want to get a unique lexical scope for each observer, you can make use of the defer function:

<>Copy
import { defer, interval, MonoTypeOperatorFunction } from "rxjs"; import { take, tap } from "rxjs/operators"; const source$ = interval(1000).pipe(take(3)); function tapOnceUnique<T>(job: Function): MonoTypeOperatorFunction<T> { return source$ => defer(() => { let isFirst = true; return source$.pipe( tap(v => { if (!isFirst) { return; } job(v); isFirst = false; }) ); }); } const results$ = source$.pipe(tapOnceUnique(() => console.log("First value emitted"))); results$.subscribe(console.log); results$.subscribe(console.log); // console output: First value emitted, 0, First value emitted, 0, 1, 1, 2, 2

If you want to get to know how to solve the tapOnce task in a different way, take a look at one of my blog posts.

Real-life examples
Link to this section

firstTruthy operator:

<>Copy
import { MonoTypeOperatorFunction, of, pipe } from "rxjs"; import { first } from "rxjs/operators"; const source1$ = of(0, "", "foo", 69); function firstTruthy<T>(): MonoTypeOperatorFunction<T> { return pipe(first(v => Boolean(v))); } const result1$ = source1$.pipe(firstTruthy()); result1$.subscribe(console.log); // console output: foo

evenMultiplied operator:

<>Copy
import { interval, MonoTypeOperatorFunction, pipe } from "rxjs"; import { filter, map, take } from "rxjs/operators"; const source2$ = interval(10).pipe(take(3)); function evenMultiplied(multiplier: number): MonoTypeOperatorFunction<number> { return pipe( filter(v => v % 2 === 0), map(v => v * multiplier) ); } const result2$ = source2$.pipe(evenMultiplied(3)); result2$.subscribe(console.log); // console output: 0, 6

liveSearch operator:

<>Copy
import { ObservableInput, of, OperatorFunction, pipe } from "rxjs"; import { debounceTime, delay, distinctUntilChanged, switchMap } from "rxjs/operators"; const source3$ = of("politics", "sport"); type DataProducer<T> = (q: string) => ObservableInput<T>; function liveSearch<R>( time: number, dataProducer: DataProducer<R> ): OperatorFunction<string, R> { return pipe( debounceTime(time), distinctUntilChanged(), switchMap(dataProducer) ); } const newsProducer = (q: string) => of(`Data fetched for ${q}`).pipe(delay(2000)); const result3$ = source3$.pipe(liveSearch(500, newsProducer)); result3$.subscribe(console.log); // console output: Data fetched for sport

Conclusions
Link to this section

Common RxJS operators’ combinations can be extracted into custom operators and reused in the future when implementing similar features. Generics enables proper type inference for emitted values further down in the pipe sequence.

Live example:

I hope you liked the post and learned something new.

Share

About the author

author_image

I'm a Frontend Developer freelancer. Passionate of Angular and reactive programming who is always seeking for new coding challenges. Chocolate lover.

author_image

About the author

Wojciech Trawiński

I'm a Frontend Developer freelancer. Passionate of Angular and reactive programming who is always seeking for new coding challenges. Chocolate lover.

About the author

author_image

I'm a Frontend Developer freelancer. Passionate of Angular and reactive programming who is always seeking for new coding challenges. Chocolate lover.

Looking for a JS job?
Job logo
Senior Full-Stack Developer (Node+Angular)

A-Listware

Ukraine
Remote
$48k - $78k
Job logo
Senior Full stack (Angular+Node)

Monolith

Ukraine
Remote
$60k - $84k
Job logo
AngularJS Developer/.net Core - Remote Contract

InfoMagnus

United States
Remote
$115k - $134k
Job logo
Angular Web Developer

NTT Data Services, Inc.

United States
Remote
$115k - $134k
More jobs
NxAngularCli
NxAngularCli
NxAngularCli

Featured articles

Angularpost
13 September 20218 min read
Tracking user interaction area

Explore one of the most complex pieces of Taiga UI — ActiveZone directive that keeps an eye on what region user is working with. It touches on low-level native DOM events API, advanced RxJS and Dependency Injection, ShadowDOM and more!

Angularpost
13 September 20218 min read
Tracking user interaction area

Explore one of the most complex pieces of Taiga UI — ActiveZone directive that keeps an eye on what region user is working with. It touches on low-level native DOM events API, advanced RxJS and Dependency Injection, ShadowDOM and more!

Read more
AngularpostTracking user interaction area

13 September 2021

8 min read

Explore one of the most complex pieces of Taiga UI — ActiveZone directive that keeps an eye on what region user is working with. It touches on low-level native DOM events API, advanced RxJS and Dependency Injection, ShadowDOM and more!

Read more
Angularpost
7 September 202122 min read
Designing Angular architecture - Container-Presentation pattern

Designing architecture could be tricky, especially in the agile world, where requirement changes are frequent. So your design has to support that and provides extendibility without the need for serious modification. In such cases, you will find the Container-Presentation pattern instrumental.

micro frontendspost
6 September 202125 min read
Taking micro-frontends to the next level

The micro-frontends concept has been out there for quite a while. We’ve been using this architecture in Wix since around 2013, long before it was even given this name. In this article I’d like to share some of the things we did in order to evolve the concept of developing big scale micro-frontends.

micro frontendspost
6 September 202125 min read
Taking micro-frontends to the next level

The micro-frontends concept has been out there for quite a while. We’ve been using this architecture in Wix since around 2013, long before it was even given this name. In this article I’d like to share some of the things we did in order to evolve the concept of developing big scale micro-frontends.

Read more
micro frontendspostTaking micro-frontends to the next level

6 September 2021

25 min read

The micro-frontends concept has been out there for quite a while. We’ve been using this architecture in Wix since around 2013, long before it was even given this name. In this article I’d like to share some of the things we did in order to evolve the concept of developing big scale micro-frontends.

Read more