How to Use Task Queues in Node.js

This tutorial explains and demonstrates queuing systems. Queues are often used to process long-running tasks such as email newsletter delivery.

It’s not always practical to execute a task the moment it’s requested.

Consider an email newsletter administration system. After writing, an administrator must hit a big red “SEND NOW” button. The application could send every email immediately and show a “completed” response. That would work for a dozen messages, but how long would it take for 1,000 subscribers or more? The browser request would time out before the process completed.

Another example: a user can upload any number of photographs to a gallery application. The system resizes and sharpens each image for alternative dimensions. This process could run on upload, but it would to incur a delay for every image.

It’s more effective to decouple tasks in these situations. The user receives an instant response but task processing occurs in the background. Other applications or servers handle tasks and schedule re-attempts on failure. The user can receive alerts or examine logs to determine progress.

Queues

A queue is a data structure which holds a collection of items:

  • Any process can send (or enqueue) an item at any time — such as send newsletter X to recipient Y.
  • Any process can receive (or dequeue) the item at the front of the queue — for example, the item that’s been in the queue for longest.

Queues are a first-in-first-out (FIFO) structure. The first item added to the queue will be the first out.

Implementing a Basic JavaScript Queue

You can create a queue using a JavaScript array. The push() method adds an item to the end of an Array while the shift() method removes and returns an item from the start:

const queue = [];

queue.push( 'item 1' );
queue.push( 'item 2' );

console.log( queue.shift() ); // item 1
console.log( queue.shift() ); // item 2
console.log( queue.shift() ); // undefined

Individual array elements can hold any data. You can push strings, numbers, Booleans, other arrays, or objects.

You can use an ES6 class to define any number of separate queues:

class Queue 

  constructor()  this.q = []; 
  send( item )   this.q.push( item ); 
  receive()      return this.q.shift(); 



// define two queues
const q1 = new Queue();
const q2 = new Queue();

q1.send('item 1');
q2.send('item 2');

console.log( q1.receive() ); // item 1
console.log( q1.receive() ); // undefined
console.log( q2.receive() ); // item 2

These simple examples may be useful for less critical client-side code such as queuing UI updates so processing occurs in a single DOM update. localStorage or IndexedDB can offer a level of data persistence if necessary.

Queuing Platforms

In-memory queues are less practical for complex server applications:

  1. Two or more separate applications can’t (easily) access the same queue.
  2. Queue data disappears when the application terminates.

Purpose built message-broker software provides more robust queuing. Platforms vary, but offer features such as:

  • data persistence in a choice of databases with replication, sharding, and clustering options
  • a range of access protocols, often including HTTP and Web Sockets
  • any number of separate queues
  • delayed messaging, where message processing can occur at a later time
  • transaction-like support, where a message is re-queued when processing isn’t confirmed
  • publish-subscribe patterns, where applications receive an event when a new item appears on a queue

Message-broker software includes Redis, RabbitMQ, Apache ActiveMQ, and Gearman. Cloud messaging services include Amazon SQS, Azure Service Bus, and Google Pub/Sub.

These may be viable options for enterprise-level applications. Yet they could be overkill if you have simpler requirements and already use a database.

Use MongoDB as a Message Broker

It’s possible to develop a sophisticated queuing system in a couple of hundred lines of Node.js code.

The queue-mongodb module described here uses MongoDB for data storage, but the same concepts could be adopted by any SQL or NoSQL database. The code is available on GitHub and npm.

Quick Start

Make sure you have Node.js 14 or above installed, then create a new project folder such as queue-test. Add a new package.json file:


  "name": "queue-test",
  "version": "1.0.0",
  "description": "Queue test",
  "type": "module",
  "scripts": 
    "send": "node ./send.js",
    "receive": "node ./receive.js"
  

Note: "type": "module" configures the project to use ES6 modules. The "scripts" will send and receive queued items.

Install the queue-mongodb module:

npm install @craigbuckler/queue-mongodb

Then create a .env file with your MongoDB database connection credentials. For example:

