In microservices architectures, AWS SQS and SNS are essential services for connecting and managing communication between distributed systems. In this blog post, we will guide you through setting up AWS SQS and AWS SNS in a NestJS project using TypeScript. You’ll also learn how to use the SNS fan-out pattern to broadcast messages and subscribe to those messages using SQS.
Additionally, you will get the full setup, including SQLite database integration to save processed messages and SQS polling to automatically process and manage messages from the queue.
SNS Fanout Pattern:
The SNS fan-out pattern is a mechanism through which a single message is sent to multiple subscribers. Instead of sending separate messages to each subscriber, a message is published to an SNS topic, and multiple subscribers consume it through SQS or other supported endpoints.
For example, consider a scenario where we have an Order Service, Invoice Service, and Order-Tracking Shipment Service. When a message is published to an SNS topic, such as order-created, by the Order Service, other services can listen to the message through SQS and begin processing it.
- The Invoice Service will generate an invoice for the order.
- The Order-Tracking Shipment Service will initiate order tracking and shipment.
This allows for efficient and decoupled message handling across multiple services.
Setting Up a NestJS Project with SQS and SNS.
Prerequisites
1. Basic understanding of NestJS.
2. An AWS account.
3. Familiarity with SQS (Simple Queue Service) and SNS (Simple Notification Service).
4. Node.js installed on your system.
Step 1: Setting Up NestJS
First, create a new NestJS project:npx @nestjs/cli new nest--project
Follow the CLI prompts to complete the setup.
Step 2: Install Required Dependencies
1. Navigate to the project Directory
cd nest-project
2. Install the AWS SDK modules for SQS and SNS, along with other necessary packages:
npm install @aws-sdk/client-sns @aws-sdk/client-sqs
3. Install additional dependencies for SQS polling and database support:
npm install sqs-consumer @nestjs/typeorm typeorm sqlite3
4. Install other dependencies
npm install @nestjs/config uuid dotenv
Step 3: Configure Environment Variables
Create a .env
file in the root of your project and add your AWS credentials:
AWS_ACCESS_KEY_ID=<your-access-key-id>
AWS_SECRET_ACCESS_KEY=<your-secret-access-key>
AWS_REGION=<your-region>
SNS_TOPIC_ARN=<your-sns-topic-arn>
SQS_QUEUE_URL=<your-sqs-queue-url>
Load these variables globally using NestJS’s ConfigModule, and update imports in app.module.ts
@Module({
imports: [
ConfigModule.forRoot({
isGlobal: true, // Makes the configuration available globally
}),
],
…
})
Code language: JavaScript (javascript)
Step 4: Set Up SQS in AWS
1. Create an SQS Queue:
1. Log in to the AWS Management Console.
2. Navigate to the SQS service.
3. Click on Create Queue.
4. Choose either a Standard or FIFO queue based on your needs.
5. Configure the queue settings (e.g., name, visibility timeout).
6. Click Create Queue.
For more details, refer to the Amazon SQS Documentation.
Step 5: Set Up SNS in AWS
1. Create an SNS Topic:
1. Navigate to the SNS service in the AWS Management Console.
2. Click on Create topic.
3. Choose either a Standard or FIFO topic based on your requirements.
4. Enter a name for your topic and configure additional settings.
5. Click Create topic.
2. Subscribe to the SQS Queue to the SNS Topic:
1. Navigate to your SNS topic.
2. Click Create Subscription.
3. Choose Amazon SQS as the protocol.
4. Enter the ARN of your SQS queue.
5. Click Create Subscription.
6. Add permissions to allow SNS to publish to your SQS queue.
For more details on SNS, check out the Amazon SNS Documentation.
Step 6: Configure SQLite database in NestJS:
1. Create the SqsMessage Entity:
Define the SqsMessage
entity in sqs/models/sqs-message.ts
:
import { Entity, PrimaryGeneratedColumn, Column } from 'typeorm';
@Entity({ name: 'sqs_message' })
export class SqsMessage {
@PrimaryGeneratedColumn()
id: number;
@Column()
messageId: string;
@Column({ name: 'message_body' })
messageBody: string;
@Column({ name: 'processed_at' })
processedAt: Date;
@Column({ name: 'message' })
message: string;
@Column({ name: 'source_type' })
sourceType: string;
@Column({ name: 'deleted_at', nullable: true })
deletedAt?: Date;
}
Code language: JavaScript (javascript)
2. Create the Data Access Object for SqsMessage:
Create sqs/daos/sqs-message.dao.ts
:
import { Injectable } from '@nestjs/common';
import { DataSource } from 'typeorm';
import { SqsMessage } from '../models/sqs-message';
@Injectable()
export class SqsMessageDao {
constructor(private readonly dataSource: DataSource) {}
public async createSqsMessage(message: SqsMessage): Promise<SqsMessage> {
return await this.dataSource.manager.transaction(async (manager) => {
return await manager.save(message);
});
}
public async getSqsMessageById(messageId: string): Promise<SqsMessage> {
return await this.dataSource.manager.transaction(async (manager) => {
return await manager
.createQueryBuilder()
.select('s')
.from(SqsMessage, 's')
.where('s.deletedAt IS NULL')
.andWhere('s.messageId = :messageId', { messageId })
.getOne();
});
}
}
Code language: JavaScript (javascript)
3. Create an enum for Message Source Type sqs/utils/constants.ts
export enum MessageSourceType {
SQS = 'SQS',
SNS = 'SNS',
}
Code language: JavaScript (javascript)
4. Configure TypeORM:
Update app.module.ts
to include TypeORM
configuration:
@Module({
imports: [
…
TypeOrmModule.forRoot({
type: 'sqlite',
database: 'sqlite.db',
entities: [SqsMessage],
synchronize: true,
}),
TypeOrmModule.forFeature([SqsMessage]),
…
],
…
})
Code language: JavaScript (javascript)
Step 7: Implement SQS in NestJS
1. Create the SQS Producer Service:
Create sqs/sqs.producer.service.ts
to send messages to SQS:
import { SendMessageCommand, SQSClient } from '@aws-sdk/client-sqs';
import { Injectable } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { v4 as uuid } from 'uuid';
@Injectable()
export class SqsProducerService {
private sqsClient: SQSClient;
private queueUrl: string;
constructor(private configService: ConfigService) {
this.sqsClient = new SQSClient({
region: this.configService.get('AWS_REGION'),
credentials: {
accessKeyId: this.configService.get('AWS_ACCESS_KEY_ID'),
secretAccessKey: this.configService.get('AWS_SECRET_ACCESS_KEY'),
},
});
this.queueUrl = this.configService.get('SQS_QUEUE_URL');
}
async sendMessage(message: string) {
try {
const command = new SendMessageCommand({
QueueUrl: this.queueUrl,
MessageBody: message,
MessageGroupId: 'sqs-message-group',
MessageDeduplicationId: uuid(),
});
const response = await this.sqsClient.send(command);
console.log('SQS message sent: ', message);
return response;
}catch (error) {
console.log('error', error);
}
}
}
Code language: JavaScript (javascript)
2. Create the SQS Consumer Service:
Create sqs/sqs.consumer.service.ts
to integrate sqs-consumer
for polling messages.
Implement logic to process each message, save it in the SQLite database, handle duplicates by checking for existing records, and delete processed messages from the queue.
/* eslint-disable @typescript-eslint/no-unused-vars */
import { DeleteMessageCommand, Message, SQSClient } from '@aws-sdk/client-sqs';
import { Injectable, OnModuleDestroy, OnModuleInit } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { Consumer } from 'sqs-consumer';
import { SqsMessageDao } from './daos/sqs-message.dao';
import { SqsMessage } from './models/sqs-message';
import { MessageSourceType } from './utils/constants';
@Injectable()
export class SqsConsumerService implements OnModuleInit, OnModuleDestroy {
private sqsClient: SQSClient;
private queueUrl: string;
private consumer: Consumer;
constructor(
private configService: ConfigService,
private sqsMessageDao: SqsMessageDao,
){
this.sqsClient = new SQSClient({
region: this.configService.get('AWS_REGION'),
credentials: {
accessKeyId: this.configService.get('AWS_ACCESS_KEY_ID'),
secretAccessKey: this.configService.get('AWS_SECRET_ACCESS_KEY'),
},
});
this.queueUrl = this.configService.get('SQS_QUEUE_URL');
}
async deleteConsumedMessage(receiptHandle: string) {
try {
const command = new DeleteMessageCommand({
QueueUrl: this.queueUrl,
ReceiptHandle: receiptHandle,
});
const response = await this.sqsClient.send(command);
return response;
}catch (error) {
console.log('Delete message error', error);
}
}
public async onModuleInit(): Promise<void> {
this.pollMessages();
}
private formatSQSMessage(message: Message) {
const messageBody = message.Body;
let sourceType = '';
let messageText = '';
try {
const parsedMessageBody = JSON.parse(messageBody);
if(parsedMessageBody && parsedMessageBody.Message) {
messageText = parsedMessageBody.Message;
}
if(
parsedMessageBody?.Type === 'Notification' &&
parsedMessageBody?.TopicArn
){
sourceType = MessageSourceType.SNS;
}else if (messageBody) {
messageText = messageBody;
sourceType = MessageSourceType.SQS;
}
}catch (error) {
sourceType = MessageSourceType.SQS;
messageText = messageBody;
}
return {
messageText: messageText,
sourceType: sourceType,
};
}
private async saveMessageInDb(message: Message) {
const { messageText, sourceType } = this.formatSQSMessage(message);
const sqsMessage = new SqsMessage();
sqsMessage.messageId = message.MessageId;
sqsMessage.messageBody = message.Body;
sqsMessage.processedAt = new Date();
sqsMessage.sourceType = sourceType;
sqsMessage.message = messageText;
this.sqsMessageDao.createSqsMessage(sqsMessage);
}
private async processMessage(message: Message) {
const sqsMessageFromDb = await this.sqsMessageDao.getSqsMessageById(
message.MessageId,
);
if(!sqsMessageFromDb || !sqsMessageFromDb.processedAt) {
//process message
console.log('Received new message:', message.Body);
this.saveMessageInDb(message);
}
this.deleteConsumedMessage(message.ReceiptHandle);
}
private pollMessages = () => {
this.consumer = Consumer.create({
queueUrl: this.queueUrl,
handleMessage: async (message: Message) => {
this.processMessage(message);
},
waitTimeSeconds: 1,
pollingWaitTimeMs: 2000, //configure polling wait time
shouldDeleteMessages: false, //if set to true, consumer will delete the messages
sqs: this.sqsClient,
});
this.consumer.on('error', (err) => {
console.error(err.message);
});
this.consumer.on('processing_error', (err) => {
console.error(err.message);
});
this.consumer.on('timeout_error', (err) => {
console.error(err.message);
});
this.consumer.start();
};
public async onModuleDestroy() {
this.consumer?.stop({ abort: true });
}
}
Code language: JavaScript (javascript)
3. Create the SQS controller:
Create sqs/sqs.controller.ts
to send message to the SQS queue:
import { Controller, Post, Body } from '@nestjs/common';
import { SqsProducerService } from './sqs.producer.service';
@Controller('sqs')
export class SqsController {
constructor(private sqsProducerService: SqsProducerService) {}
@Post('publish')
async publishMessage(@Body() body: { message: string }) {
const { message } = body;
const messageResponse = await this.sqsProducerService.sendMessage(message);
return{
status: 'Message published',
message,
messageId: messageResponse.MessageId,
};
}
}
Code language: JavaScript (javascript)
4. Create the SQS Module to manage SQS services:
Create sqs/sqs.module.ts
:
import { Module } from '@nestjs/common';
import { SqsProducerService } from './sqs.producer.service';
import { SqsConsumerService } from './sqs.consumer.service';
import { SqsController } from './sqs.controller';
import { SqsMessageDao } from './daos/sqs-message.dao';
@Module({
imports: [],
controllers: [SqsController],
providers: [SqsProducerService, SqsConsumerService, SqsMessageDao],
exports: [SqsProducerService, SqsConsumerService],
})
export class SqsModule {}
Code language: JavaScript (javascript)
Step 8. Implement SNS in NestJS
1. Create the SNS Publisher Service:
Create sns/sns.publisher.service.ts
to send notification messages:
import { PublishCommand, SNSClient } from '@aws-sdk/client-sns';
import { Injectable } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { v4 as uuid } from 'uuid';
@Injectable()
export class SnsPublisherService {
private snsClient: SNSClient;
private topicArn: string;
constructor(private configService: ConfigService) {
this.snsClient = new SNSClient({
region: this.configService.get('AWS_REGION'),
credentials: {
accessKeyId: this.configService.get('AWS_ACCESS_KEY_ID'),
secretAccessKey: this.configService.get('AWS_SECRET_ACCESS_KEY'),
},
});
this.topicArn = this.configService.get('SNS_TOPIC_ARN');
}
async publishMessage(message: string) {
try {
const command = new PublishCommand({
TopicArn: this.topicArn,
Message: message,
MessageGroupId: 'sns-message-group',
MessageDeduplicationId: uuid(),
});
const response = await this.snsClient.send(command);
console.log('SNS message sent: ', message);
return response;
}catch (error) {
console.log('Error', error);
}
}
}
Code language: JavaScript (javascript)
2. Create the Create SNS Controller:
Create sns/sns.controller.ts
to send message to the SNS topic:
import { Controller, Post, Body } from '@nestjs/common';
import { SnsPublisherService } from './sns.publisher.service';
@Controller('sns')
export class SnsController {
constructor(private snsService: SnsPublisherService) {}
@Post('publish')
async publishMessage(@Body() body: { message: string }) {
const { message } = body;
const messageResponse = await this.snsService.publishMessage(message);
return {
status: 'Message published',
message,
messageId: messageResponse.MessageId,
};
}
}
Code language: JavaScript (javascript)
3. Create the SNS Module to manage SNS services:
Create sns/sns.module.ts
import { Module } from '@nestjs/common';
import { SnsPublisherService } from './sns.publisher.service';
import { SnsController } from './sns.controller';
@Module({
imports: [],
controllers: [SnsController],
providers: [SnsPublisherService],
exports: [SnsPublisherService],
})
export class SnsModule {}
Code language: JavaScript (javascript)
Step 9. Setup App module to use SQS and SNS module
Update app.module.ts
to use the SQS and SNS modules:
import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { SnsModule } from './sns/sns.module';
import { SqsModule } from './sqs/sqs.module';
import { ConfigModule } from '@nestjs/config';
import { TypeOrmModule } from '@nestjs/typeorm';
import { SqsMessage } from './sqs/models/sqs-message';
@Module({
imports: [
TypeOrmModule.forRoot({
type: 'sqlite',
database: 'sqlite.db',
entities: [SqsMessage],
synchronize: true, // Automatically sync the database schema (disable in production)
}),
TypeOrmModule.forFeature([SqsMessage]),
SnsModule,
SqsModule,
ConfigModule.forRoot({
isGlobal: true,
}),
],
controllers: [AppController],
providers: [AppService],
})
export class AppModule {}
Code language: JavaScript (javascript)
Step 10: Run and Test the Application
1. Start the application using npm start
.
2. Send a POST request to http://localhost:3000/sns/publish
with the body:
{ “message”: “Test Order Created” }
Check the response, it should confirm that the message was published.
You will also see the logs:SNS message sent: Test Order Created
Received new message: Message object
3. Send a POST request to http://localhost:3000/sqs/publish
with the body:
{ “message”: “SQS Test Order Created” }
Check the response, it should confirm that the message was published.
You will also see the logs:SQS message sent: SQS Test Order Created
Received new message: SQS Test Order Created
You can find the complete AWS SQS and SNS integration codebase for NestJS on our Github
Conclusion
In this blog, you’ve explored how AWS SQS and SNS are integral to microservices architecture, facilitating scalable and efficient message handling. By following the step-by-step guide, you’ve successfully configured SNS fanout and SQS subscriptions within a NestJS application using TypeScript. These integrations enable seamless communication between microservices, ensuring reliable message processing and event-driven architecture. With this knowledge, you can build robust, distributed systems capable of handling real-time data and notifications effectively.
Author's Bio
Chetan Shelake is a Principal Software Engineer at Mobisoft Infotech with 7.5 years of experience in web, mobile, and backend development. Specializing in React, React Native, and Node, he is dedicated to building innovative and user-friendly applications. Chetan is passionate about delivering scalable solutions and staying ahead in emerging technologies.