Source: mq/application.js

'use strict';

const { EventEmitter } = require('events');
const { URL } = require('url');

const async = require('async');

const { AmqpQueue } = require('general-mq/lib/amqp-queue');
const { DataTypes, Status, Events } = require('general-mq/lib/constants');
const { MqttQueue } = require('general-mq/lib/mqtt-queue');
const {
  Connection,
  DataMqStatus,
  Options,
  getConnection,
  removeConnection,
  newDataQueues,
} = require('./lib');
const { MgrStatus, Errors } = require('./constants');

/**
 * Uplink data from broker to application.
 *
 * @typedef {Object} AppUlData
 * @property {string} dataId
 * @property {Date} time
 * @property {Date} pub
 * @property {string} deviceId
 * @property {string} networkId
 * @property {string} networkCode
 * @property {string} networkAddr
 * @property {boolean} isPublic
 * @property {Buffer} data
 * @property {Object} [extension]
 */

/**
 * Downlink data from application to broker.
 *
 * @typedef {Object} AppDlData
 * @property {string} correlationId
 * @property {string} [deviceId]
 * @property {string} [networkCode]
 * @property {string} [networkAddr]
 * @property {Buffer} data
 * @property {Object} [extension]
 */

/**
 * Downlink data response for `AppDlData`.
 *
 * @typedef {Object} AppDlDataResp
 * @property {string} correlationId
 * @property {string} [dataId]
 * @property {string} [error]
 * @property {string} [message]
 */

/**
 * Downlink data result when processing or completing data transfer to the device.
 *
 * @typedef {Object} AppDlDataResult
 * @property {string} dataId
 * @property {number} status
 * @property {string} [message]
 */

/**
 * Connection status event.
 *
 * @event ApplicationMgr#status
 * @type {Status}
 */

/**
 * @typedef {Object} AppMgrMsgHandlers
 * @property {OnAppUlData} onUlData
 * @property {OnAppDlDataResp} onDlDataResp
 * @property {OnAppDlDataResult} onDlDataResult
 */

/**
 * @callback OnAppUlData
 * @param {ApplicationMgr} mgr
 * @param {AppUlData} data
 * @param {function} callback
 *   @param {?Error} callback.err Use error to NACK the message.
 */

/**
 * @callback OnAppDlDataResp
 * @param {ApplicationMgr} mgr
 * @param {AppDlDataResp} data
 * @param {function} callback
 *   @param {?Error} callback.err Use error to NACK the message.
 */

/**
 * @callback OnAppDlDataResult
 * @param {ApplicationMgr} mgr
 * @param {AppDlDataResult} data
 * @param {function} callback
 *   @param {?Error} callback.err Use error to NACK the message.
 */

/**
 * The manager for application queues.
 *
 * @class ApplicationMgr
 * @fires ApplicationMgr#status
 */
