1
//! To management queues for applications and networks.
2
//!
3
//! For applications, the [`application::ApplicationMgr`] manages the following kind of queues:
4
//! - uldata: uplink data from the broker to the application.
5
//! - dldata: downlink data from the application to the broker.
6
//! - dldata-resp: the response of downlink data.
7
//! - dldata-result: the data process result from the network.
8
//!
9
//! For networks, the [`network::NetworkMgr`] manages the following kind of queues:
10
//! - uldata: device uplink data from the network to the broker.
11
//! - dldata: downlink data from the broker to the network.
12
//! - dldata-result: the data process result from the network.
13
//! - ctrl: the control messages from the broker to the network
14

            
15
use std::{
16
    collections::HashMap,
17
    error::Error as StdError,
18
    sync::{Arc, Mutex},
19
};
20

            
21
use serde::{Deserialize, Serialize};
22
use url::Url;
23

            
24
use general_mq::{
25
    AmqpConnection, AmqpConnectionOptions, AmqpQueueOptions, MqttConnection, MqttConnectionOptions,
26
    MqttQueueOptions, Queue, QueueOptions, connection::GmqConnection, queue::Status,
27
};
28

            
29
pub mod application;
30
pub mod network;
31

            
32
/// The general connection type with reference counter for upper layer maintenance.
33
#[derive(Clone)]
34
pub enum Connection {
35
    Amqp(AmqpConnection, Arc<Mutex<isize>>),
36
    Mqtt(MqttConnection, Arc<Mutex<isize>>),
37
}
38

            
39
/// Manager status.
40
#[derive(Clone, Copy, PartialEq)]
41
pub enum MgrStatus {
42
    /// One or more queues are not connected.
43
    NotReady,
44
    /// All queues are connected.
45
    Ready,
46
}
47

            
48
/// Detail queue connection status.
49
pub struct DataMqStatus {
50
    /// For `uldata`.
51
    pub uldata: Status,
52
    /// For `dldata`.
53
    pub dldata: Status,
54
    /// For `dldata-resp`.
55
    pub dldata_resp: Status,
56
    /// For `dldata-result`.
57
    pub dldata_result: Status,
58
    /// For `ctrl`.
59
    pub ctrl: Status,
60
}
61

            
62
/// The options of the application/network manager.
63
#[derive(Default, Deserialize, Serialize)]
64
pub struct Options {
65
    /// The associated unit ID of the application/network. Empty for public network.
66
    #[serde(rename = "unitId")]
67
    pub unit_id: String,
68
    /// The associated unit code of the application/network. Empty for public network.
69
    #[serde(rename = "unitCode")]
70
    pub unit_code: String,
71
    /// The associated application/network ID.
72
    pub id: String,
73
    /// The associated application/network code.
74
    pub name: String,
75
    /// AMQP prefetch option.
76
    #[serde(skip_serializing_if = "Option::is_none")]
77
    pub prefetch: Option<u16>,
78
    /// AMQP persistent option.
79
    #[serde(skip_serializing_if = "Option::is_none")]
80
    pub persistent: Option<bool>,
81
    /// MQTT shared queue prefix option.
82
    #[serde(rename = "sharedPrefix", skip_serializing_if = "Option::is_none")]
83
    pub shared_prefix: Option<String>,
84
}
85

            
86
/// Support application/network host schemes.
87
pub const SUPPORT_SCHEMES: &'static [&'static str] = &["amqp", "amqps", "mqtt", "mqtts"];
88

            
89
/// The default prefetch value for AMQP.
90
const DEF_PREFETCH: u16 = 100;
91
/// The default persistent value for AMQP.
92
const DEF_PERSISTENT: bool = false;
93

            
94
/// Utility function to get the message queue connection instance. A new connection will be created
95
/// if the host does not exist.
96
128
fn get_connection(
97
128
    conn_pool: &Arc<Mutex<HashMap<String, Connection>>>,
98
128
    host_uri: &Url,
99
128
) -> Result<Connection, String> {
100
128
    let uri = host_uri.to_string();
101
128
    let mut mutex = conn_pool.lock().unwrap();
102
128
    if let Some(conn) = mutex.get(&uri) {
103
32
        return Ok(conn.clone());
104
96
    }
105

            
106
96
    match host_uri.scheme() {
107
96
        "amqp" | "amqps" => {
108
48
            let opts = AmqpConnectionOptions {
109
48
                uri: host_uri.to_string(),
110
48
                ..Default::default()
111
48
            };
112
48
            let mut conn = AmqpConnection::new(opts)?;
113
48
            let _ = conn.connect();
114
48
            let conn = Connection::Amqp(conn, Arc::new(Mutex::new(0)));
115
48
            mutex.insert(uri, conn.clone());
116
48
            Ok(conn)
117
        }
118
48
        "mqtt" | "mqtts" => {
119
48
            let opts = MqttConnectionOptions {
120
48
                uri: host_uri.to_string(),
121
48
                ..Default::default()
122
48
            };
123
48
            let mut conn = MqttConnection::new(opts)?;
124
48
            let _ = conn.connect();
125
48
            let conn = Connection::Mqtt(conn, Arc::new(Mutex::new(0)));
126
48
            mutex.insert(uri, conn.clone());
127
48
            Ok(conn)
128
        }
129
        s => Err(format!("unsupport scheme {}", s)),
130
    }
131
128
}
132

            
133
/// Utility function to remove connection from the pool if the reference count meet zero.
134
100
async fn remove_connection(
135
100
    conn_pool: &Arc<Mutex<HashMap<String, Connection>>>,
136
100
    host_uri: &String,
137
100
    count: isize,
138
100
) -> Result<(), Box<dyn StdError + Send + Sync>> {
139
88
    let conn = {
140
100
        let mut mutex = conn_pool.lock().unwrap();
141
100
        match mutex.get(host_uri) {
142
            None => return Ok(()),
143
100
            Some(conn) => match conn {
144
50
                Connection::Amqp(_, counter) => {
145
50
                    let mut mutex = counter.lock().unwrap();
146
50
                    *mutex -= count;
147
50
                    if *mutex > 0 {
148
6
                        return Ok(());
149
44
                    }
150
                }
151
50
                Connection::Mqtt(_, counter) => {
152
50
                    let mut mutex = counter.lock().unwrap();
153
50
                    *mutex -= count;
154
50
                    if *mutex > 0 {
155
6
                        return Ok(());
156
44
                    }
157
                }
158
            },
159
        }
160
88
        mutex.remove(host_uri)
161
    };
162
88
    if let Some(conn) = conn {
163
88
        match conn {
164
44
            Connection::Amqp(mut conn, _) => {
165
44
                conn.close().await?;
166
            }
167
44
            Connection::Mqtt(mut conn, _) => {
168
44
                conn.close().await?;
169
            }
170
        }
171
    }
172
88
    Ok(())
173
100
}
174

            
175
/// The utility function for creating application/network queue. The return tuple contains:
176
/// - `[prefix].[unit].[code].uldata`
177
/// - `[prefix].[unit].[code].dldata`
178
/// - `[prefix].[unit].[code].dldata-resp`: `Some` for applications and `None` for networks.
179
/// - `[prefix].[unit].[code].dldata-result`
180
/// - `[prefix].[unit].[code].ctrl`
181
128
fn new_data_queues(
182
128
    conn: &Connection,
183
128
    opts: &Options,
184
128
    prefix: &str,
185
128
    is_network: bool,
186
128
) -> Result<
187
128
    (
188
128
        Arc<Mutex<Queue>>,
189
128
        Arc<Mutex<Queue>>,
190
128
        Option<Arc<Mutex<Queue>>>,
191
128
        Arc<Mutex<Queue>>,
192
128
        Option<Arc<Mutex<Queue>>>,
193
128
    ),
