NestJS is a progressive Node.js framework that heavily embraces Observables to handle asynchronous tasks. Observables are particularly useful in NestJS Microservices for:
✅ Inter-Service Communication (using Kafka, Redis, RabbitMQ, etc.)
✅ Streaming Data (WebSockets, gRPC, etc.)
✅ Handling Long-running Tasks (e.g., background jobs)
Let’s dive into real-world examples of how Observables can be leveraged in NestJS Microservices. 🔥
1️⃣ Observables in NestJS Microservices
NestJS uses RxJS Observables as a core part of its design for handling async operations. The framework provides built-in support for Microservices and encourages the use of Observables for request-response patterns.
Example: NestJS Microservices Setup
Let’s say we have two microservices:
- Orders Service (publishes an event when an order is placed)
- Inventory Service (listens and updates stock levels)
Orders Microservice (Publisher)
import { Controller, Post } from '@nestjs/common'; import { Client, ClientProxy } from '@nestjs/microservices'; import { Observable } from 'rxjs'; @Controller('orders') export class OrdersController { @Client({ transport: Transport.REDIS, options: { host: 'localhost', port: 6379 } }) private client: ClientProxy; @Post('create') createOrder(): Observable<string> { return this.client.send('order_created', { productId: 1, quantity: 2 }); } }
🔹 Here, client.send()
returns an Observable that emits a response when the Inventory Service processes the event.
Inventory Microservice (Listener)
import { Controller } from '@nestjs/common'; import { MessagePattern } from '@nestjs/microservices'; @Controller() export class InventoryController { @MessagePattern('order_created') handleOrderCreated(data: { productId: number; quantity: number }): Observable<string> { console.log('Updating inventory for:', data); return new Observable((subscriber) => { // Simulate processing setTimeout(() => { subscriber.next('Inventory Updated ✅'); subscriber.complete(); }, 2000); }); } }
🔹 This service listens for order_created
messages and responds with an Observable.
Response in Orders Service
createOrder().subscribe((response) => console.log(response));
🟢 Output:
Updating inventory for: { productId: 1, quantity: 2 } Inventory Updated ✅
2️⃣ Observables in HTTP Services
If you're building a REST API inside a microservice, Observables can be used with HTTP clients like Axios (wrapped in from()
to convert Promises to Observables).
Example: Fetching Data from Another Microservice
import { Injectable } from '@nestjs/common'; import { HttpService } from '@nestjs/axios'; import { Observable, map } from 'rxjs'; @Injectable() export class ProductService { constructor(private httpService: HttpService) {} getProductDetails(productId: number): Observable<any> { return this.httpService.get(`http://inventory-service/products/${productId}`).pipe( map((response) => response.data) // Transform response ); } }
🔹 This helps us keep the reactive approach even for HTTP calls.
3️⃣ Streaming Data Using Observables
NestJS supports WebSockets & gRPC, both of which work well with Observables.
Example: Real-time Stock Updates using WebSockets
1️⃣ Gateway (WebSocket Server)
import { WebSocketGateway, SubscribeMessage, WebSocketServer } from '@nestjs/websockets'; import { Observable, interval, map } from 'rxjs'; @WebSocketGateway() export class StockGateway { @WebSocketServer() server; @SubscribeMessage('stock_updates') stockUpdates(): Observable<{ stock: number }> { return interval(2000).pipe( map(() => ({ stock: Math.floor(Math.random() * 100) })) // Random stock value ); } }
2️⃣ Client (WebSocket Frontend)
const socket = io('http://localhost:3000'); socket.emit('stock_updates'); socket.on('stock_updates', (data) => { console.log('Live Stock:', data); });
🔹 The stock updates stream continuously using an Observable interval.
4️⃣ Handling Long-running Tasks with Observables
In some cases, we might need to process large data asynchronously. Instead of blocking the request, we can return an Observable that emits data progressively.
Example: Streaming Large Report Data
import { Controller, Get } from '@nestjs/common'; import { Observable, interval, take, map } from 'rxjs'; @Controller('reports') export class ReportsController { @Get('generate') generateReport(): Observable<string> { return interval(1000).pipe( take(5), // Emit 5 values (simulate processing) map((count) => `Processing chunk ${count + 1}...`) ); } }
🟢 Client Output (after hitting /reports/generate
):
Processing chunk 1... Processing chunk 2... Processing chunk 3... Processing chunk 4... Processing chunk 5...
🔹 This prevents blocking and streams responses incrementally.
Why Use Observables in NestJS? 🤔
✔ Better async handling – Unlike Promises, Observables allow multiple values over time.
✔ Reactive programming – Works great with real-time updates (WebSockets, Kafka, etc.).
✔ Powerful Operators – map()
, filter()
, mergeMap()
make async transformations easier.
✔ Built-in NestJS Support – Microservices, WebSockets, and gRPC all use Observables by default.
Conclusion 🎯
Observables in NestJS shine when working with Microservices, real-time applications, and long-running processes. They provide a powerful way to handle asynchronous tasks efficiently, making your app more reactive and scalable.
🔥 If you’re building NestJS Microservices, start using Observables today! 🚀
Top comments (0)