Source: amqp-queue.js

'use strict';

const { EventEmitter } = require('events');

const amqplib = require('amqplib/callback_api');
const async = require('async');

const { AmqpConnection } = require('./amqp-connection');
const { DataTypes, Errors, Events, QueuePattern, Status } = require('./constants');

const DEF_RECONN = 1000;

const amqplibConsts = {
  Fanout: 'fanout',
};

/**
 * Queue error event.
 *
 * @event AmqpQueue#error
 * @type {Error}
 */

/**
 * Queue status event.
 *
 * @event AmqpQueue#status
 * @type {Status}
 */

/**
 * The message that contains payload and meta data for acknowledgement.
 *
 * @typedef {Object} AmqpMessage
 * @property {Buffer} payload The message payload.
 * @property {amqplib.Message} meta The meta data that is used for acknowledgement. You should not
 *           access this property.
 */

/**
 * Message handler.
 *
 * @callback AmqpQueueMsgHandler
 * @param {AmqpQueue} queue The relative queue that receives the incoming message.
 * @param {AmqpMessage} msg The incoming message.
 */

/**
 * Manages an AMQP queue.
 *
 * @class AmqpQueue
 * @fires AmqpQueue#error
 * @fires AmqpQueue#status
 */
class AmqpQueue extends EventEmitter {
  /**
   * @constructor
   * @param {Object} opts The queue options.
   *   @param {string} opts.name The queue name that is used to map a AMQP queue (unicast) or an
   *          exchange (broadcast). The pattern is `^[a-z0-9_-]+([\.]{1}[a-z0-9_-]+)*$`.
   *   @param {boolean} opts.isRecv `true` for the receiver and `false` for the sender.
   *   @param {boolean} opts.reliable Reliable by selecting the confirm channel (for publish).
   *   @param {boolean} opts.broadcast `true` for broadcast and `false` for unicast.
   *   @param {number} [opts.reconnectMillis=1000] Time in milliseconds from disconnection to
   *          reconnection.
   *   @param {number} [opts.prefetch] REQUIRED when `isRecv=true`. The QoS of the receiver queue.
   *          This value MUST be a positive value between 1 to 65535.
   *   @param {boolean} [opts.persistent=false] Use persistent delivery mode.
   * @param {AmqpConnection} conn The AMQP connection.
   * @throws {Error} Wrong arguments.
   */
  constructor(opts, conn) {
    super();

    if (!opts || typeof opts !== DataTypes.Object || Array.isArray(opts)) {
      throw Error('`opts` is not an object');
    } else if (!(conn instanceof AmqpConnection)) {
      throw Error('`conn` is not a `AmqpConnection` object');
    } else if (!QueuePattern.test(opts.name)) {
      throw Error('`name` is not match pattern `^[a-z0-9_-]+([\\.]{1}[a-z0-9_-]+)*$`');
    } else if (typeof opts.isRecv !== DataTypes.Boolean) {
      throw Error('`isRecv` is not boolean');
    } else if (typeof opts.reliable !== DataTypes.Boolean) {
      throw Error('`reliable` is not boolean');
    } else if (typeof opts.broadcast !== DataTypes.Boolean) {
      throw Error('`broadcast` is not boolean');
    } else if (
      opts.isRecv &&
      (!Number.isInteger(opts.prefetch) || opts.prefetch <= 0 || opts.prefetch > 65535)
    ) {
      throw Error('`prefetch` must be a positive integer between 1 to 65535');
    }
    if (
      opts.reconnectMillis !== undefined &&
      (!Number.isInteger(opts.reconnectMillis) || opts.reconnectMillis < 0)
    ) {
      throw Error('`reconnectMillis` must be a positive integer');
    }
    if (opts.persistent !== undefined && typeof opts.persistent !== DataTypes.Boolean) {
      throw Error('`persistent` is not boolean');
    }

    this.#opts = {
      name: opts.name,
      isRecv: opts.isRecv,
      reliable: opts.reliable,
      broadcast: opts.broadcast,
      reconnectMillis: opts.reconnectMillis || DEF_RECONN,
      prefetch: opts.prefetch,
      persistent: opts.persistent || false,
    };
    this.#status = Status.Closed;
    this.#conn = conn;
    this.#connProcessing = false;
    this.#channel = null;
    this.#msgHandler = null;

