Source: mq/network.js

'use strict';

const { EventEmitter } = require('events');
const { URL } = require('url');

const async = require('async');

const { AmqpQueue } = require('general-mq/lib/amqp-queue');
const { DataTypes, Status, Events } = require('general-mq/lib/constants');
const { MqttQueue } = require('general-mq/lib/mqtt-queue');
const {
  Connection,
  DataMqStatus,
  CtrlAddDevice,
  CtrlAddDeviceBulk,
  CtrlAddDeviceRange,
  CtrlDelDevice,
  CtrlDelDeviceBulk,
  CtrlDelDeviceRange,
  Options,
  getConnection,
  newDataQueues,
  removeConnection,
} = require('./lib');
const { MgrStatus } = require('./constants');

/**
 * Uplink data from network to broker.
 *
 * @typedef {Object} NetUlData
 * @property {Date} time
 * @property {string} networkAddr
 * @property {Buffer} data
 * @property {Object} [extension]
 */

/**
 * Downlink data from broker to network.
 *
 * @typedef {Object} NetDlData
 * @property {string} dataId
 * @property {Date} pub
 * @property {number} expiresIn
 * @property {string} networkAddr
 * @property {Buffer} data
 * @property {Object} [extension]
 */

/**
 * Downlink data result when processing or completing data transfer to the device.
 *
 * @typedef {Object} NetDlDataResult
 * @property {string} dataId
 * @property {number} status
 * @property {string} [message]
 */

/**
 * Network control message from broker to network.
 *
 * @typedef {Object} NetCtrlMsg
 * @property {string} operation
 * @property {Date} time
 * @property {CtrlAddDevice|CtrlAddDeviceBulk|CtrlAddDeviceRange|CtrlDelDevice|CtrlDelDeviceBulk|CtrlDelDeviceRange} new
 */

/**
 * Connection status event.
 *
 * @event NetworkMgr#status
 * @type {Status}
 */

/**
 * @typedef {Object} NetMgrMsgHandlers
 * @property {OnNetDlData} onDlData
 * @property {OnNetCtrl} onCtrl
 */

/**
 * @callback OnNetDlData
 * @param {NetworkMgr} mgr
 * @param {NetDlData} data
 * @param {function} callback
 *   @param {?Error} callback.err Use error to NACK the message.
 */

/**
 * @callback OnNetCtrl
 * @param {NetworkMgr} mgr
 * @param {NetCtrlMsg} data
 * @param {function} callback
 *   @param {?Error} callback.err Use error to NACK the message.
 */

/**
 * The manager for network queues.
 *
 * @class NetworkMgr
 * @fires NetworkMgr#status
 */
class NetworkMgr extends EventEmitter {
  /**
   * @param {Map<string, Connection>} connPool
   * @param {URL} hostUri
   * @param {Options} opts
   * @param {NetMgrMsgHandlers} handler
   * @throws {Error} Wrong arguments.
   */
  constructor(connPool, hostUri, opts, handler) {
    super();

    if (!connPool || !(connPool instanceof Map)) {
      throw Error('`connPool` is not a Connection pool map');
    } else if (!hostUri || !(hostUri instanceof URL) || !URL.canParse(hostUri.toString())) {
      throw Error('`hostUri` is not a valid URI');
    } else if (!opts || typeof opts !== DataTypes.Object || Array.isArray(opts)) {
      throw Error('`opts` is not an object');
    } else if (!handler || typeof handler !== DataTypes.Object || Array.isArray(handler)) {
      throw Error('`handler` is not an object');
    } else if (
      typeof handler.onDlData !== DataTypes.Function ||
      typeof handler.onCtrl !== DataTypes.Function
    ) {
      throw Error('`onDlData` and `onCtrl` must be functions');
    }

    const conn = getConnection(connPool, hostUri);
    const queues = newDataQueues(conn, opts, QUEUE_PREFIX, true);
    conn.conn.connect();

    this.#opts = { ...opts };
    this.#connPool = connPool;
    this.#hostUri = hostUri.toString();
    this.#uldata = queues.uldata;
    this.#dldata = queues.dldata;
    this.#dldataResult = queues.dldataResult;
    this.#ctrl = queues.ctrl;
    this.#status = MgrStatus.NotReady;
    this.#mgrMsgHandler = handler;

    this.#uldata.on(Events.Status, this.#gmqStatusHandler.bind(this));
    this.#uldata.setMsgHandler(this.#gmqMsgHandler.bind(this));
    this.#uldata.connect();

    this.#dldata.on(Events.Status, this.#gmqStatusHandler.bind(this));
    this.#dldata.setMsgHandler(this.#gmqMsgHandler.bind(this));
    this.#dldata.connect();

    this.#dldataResult.on(Events.Status, this.#gmqStatusHandler.bind(this));
    this.#dldataResult.setMsgHandler(this.#gmqMsgHandler.bind(this));
    this.#dldataResult.connect();

    this.#ctrl.on(Events.Status, this.#gmqStatusHandler.bind(this));
    this.#ctrl.setMsgHandler(this.#gmqMsgHandler.bind(this));
    this.#ctrl.connect();

    conn.count += 4;
  }

