Nest JS with AWS components like SNS, SQS, S3 and many more..

Nest JS with AWS components like SNS, SQS, S3 and many more

Recently i was exploring some existing module to work with AWS

  • SQS with nestjs
  • SNS with nestjs
  • S3 with nestjs

There are some modules which can make your task easier to deal with SNS, SQS

First of all: Install below dependencies :

npm i @ssut/nestjs-sqs
npm i aws-sdk

I will mainly tackle two different scenarios:

  • Producing messages to an existing SQS.
  • Consuming messages from an existing SQS.
  • Setting Up the Producer:

Create producer.module.ts

import { Module } from '@nestjs/common';
import { SqsModule } from '@ssut/nestjs-sqs';
import { MessageProducer } from './producer.service';
import * as AWS from 'aws-sdk';
import { config } from '../config';

AWS.config.update({
    region: config.AWS_REGION, // aws region
    accessKeyId: config.ACCESS_KEY_ID, // aws access key id
    secretAccessKey: config.SECRET_ACCESS_KEY, // aws secret access key
});

@Module({
    imports: [
        SqsModule.register({
            consumers: [],
            producers: [
                {
                    name: config.TEST_QUEUE, // name of the queue
                    queueUrl: config.TEST_QUEUE_URL, 
                    region: config.AWS_REGION, // url of the queue
                },
            ],
        }),
    ],
    controllers: [],
    providers: [MessageProducer],
    exports: [MessageProducer]
})
export class ProducerModule { }

Create Producer function in producer.service.ts

import { Injectable } from '@nestjs/common';
import { SqsService } from '@ssut/nestjs-sqs';
import { config } from '../config';
@Injectable()
export class MessageProducer {
    constructor(private readonly sqsService: SqsService) { }
    async sendMessage(body: any) {

        const message: any = JSON.stringify(body);

        try {
            await this.sqsService.send(config.TEST_QUEUE, message);
        } catch (error) {
            console.log('error in producing image!', error);
        }

    }
}

Lastly Import the producer in the root app.module.ts

import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { ProducerModule } from './producer/producer.module';

@Module({
  imports: [
    ProducerModule,
  ],
  controllers: [AppController],
  providers: [AppService],
})
export class AppModule {}
  • Setting up the consumer.
  • Create consumer.module.ts
import { Module } from '@nestjs/common';
import { SqsModule } from '@ssut/nestjs-sqs';
import { MessageHandler } from './messageHandler';
import * as AWS from 'aws-sdk';
import { config} from '../config';


AWS.config.update({
    region: config.AWS_REGION,
    accessKeyId: config.ACCESS_KEY_ID,
    secretAccessKey: config.SECRET_ACCESS_KEY,
});
@Module({
    imports: [
        SqsModule.register({
            consumers: [
                {
                    name: config.TEST_QUEUE, // name of the queue 
                    queueUrl: config.TEST_QUEUE, // the url of the queue
                    region: config.AWS_REGION,
                },
            ],
            producers: [],
        }),
    ],
    controllers: [],
    providers: [MessageHandler],
})
export class ConsumerModule { }

Set up your message handler to consume the messages messageHandler.ts

import { Injectable } from '@nestjs/common';
import { SqsMessageHandler } from '@ssut/nestjs-sqs';
import * as AWS from 'aws-sdk';
import { config } from '../config';


console.log('config.AWS_REGION', config);
@Injectable()
export class MessageHandler {
    constructor() { }
    @SqsMessageHandler(config.TEST_QUEUE, false)
    async handleMessage(message: AWS.SQS.Message) {
        const obj: any = JSON.parse(message.Body) as {
            message: string;
            date: string;
        };
        const { data } = JSON.parse(obj.Message);

        // use the data and consume it the way you want // 
    }
}

Lastly import the consumer module in app.module.ts

import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { ConsumerModule } from './consumer/consumer.module';
import { ProducerModule } from './producer/producer.module';

@Module({
  imports: [
    ProducerModule,
    ConsumerModule
  ],
  controllers: [AppController],
  providers: [AppService],
})
export class AppModule {}

Now we have consumer and producer both available in main module

Lets see how to work with SNS Topics

Amazon Simple Notification Service (Amazon SNS) is a fully managed messaging service for both application-to-application (A2A) and application-to-person (A2P) communication.

The A2A pub/sub functionality provides topics for high-throughput, push-based, many-to-many messaging between distributed systems, microservices, and event-driven serverless applications. Using Amazon SNS topics, your publisher systems can fanout messages to a large number of subscriber systems, including Amazon SQS queues, AWS Lambda functions, HTTPS endpoints, and Amazon Kinesis Data Firehose, for parallel processing. The A2P functionality enables you to send messages to users at scale via SMS, mobile push, and email.

  • we will create sns service and sns module