194
128
    String,
195
128
> {
196
    let uldata: Arc<Mutex<Queue>>;
197
    let dldata: Arc<Mutex<Queue>>;
198
    let dldata_resp: Option<Arc<Mutex<Queue>>>;
199
    let dldata_result: Arc<Mutex<Queue>>;
200
    let ctrl: Option<Arc<Mutex<Queue>>>;
201

            
202
128
    if opts.unit_id.len() == 0 {
203
8
        if opts.unit_code.len() != 0 {
204
4
            return Err("unit_id and unit_code must both empty or non-empty".to_string());
205
4
        }
206
    } else {
207
120
        if opts.unit_code.len() == 0 {
208
8
            return Err("unit_id and unit_code must both empty or non-empty".to_string());
209
112
        }
210
    }
211
116
    if opts.id.len() == 0 {
212
8
        return Err("`id` cannot be empty".to_string());
213
108
    }
214
108
    if opts.name.len() == 0 {
215
8
        return Err("`name` cannot be empty".to_string());
216
100
    }
217

            
218
100
    let unit = match opts.unit_code.len() {
219
4
        0 => "_",
220
96
        _ => opts.unit_code.as_str(),
221
    };
222

            
223
100
    match conn {
224
50
        Connection::Amqp(conn, _) => {
225
50
            let prefetch = match opts.prefetch {
226
36
                None => DEF_PREFETCH,
227
14
                Some(prefetch) => match prefetch {
228
10
                    0 => DEF_PREFETCH,
229
4
                    _ => prefetch,
230
                },
231
            };
232
50
            let persistent = match opts.persistent {
233
42
                None => DEF_PERSISTENT,
234
8
                Some(persistent) => persistent,
235
            };
236

            
237
50
            let uldata_opts = QueueOptions::Amqp(
238
50
                AmqpQueueOptions {
239
50
                    name: format!("{}.{}.{}.uldata", prefix, unit, opts.name.as_str()),
240
50
                    is_recv: !is_network,
241
50
                    reliable: true,
242
50
                    persistent,
243
50
                    broadcast: false,
244
50
                    prefetch,
245
50
                    ..Default::default()
246
50
                },
247
50
                conn,
248
50
            );
249
50
            let dldata_opts = QueueOptions::Amqp(
250
50
                AmqpQueueOptions {
251
50
                    name: format!("{}.{}.{}.dldata", prefix, unit, opts.name.as_str()),
252
50
                    is_recv: is_network,
253
50
                    reliable: true,
254
50
                    persistent,
255
50
                    broadcast: false,
256
50
                    prefetch,
257
50
                    ..Default::default()
258
50
                },
259
50
                conn,
260
50
            );
261
50
            let dldata_resp_opts = QueueOptions::Amqp(
262
50
                AmqpQueueOptions {
263
50
                    name: format!("{}.{}.{}.dldata-resp", prefix, unit, opts.name.as_str()),
264
50
                    is_recv: !is_network,
265
50
                    reliable: true,
266
50
                    persistent,
267
50
                    broadcast: false,
268
50
                    prefetch,
269
50
                    ..Default::default()
270
50
                },
271
50
                conn,
272
50
            );
273
50
            let dldata_result_opts = QueueOptions::Amqp(
274
50
                AmqpQueueOptions {
275
50
                    name: format!("{}.{}.{}.dldata-result", prefix, unit, opts.name.as_str()),
276
50
                    is_recv: !is_network,
277
50
                    reliable: true,
278
50
                    persistent,
279
50
                    broadcast: false,
280
50
                    prefetch,
281
50
                    ..Default::default()
282
50
                },
283
50
                conn,
284
50
            );
285
50
            let ctrl_opts = QueueOptions::Amqp(
286
50
                AmqpQueueOptions {
287
50
                    name: format!("{}.{}.{}.ctrl", prefix, unit, opts.name.as_str()),
288
50
                    is_recv: true,
289
50
                    reliable: true,
290
50
                    persistent,
291
50
                    broadcast: false,
292
50
                    prefetch,
293
50
                    ..Default::default()
294
50
                },
295
50
                conn,
296
50
            );
297
50
            uldata = Arc::new(Mutex::new(Queue::new(uldata_opts)?));
298
50
            dldata = Arc::new(Mutex::new(Queue::new(dldata_opts)?));
299
50
            dldata_resp = match is_network {
300
24
                false => Some(Arc::new(Mutex::new(Queue::new(dldata_resp_opts)?))),
301
26
                true => None,
302
            };
303
50
            dldata_result = Arc::new(Mutex::new(Queue::new(dldata_result_opts)?));
304
50
            ctrl = match is_network {
305
24
                false => None,
306
26
                true => Some(Arc::new(Mutex::new(Queue::new(ctrl_opts)?))),
307
            };
308
        }
309
50
        Connection::Mqtt(conn, _) => {
310
50
            let uldata_opts = QueueOptions::Mqtt(
311
50
                MqttQueueOptions {
312
50
                    name: format!("{}.{}.{}.uldata", prefix, unit, opts.name.as_str()),
313
50
                    is_recv: !is_network,
314
50
                    reliable: true,
315
50
                    broadcast: false,
316
50
                    shared_prefix: opts.shared_prefix.clone(),
317
50
                    ..Default::default()
318
50
                },
319
50
                conn,
320
50
            );
321
50
            let dldata_opts = QueueOptions::Mqtt(
322
50
                MqttQueueOptions {
323
50
                    name: format!("{}.{}.{}.dldata", prefix, unit, opts.name.as_str()),
324
50
                    is_recv: is_network,
325
50
                    reliable: true,
326
50
                    broadcast: false,
327
50
                    shared_prefix: opts.shared_prefix.clone(),
328
50
                    ..Default::default()
329
50
                },
330
50
                conn,
331
50
            );
332
50
            let dldata_resp_opts = QueueOptions::Mqtt(
333
50
                MqttQueueOptions {
334
50
                    name: format!("{}.{}.{}.dldata-resp", prefix, unit, opts.name.as_str()),
335
50
                    is_recv: !is_network,
336
50
                    reliable: true,
337
50
                    broadcast: false,
338
50
                    shared_prefix: opts.shared_prefix.clone(),
339
50
                    ..Default::default()
340
50
                },
341
50
                conn,
342
50
            );
343
50
            let dldata_result_opts = QueueOptions::Mqtt(
344
50
                MqttQueueOptions {
345
50
                    name: format!("{}.{}.{}.dldata-result", prefix, unit, opts.name.as_str()),
346
50
                    is_recv: !is_network,
347
50
                    reliable: true,
348
50
                    broadcast: false,
349
50
                    shared_prefix: opts.shared_prefix.clone(),
350
50
                    ..Default::default()
351
50
                },
352
50
                conn,
353
50
            );
354
50
            let ctrl_opts = QueueOptions::Mqtt(
355
50
                MqttQueueOptions {
356
50
                    name: format!("{}.{}.{}.ctrl", prefix, unit, opts.name.as_str()),
357
50
                    is_recv: true,
358
50
                    reliable: true,
359
50
                    broadcast: false,
360
50
                    shared_prefix: opts.shared_prefix.clone(),
361
50
                    ..Default::default()
362
50
                },
363
50
                conn,
364
50
            );
365
50
            uldata = Arc::new(Mutex::new(Queue::new(uldata_opts)?));
366
50
            dldata = Arc::new(Mutex::new(Queue::new(dldata_opts)?));
367
50
            dldata_resp = match is_network {
368
24
                false => Some(Arc::new(Mutex::new(Queue::new(dldata_resp_opts)?))),
369
26
                true => None,
370
            };
371
50
            dldata_result = Arc::new(Mutex::new(Queue::new(dldata_result_opts)?));
372
50
            ctrl = match is_network {
373
24
                false => None,
374
26
                true => Some(Arc::new(Mutex::new(Queue::new(ctrl_opts)?))),
375
            };
376
        }
377
    }
378

            
379
100
    Ok((uldata, dldata, dldata_resp, dldata_result, ctrl))
380
128
}