Source: amqp-queue.js

'use strict';

const { EventEmitter } = require('events');

const amqplib = require('amqplib');

const { AmqpConnection } = require('./amqp-connection');
const { DataTypes, Errors, Events, QueuePattern, Status } = require('./constants');
const { SdkError } = require('./lib');

const DEF_RECONN = 1000;

const amqplibConsts = {
  Fanout: 'fanout',
};

/**
 * Queue error event.
 *
 * @event AmqpQueue#error
 * @type {Error}
 */

/**
 * Queue status event.
 *
 * @event AmqpQueue#status
 * @type {Status}
 */

/**
 * The message that contains payload and meta data for acknowledgement.
 *
 * @typedef {Object} AmqpMessage
 * @property {Buffer} payload The message payload.
 * @property {amqplib.Message} meta The meta data that is used for acknowledgement. You should not
 *           access this property.
 */

/**
 * Message handler.
 *
 * @callback AmqpQueueMsgHandler
 * @param {AmqpQueue} queue The relative queue that receives the incoming message.
 * @param {AmqpMessage} msg The incoming message.
 */

/**
 * Manages an AMQP queue.
 *
 * @class AmqpQueue
 * @fires AmqpQueue#error
 * @fires AmqpQueue#status
 */
class AmqpQueue extends EventEmitter {
  /**
   * @constructor
   * @param {Object} opts The queue options.
   *   @param {string} opts.name The queue name that is used to map a AMQP queue (unicast) or an
   *          exchange (broadcast). The pattern is `^[a-z0-9_-]+([\.]{1}[a-z0-9_-]+)*$`.
   *   @param {boolean} opts.isRecv `true` for the receiver and `false` for the sender.
   *   @param {boolean} opts.reliable Reliable by selecting the confirm channel (for publish).
   *   @param {boolean} opts.broadcast `true` for broadcast and `false` for unicast.
   *   @param {number} [opts.reconnectMillis=1000] Time in milliseconds from disconnection to
   *          reconnection.
   *   @param {number} [opts.prefetch] REQUIRED when `isRecv=true`. The QoS of the receiver queue.
   *          This value MUST be a positive value between 1 to 65535.
   *   @param {boolean} [opts.persistent=false] Use persistent delivery mode.
   * @param {AmqpConnection} conn The AMQP connection.
   * @throws {Error} Wrong arguments.
   */
  constructor(opts, conn) {
    super();

    if (!opts || typeof opts !== DataTypes.Object || Array.isArray(opts)) {
      throw Error('`opts` is not an object');
    } else if (!(conn instanceof AmqpConnection)) {
      throw Error('`conn` is not a `AmqpConnection` object');
    } else if (!QueuePattern.test(opts.name)) {
      throw Error('`name` is not match pattern `^[a-z0-9_-]+([\\.]{1}[a-z0-9_-]+)*$`');
    } else if (typeof opts.isRecv !== DataTypes.Boolean) {
      throw Error('`isRecv` is not boolean');
    } else if (typeof opts.reliable !== DataTypes.Boolean) {
      throw Error('`reliable` is not boolean');
    } else if (typeof opts.broadcast !== DataTypes.Boolean) {
      throw Error('`broadcast` is not boolean');
    } else if (
      opts.isRecv &&
      (!Number.isInteger(opts.prefetch) || opts.prefetch <= 0 || opts.prefetch > 65535)
    ) {
      throw Error('`prefetch` must be a positive integer between 1 to 65535');
    }
    if (
      opts.reconnectMillis !== undefined &&
      (!Number.isInteger(opts.reconnectMillis) || opts.reconnectMillis < 0)
    ) {
      throw Error('`reconnectMillis` must be a positive integer');
    }
    if (opts.persistent !== undefined && typeof opts.persistent !== DataTypes.Boolean) {
      throw Error('`persistent` is not boolean');
    }

    this.#opts = {
      name: opts.name,
      isRecv: opts.isRecv,
      reliable: opts.reliable,
      broadcast: opts.broadcast,
      reconnectMillis: opts.reconnectMillis || DEF_RECONN,
      prefetch: opts.prefetch,
      persistent: opts.persistent || false,
    };
    this.#status = Status.Closed;
    this.#conn = conn;
    this.#connProcessing = false;
    this.#channel = null;
    this.#msgHandler = null;

