'use strict';
const { URL } = require('url');
const gmq = require('general-mq');
const { AmqpConnection } = require('general-mq/lib/amqp-connection');
const { AmqpQueue } = require('general-mq/lib/amqp-queue');
const { DataTypes } = require('general-mq/lib/constants');
const { MqttConnection } = require('general-mq/lib/mqtt-connection');
const { MqttQueue } = require('general-mq/lib/mqtt-queue');
const { Status } = require('./constants');
/**
* Detail queue connection status.
*
* @typedef {Object} DataMqStatus
* @property {Status} uldata
* @property {Status} dldata
* @property {Status} dldataResp
* @property {Status} dldataResult
* @property {Status} ctrl
*/
/**
* The options of the application/network manager.
*
* @typedef {Object} Options
* @property {string} [unitId] The associated unit ID of the application/network. Empty or undefined
* for public network.
* @property {string} [unitCode] The associated unit code of the application/network. Empty or
* undefined for public network.
* @property {string} id The associated application/network ID.
* @property {string} name The associated application/network code.
* @property {number} [prefetch=100] AMQP prefetch option.
* @property {boolean} [persistent=false] AMQP persistent option.
* @property {string} [sharedPrefix] MQTT shared queue prefix option.
*/
/**
* @typedef {Object} CtrlAddDevice
* @property {string} networkAddr
*/
/**
* @typedef {Object} CtrlAddDeviceBulk
* @property {string[]} networkAddrs
*/
/**
* @typedef {Object} CtrlAddDeviceRange
* @property {string} startAddr
* @property {string} endAddr
*/
/**
* @typedef {Object} CtrlDelDevice
* @property {string} networkAddr
*/
/**
* @typedef {Object} CtrlDelDeviceBulk
* @property {string[]} networkAddrs
*/
/**
* @typedef {Object} CtrlDelDeviceRange
* @property {string} startAddr
* @property {string} endAddr
*/
/**
* @private
* @typedef {Object} DataMqQueues
* @property {AmqpQueue|MqttQueue} uldata
* @property {AmqpQueue|MqttQueue} dldata
* @property {?AmqpQueue|?MqttQueue} dldataResp (for application only)
* @property {AmqpQueue|MqttQueue} dldataResult
* @property {?AmqpQueue|?MqttQueue} ctrl (for network only)
*/
/**
* The connection object with reference count for pool management.
*
* @class Connection
*/
class Connection {
/**
* @param {AmqpConnection|MqttConnection} conn
* @throws {Error} Wrong host scheme.
*/
constructor(conn) {
if (!(conn instanceof AmqpConnection) && !(conn instanceof MqttConnection)) {
throw Error('invalid `conn`');
}
this.conn = conn;
this.count = 0;
}
/** @type {AmqpConnection|MqttConnection} */
conn;
/**
* Reference count.
*
* @type {number}
*/
count;
}
const DEF_PREFETCH = 100;
const DEF_PERSISTENT = false;
/**
* Utility function to get the message queue connection instance. A new connection will be created
* if the host does not exist.
*
* @private
* @param {Map<string, Connection>} connPool
* @param {URL} hostUri
* @returns {Connection}
* @throws {Error} Wrong host scheme.
*/
function getConnection(connPool, hostUri) {
const uri = hostUri.toString();
let conn = connPool.get(uri);
if (conn) {
return conn;
}
let engine;
switch (hostUri.protocol) {
case 'amqp:':
case 'amqps:':
engine = gmq.amqp;
break;
case 'mqtt:':
case 'mqtts:':
engine = gmq.mqtt;
break;
default:
throw Error(`unsupport scheme ${hostUri.protocol}`);
}
const c = new engine.Connection({ uri });
conn = new Connection(c);
connPool.set(uri, conn);
return conn;
}
/**
* Utility function to remove connection from the pool if the reference count meet zero.
*
* @private
* @param {Map<string, Connection>} connPool
* @param {string} hostUri
* @param {number} count
* @param {function} callback
* @param {?Error} callback.err
*/
function removeConnection(connPool, hostUri, count, callback) {
const conn = connPool.get(hostUri);
if (!conn) {
return void process.nextTick(() => {
callback(null);
});
}
conn.count -= count;
if (conn.count <= 0) {
connPool.delete(hostUri);
}
conn.conn.removeAllListeners();
conn.conn.close((err) => {
callback(err || null);
});
}
/**
* The utility function for creating application/network queue. The return object contains:
* - `[prefix].[unit].[code].uldata`
* - `[prefix].[unit].[code].dldata`
* - `[prefix].[unit].[code].dldata-resp`
* - `[prefix].[unit].[code].dldata-result`
* - `[prefix].[unit].[code].ctrl`
*
* @private
* @param {Connection} conn
* @param {Options} opts
* @param {string} prefix
* @param {bool} isNetwork
* @returns {DataMqQueues}
* @throws {Error} Wrong parameters.
*/
function newDataQueues(conn, opts, prefix, isNetwork) {
if (!(conn instanceof Connection)) {
throw Error('`conn` is not a Connection');
} else if (!opts || typeof opts !== DataTypes.Object || Array.isArray(opts)) {
throw Error('`opts` is not an object');
} else if (!prefix || typeof prefix !== DataTypes.String) {
throw Error('`prefix` is not a string');
} else if (typeof isNetwork !== DataTypes.Boolean) {
throw Error('`isNetwork` is not a boolean');
} else if (opts.unitId !== undefined && typeof opts.unitId !== DataTypes.String) {
throw Error('`opts.unitId` is not a string');
} else if (opts.unitCode !== undefined && typeof opts.unitCode !== DataTypes.String) {
throw Error('`opts.unitCode` is not a string');
} else if (!opts.id || typeof opts.id !== DataTypes.String) {
throw Error('`opts.id` is not a non-empty string');
} else if (!opts.name || typeof opts.name !== DataTypes.String) {
throw Error('`opts.name` is not a non-empty string');
} else if (
opts.prefetch !== undefined &&
(!Number.isInteger(opts.prefetch) || opts.prefetch < 0 || opts.prefetch > 65535)
) {
throw Error('`opts.prefetch` is not an integer between 1 and 65535');
} else if (opts.persistent !== undefined && typeof opts.persistent !== DataTypes.Boolean) {
throw Error('`opts.persistent` is not a boolean');
} else if (opts.sharedPrefix !== undefined && typeof opts.sharedPrefix !== DataTypes.String) {
throw Error('`opts.sharedPrefix` is not a string');
}
if ((opts.unitId && !opts.unitCode) || (!opts.unitId && opts.unitCode)) {
throw Error('`opts.unitId` and `opts.unitCode` must both empty or non-empty');
}
const engine = conn.conn instanceof MqttConnection ? gmq.mqtt : gmq.amqp;
const qNamePrefix = `${prefix}.${opts.unitCode || '_'}.${opts.name}`;
let qOpts = {
name: `${qNamePrefix}.uldata`,
isRecv: !isNetwork,
reliable: true,
broadcast: false,
prefetch: opts.prefetch || DEF_PREFETCH,
persistent: opts.persistent || DEF_PERSISTENT,
sharedPrefix: opts.sharedPrefix,
};
const uldata = new engine.Queue(qOpts, conn.conn);
qOpts.name = `${qNamePrefix}.dldata`;
qOpts.isRecv = isNetwork;
const dldata = new engine.Queue(qOpts, conn.conn);
qOpts.name = `${qNamePrefix}.dldata-resp`;
qOpts.isRecv = !isNetwork;
const dldataResp = isNetwork ? null : new engine.Queue(qOpts, conn.conn);
qOpts.name = `${qNamePrefix}.dldata-result`;
qOpts.isRecv = !isNetwork;
const dldataResult = new engine.Queue(qOpts, conn.conn);
qOpts.name = `${qNamePrefix}.ctrl`;
qOpts.isRecv = true;
const ctrl = isNetwork ? new engine.Queue(qOpts, conn.conn) : null;
return {
uldata,
dldata,
dldataResp,
dldataResult,
ctrl,
};
}
module.exports = {
Connection,
getConnection,
removeConnection,
newDataQueues,
};