Nest js with RX JS Observables, play with rx js observables || Reactive Programming

Nest js with RX JS Observables, play with rx js observables || Reactive Programming 🚀 🚀

In this post, let's cover using Reactive Programming and RxJS when building nestjs apis

We will dive deep into the fundamental parts of working with RxJS and how it benefits us when building complex applications. You can build on this knowledge further after understanding these basic concepts.

RxJS helps developers author declarative code for handling side effects and asynchronous actions with continuous data streams and subscriptions. Think of a stream as a collection of data that arrives asynchronously over time.

The main building blocks of RxJS include:

  1. Observable - An object responsible for handling data streams and notifying observers when new data arrives.
  2. Observer - Consumers of data streams emitted by observables. Usually, it's a simple handler function that runs each time a new event occurs.

Creating an Observable

Let's explore the anatomy of an observable and how to use it.

One of the many benefits of using RxJS is the abundance of utility methods to create observables from all kinds of sources. You can make observables from DOM events, promises, data streams, and other sources of asynchronous data. For this example, we'll create a blank new observable.

const { Observable, fromEvent, interval } = require('rxjs');

const observable = new Observable((subscriber) => {
  let counter = 0;
  subscriber.next(counter);
  const interval = setInterval(() => {
    counter++;
    subscriber.next(counter);
    if (counter === 5) {
      subscriber.complete();
      clearInterval(interval);
    }
  }, 1000);
});

observable.subscribe(
  (value) => {
    console.log({ value });
  },
  null,
  () => {
    console.log('Done!');
  }
);

Role of Operators with Observables (Reference https://www.tutorialspoint.com/rxjs/rxjs_operators.htm)

(They create disruptions in stream) Operators are an important part of RxJS. An operator is a pure function that takes in observable as input and the output is also an observable.

Working with Operators

An operator is a pure function which takes in observable as input and the output is also an observable.

To work with operators we need a pipe() method. Its all about observables, we create observable from any of the source now operators can play with stream of observable data using different operators like map, filter, tap, mergeMap, switchMap and many more ...

Example of using pipe()

let obs = of(1,2,3); // an observable
obs.pipe(
   operator1(),
   operator2(),
   operator3(),
   operator3(),
)

In above example we have created a observable using of() method that takes in values 1, 2 and 3. Now on this observable you can perform different operation using any numbers of operators using pipe() method as shown above. The execution of operators will go on sequentially on the observable given.

Below is a working example −

import { of } from 'rxjs';
import { map, reduce, filter } from 'rxjs/operators';

let test1 = of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
let case1 = test1.pipe(
   filter(x => x % 2 === 0),
   reduce((acc, one) => acc + one, 0)
)
case1.subscribe(x => console.log(x));
Output
30

In above example, we have used filter operator that, filters the even numbers and, next we have used reduce() operator that will add the even values and give the result when subscribed.

Here is a list of Observables that we are going to discuss.

  • Creation
  • Mathematical
  • Join
  • Transformation
  • Filtering
  • Utility
  • Conditional
  • Multicasting
  • Error handling

Lets explore Observable in context of Nest JS

In nestjs microservice we deal with rest or graphql based services where we read data from database/external layer and exposes that data using json response.

In most of the cases this data is async as we are reading from database and we use simple promises which can either get resolved or rejected Lets see i want to use streams and observable to make application more reactive and i can play with stream using different operators before returning data to client.

RX JS provided many operators which makes easy to fetch, merge, join, filter any async data being fetched from different sources, Rx JS is popular because of the beauty of its Operators

Lets create a simple controller in nestjs to use RX JS

// Native.
import { format } from "util";

// Package.
import {
  Body,
  Catch,
  Controller,
  Delete,
  Get,
  HttpCode,
  HttpStatus,
  InternalServerErrorException,
  Param,
  Post,
  Query,
  Req,
  Res,
  UploadedFiles,
  UseGuards,
  UseInterceptors,
  UsePipes,
  ValidationPipe,
} from "@nestjs/common";
import { FilesInterceptor } from "@nestjs/platform-express";
import {
  ApiBearerAuth,
  ApiConsumes,
  ApiHeader,
  ApiNoContentResponse,
  ApiOkResponse,
  ApiOperation,
  ApiResponse,
  ApiTags,
} from "@nestjs/swagger";
import debug from "debug";

// Internal.
import { UserMetaData } from "@auth/interface/user";
import { User } from "@auth/decorator/user";
import { RolesAllowed } from "@core/decorator/role.decorator";
import { RolesGuard } from "@core/guard/role.guard";
import { Roles } from "@core/interface/roles";
import {
  ApiFilterQuery,
  uploadFiles,
} from "@core/decorator/apiFilterQuery.decorator";
import {
  FileMetadataParam,
  FileOperationParam,
  FileQueryParam,
  FilterDtoParam,
  RequiredFileQueryParam,
  FilesCreateResponse,
  FilesListResponse,
} from "./document.dto";
import { FileService } from "./documnent.service";
import { origins } from "@domains/shared/interfaces/origin";
import { Request, Response } from "express";
import { catchError, map, throwError } from "rxjs";
const verbose = debug("apis:verbose:middleware:auth");


@ApiBearerAuth("authorization")
@Controller("apis")
@UsePipes(
  new ValidationPipe({
    whitelist: true,
    transform: true,
  })
)
  @ApiTags("documents")
  @RolesAllowed(
    Roles["mercanis-root"],
    Roles["purchaser-admin"],
    Roles["purchaser-user"]
  )
  @HttpCode(HttpStatus.NO_CONTENT)
  @ApiConsumes("application/json")
  @Delete("/:id/files/:file_id")
  public async deleteFile(
    @User() user: UserMetaData,
    @Res() res: Response,
    @Param() param: FileOperationParam
  ) {
    try {
      return await this.service.deleteById(user, param).pipe(
        map((data) => {
          return res.status(204).send(data);
        })
      );
    } catch (err) {
      console.log(err);
      error(format(err, "FileController -> fetchFiles"));
      throw err;
    }
  }
}