class ApplicationMgr extends EventEmitter {
  /**
   * @param {Map<string, Connection>} connPool
   * @param {URL} hostUri
   * @param {Options} opts
   * @param {AppMgrMsgHandlers} handler
   * @throws {Error} Wrong arguments.
   */
  constructor(connPool, hostUri, opts, handler) {
    super();

    if (!connPool || !(connPool instanceof Map)) {
      throw Error('`connPool` is not a Connection pool map');
    } else if (!hostUri || !(hostUri instanceof URL) || !URL.canParse(hostUri.toString())) {
      throw Error('`hostUri` is not a valid URI');
    } else if (!opts || typeof opts !== DataTypes.Object || Array.isArray(opts)) {
      throw Error('`opts` is not an object');
    } else if (!opts.unitId) {
      throw Error('`opts.unitId` cannot be empty for application');
    } else if (!handler || typeof handler !== DataTypes.Object || Array.isArray(handler)) {
      throw Error('`handler` is not an object');
    } else if (
      typeof handler.onUlData !== DataTypes.Function ||
      typeof handler.onDlDataResp !== DataTypes.Function ||
      typeof handler.onDlDataResult !== DataTypes.Function
    ) {
      throw Error('`onUlData`, `onDlDataResp` and `onDlDataResult` must be functions');
    }

    const conn = getConnection(connPool, hostUri);
    const queues = newDataQueues(conn, opts, QUEUE_PREFIX, false);
    conn.conn.connect();

    this.#opts = { ...opts };
    this.#connPool = connPool;
    this.#hostUri = hostUri.toString();
    this.#uldata = queues.uldata;
    this.#dldata = queues.dldata;
    this.#dldataResp = queues.dldataResp;
    this.#dldataResult = queues.dldataResult;
    this.#status = MgrStatus.NotReady;
    this.#mgrMsgHandler = handler;

    this.#uldata.on(Events.Status, this.#gmqStatusHandler.bind(this));
    this.#uldata.setMsgHandler(this.#gmqMsgHandler.bind(this));
    this.#uldata.connect();

    this.#dldata.on(Events.Status, this.#gmqStatusHandler.bind(this));
    this.#dldata.setMsgHandler(this.#gmqMsgHandler.bind(this));
    this.#dldata.connect();

    this.#dldataResp.on(Events.Status, this.#gmqStatusHandler.bind(this));
    this.#dldataResp.setMsgHandler(this.#gmqMsgHandler.bind(this));
    this.#dldataResp.connect();

    this.#dldataResult.on(Events.Status, this.#gmqStatusHandler.bind(this));
    this.#dldataResult.setMsgHandler(this.#gmqMsgHandler.bind(this));
    this.#dldataResult.connect();

    conn.count += 4;
  }

  /**
   * @returns {string} The associated unit ID of the application.
   */
  unitId() {
    return this.#opts.unitId;
  }

  /**
   * @returns {string} The associated unit code of the application.
   */
  unitCode() {
    return this.#opts.unitCode;
  }

  /**
   * @returns {string} The application ID.
   */
  id() {
    return this.#opts.id;
  }

  /**
   * @returns {string} The application code.
   */
  name() {
    return this.#opts.name;
  }

  /**
   * @returns {MgrStatus} Manager status.
   */
  status() {
    return this.#status;
  }

  /**
   * @returns {DataMqStatus} Manager status.
   */
  mqStatus() {
    return {
      uldata: this.#uldata.status(),
      dldata: this.#dldata.status(),
      dldataResp: this.#dldataResp.status(),
      dldataResult: this.#dldataResult.status(),
      ctrl: Status.Closed,
    };
  }

