NestJS with Kafkajs: A Powerful Combination for Building Scalable Applications
https://github.com/tkssharma/nestjs-kafka-monorepo
Introduction
Kafka is a distributed streaming platform that is widely used for real-time data processing and messaging. NestJS, on the other hand, is a progressive Node.js framework for building scalable and efficient applications. Combining Kafka with NestJS can create a powerful and scalable solution for various use cases.
Integrating Kafka with NestJS
To integrate Kafka with NestJS, we can use the @nestjs/microservices
package. This package provides a convenient way to create microservices that communicate using different protocols, including Kafka.
Creating a Kafka Producer
To create a Kafka producer in NestJS, we can use the @nestjs/microservices
module and the KafkaOptions
interface. Here's an example:
import { KafkaOptions, Transport } from '@nestjs/microservices'; @Injectable() export class KafkaProducerService { private readonly kafkaClient: ClientKafka; constructor() { this.kafkaClient = new ClientKafka({ transport: Transport.KAFKA, options: { client: { brokers: ['localhost:9092'], }, }, }); } async send(message: any) { await this.kafkaClient.emit('topic-name', message); } }
Creating a Kafka Consumer
To create a Kafka consumer in NestJS, we can use the @nestjs/microservices
module and the KafkaOptions
interface. Here's an example:
import { KafkaOptions, Transport } from '@nestjs/microservices'; @Injectable() export class KafkaConsumerService { private readonly kafkaClient: ClientKafka; constructor() { this.kafkaClient = new ClientKafka({ transport: Transport.KAFKA, options: { client: { brokers: ['localhost:9092'], }, }, }); } @EventPattern('topic-name') async handleEvent(message: any) { // Process the message } }
Using Kafka Producer and Consumer
Once you have created the producer and consumer, you can use them to send and receive messages. Here's an example:
@Controller('kafka') export class KafkaController { constructor(private readonly kafkaProducerService: KafkaProducerService) {} @Post() async sendMessage(@Body() message: any) { await this.kafkaProducerService.send(message); return { message: 'Message sent successfully' }; } }
Integrate nestjs with simple kafkajs library
import { Module } from '@nestjs/common'; import { KafkaService } from './kafka.service'; import { AppConfigModule } from '@app/config'; import { KafkaConsumerService } from './kafka.consumer.service'; import { KafkaProducerService } from './kafka.producer.service'; @Module({ imports: [AppConfigModule], providers: [KafkaConsumerService, KafkaProducerService], exports: [KafkaConsumerService, KafkaProducerService], }) export class KafkaModule { }
import { AppConfigService } from '@app/config'; import { Injectable, OnApplicationShutdown } from '@nestjs/common'; import { ConfigService } from '@nestjs/config'; import { ConsumerConfig, ConsumerSubscribeTopic, KafkaMessage } from 'kafkajs'; import { KafkaConsumer } from './consumer.service'; interface KafkajsConsumerOptions { topic: ConsumerSubscribeTopic; config: ConsumerConfig; onMessage: (message: KafkaMessage) => Promise<void>; } export interface IConsumer { connect: () => Promise<void>; disconnect: () => Promise<void>; consume: (message: any) => Promise<void>; } @Injectable() export class KafkaConsumerService implements OnApplicationShutdown { private readonly consumers: IConsumer[] = []; constructor( private readonly configService: AppConfigService, ) { } async consume({ topic, config, onMessage }: KafkajsConsumerOptions) { const consumer = new KafkaConsumer( topic, config, this.configService.kafka.broker ); await consumer.connect(); await consumer.consume(onMessage); this.consumers.push(consumer); } async onApplicationShutdown() { for (const consumer of this.consumers) { await consumer.disconnect(); } } }
Consumer service
import { Consumer, ConsumerConfig, ConsumerSubscribeTopic, Kafka, KafkaMessage, Producer } from "kafkajs"; import { IProducer } from "./kafka.producer.service"; import { Logger } from "@nestjs/common"; import { IConsumer } from "./kafka.consumer.service"; import * as retry from 'async-retry'; export const sleep = (timeout: number) => { return new Promise<void>((resolve) => setTimeout(resolve, timeout)); }; export class KafkaConsumer implements IConsumer { private readonly kafka: Kafka; private readonly consumer: Consumer; private readonly logger: Logger; constructor( private readonly topic: ConsumerSubscribeTopic, config: ConsumerConfig, broker: string) { this.kafka = new Kafka({ brokers: [broker] }) this.consumer = this.kafka.consumer(config); this.logger = new Logger(`${topic}-${config.groupId}`) } async consume(onMessage: (message: KafkaMessage) => Promise<void>) { await this.consumer.subscribe(this.topic) await this.consumer.run({ eachMessage: async ({ message, partition }) => { this.logger.debug(`Processing message partition: ${partition}`); try { await retry(async () => onMessage(message), { retries: 3, onRetry: (error, attempt) => this.logger.error( `Error consuming message, executing retry ${attempt}/3...`, error, ), }); } catch (err) { // handle failure of message // write then to DB table or log them // better write to DATABASE } }, }) } async connect() { try { await this.consumer.connect(); } catch (err) { this.logger.error('Failed to connect to Kafka. trying again ...', err); await sleep(5000); await this.connect(); } } async disconnect() { this.consumer.disconnect() } }
Conclusion
By integrating Kafka with NestJS, you can build scalable and distributed applications that can handle high volumes of data and real-time processing. The @nestjs/microservices
module provides a convenient way to interact with Kafka, making it easy to incorporate into your NestJS projects.
Top comments (1)
i dont understand this implementation, nestjs is doing this under the hood already all you need is the config and a controller as consumer for the topics