    this.#conn.on(Events.Status, this.#onConnStatusChanged.bind(this));
  }

  /**
   * To get the queue name.
   *
   * @returns {string}
   */
  name() {
    return this.#opts.name;
  }

  /**
   * Is the queue a receiver.
   *
   * @returns {boolean}
   */
  isRecv() {
    return this.#opts.isRecv;
  }

  /**
   * To get the queue status.
   *
   * @returns {Status}
   */
  status() {
    return this.#status;
  }

  /**
   * Set the message handler.
   *
   * @param {?AmqpQueueMsgHandler} handler
   * @throws {Error} Wrong arguments.
   */
  setMsgHandler(handler) {
    if (typeof handler !== DataTypes.Function) {
      throw Error('the handler is not a function');
    }

    this.#msgHandler = handler;
  }

  /**
   * To connect to the message queue. The `AmqpQueue` will report status with Status.
   *
   * @throws {Error} Wrong usage.
   */
  connect() {
    if (this.#opts.isRecv && !this.#msgHandler) {
      throw Error(Errors.NoMsgHandler);
    }

    if (this.#status !== Status.Closed && this.#status !== Status.Closing) {
      return;
    }

    this.#status = Status.Connecting;
    this.emit(Events.Status, Status.Connecting);

    this.#innerConnect();
  }

  /**
   * To close the queue. You can use a callback function to get the result or listen events.
   *
   * @param {function} [callback]
   *   @param {?Error} callback.err
   */
  close(callback) {
    if (typeof callback !== DataTypes.Function) {
      callback = null;
    }

    if (this.#status === Status.Closing || this.#status === Status.Closed) {
      if (callback) {
        return void process.nextTick(() => {
          callback(null);
        });
      }
      return;
    } else if (!this.#channel) {
      this.#status = Status.Closed;
      this.emit(Events.Status, Status.Closed);
      if (callback) {
        return void process.nextTick(() => {
          callback(null);
        });
      }
    }

    this.#status = Status.Closing;
    this.emit(Events.Status, Status.Closing);
    const self = this;
    this.#channel.close((err) => {
      if (self.#channel) {
        self.#channel.removeAllListeners();
        self.#channel = null;
      }
      self.#status = Status.Closed;
      self.emit(Events.Status, Status.Closed);
      if (callback) {
        return void process.nextTick(() => {
          callback(err);
        });
      }
    });
  }

  /**
   * To send a message (for senders only).
   *
   * @param {Buffer} payload The raw data to be sent.
   * @param {function} callback
   *   @param {?Error} callback.err
   * @throws {Error} Wrong arguments.
   */
  sendMsg(payload, callback) {
    if (!(payload instanceof Buffer)) {
      throw Error('`payload` is not a Buffer');
    } else if (typeof callback !== DataTypes.Function) {
      throw Error('`callback` is not a function');
    } else if (this.#status !== Status.Connected) {
      return void process.nextTick(() => {
        callback(Error(Errors.NotConnected));
      });
    }
    if (this.#opts.isRecv) {
      return void process.nextTick(() => {
        callback(Error(Errors.QueueIsReceiver));
      });
    }

    const exchange = this.#opts.broadcast ? this.#opts.name : '';
    const routingKey = this.#opts.broadcast ? '' : this.#opts.name;
    let opts = {
      persistent: this.#opts.persistent,
    };
    if (this.#opts.reliable) {
      opts = {
        mandatory: true,
      };
      this.#channel.publish(exchange, routingKey, payload, opts, (err, ok) => {
        if (err) {
          return void callback(err);
        }
        callback(null);
      });
    } else {
      this.#channel.publish(exchange, routingKey, payload, opts);
      // Use `setTimeout` instead of `nextTick` because nextTick may causes too much events to hang
      // AMQP packets transmission.
      setTimeout(() => {
        callback(null);
      }, 1);
    }
  }

  /**
   * Use this if the message is processed successfully.
   *
   * @param {AmqpMessage} msg
   * @param {function} callback
   *   @param {?Error} callback.err
   */
  ack(msg, callback) {
    if (!msg || typeof msg !== DataTypes.Object || Array.isArray(msg)) {
      throw Error('`msg` is not an object');
    } else if (typeof callback !== DataTypes.Function) {
      throw Error('`callback` is not a function');
    }

    const channel = this.#channel;
    if (channel) {
      channel.ack(msg.meta);
    }
    process.nextTick(() => {
      callback(null);
    });
  }

  /**
   * To requeue the message and the broker will send the message in the future.
   *
   * @param {AmqpMessage} msg
   * @param {function} callback
   *   @param {?Error} callback.err
   */
  nack(msg, callback) {
    if (!msg || typeof msg !== DataTypes.Object || Array.isArray(msg)) {
      throw Error('`msg` is not an object');
    } else if (typeof callback !== DataTypes.Function) {
      throw Error('`callback` is not a function');
    }

    const channel = this.#channel;
    if (channel) {
      channel.nack(msg.meta);
    }
    process.nextTick(() => {
      callback(null);
    });
  }

  #innerConnect() {
    if (this.#status !== Status.Connecting || this.#connProcessing) {
      return;
    }
    this.#connProcessing = true;

    const self = this;

    if (this.#conn.status() !== Status.Connected) {
      this.#connProcessing = false;
      return void setTimeout(() => {
        self.#innerConnect();
      }, this.#opts.reconnectMillis);
    }
    const rawConn = this.#conn.getRawConnection();
    if (!rawConn) {
      this.#connProcessing = false;
      return void setTimeout(() => {
        self.#innerConnect();
      }, this.#opts.reconnectMillis);
    }

    let channel;
    async.waterfall(
      [
        // Create a channel.
        function (cb) {
          const fn = self.#opts.reliable
            ? rawConn.createConfirmChannel.bind(rawConn)
            : rawConn.createChannel.bind(rawConn);
          fn((err, ch) => {
            if (err) {
              return void cb(err);
            }
            channel = ch;
            cb(null);
          });
        },
        // Declare resources for unicast or broadcast.
        function (cb) {
          if (self.#opts.broadcast) {
            self.#createBroadcast(channel, (err, qname) => {
              cb(err, qname);
            });
          } else {
            self.#createUnicast(channel, (err) => {
              if (err) {
                return void cb(err);
              }
              cb(null, self.#opts.name);
            });
          }
        },
        // Set prefetch and consume the incoming messages.
        function (qname, cb) {
          if (!self.#opts.isRecv) {
            return void cb(null);
          }

          channel.prefetch(self.#opts.prefetch);
          channel.consume(qname, self.#innerOnMessage.bind(self), {}, (err) => {
            cb(err);
          });
        },
      ],
      (err) => {
        self.#connProcessing = false;
        if (err) {
          self.emit(Events.Error, err);
          return void setTimeout(() => {
            self.#innerConnect();
          }, self.#opts.reconnectMillis);
        }

        channel.on('close', self.#onClose.bind(self));
        channel.on('drain', self.#onDrain.bind(self));
        channel.on('error', self.#onError.bind(self));
        channel.on('return', self.#onReturn.bind(self));
        self.#channel = channel;
        self.#status = Status.Connected;
        self.emit(Events.Status, Status.Connected);
      }
    );
  }

  /**
   * Message handler. `this` is the `AmqpQueue` instance.
   *
   * @param {amqplib.Message} msg The raw message from the amqplib library.
   */
  #innerOnMessage(msg) {
    const handler = this.#msgHandler;
    if (handler) {
      handler(this, {
        payload: msg.content,
        meta: msg,
      });
    }
  }

  /**
   * To create resouces for the broadcast queue.
   *
   * @param {amqplib.Channel} channel
   * @param {function} callback
   *   @param {?Error} callback.err
   *   @param {string} callback.qname The name of the temporary queue.
   */
  #createBroadcast(channel, callback) {
    const self = this;

    async.waterfall(
      [
        // Declare the fanout exchange.
        function (cb) {
          const opts = { durable: false };
          channel.assertExchange(self.#opts.name, amqplibConsts.Fanout, opts, (err) => {
            cb(err);
          });
        },
        // Declare a temporary queue and bind the queue name to the exchange.
        function (cb) {
          if (!self.#opts.isRecv) {
            return void cb(null, '');
          }

          channel.assertQueue('', { exclusive: true }, (err, q) => {
            if (err) {
              return void cb(err);
            }
            channel.bindQueue(q.queue, self.#opts.name, '', {}, (err) => {
              cb(err, q.name);
            });
          });
        },
      ],
      (err, qname) => {
        callback(err, qname);
      }
    );
  }

  /**
   * To create resouces for the unicast queue.
   *
   * @param {amqplib.Channel} channel
   * @param {function} callback
   *   @param {?Error} callback.err
   */
  #createUnicast(channel, callback) {
    channel.assertQueue(this.#opts.name, { durable: true }, (err) => {
      callback(err);
    });
  }

  /**
   * To handle `AmqpConnection` status change events.
   *
   * @param {Status} status The latest status of the relative `AmqpConnection`.
   */
  #onConnStatusChanged(status) {
    switch (status) {
      case Status.Closed:
      case Status.Closing:
      case Status.Connecting:
      case Status.Disconnected:
        this.#onClose();
        return;
      case Status.Connected:
        return void this.#innerConnect();
      default:
        return;
    }
  }

  #onClose() {
    if (this.#channel) {
      this.#channel.removeAllListeners();
      this.#channel = null;
    }

    if (
      this.#status === Status.Closing ||
      this.#status === Status.Closed ||
      this.#status === Status.Connecting
    ) {
      return;
    }
    this.#status = Status.Connecting;
    this.emit(Events.Status, Status.Connecting);
    const self = this;
    setTimeout(() => {
      self.#innerConnect();
    }, this.#opts.reconnectMillis);
  }

  #onDrain() {}

  #onError(_err) {}

  #onReturn(_msg) {}

  #opts;
  /** @type {Status} */
  #status;
  /** @type {AmqpConnection} */
  #conn;
  /**
   * Processing `#innerConnect`.
   *
   * @type {boolean}
   */
  #connProcessing;
  /** @type {amqplib.Channel} */
  #channel;
  /** @type {AmqpQueueMsgHandler} */
  #msgHandler;
}

module.exports = {
  AmqpQueue,
};