'use strict';
const { EventEmitter } = require('events');
const { URL } = require('url');
const mqtt = require('mqtt');
const randomstring = require('randomstring');
const { DataTypes, Events, Status, QueuePattern } = require('./constants');
const DEF_URI = 'mqtt://localhost';
const DEF_CONN_TIMEOUT = 3000;
const DEF_RECONN = 1000;
/**
* Connection error event.
*
* @event MqttConnection#error
* @type {Error}
*/
/**
* Connection status event.
*
* @event MqttConnection#status
* @type {Status}
*/
/**
* Packet handler object.
*
* @typedef {Object} PacketHandler
* @property {string} topic The relative topic of the queue (with shared prefix or not).
* @property {number} qos 0 for best-effort and 1 for reliable.
* @property {function} handler
* @param {Buffer} handler.payload
*/
/**
* Manages an MQTT connection.
*
* @class MqttConnection
* @fires MqttConnection#error
* @fires MqttConnection#status
*/
class MqttConnection extends EventEmitter {
/**
* @constructor
* @param {Object} [opts] The connection options.
* @param {string} [opts.uri='mqtt://localhost'] Connection URI. Use
* `mqtt|mqtts://username:password@host:port/vhost` format.
* @param {number} [opts.connectTimeoutMillis=3000] Connection timeout in milliseconds.
* @param {number} [opts.reconnectMillis=1000] Time in milliseconds from disconnection to
* reconnection.
* @param {boolean} [opts.insecure=false] Allow insecure TLS connection.
* @param {string} [opts.clientId] Client identifier. Default uses a random client identifier.
* @param {boolean} [opts.cleanSession=true] Clean session flag. This is NOT stable.
* @throws {Error} Wrong arguments.
*/
constructor(opts) {
super();
if (opts === undefined) {
opts = {};
} else {
if (!opts || typeof opts !== DataTypes.Object || Array.isArray(opts)) {
throw Error('`opts` is not an object');
}
if (opts.uri !== undefined) {
if (!opts.uri || typeof opts.uri !== DataTypes.String) {
throw Error('`uri` is not a string');
} else if (!URL.canParse(opts.uri)) {
throw Error('`uri` is not a valid URI');
}
const u = new URL(opts.uri);
if (u.protocol !== 'mqtt:' && u.protocol !== 'mqtts:') {
throw Error('`uri` scheme only support `mqtt` and `mqtts`');
}
}
if (
opts.connectTimeoutMillis !== undefined &&
(!Number.isInteger(opts.connectTimeoutMillis) || opts.connectTimeoutMillis < 0)
) {
throw Error('`connectTimeoutMillis` is not a positive integer');
} else if (
opts.reconnectMillis !== undefined &&
(!Number.isInteger(opts.reconnectMillis) || opts.reconnectMillis < 0)
) {
throw Error('`reconnectMillis` is not a positive integer');
} else if (opts.insecure !== undefined && typeof opts.insecure !== DataTypes.Boolean) {
throw Error('`insecure` is not a boolean');
} else if (
opts.clientId !== undefined &&
(typeof opts.clientId !== DataTypes.String ||
opts.clientId.length <= 0 ||
opts.clientId.length > 23)
) {
throw Error('`clientId` is not a string with length 1~23');
} else if (
opts.cleanSession !== undefined &&
typeof opts.cleanSession !== DataTypes.Boolean
) {
throw Error('`cleanSession` is not a boolean');
}
}
this.#opts = {
uri: opts.uri || DEF_URI,
connectTimeoutMillis: opts.connectTimeoutMillis || DEF_CONN_TIMEOUT,
reconnectMillis: opts.reconnectMillis || DEF_RECONN,
insecure: opts.insecure || false,
clientId: opts.clientId || `general-mq-${randomstring.generate(12)}`,
cleanSession: !(opts.cleanSession === false),
};
this.#status = Status.Closed;
this.#conn = null;
this.#packetHandlers = new Map();
}
/**
* To get the connection status.
*
* @returns {Status}
*/
status() {
return this.#status;
}
/**
* To connect to the message broker. The `AmqpConnection` will report status with Status.
*/
connect() {
if (this.#status !== Status.Closed && this.#status !== Status.Closing) {
return;
}
this.#status = Status.Connecting;
this.emit(Events.Status, Status.Connecting);
this.#innerConnect();
}
/**
* To close the connection. You can use a callback function to get the result or listen events.
*
* @param {function} [callback]
* @param {?Error} callback.err
*/
close(callback) {
if (typeof callback !== DataTypes.Function) {
callback = null;
}
if (!this.#conn) {
if (callback) {
return void process.nextTick(() => {
callback(null);
});
}
return;
}
this.#status = Status.Closing;
this.emit(Events.Status, Status.Closing);
const self = this;
this.#conn.end((err) => {
if (self.#conn) {
self.#conn.removeAllListeners();
self.#conn = null;
}
self.#status = Status.Closed;
self.emit(Events.Status, Status.Closed);
if (callback) {
return void process.nextTick(() => {
callback(err);
});
}
});
}
/**
* To get the raw MQTT connection instance for topic subscription.
*
* @private
* @returns {?mqtt.MqttClient} The connection instance.
*/
getRawConnection() {
return this.#conn;
}
/**
* To add a packet handler for `MqttQueue`.
*
* @private
* @param {string} name The queue name.
* @param {string} topic The relative topic.
* @param {boolean} reliable The queue is reliable.
* @param {function} handler The packet handler.
* @param {Buffer} handler.payload The packet payload.
*/
addPacketHandler(name, topic, reliable, handler) {
if (!QueuePattern.test(name)) {
throw Error('`name` is not a valid queue name');
} else if (!topic || typeof topic !== DataTypes.String || !topic.endsWith(name)) {
throw Error('`topic` is not a valid topic');
} else if (typeof reliable !== DataTypes.Boolean) {
throw Error('`reliable` is not a boolean');
} else if (typeof handler !== DataTypes.Function) {
throw Error('`handler` is not a function');
}
this.#packetHandlers.set(name, {
topic,
qos: reliable ? 1 : 0,
handler,
});
}
/**
* To remove a packet handler.
*
* @private
* @param {string} name The queue name.
*/
removePacketHandler(name) {
if (!QueuePattern.test(name)) {
throw Error('`name` is not a valid queue name');
}
this.#packetHandlers.delete(name);
}
#innerConnect() {
const urlInfo = new URL(this.#opts.uri);
const opts = {
reconnectPeriod: this.#opts.reconnectMillis,
connectTimeout: this.#opts.connectTimeoutMillis,
clean: this.#opts.cleanSession,
username: urlInfo.username,
password: urlInfo.password,
};
if (this.#opts.insecure) {
opts.rejectUnauthorized = false;
}
this.#conn = mqtt.connect(this.#opts.uri, opts);
this.#conn.on('close', this.#onClose.bind(this));
this.#conn.on('connect', this.#onConnect.bind(this));
this.#conn.on('error', this.#onError.bind(this));
this.#conn.on('message', this.#onMessage.bind(this));
this.#conn.on('offline', this.#onClose.bind(this));
this.#conn.on('reconnect', this.#onReconnect.bind(this));
}
#onClose() {
if (this.#conn) {
this.#conn.removeAllListeners();
this.#conn = null;
}
if (this.#status !== Status.Closing && this.#status !== Status.Closed) {
this.#status = Status.Connecting;
this.emit(Events.Status, Status.Connecting);
this.#innerConnect();
}
}
#onConnect() {
if (this.#conn) {
this.#status = Status.Connected;
this.emit(Events.Status, Status.Connected);
}
}
#onError(err) {
if (this.#status !== Status.Closed) {
this.emit(Events.Error, err);
}
}
#onMessage(topic, message, _packet) {
const handler = this.#packetHandlers.get(topic);
if (handler) {
handler.handler(message);
}
}
#onReconnect() {
// Rely on library's reconnect instead of calling #innerConnect().
if (this.#status !== Status.Closing && this.#status !== Status.Closed) {
this.#status = Status.Connecting;
this.emit(Events.Status, Status.Connecting);
}
}
#opts;
/** @type {Status} */
#status;
/** @type {mqtt.MqttClient} */
#conn;
/** @type {Map<string, PacketHandler>} */
#packetHandlers;
}
module.exports = {
MqttConnection,
};