Skip to content

void666/in-memory-queue

Repository files navigation

in-memory-queue

npm version npm downloads coverage status build status

Properties :

  • Concurrent read and writes
  • Retry mechanism
  • Parallel and series processing of jobs based on consumer priority definition
  • Completely in-memory
  • Multiple consumer support
  • Topic wise job distribution
  • Supports consumer tasks as promises.
  • Auto start polling of queue, on first message.

Interface :

setQueueConfiguration : Takes two parameters , size and maxRetry, both positive integers. Returns promise.

const imqueue = require('in-memory-queue'); imqueue.setQueueConfiguration(100, 2); 

createTopic : Creates a topic (string based pattern). Takes one parameter as string. Returns promise.

const imqueue = require('in-memory-queue'); const Promise = require('bluebird'); const topic = 'topic1'; imqueue.setQueueConfiguration(100, 2); return imqueue.createTopic(topic).then((result)=>{ if(result.topic === topic && result.success){ console.log(`Topic got created {topic}`); } return; }); 

createMessage :Creates a Message, takes two parameters, topic and stringified JSON. Returns promise.

const imqueue = require('in-memory-queue'); const Promise = require('bluebird'); const topic = 'topic1'; const jsonString = {code: 'json1'}; imqueue.setQueueConfiguration(100, 2); return imqueue.createTopic(topic).then((result) => { return imqueue.createMessage(topic, JSON.stringify(jsonString)) .then((result) => { let msg = result.message; console.log(`Message topic ${msg.getId()}`); console.log(`Message topic ${msg.getTopic()}`); console.log(`Message created timestamp ${msg.getCreated()}`); console.log(`Message allowed retries ${msg.getAllowedRetries()}`); console.log(`Message value ${msg.getValue()}`); console.log(`Message processed ${msg.getProcessed()}`); }); }); 

createConsumer : Creates a Consumer for a given topic. Accepts valid topic, priority(positive integer), handler (promisified function , with a message parameter);

const imqueue = require('in-memory-queue'); const Promise = require('bluebird'); imqueue.createConsumer('topic1', 1, function (msg) { console.log(`Handler task executing ${msg.getValue()}`); return Promise.resolve(); }).then((consumer)=> { console.log(`Consumer id ${consumer.getId()}`); console.log(`Consumer topic ${consumer.getTopic()}`); console.log(`Consumer priority ${consumer.getPriority()}`); }); imqueue.createConsumer('topic1', 1, function (msg) { console.log(`Handler task executing ${msg.getValue()}`); return Promise.resolve(); }).then((consumer)=> { console.log(`Consumer id ${consumer.getId()}`); console.log(`Consumer topic ${consumer.getTopic()}`); console.log(`Consumer priority ${consumer.getPriority()}`); }); imqueue.createConsumer('topic1', 2, function (msg) { console.log(`Handler task executing ${msg.getValue()}`); return Promise.resolve(); }).then((consumer)=> { console.log(`Consumer id ${consumer.getId()}`); console.log(`Consumer topic ${consumer.getTopic()}`); console.log(`Consumer priority ${consumer.getPriority()}`); }); 

Note

The order of execution of each consumer depends on the increasing order of priority. Consumer handlers with priority 1 will be executed in parallel and then the consumer handler for priority 2 will be executed.

getProcessedMessages : Returns list of all messages processed so far. Returns promise.

const imqueue = require('in-memory-queue'); const Promise = require('bluebird'); const topic = 'topic1'; const jsonString = {code: 'json1'}; imqueue.setQueueConfiguration(100, 2); return imqueue.createTopic(topic).then((result) => { return imqueue.createMessage(topic, JSON.stringify(jsonString)) .then((result) => { let msg = result.message; console.log(`Message topic ${msg.getId()}`); console.log(`Message topic ${msg.getTopic()}`); console.log(`Message created timestamp ${msg.getCreated()}`); console.log(`Message allowed retries ${msg.getAllowedRetries()}`); console.log(`Message value ${msg.getValue()}`); console.log(`Message processed ${msg.getProcessed()}`); return Promise.resolve(); }).then(() => { return imqueue.getProcessedMessages().then((result) => { let message = result[0]; console.log(`Message processed ${message.getProcessed()}`); console.log(`Message Processing details ${message.getProcessingDetails()}`); }); }); }); 

getStatus : gets the status of the present queue. Returns promise and prints the status in queue log

const imqueue = require('in-memory-queue'); imqueue.setQueueConfiguration(100, 2); return imqueue.getStatus() .then((result)=> { console.log(`Queue size : ${result.size}`); console.log(`Message in queue : ${result.messages_in_queue}`); // returns the current number of message in queue console.log(`Queue max retrys : ${result.retries}`); }); 

Installation :

  • Method 1 (Update):

    • do npm install in-memory-queue
  • Method 2 (Direct clone):

    • add a referrence to in-memory-queue as follows in package.json:
    "dependencies": { "in-memory-queue": "git+https://github.com/void666/in-memory-queue.git"} 
    • do npm install

Test :

  • do npm test

About

Simple in-memory job queue

Topics

Resources

License

Stars

Watchers

Forks

Packages

No packages published