Create a tapOnce custom Rxjs Operator

Post Editor

Create a custom operator to run a function on first emission only

3 min read
post

Create a tapOnce custom Rxjs Operator

Create a custom operator to run a function on first emission only

post
post
3 min read
3 min read

If you use rxjs on your project you most likely are using the tap operator.
tap does not change the stream and allows you execute a function (or side-effect) taking as parameter the value of each emission. For instance we can console.log each emitted value like this:

source.pipe(
  tap(val => console.log(val))
);

What happens if we want to execute a side-effect but only on the first emission? rxjs does not include an operator for this, but no worries, we can write our own!

Writing a custom operator

If you are not familiar with the internals of an rxjs operator, in a nutshell an operator is just a function that takes an observable and returns an observable. That's it.
Most of the times we'll transform the observable before we return it or do something with it. If you want to read more on writing custom operators you can check this post or this one.
So, following this guideline this is a how a basic custom operator would look like.

function basicCustomOperator<T>() {
    return function(source: Observable<T>) {        
        return source;
    };
}

And we can use it like this:

source.pipe(
  basicCustomOperator()
)

Now that we have the ground base of a custom operator, let's write tapOnce.

tapOnce

An initial approach could be to use the existing take operator. We would   apply take(1) on the source observable and create an inner subscription on it. We will then return the original source back to the stream.

function tapOnce<T>(fn: (value)=> void) {
    return function(source: Observable<T>) {
        source
            .pipe(
                take(1),
                tap(value => fn(value))
            )
            .subscribe();

        return source;
    };
}

This approach seems ok, and will work for most cases, but has some problems.
First, it will create a subscription on the source. This could lead to unexpected behaviour, like executing the side-effect even if we never we subscribe to the original observable.

const sourceSubject = new Subject();

const source = sourceSubject.pipe(
  tapOnce(x => console.log(`tapOnce ${x}`)),
);

sourceSubject.next("1");
sourceSubject.next("2");

// tapOnce will execute even if we didn't subscribe to sourceSubject

Also, since we are using the reference of the source if we subscribe multiple times, it will only execute once.

const sourceSubject = new Subject();

const source = sourceSubject.pipe(
  tapOnce(x => console.log(`tapOnce ${x}`))
);

source.subscribe();
source.subscribe();

sourceSubject.next("1");
sourceSubject.next("2");

// tapOnce will execute on first subscribe, when we would expect to run for both

Another problem is when you combine the stream with takeUntil operator.

source.pipe(
  tapOnce(x=> console.log(`tapOnce ${x}`)),
  takeUntil(takeUntilSource)
).subscribe(x=> console.log(x))

source.subscribe();

takeUntilSource.next()
source.next('1')
source.next('2')
source.next('3')

takeUntil happens before first source emission, and we would expect tapOnce to not execute at all, but indeed it emits. The reason for this is that we have an inner subscription in our custom operator that doesn't know about our takeUntil we have later in the pipe.

Using defer

Let's see how we can fix this. What we need is an operator that allows us to run our code only when the original source is subscribed, and also that wont share the execution context if multiple subscriptions occur. Luckily rxjs includes the defer operator that helps us with this.
So, this would be the refactored version of tapOnce:

function tapOnce<T>(fn: (value) => void) {
    return (source: Observable<any>) =>
        defer(() => {
            let first = true;
            return source.pipe(
                tap<T>((payload) => {
                    if (first) {
                        fn(payload);
                    }
                    first = false;
                })
            );
        });
}

Let's dissect the code to understand what's going on. We are returning a defer() observable, which creates an observable only when it is subscribed.

defer(() => {
  // ...
  return source.pipe(
    // ...
  );
});

Also it creates a fresh observable for each subscription, keeping its state "per-subscription". We want this to be sure each subscription will have its own value of first variable. You can read more about this here

Finally we are just checking if it is the first emission and run the function or not.