QUEUE_DB_HOST=localhost
QUEUE_DB_PORT=27017
QUEUE_DB_USER=root
QUEUE_DB_PASS=mysecret
QUEUE_DB_NAME=qdb
QUEUE_DB_COLL=queue

Note: this creates a queue collection (QUEUE_DB_COLL) in the qdb database (QUEUE_DB_NAME). You can use an existing database, but make sure the collection doesn’t clash with another.

Database read/write access must be granted to the user root (QUEUE_DB_USER) with password mysecret (QUEUE_DB_PASS). Set both values blank if no authentication is required.

Start a MongoDB database if it’s not already running. Those with Docker and Docker Compose can create a new docker-compose.yml file:

version: '3'

services:

  queuedb:
    environment:
      - MONGO_INITDB_ROOT_USERNAME=$QUEUE_DB_USER
      - MONGO_INITDB_ROOT_PASSWORD=$QUEUE_DB_PASS
    image: mongo:4.4-bionic
    container_name: queuedb
    volumes:
      - queuedata:/data/db
    ports:
      - "$QUEUE_DB_PORT:$QUEUE_DB_PORT"
    restart: always

volumes:
  queuedata:

Then run docker-compose up to download and start MongoDB with a persistent data volume.

Docker is available Linux, macOS, and Windows 10. See the Docker installation instructions.

Create a new send.js file to add a randomly generated email messages to a queue named news:

// Queue module
import  Queue  from '@craigbuckler/queue-mongodb';

// initialize queue named 'news'
const newsQ = new Queue('news');

// random name
const name = String.fromCharCode(65 + Math.random() * 26).repeat(1 + Math.random() * 10);

// add object to queue
const send = await newsQ.send(
  name:     name,
  email:    `$ name.toLowerCase() @test.com`,
  date:     new Date(),
  message:  `Hey there, $ name !`
);

console.log('send', send);

// get number of items remaining in queue
console.log('items queued:', await newsQ.count());

// close connection and quit
await newsQ.close();

Execute it with npm run send and you’ll see output such as this:

send 
  _id: 607d692563bd6d05bb459931,
  sent: 2021-04-19T11:27:33.000Z,
  data: 
    name: 'AAA',
    email: 'aaa@test.com',
    date: 2021-04-19T11:27:33.426Z,
    message: 'Hey there, AAA!'
  

items queued: 1

The .send() method returns an qItem object containing:

  1. the MongoDB document _id
  2. the date/time the item was originally queued, and
  3. a copy of the message data

Run the script any number of times to add further items to the queue. The items queued will increment on every run.

Now create a new receive.js file to retrieve messages from the same queue:

// Queue module
import  Queue  from '@craigbuckler/queue-mongodb';

// initialize queue named 'news'
const newsQ = new Queue('news');

let qItem;

do 

  qItem = await newsQ.receive();

  if (qItem) 

    console.log('nreceive', qItem);

    // ... process qItem.data ...
    // ... to send email ...

  

 while (qItem);

// number of items remaining in queue
console.log('items queued:', await newsQ.count());

await newsQ.close();

Run npm run receive to fetch and process queued items:

receive 
  _id: 607d692563bd6d05bb459931,
  sent: 2021-04-19T11:27:33.000Z,
  data: 
    name: 'AAA',
    email: 'aaa@test.com',
    date: 2021-04-19T11:27:33.426Z,
    message: 'Hey there, AAA!'
  

items queued: 0

No email is sent in this example, but that could be implemented using Nodemailer or another suitable module.

If processing fails — perhaps because the mail server is down — an item can be re-queued with this:

newsQ.send( qItem.data, 600 );

The second 600 argument is an optional number of seconds or future date. This command re-queues the item after 600 seconds (ten minutes) have elapsed.

This is a simple example, but any application can send data to any number of queues. Another process, perhaps started as a cron job, can receive and process items when necessary.

Continue reading
How to Implement a Simple Task Queue in Node.js
on SitePoint.