  /**
   * To close the manager queues.
   * The underlying connection will be closed when there are no queues use it.
   *
   * @param {function} callback
   *   @param {?Error} callback.err
   */
  close(callback) {
    const self = this;

    async.waterfall(
      [
        function (cb) {
          self.#uldata.removeAllListeners();
          self.#uldata.close((err) => {
            cb(err || null);
          });
        },
        function (cb) {
          self.#dldata.removeAllListeners();
          self.#dldata.close((err) => {
            cb(err || null);
          });
        },
        function (cb) {
          self.#dldataResp.removeAllListeners();
          self.#dldataResp.close((err) => {
            cb(err || null);
          });
        },
        function (cb) {
          self.#dldataResult.removeAllListeners();
          self.#dldataResult.close((err) => {
            cb(err || null);
          });
        },
        function (cb) {
          removeConnection(self.#connPool, self.#hostUri, 4, (err) => {
            cb(err || null);
          });
        },
      ],
      callback
    );
  }

  /**
   * Send downlink data `AppDlData` to the broker.
   *
   * @param {AppDlData} data
   * @param {function} callback
   *   @param {?Error} callback.err
   * @throws {Error} Wrong arguments.
   */
  sendDlData(data, callback) {
    if (!data || typeof data !== DataTypes.Object || Array.isArray(data)) {
      throw Error('`data` is not an object');
    } else if (!data.correlationId || typeof data.correlationId !== DataTypes.String) {
      throw Error(ErrParamCorrId);
    } else if (data.deviceId !== undefined && typeof data.deviceId !== DataTypes.String) {
      throw Error('`data.deviceId` is not a string');
    } else if (data.networkCode !== undefined && typeof data.networkCode !== DataTypes.String) {
      throw Error('`data.networkCode` is not a string');
    } else if (data.networkAddr !== undefined && typeof data.networkAddr !== DataTypes.String) {
      throw Error('`data.networkAddr` is not a string');
    } else if (!(data.data instanceof Buffer)) {
      throw Error('`data.data` is not a Buffer');
    } else if (
      data.extension !== undefined &&
      (!data.extension ||
        typeof data.extension !== DataTypes.Object ||
        Array.isArray(data.extension))
    ) {
      throw Error('`data.extension` is not an object');
    }
    if (!data.deviceId) {
      if ((data.networkCode && !data.networkAddr) || (!data.networkCode && data.networkAddr)) {
        throw Error(Errors.ErrParamDev);
      }
    }

    const dldata = {
      correlationId: data.correlationId,
      deviceId: data.deviceId || undefined,
      networkCode: data.networkCode || undefined,
      networkAddr: data.networkAddr || undefined,
      data: data.data.toString('hex'),
      extension: data.extension || undefined,
    };
    const payload = Buffer.from(JSON.stringify(dldata));
    this.#dldata.sendMsg(payload, (err) => {
      callback(err || null);
    });
  }

  /**
   * The handler for the gmq.Queue#status events.
   */
  #gmqStatusHandler(_queue, _status) {
    let status;
    if (
      this.#uldata.status() === Status.Connected &&
      this.#dldata.status() === Status.Connected &&
      this.#dldataResp.status() === Status.Connected &&
      this.#dldataResult.status() === Status.Connected
    ) {
      status = MgrStatus.Ready;
    } else {
      status = MgrStatus.NotReady;
    }

    if (this.#status === status) {
      return;
    }
    this.#status = status;
    this.emit(Events.Status, status);
  }

  /**
   * The message handler for the gmq.Queue.
   */
  #gmqMsgHandler(queue, msg) {
    let data;
    try {
      data = JSON.parse(msg.payload.toString());
    } catch (e) {
      queue.ack(msg, (_err) => {});
      return;
    }

    const self = this;
    if (queue.name() === this.#uldata.name()) {
      data.time = new Date(data.time);
      data.pub = new Date(data.pub);
      data.data = Buffer.from(data.data, 'hex');
      this.#mgrMsgHandler.onUlData(this, data, (err) => {
        if (err) {
          self.#uldata.nack(msg, (_err) => {});
        } else {
          self.#uldata.ack(msg, (_err) => {});
        }
      });
    } else if (queue.name() === this.#dldataResp.name()) {
      this.#mgrMsgHandler.onDlDataResp(this, data, (err) => {
        if (err) {
          self.#dldataResp.nack(msg, (_err) => {});
        } else {
          self.#dldataResp.ack(msg, (_err) => {});
        }
      });
    } else if (queue.name() === this.#dldataResult.name()) {
      this.#mgrMsgHandler.onDlDataResult(this, data, (err) => {
        if (err) {
          self.#dldataResult.nack(msg, (_err) => {});
        } else {
          self.#dldataResult.ack(msg, (_err) => {});
        }
      });
    } else {
      return;
    }
  }

  /** @type {Options} */
  #opts;
  /**
   * Information for delete connection automatically.
   *
   * @type {Map<string, Connection>}
   */
  #connPool;
  /**
   * Information for delete connection automatically.
   *
   * @type {string}
   */
  #hostUri;
  /** @type {AmqpQueue|MqttQueue} */
  #uldata;
  /** @type {AmqpQueue|MqttQueue} */
  #dldata;
  /** @type {AmqpQueue|MqttQueue} */
  #dldataResp;
  /** @type {AmqpQueue|MqttQueue} */
  #dldataResult;
  /** @type {Status} */
  #status;
  /** @type {AppMgrMsgHandlers} */
  #mgrMsgHandler;
}

const QUEUE_PREFIX = 'broker.application';

module.exports = {
  ApplicationMgr,
};