// ...
let first = true;
return source.pipe(
  tap((payload) => {
    if (first) {
      fn(payload);
    }
    first = false; 
  })
);

Now, we have a tapOnce operator that works for all scenarios as expected!

Hope you enjoyed the post! You can play with the code here:

StackBlitz

Discuss with community

Share

About the author

author_image

I’m a freelance full-stack developer. ng-rosario meetup co-organizer. Speaker and tech blog writer.

author_image

About the author

Joaquin Cid

I’m a freelance full-stack developer. ng-rosario meetup co-organizer. Speaker and tech blog writer.

About the author

author_image

I’m a freelance full-stack developer. ng-rosario meetup co-organizer. Speaker and tech blog writer.

NxAngularCli
NxAngularCli
NxAngularCli

Featured articles

Angularpost
4 March 20218 min read
Angular Universal: real app problems

Angular Universal is an open-source project that extends the functionality of @angular/platform-server. The project makes server-side rendering possible in Angular. This article will discuss the issues and possible solutions we encountered while developing a real application with Angular Universal.

Angularpost
4 March 20218 min read
Angular Universal: real app problems

Angular Universal is an open-source project that extends the functionality of @angular/platform-server. The project makes server-side rendering possible in Angular. This article will discuss the issues and possible solutions we encountered while developing a real application with Angular Universal.

Read more
AngularpostAngular Universal: real app problems

4 March 2021

8 min read

Angular Universal is an open-source project that extends the functionality of @angular/platform-server. The project makes server-side rendering possible in Angular. This article will discuss the issues and possible solutions we encountered while developing a real application with Angular Universal.

Read more
Angularpost
3 March 20215 min read
View State Selector  - Angular design pattern

As a web developer you may have noticed a repetitive boiler plate code of displaying a loader while an asynchronous request is being processed, then switching to the main view or displaying an error. Personally, I noticed these repetitions both in my code and other developers I work with. And even worse than the repetitive code is the fact that there are no indications for missing state views (such as unhandled errors or a missing loader). <div *ngIf="data$ | async as data"> <ng-container *ng

Angularpost
3 March 20215 min read
View State Selector  - Angular design pattern

As a web developer you may have noticed a repetitive boiler plate code of displaying a loader while an asynchronous request is being processed, then switching to the main view or displaying an error. Personally, I noticed these repetitions both in my code and other developers I work with. And even worse than the repetitive code is the fact that there are no indications for missing state views (such as unhandled errors or a missing loader). <div *ngIf="data$ | async as data"> <ng-container *ng

Read more
AngularpostView State Selector  - Angular design pattern

3 March 2021

5 min read

As a web developer you may have noticed a repetitive boiler plate code of displaying a loader while an asynchronous request is being processed, then switching to the main view or displaying an error. Personally, I noticed these repetitions both in my code and other developers I work with. And even worse than the repetitive code is the fact that there are no indications for missing state views (such as unhandled errors or a missing loader). <div *ngIf="data$ | async as data"> <ng-container *ng

Read more
RxJSpost
26 February 20213 min read
RxJS: Why memory leaks occur when using a Subject

It's not uncommon to see the words 'unsubscribe', 'memory leaks', 'subject' in the same phrase when reading upon RxJS-related materials. In this article, we're going to tackle this fact and by the end of it you should gain a better insight as to why memory leaks occur.

RxJSpost
26 February 20213 min read
RxJS: Why memory leaks occur when using a Subject

It's not uncommon to see the words 'unsubscribe', 'memory leaks', 'subject' in the same phrase when reading upon RxJS-related materials. In this article, we're going to tackle this fact and by the end of it you should gain a better insight as to why memory leaks occur.

Read more
RxJSpostRxJS: Why memory leaks occur when using a Subject

26 February 2021

3 min read

It's not uncommon to see the words 'unsubscribe', 'memory leaks', 'subject' in the same phrase when reading upon RxJS-related materials. In this article, we're going to tackle this fact and by the end of it you should gain a better insight as to why memory leaks occur.

Read more