    this.#conn.on(Events.Status, this.#onConnStatusChanged.bind(this));
  }

  /**
   * To get the queue name.
   *
   * @returns {string}
   */
  name() {
    return this.#opts.name;
  }

  /**
   * Is the queue a receiver.
   *
   * @returns {boolean}
   */
  isRecv() {
    return this.#opts.isRecv;
  }

  /**
   * To get the queue status.
   *
   * @returns {Status}
   */
  status() {
    return this.#status;
  }

  /**
   * Set the message handler.
   *
   * @param {?AmqpQueueMsgHandler} handler
   * @throws {Error} Wrong arguments.
   */
  setMsgHandler(handler) {
    if (typeof handler !== DataTypes.Function) {
      throw Error('the handler is not a function');
    }

    this.#msgHandler = handler;
  }

  /**
   * To connect to the message queue. The `AmqpQueue` will report status with Status.
   *
   * @throws {Error} Wrong usage.
   */
  connect() {
    if (this.#opts.isRecv && !this.#msgHandler) {
      throw Error(Errors.NoMsgHandler);
    }

    if (this.#status !== Status.Closed && this.#status !== Status.Closing) {
      return;
    }

    this.#status = Status.Connecting;
    this.emit(Events.Status, Status.Connecting);

    this.#innerConnect();
  }

  /**
   * To close the queue. You can use `await` to get the result or listen events.
   *
   * @async
   * @returns {Promise<void>}
   * @throws {SdkError}
   */
  async close() {
    if (this.#status === Status.Closing || this.#status === Status.Closed) {
      return;
    } else if (!this.#channel) {
      this.#status = Status.Closed;
      this.emit(Events.Status, Status.Closed);
      return;
    }

    this.#status = Status.Closing;
    this.emit(Events.Status, Status.Closing);

    let err;
    await this.#channel.close().catch((e) => (err = e));
    if (this.#channel) {
      this.#channel.removeAllListeners();
      this.#channel = null;
    }
    this.#status = Status.Closed;
    this.emit(Events.Status, Status.Closed);
    if (err) {
      throw new SdkError(err.message);
    }
  }

  /**
   * To send a message (for senders only).
   *
   * @async
   * @param {Buffer} payload The raw data to be sent.
   * @returns {Promise<void>}
   * @throws {Error} Wrong arguments.
   * @throws {SdkError}
   */
  async sendMsg(payload) {
    if (!(payload instanceof Buffer)) {
      throw Error('`payload` is not a Buffer');
    } else if (this.#status !== Status.Connected) {
      throw new SdkError(Errors.NotConnected);
    } else if (this.#opts.isRecv) {
      throw new SdkError(Errors.QueueIsReceiver);
    }

    const exchange = this.#opts.broadcast ? this.#opts.name : '';
    const routingKey = this.#opts.broadcast ? '' : this.#opts.name;
    if (this.#opts.reliable) {
      await new Promise((resolve, reject) => {
        this.#channel.publish(exchange, routingKey, payload, { mandatory: true }, (err) => {
          if (err) {
            return reject(new SdkError(err.message));
          }
          resolve();
        });
      });
    } else {
      this.#channel.publish(exchange, routingKey, payload, { persistent: this.#opts.persistent });
      // Use `setTimeout` instead of `nextTick` because nextTick may causes too much events to hang
      // AMQP packets transmission.
      await new Promise((resolve) => setTimeout(resolve, 1));
    }
  }

  /**
   * Use this if the message is processed successfully.
   *
   * @async
   * @param {AmqpMessage} msg
   * @returns {Promise<void>}
   * @throws {Error} Wrong usage.
   * @throws {SdkError}
   */
  async ack(msg) {
    if (!msg || typeof msg !== DataTypes.Object || Array.isArray(msg)) {
      throw Error('`msg` is not an object');
    }

    if (this.#channel) {
      this.#channel.ack(msg.meta);
      await new Promise((resolve) => process.nextTick(resolve));
    }
  }

  /**
   * To requeue the message and the broker will send the message in the future.
   *
   * @async
   * @param {AmqpMessage} msg
   * @returns {Promise<void>}
   * @throws {Error} Wrong usage.
   * @throws {SdkError}
   */
  async nack(msg) {
    if (!msg || typeof msg !== DataTypes.Object || Array.isArray(msg)) {
      throw Error('`msg` is not an object');
    }

    if (this.#channel) {
      this.#channel.nack(msg.meta);
      await new Promise((resolve) => process.nextTick(resolve));
    }
  }

  async #innerConnect() {
    if (this.#status !== Status.Connecting || this.#connProcessing) {
      return;
    }
    this.#connProcessing = true;

    if (this.#conn.status() !== Status.Connected) {
      this.#connProcessing = false;
      return void setTimeout(() => this.#innerConnect(), this.#opts.reconnectMillis);
    }
    const rawConn = this.#conn.getRawConnection();
    if (!rawConn) {
      this.#connProcessing = false;
      return void setTimeout(() => this.#innerConnect(), this.#opts.reconnectMillis);
    }

    try {
      // Create a channel.
      const channel = this.#opts.reliable
        ? await rawConn.createConfirmChannel()
        : await rawConn.createChannel();

      // Declare resources for unicast or broadcast.
      let qname;
      if (this.#opts.broadcast) {
        qname = await this.#createBroadcast(channel);
      } else {
        await this.#createUnicast(channel);
        qname = this.#opts.name;
      }

      this.#connProcessing = false;

      channel.on('close', this.#onClose.bind(this));
      channel.on('drain', this.#onDrain.bind(this));
      channel.on('error', this.#onError.bind(this));
      channel.on('return', this.#onReturn.bind(this));
      this.#channel = channel;

      // Set prefetch and consume the incoming messages.
      if (this.#opts.isRecv) {
        await channel.prefetch(this.#opts.prefetch);
        await channel.consume(qname, this.#innerOnMessage.bind(this), {});
      }

      this.#status = Status.Connected;
      this.emit(Events.Status, Status.Connected);
    } catch (err) {
      if (this.#channel) {
        this.#channel.removeAllListeners();
        this.#channel = null;
      }
      this.#connProcessing = false;
      this.emit(Events.Error, err);
      return void setTimeout(() => this.#innerConnect(), this.#opts.reconnectMillis);
    }
  }

  /**
   * Message handler. `this` is the `AmqpQueue` instance.
   *
   * @param {amqplib.Message} msg The raw message from the amqplib library.
   */
  #innerOnMessage(msg) {
    const handler = this.#msgHandler;
    if (handler) {
      handler(this, {
        payload: msg.content,
        meta: msg,
      });
    }
  }

  /**
   * To create resouces for the broadcast queue.
   *
   * @async
   * @param {amqplib.Channel} channel
   * @returns {Promise<string>} The name of the temporary queue.
   * @throws {SdkError}
   */
  async #createBroadcast(channel) {
    try {
      // Declare the fanout exchange.
      await channel.assertExchange(this.#opts.name, amqplibConsts.Fanout, { durable: false });

      // Declare a temporary queue and bind the queue name to the exchange.
      if (!this.#opts.isRecv) {
        return '';
      }

      const q = await channel.assertQueue('', { exclusive: true });
      await channel.bindQueue(q.name, this.#opts.name, '', {});
      return q.name;
    } catch (err) {
      throw new SdkError(err.message);
    }
  }

  /**
   * To create resouces for the unicast queue.
   *
   * @async
   * @param {amqplib.Channel} channel
   * @returns {Promise<void>}
   * @throws {SdkError}
   */
  async #createUnicast(channel) {
    await channel.assertQueue(this.#opts.name, { durable: true }).catch((err) => {
      throw new SdkError(err.message);
    });
  }

  /**
   * To handle `AmqpConnection` status change events.
   *
   * @param {Status} status The latest status of the relative `AmqpConnection`.
   */
  #onConnStatusChanged(status) {
    switch (status) {
      case Status.Closed:
      case Status.Closing:
      case Status.Connecting:
      case Status.Disconnected:
        this.#onClose();
        return;
      case Status.Connected:
        return void this.#innerConnect();
      default:
        return;
    }
  }

  #onClose() {
    if (this.#channel) {
      this.#channel.removeAllListeners();
      this.#channel = null;
    }

    if (
      this.#status === Status.Closing ||
      this.#status === Status.Closed ||
      this.#status === Status.Connecting
    ) {
      return;
    }
    this.#status = Status.Connecting;
    this.emit(Events.Status, Status.Connecting);
    setTimeout(() => this.#innerConnect(), this.#opts.reconnectMillis);
  }

  #onDrain() {}

  #onError(_err) {}

  #onReturn(_msg) {}

  #opts;
  /** @type {Status} */
  #status;
  /** @type {AmqpConnection} */
  #conn;
  /**
   * Processing `#innerConnect`.
   *
   * @type {boolean}
   */
  #connProcessing;
  /** @type {amqplib.Channel|amqplib.ConfirmChannel} */
  #channel;
  /** @type {AmqpQueueMsgHandler} */
  #msgHandler;
}

module.exports = {
  AmqpQueue,
};