So call to service from controller is little bit different here

 try {
      return await this.service.deleteById(user, param).pipe(
        map((data) => {
          return res.status(204).send(data);
        })
      );
    } catch (err) {
      console.log(err);
      error(format(err, "FileController -> fetchFiles"));
      throw err;
    }

Our Service Method

  deleteById(user: UserMetaData, param: FileOperationParam) {
    const { authorization, origin } = user;
    const { id, file_id } = param;
    return this.HttpProxyService.deleteById(`files/${file_id}`, {
      params: { external_id: id },
      authorization,
      origin,
    }).pipe(
      map((res) => {
        return res;
      })
    );
  }

Document Service will be calling some external Interface to make this action

  deleteById(url: string, config: ApiRequestConfig) {
    return this.http
      .delete(url, {
        baseURL: this.baseUrl,
        params: config.params,
        headers: this.configureHeaders(config),
      })
      .pipe(
        map((res) => {
          return res.data;
        }),
        catchError((err: AxiosError) => {
          verbose("[axios http error]", err);
          return throwError(() => err);
        }),
        catchError((err: AxiosError) => {
          verbose("[loggin for debugging error]", err);
          return throwError(() => this.handleError(err));
        })
      );
  }

There are many other unease where we can use rx js operators A simple Logging interceptor

import { Injectable, NestInterceptor, ExecutionContext, CallHandler } from '@nestjs/common';
import { Observable } from 'rxjs';
import { tap } from 'rxjs/operators';

@Injectable()
export class LoggingInterceptor implements NestInterceptor {
  intercept(context: ExecutionContext, next: CallHandler): Observable<any> {
    console.log('Before...');
    const now = Date.now();
    return next
      .handle()
      .pipe(
        tap(() => console.log(`After... ${Date.now() - now}ms`)),
      );
  }
}

References

Comments