  /**
   * @returns {string} The associated unit ID of the network.
   */
  unitId() {
    return this.#opts.unitId;
  }

  /**
   * @returns {string} The associated unit code of the network.
   */
  unitCode() {
    return this.#opts.unitCode;
  }

  /**
   * @returns {string} The network ID.
   */
  id() {
    return this.#opts.id;
  }

  /**
   * @returns {string} The network code.
   */
  name() {
    return this.#opts.name;
  }

  /**
   * @returns {MgrStatus} Manager status.
   */
  status() {
    return this.#status;
  }

  /**
   * @returns {DataMqStatus} Manager status.
   */
  mqStatus() {
    return {
      uldata: this.#uldata.status(),
      dldata: this.#dldata.status(),
      dldataResp: Status.Closed,
      dldataResult: this.#dldataResult.status(),
      ctrl: this.#ctrl.status(),
    };
  }

  /**
   * To close the manager queues.
   * The underlying connection will be closed when there are no queues use it.
   *
   * @param {function} callback
   *   @param {?Error} callback.err
   */
  close(callback) {
    const self = this;

    async.waterfall(
      [
        function (cb) {
          self.#uldata.removeAllListeners();
          self.#uldata.close((err) => {
            cb(err || null);
          });
        },
        function (cb) {
          self.#dldata.removeAllListeners();
          self.#dldata.close((err) => {
            cb(err || null);
          });
        },
        function (cb) {
          self.#dldataResult.removeAllListeners();
          self.#dldataResult.close((err) => {
            cb(err || null);
          });
        },
        function (cb) {
          self.#ctrl.removeAllListeners();
          self.#ctrl.close((err) => {
            cb(err || null);
          });
        },
        function (cb) {
          removeConnection(self.#connPool, self.#hostUri, 4, (err) => {
            cb(err || null);
          });
        },
      ],
      callback
    );
  }

  /**
   * Send uplink data to the broker.
   *
   * @param {NetUlData} data
   * @param {function} callback
   *   @param {?Error} callback.err
   * @throws {Error} Wrong arguments.
   */
  sendUlData(data, callback) {
    if (!data || typeof data !== DataTypes.Object || Array.isArray(data)) {
      throw Error('`data` is not an object');
    } else if (!data.time || !(data.time instanceof Date) || isNaN(data.time.getTime())) {
      throw Error('`data.time` is not a Date');
    } else if (!data.networkAddr || typeof data.networkAddr !== DataTypes.String) {
      throw Error('`data.networkAddr` is not a string');
    } else if (!(data.data instanceof Buffer)) {
      throw Error('`data.data` is not a Buffer');
    } else if (
      data.extension !== undefined &&
      (!data.extension ||
        typeof data.extension !== DataTypes.Object ||
        Array.isArray(data.extension))
    ) {
      throw Error('`data.extension` is not an object');
    }

    const uldata = {
      time: data.time.toISOString(),
      networkAddr: data.networkAddr,
      data: data.data.toString('hex'),
      extension: data.extension || undefined,
    };
    const payload = Buffer.from(JSON.stringify(uldata));
    this.#uldata.sendMsg(payload, (err) => {
      callback(err || null);
    });
  }

  /**
   * Send downlink result data to the broker.
   *
   * @param {NetDlDataResult} data
   * @param {function} callback
   *   @param {?Error} callback.err
   * @throws {Error} Wrong arguments.
   */
  sendDlDataResult(data, callback) {
    if (!data || typeof data !== DataTypes.Object || Array.isArray(data)) {
      throw Error('`data` is not an object');
    } else if (!data.dataId || typeof data.dataId !== DataTypes.String) {
      throw Error('`data.dataId` is not a string');
    } else if (!Number.isInteger(data.status)) {
      throw Error('`data.status` is not an integer');
    } else if (data.message !== undefined && typeof data.message !== DataTypes.String) {
      throw Error('`data.message` is not a string');
    }

    const dldataResult = {
      dataId: data.dataId,
      status: data.status,
      message: data.message || undefined,
    };
    const payload = Buffer.from(JSON.stringify(dldataResult));
    this.#dldataResult.sendMsg(payload, (err) => {
      callback(err || null);
    });
  }

  /**
   * The handler for the gmq.Queue#status events.
   */
  #gmqStatusHandler(_queue, _status) {
    let status;
    if (
      this.#uldata.status() === Status.Connected &&
      this.#dldata.status() === Status.Connected &&
      this.#dldataResult.status() === Status.Connected &&
      this.#ctrl.status() === Status.Connected
    ) {
      status = MgrStatus.Ready;
    } else {
      status = MgrStatus.NotReady;
    }

    if (this.#status === status) {
      return;
    }
    this.#status = status;
    this.emit(Events.Status, status);
  }

  /**
   * The message handler for the gmq.Queue.
   */
  #gmqMsgHandler(queue, msg) {
    let data;
    try {
      data = JSON.parse(msg.payload.toString());
    } catch (e) {
      queue.ack(msg, (_err) => {});
      return;
    }

    const self = this;
    if (queue.name() === this.#dldata.name()) {
      data.pub = new Date(data.pub);
      data.data = Buffer.from(data.data, 'hex');
      this.#mgrMsgHandler.onDlData(this, data, (err) => {
        if (err) {
          self.#dldata.nack(msg, (_err) => {});
        } else {
          self.#dldata.ack(msg, (_err) => {});
        }
      });
    } else if (queue.name() === this.#ctrl.name()) {
      data.time = new Date(data.time);
      this.#mgrMsgHandler.onCtrl(this, data, (err) => {
        if (err) {
          self.#ctrl.nack(msg, (_err) => {});
        } else {
          self.#ctrl.ack(msg, (_err) => {});
        }
      });
    } else {
      return;
    }
  }

  /** @type {Options} */
  #opts;
  /**
   * Information for delete connection automatically.
   *
   * @type {Map<string, Connection>}
   */
  #connPool;
  /**
   * Information for delete connection automatically.
   *
   * @type {string}
   */
  #hostUri;
  /** @type {AmqpQueue|MqttQueue} */
  #uldata;
  /** @type {AmqpQueue|MqttQueue} */
  #dldata;
  /** @type {AmqpQueue|MqttQueue} */
  #dldataResult;
  /** @type {AmqpQueue|MqttQueue} */
  #ctrl;
  /** @type {Status} */
  #status;
  /** @type {NetMgrMsgHandlers} */
  #mgrMsgHandler;
}

const QUEUE_PREFIX = 'broker.network';

module.exports = {
  NetworkMgr,
};