Hello, in this article , we are going to implement a message queue in node.js using BullMQ library which is built on the top of redis.
We will implement two message queues. One for adding refund task for a particular order. On successful completion of refund task, we will initiate notification task to notify user about refund completion. For notification task, we will use another queue.
Step 1: Setting up Project
Make a new folder "messaging_queue" and initialize it with npm init and add dependencies.
mkdir messaging_queue cd messaging_queue npm init npm i express bullmq -D Step 2: Implementation of Queue
Firstly, create a refundQueue.js file to write the code for implementing a refundQueue and a function to add refund tasks to refundQueue.
const { Queue } =require("bullmq") const refundQueue = new Queue("refund-queue",{ connection:{ host:"127.0.0.1", port:"6379" } }) async function addRefundTask(order){ const response = await refundQueue.add(`refund-to-${order.id}`,{ amount:order.amount, id:order.id, user_id:order.user_id }) console.log("Job added to refundQueue with id: ",response.id) } exports.addRefundTask=addRefundTask Now, create a notificationQueue.js file to write the code for implementing a notificationQueue and a function to add notification tasks to notificationQueue.
const { Queue } =require("bullmq") const notificationQueue = new Queue("notification-queue",{ connection:{ host:"127.0.0.1", port:"6379" } }) async function addNotificationTask(user_id,order_id){ const response = await notificationQueue.add(`notification-to-${user_id}`,{ order_id:order_id, user_id:user_id }) console.log("Job added to notificationQueue with id: ",response.id) } exports.addNotificationTask=addNotificationTask Step 3: Refund and Notification Process
Now, create two files refundProcess.js and notificationProcess.js to write the code for implementation of refund and notification process.
refundProcess.js
const refundComplete=()=>new Promise((res,rej)=>setTimeout(()=>res(),4*1000)); async function refundProcess(id,amount,user_id){ console.log(`Refund for order ${id} has started`); console.log(`Refund Amount: ${amount}`) console.log(`User ID: ${user_id}`) await refundComplete() console.log("Refund Completed Successfully!") } exports.refundProcess=refundProcess notificationProcess.js
const notificationComplete=()=>new Promise((res,rej)=>setTimeout(()=>res(),2*1000)); async function notificationProcess(user_id,order_id){ console.log(`Notify user ${user_id} about refund for order ${order_id}`) await notificationComplete() console.log("Notification Sent Successfully!") } exports.notificationProcess=notificationProcess Step 4: Implementation of Worker
Now, create a new file worker.js to implement refundWorker and notificationWorker.
const { Worker } =require("bullmq") const { refundProcess } =require("./refundProcess") const { notificationProcess } =require("./notificationProcess") const refundWorker = new Worker("refund-queue", async(job)=>{ console.log(`Refund Job ${job.id} started`) await refundProcess(job.data.id,job.data.amount,job.data.user_id) }) const notificationWorker = new Worker("notification-queue", async(job)=>{ console.log(`Notification Job ${job.id} started`) await notificationProcess(job.data.user_id,job.data.order_id) }) exports.refundWorker= refundWorker exports.notificationWorker=notificationWorker Step 5: Express Server
Now, in index.js file , write code for implementation of express server.
const express=require('express') const { addRefundTask } = require("./refundQueue") const { refundWorker } = require("./worker") const { addNotificationTask } = require('./notificationQueue') const { notificationWorker }=require('./worker') async function init(){ const app=express() const PORT = 8000; const order1={ id:"order1", amount:4000, user_id:"user1" } const order2={ id:"order2", amount:10000, user_id:"user2" } app.listen(PORT,()=>{ console.log("Server running at port 8000") }) await addRefundTask(order1); await addRefundTask(order2) refundWorker.on('completed', async(job) => { console.log(`Refund Job ${job.id} has completed!`); await addNotificationTask(job.data.user_id,job.data.id); }); refundWorker.on('failed', (job, err) => { console.log(`Refund Job ${job.id} has failed with ${err.message}`); }); notificationWorker.on('completed', (job) => { console.log(`Notification Job ${job.id} has completed!`); }); notificationWorker.on('failed', (job, err) => { console.log(`Notification Job ${job.id} has failed with ${err.message}`); }); } init() Here, we will add two orders to refundQueue.
We are using two event listeners 'completed' and 'failed' for both refundWorker and notificationWorker. On successful completion of refund task, a notification task is added to notificationQueue.
Step 6: Docker setup
We need a redis server running in local computer to run the code of BullMQ. So, we will use docker for that. Ensure docker is installed in your system. And, create a docker-compose.yml file.
version : '3.4' services: redis: container_name : redis-server image : redis ports: - 6379:6379 stdin_open : true Now, start the redis container using following command
docker compose up -d Now, we can run our express server
node index.js Here, is the output
Thank you for reading the article !

Top comments (1)
As is github code Compile/Run Suggestions:
As is github code Run Results:
BullMQ: DEPRECATION WARNING! Optional instantiation of Queue, Worker and QueueEvents without providing explicitly a connection or connection options is deprecated. This behaviour will be removed in the next major release