DEV Community

Abhinav
Abhinav

Posted on

Using Observables in NestJS Microservices 🚀

NestJS is a progressive Node.js framework that heavily embraces Observables to handle asynchronous tasks. Observables are particularly useful in NestJS Microservices for:

Inter-Service Communication (using Kafka, Redis, RabbitMQ, etc.)

Streaming Data (WebSockets, gRPC, etc.)

Handling Long-running Tasks (e.g., background jobs)

Let’s dive into real-world examples of how Observables can be leveraged in NestJS Microservices. 🔥


1️⃣ Observables in NestJS Microservices

NestJS uses RxJS Observables as a core part of its design for handling async operations. The framework provides built-in support for Microservices and encourages the use of Observables for request-response patterns.

Example: NestJS Microservices Setup

Let’s say we have two microservices:

  • Orders Service (publishes an event when an order is placed)
  • Inventory Service (listens and updates stock levels)

Orders Microservice (Publisher)

import { Controller, Post } from '@nestjs/common'; import { Client, ClientProxy } from '@nestjs/microservices'; import { Observable } from 'rxjs'; @Controller('orders') export class OrdersController { @Client({ transport: Transport.REDIS, options: { host: 'localhost', port: 6379 } }) private client: ClientProxy; @Post('create') createOrder(): Observable<string> { return this.client.send('order_created', { productId: 1, quantity: 2 }); } } 
Enter fullscreen mode Exit fullscreen mode

🔹 Here, client.send() returns an Observable that emits a response when the Inventory Service processes the event.


Inventory Microservice (Listener)

import { Controller } from '@nestjs/common'; import { MessagePattern } from '@nestjs/microservices'; @Controller() export class InventoryController { @MessagePattern('order_created') handleOrderCreated(data: { productId: number; quantity: number }): Observable<string> { console.log('Updating inventory for:', data); return new Observable((subscriber) => { // Simulate processing setTimeout(() => { subscriber.next('Inventory Updated ✅'); subscriber.complete(); }, 2000); }); } } 
Enter fullscreen mode Exit fullscreen mode

🔹 This service listens for order_created messages and responds with an Observable.

Response in Orders Service

createOrder().subscribe((response) => console.log(response)); 
Enter fullscreen mode Exit fullscreen mode

🟢 Output:

Updating inventory for: { productId: 1, quantity: 2 } Inventory Updated ✅ 
Enter fullscreen mode Exit fullscreen mode

2️⃣ Observables in HTTP Services

If you're building a REST API inside a microservice, Observables can be used with HTTP clients like Axios (wrapped in from() to convert Promises to Observables).

Example: Fetching Data from Another Microservice

import { Injectable } from '@nestjs/common'; import { HttpService } from '@nestjs/axios'; import { Observable, map } from 'rxjs'; @Injectable() export class ProductService { constructor(private httpService: HttpService) {} getProductDetails(productId: number): Observable<any> { return this.httpService.get(`http://inventory-service/products/${productId}`).pipe( map((response) => response.data) // Transform response ); } } 
Enter fullscreen mode Exit fullscreen mode

🔹 This helps us keep the reactive approach even for HTTP calls.


3️⃣ Streaming Data Using Observables

NestJS supports WebSockets & gRPC, both of which work well with Observables.

Example: Real-time Stock Updates using WebSockets

1️⃣ Gateway (WebSocket Server)

import { WebSocketGateway, SubscribeMessage, WebSocketServer } from '@nestjs/websockets'; import { Observable, interval, map } from 'rxjs'; @WebSocketGateway() export class StockGateway { @WebSocketServer() server; @SubscribeMessage('stock_updates') stockUpdates(): Observable<{ stock: number }> { return interval(2000).pipe( map(() => ({ stock: Math.floor(Math.random() * 100) })) // Random stock value ); } } 
Enter fullscreen mode Exit fullscreen mode

2️⃣ Client (WebSocket Frontend)

const socket = io('http://localhost:3000'); socket.emit('stock_updates'); socket.on('stock_updates', (data) => { console.log('Live Stock:', data); }); 
Enter fullscreen mode Exit fullscreen mode

🔹 The stock updates stream continuously using an Observable interval.


4️⃣ Handling Long-running Tasks with Observables

In some cases, we might need to process large data asynchronously. Instead of blocking the request, we can return an Observable that emits data progressively.

Example: Streaming Large Report Data

import { Controller, Get } from '@nestjs/common'; import { Observable, interval, take, map } from 'rxjs'; @Controller('reports') export class ReportsController { @Get('generate') generateReport(): Observable<string> { return interval(1000).pipe( take(5), // Emit 5 values (simulate processing) map((count) => `Processing chunk ${count + 1}...`) ); } } 
Enter fullscreen mode Exit fullscreen mode

🟢 Client Output (after hitting /reports/generate):

Processing chunk 1... Processing chunk 2... Processing chunk 3... Processing chunk 4... Processing chunk 5... 
Enter fullscreen mode Exit fullscreen mode

🔹 This prevents blocking and streams responses incrementally.


Why Use Observables in NestJS? 🤔

Better async handling – Unlike Promises, Observables allow multiple values over time.

Reactive programming – Works great with real-time updates (WebSockets, Kafka, etc.).

Powerful Operatorsmap(), filter(), mergeMap() make async transformations easier.

Built-in NestJS Support – Microservices, WebSockets, and gRPC all use Observables by default.


Conclusion 🎯

Observables in NestJS shine when working with Microservices, real-time applications, and long-running processes. They provide a powerful way to handle asynchronous tasks efficiently, making your app more reactive and scalable.

🔥 If you’re building NestJS Microservices, start using Observables today! 🚀


Top comments (0)