// Package.
import { SNS } from "aws-sdk";
import { Injectable } from "@nestjs/common";
import { v4 as uuid } from "uuid";

// Internal.
import { AppConfigService, Stage } from "@lib/config";
import { Logger } from "@lib/logger";

// Code.
@Injectable()
export default class AWSSNSService {
  private client: AWS.SNS;

  constructor(
    private configService: AppConfigService,
    private readonly logger: Logger
  ) {
    this.client = new SNS({
      region: "us-west-1",
      endpoint:
        configService.stage === Stage.LOCAL
          ? "http://localhost:4566"
          : undefined,
    });
  }

  public getTopic(type: string) {
    switch (type) {
      // TBD create enum of all event types
      case "events":
        return this.configService.sns.topicname;
    }
  }

  async publishMessage(payload: any, type: string) {
    if (this.configService.stage === Stage.LOCAL) {
      return {
        MessageId: uuid(),
        SequenceNumber: 1,
      };
    }

    const params = {
      Message: JSON.stringify(payload),
      TopicArn: this.getTopic(type),
    };

    // Create promise and SNS service object
    const response = await this.client.publish(params).promise();
    this.logger.log(
      `Message ${params.Message} sent to the topic ${params.TopicArn}`
    );
    this.logger.log("MessageID is " + response.MessageId);

    return {
      MessageId: response.MessageId,
      SequenceNumber: response.SequenceNumber,
    };
  }
}

now we can crate module for SNS

// Package.
import { Global, Module } from "@nestjs/common";
// Internal.
import { AppConfigModule } from "@lib/config";
import AWSSNSService from "./aws-sns.service";
import { LoggerModule } from "@lib/logger";

// Code.
@Global()
@Module({
  imports: [AppConfigModule, LoggerModule],
  providers: [AWSSNSService],
  exports: [AWSSNSService],
})
export class AWSSNSModule {}

Lets see how to work with S3 buckets

Amazon Simple Storage Service (Amazon S3) is an object storage service that offers industry-leading scalability, data availability, security, and performance. You can use Amazon S3 to store and retrieve any amount of data at any time, from anywhere.

lets create S3 service and S3 module

// Package.
import { S3, config } from "aws-sdk";
import { Injectable } from "@nestjs/common";
import moment from "moment";

// Internal.
import { AppConfigService } from "@lib/config";

// Code.
@Injectable()
export default class AWSS3Service {
  private client: AWS.S3;

  constructor(private configService: AppConfigService) {
    this.client = new S3({
      region: "eu-central-1",
      credentials: {
        accessKeyId: process.env.AWS_ACCESS_KEY!,
        secretAccessKey: process.env.AWS_SECRET_ACCESS!,
      },
    });
  }

  async upload(file: any, key: string, originalname: string) {
    // TODO: ideally the bucket must be parametrized here
    const params = {
      Bucket: this.configService.aws.s3MediumBucket,
      Key: key,
      Body: file,
    };
    const uploadResponse = await this.client.upload(params).promise();

    const url = await this.getPresignedUrl(key, originalname);
    return { ...uploadResponse, url };
  }

  async get(bucket: string, key: string) {
    const params = {
      Bucket: bucket,
      Key: key,
    };
    return await this.client.getObject(params).promise();
  }
}
// Package.
import { Global, Module } from "@nestjs/common";

// Internal.
import { AppConfigModule } from "@lib/config";
import AWSS3Service from "./aws-s3.service";

// Code.
@Global()
@Module({
imports: [AppConfigModule],
providers: [AWSS3Service],
exports: [AWSS3Service],
})
export class AWSS3Module {}

Setting up AWS setup locally we need localstack container or we can have actual aws environment bu updating AWS config

AWS.config.update({
    region: config.AWS_REGION,
    accessKeyId: config.ACCESS_KEY_ID,
    secretAccessKey: config.SECRET_ACCESS_KEY,
});

or just use localstack containers

version: "3.9"

services:
  localstack:
    container_name: "${LOCALSTACK_DOCKER_NAME-localstack_main}"
    image: localstack/localstack
    network_mode: bridge
    ports:
      - "4566:4566"
      - "4571:4571"
      - "${PORT_WEB_UI-8080}:${PORT_WEB_UI-8080}"
    environment:
      # https://github.com/localstack/localstack#configurations
      - SERVICES=dynamodb,sts,ssm,iam,cloudformation,s3,kms,sns
      - DEBUG=1
      - DEFAULT_REGION=us-east-1
      - DOCKER_HOST=unix:///var/run/docker.sock
    volumes:
      - "${TMPDIR:-/tmp/mercanis-localstack}:/tmp/mercanis-localstack"
      - "/var/run/docker.sock:/var/run/docker.sock"

this container is bringing up all these services dynamodb,sts,ssm,iam,cloudformation,s3,kms,sns for local setup we can connect and access these services in local aws environments for testing purpose

Comments