1
//! To management queues for applications and networks.
2
//!
3
//! For applications, the [`application::ApplicationMgr`] manages the following kind of queues:
4
//! - uldata: uplink data from the broker to the application.
5
//! - dldata: downlink data from the application to the broker.
6
//! - dldata-resp: the response of downlink data.
7
//! - dldata-result: the data process result from the network.
8
//!
9
//! For networks, the [`network::NetworkMgr`] manages the following kind of queues:
10
//! - uldata: device uplink data from the network to the broker.
11
//! - dldata: downlink data from the broker to the network.
12
//! - dldata-result: the data process result from the network.
13

            
14
use std::{
15
    collections::HashMap,
16
    error::Error as StdError,
17
    sync::{Arc, Mutex},
18
};
19

            
20
use serde::{Deserialize, Serialize};
21
use url::Url;
22

            
23
use general_mq::{
24
    connection::GmqConnection, queue::Status, AmqpConnection, AmqpConnectionOptions,
25
    AmqpQueueOptions, MqttConnection, MqttConnectionOptions, MqttQueueOptions, Queue, QueueOptions,
26
};
27

            
28
pub mod application;
29
pub mod control;
30
pub mod data;
31
pub mod network;
32

            
33
/// The general connection type with reference counter for upper layer maintenance.
34
#[derive(Clone)]
35
pub enum Connection {
36
    Amqp(AmqpConnection, Arc<Mutex<isize>>),
37
    Mqtt(MqttConnection, Arc<Mutex<isize>>),
38
}
39

            
40
/// Manager status.
41
#[derive(PartialEq)]
42
pub enum MgrStatus {
43
    /// One or more queues are not connected.
44
    NotReady,
45
    /// All queues are connected.
46
    Ready,
47
}
48

            
49
/// Detail queue connection status.
50
pub struct MgrMqStatus {
51
    /// For `uldata`.
52
    pub uldata: Status,
53
    /// For `dldata`.
54
    pub dldata: Status,
55
    /// For `dldata-resp`.
56
    pub dldata_resp: Status,
57
    /// For `dldata-result`.
58
    pub dldata_result: Status,
59
    /// For `ctrl`.
60
    pub ctrl: Status,
61
}
62

            
63
/// The options of the application/network manager.
64
624
#[derive(Default, Deserialize, Serialize)]
65
pub struct Options {
66
    /// The associated unit ID of the application/network. Empty for public network.
67
    #[serde(rename = "unitId")]
68
    pub unit_id: String,
69
    /// The associated unit code of the application/network. Empty for public network.
70
    #[serde(rename = "unitCode")]
71
    pub unit_code: String,
72
    /// The associated application/network ID.
73
    pub id: String,
74
    /// The associated application/network code.
75
    pub name: String,
76
    /// AMQP prefetch option.
77
    #[serde(skip_serializing_if = "Option::is_none")]
78
    pub prefetch: Option<u16>,
79
    pub persistent: bool,
80
    /// MQTT shared queue prefix option.
81
    #[serde(rename = "sharedPrefix", skip_serializing_if = "Option::is_none")]
82
    pub shared_prefix: Option<String>,
83
}
84

            
85
/// Support application/network host schemes.
86
pub const SUPPORT_SCHEMES: &'static [&'static str] = &["amqp", "amqps", "mqtt", "mqtts"];
87

            
88
/// The default prefetch value for AMQP.
89
const DEF_PREFETCH: u16 = 100;
90

            
91
impl Copy for MgrStatus {}
92

            
93
impl Clone for MgrStatus {
94
    fn clone(&self) -> MgrStatus {
95
        *self
96
    }
97
}
98

            
99
/// Utility function to get the message queue connection instance. A new connection will be created
100
/// if the host does not exist.
101
342
fn get_connection(
102
342
    conn_pool: &Arc<Mutex<HashMap<String, Connection>>>,
103
342
    host_uri: &Url,
104
342
) -> Result<Connection, String> {
105
342
    let uri = host_uri.to_string();
106
342
    let mut mutex = conn_pool.lock().unwrap();
107
342
    if let Some(conn) = mutex.get(&uri) {
108
256
        return Ok(conn.clone());
109
86
    }
110
86

            
111
86
    match host_uri.scheme() {
112
86
        "amqp" | "amqps" => {
113
58
            let opts = AmqpConnectionOptions {
114
58
                uri: host_uri.to_string(),
115
58
                ..Default::default()
116
58
            };
117
58
            let mut conn = AmqpConnection::new(opts)?;
118
58
            let _ = conn.connect();
119
58
            let conn = Connection::Amqp(conn, Arc::new(Mutex::new(0)));
120
58
            mutex.insert(uri, conn.clone());
121
58
            Ok(conn)
122
        }
123
28
        "mqtt" | "mqtts" => {
124
28
            let opts = MqttConnectionOptions {
125
28
                uri: host_uri.to_string(),
126
28
                ..Default::default()
127
28
            };
128
28
            let mut conn = MqttConnection::new(opts)?;
129
28
            let _ = conn.connect();
130
28
            let conn = Connection::Mqtt(conn, Arc::new(Mutex::new(0)));
131
28
            mutex.insert(uri, conn.clone());
132
28
            Ok(conn)
133
        }
134
        s => Err(format!("unsupport scheme {}", s)),
135
    }
136
342
}
137

            
138
/// Utility function to remove connection from the pool if the reference count meet zero.
139
129
async fn remove_connection(
140
129
    conn_pool: &Arc<Mutex<HashMap<String, Connection>>>,
141
129
    host_uri: &String,
142
129
    count: isize,
143
129
) -> Result<(), Box<dyn StdError + Send + Sync>> {
144
60
    let conn = {
145
129
        let mut mutex = conn_pool.lock().unwrap();
146
129
        match mutex.get(host_uri) {
147
20
            None => return Ok(()),
148
109
            Some(conn) => match conn {
149
77
                Connection::Amqp(_, counter) => {
150
77
                    let mut mutex = counter.lock().unwrap();
151
77
                    *mutex -= count;
152
77
                    if *mutex > 0 {
153
39
                        return Ok(());
154
38
                    }
155
                }
156
32
                Connection::Mqtt(_, counter) => {
157
32
                    let mut mutex = counter.lock().unwrap();
158
32
                    *mutex -= count;
159
32
                    if *mutex > 0 {
160
10
                        return Ok(());
161
22
                    }
162
                }
163
            },
164
        }
165
60
        mutex.remove(host_uri)
166
    };
167
60
    if let Some(conn) = conn {
168
60
        match conn {
169
38
            Connection::Amqp(mut conn, _) => {
170
38
                conn.close().await?;
171
            }
172
22
            Connection::Mqtt(mut conn, _) => {
173
22
                conn.close().await?;
174
            }
175
        }
176
    }
177
60
    Ok(())
178
129
}
179

            
180
/// The utility function for creating application/network control queue with the following name:
181
/// - `[prefix].[unit].[code].ctrl`
182
68
fn new_ctrl_queues(
183
68
    conn: &Connection,
184
68
    opts: &Options,
185
68
    prefix: &str,
186
68
) -> Result<Arc<Mutex<Queue>>, String> {
187
68
    let ctrl: Arc<Mutex<Queue>>;
188
68

            
189
68
    if opts.unit_id.len() == 0 {
190
20
        if opts.unit_code.len() != 0 {
191
            return Err("unit_id and unit_code must both empty or non-empty".to_string());
192
20
        }
193
    } else {
194
48
        if opts.unit_code.len() == 0 {
195
            return Err("unit_id and unit_code must both empty or non-empty".to_string());
196
48
        }
197
    }
198
68
    if opts.id.len() == 0 {
199
        return Err("`id` cannot be empty".to_string());
200
68
    }
201
68
    if opts.name.len() == 0 {
202
        return Err("`name` cannot be empty".to_string());
203
68
    }
204

            
205
68
    let unit = match opts.unit_code.len() {
206
20
        0 => "_",
207
48
        _ => opts.unit_code.as_str(),
208
    };
209

            
210
68
    match conn {
211
48
        Connection::Amqp(conn, _) => {
212
48
            let prefetch = match opts.prefetch {
213
1
                None => DEF_PREFETCH,
214
47
                Some(prefetch) => match prefetch {
215
3
                    0 => DEF_PREFETCH,
216
44
                    _ => prefetch,
217
                },
218
            };
219

            
220
48
            let ctrl_opts = QueueOptions::Amqp(
221
48
                AmqpQueueOptions {
222
48
                    name: format!("{}.{}.{}.ctrl", prefix, unit, opts.name.as_str()),
223
48
                    is_recv: false,
224
48
                    reliable: true,
225
48
                    broadcast: false,
226
48
                    prefetch,
227
48
                    ..Default::default()
228
48
                },
229
48
                conn,
230
48
            );
231
48
            ctrl = Arc::new(Mutex::new(Queue::new(ctrl_opts)?));
232
        }
233
20
        Connection::Mqtt(conn, _) => {
234
20
            let ctrl_opts = QueueOptions::Mqtt(
235
20
                MqttQueueOptions {
236
20
                    name: format!("{}.{}.{}.ctrl", prefix, unit, opts.name.as_str()),
237
20
                    is_recv: false,
238
20
                    reliable: true,
239
20
                    broadcast: false,
240
20
                    shared_prefix: opts.shared_prefix.clone(),
241
20
                    ..Default::default()
242
20
                },
243
20
                conn,
244
20
            );
245
20
            ctrl = Arc::new(Mutex::new(Queue::new(ctrl_opts)?));
246
        }
247
    }
248

            
249
68
    Ok(ctrl)
250
68
}
251

            
252
/// The utility function for creating application/network data queues. The return tuple contains:
253
/// - `[prefix].[unit].[code].uldata`
254
/// - `[prefix].[unit].[code].dldata`
255
/// - `[prefix].[unit].[code].dldata-resp`: `Some` for applications and `None` for networks.
256
/// - `[prefix].[unit].[code].dldata-result`
257
126
fn new_data_queues(
258
126
    conn: &Connection,
259
126
    opts: &Options,
260
126
    prefix: &str,
261
126
    is_network: bool,
262
126
) -> Result<
263
126
    (
264
126
        Arc<Mutex<Queue>>,
265
126
        Arc<Mutex<Queue>>,
266
126
        Option<Arc<Mutex<Queue>>>,
267
126
        Arc<Mutex<Queue>>,
268
126
    ),
269
126
    String,
270
126
> {
271
126
    let uldata: Arc<Mutex<Queue>>;
272
126
    let dldata: Arc<Mutex<Queue>>;
273
126
    let dldata_resp: Option<Arc<Mutex<Queue>>>;
274
126
    let dldata_result: Arc<Mutex<Queue>>;
275
126

            
276
126
    if opts.unit_id.len() == 0 {
277
25
        if opts.unit_code.len() != 0 {
278
2
            return Err("unit_id and unit_code must both empty or non-empty".to_string());
279
23
        }
280
    } else {
281
101
        if opts.unit_code.len() == 0 {
282
4
            return Err("unit_id and unit_code must both empty or non-empty".to_string());
283
97
        }
284
    }
285
120
    if opts.id.len() == 0 {
286
7
        return Err("`id` cannot be empty".to_string());
287
113
    }
288
113
    if opts.name.len() == 0 {
289
4
        return Err("`name` cannot be empty".to_string());
290
109
    }
291

            
292
109
    let unit = match opts.unit_code.len() {
293
20
        0 => "_",
294
89
        _ => opts.unit_code.as_str(),
295
    };
296

            
297
109
    match conn {
298
77
        Connection::Amqp(conn, _) => {
299
77
            let prefetch = match opts.prefetch {
300
2
                None => DEF_PREFETCH,
301
75
                Some(prefetch) => match prefetch {
302
5
                    0 => DEF_PREFETCH,
303
70
                    _ => prefetch,
304
                },
305
            };
306

            
307
77
            let uldata_opts = QueueOptions::Amqp(
308
77
                AmqpQueueOptions {
309
77
                    name: format!("{}.{}.{}.uldata", prefix, unit, opts.name.as_str()),
310
77
                    is_recv: is_network,
311
77
                    reliable: true,
312
77
                    persistent: opts.persistent,
313
77
                    broadcast: false,
314
77
                    prefetch,
315
77
                    ..Default::default()
316
77
                },
317
77
                conn,
318
77
            );
319
77
            let dldata_opts = QueueOptions::Amqp(
320
77
                AmqpQueueOptions {
321
77
                    name: format!("{}.{}.{}.dldata", prefix, unit, opts.name.as_str()),
322
77
                    is_recv: !is_network,
323
77
                    reliable: true,
324
77
                    persistent: opts.persistent,
325
77
                    broadcast: false,
326
77
                    prefetch,
327
77
                    ..Default::default()
328
77
                },
329
77
                conn,
330
77
            );
331
77
            let dldata_resp_opts = QueueOptions::Amqp(
332
77
                AmqpQueueOptions {
333
77
                    name: format!("{}.{}.{}.dldata-resp", prefix, unit, opts.name.as_str()),
334
77
                    is_recv: is_network,
335
77
                    reliable: true,
336
77
                    persistent: opts.persistent,
337
77
                    broadcast: false,
338
77
                    prefetch,
339
77
                    ..Default::default()
340
77
                },
341
77
                conn,
342
77
            );
343
77
            let dldata_result_opts = QueueOptions::Amqp(
344
77
                AmqpQueueOptions {
345
77
                    name: format!("{}.{}.{}.dldata-result", prefix, unit, opts.name.as_str()),
346
77
                    is_recv: is_network,
347
77
                    reliable: true,
348
77
                    persistent: opts.persistent,
349
77
                    broadcast: false,
350
77
                    prefetch,
351
77
                    ..Default::default()
352
77
                },
353
77
                conn,
354
77
            );
355
77
            uldata = Arc::new(Mutex::new(Queue::new(uldata_opts)?));
356
77
            dldata = Arc::new(Mutex::new(Queue::new(dldata_opts)?));
357
77
            dldata_resp = match is_network {
358
29
                false => Some(Arc::new(Mutex::new(Queue::new(dldata_resp_opts)?))),
359
48
                true => None,
360
            };
361
77
            dldata_result = Arc::new(Mutex::new(Queue::new(dldata_result_opts)?));
362
        }
363
32
        Connection::Mqtt(conn, _) => {
364
32
            let uldata_opts = QueueOptions::Mqtt(
365
32
                MqttQueueOptions {
366
32
                    name: format!("{}.{}.{}.uldata", prefix, unit, opts.name.as_str()),
367
32
                    is_recv: is_network,
368
32
                    reliable: true,
369
32
                    broadcast: false,
370
32
                    shared_prefix: opts.shared_prefix.clone(),
371
32
                    ..Default::default()
372
32
                },
373
32
                conn,
374
32
            );
375
32
            let dldata_opts = QueueOptions::Mqtt(
376
32
                MqttQueueOptions {
377
32
                    name: format!("{}.{}.{}.dldata", prefix, unit, opts.name.as_str()),
378
32
                    is_recv: !is_network,
379
32
                    reliable: true,
380
32
                    broadcast: false,
381
32
                    shared_prefix: opts.shared_prefix.clone(),
382
32
                    ..Default::default()
383
32
                },
384
32
                conn,
385
32
            );
386
32
            let dldata_resp_opts = QueueOptions::Mqtt(
387
32
                MqttQueueOptions {
388
32
                    name: format!("{}.{}.{}.dldata-resp", prefix, unit, opts.name.as_str()),
389
32
                    is_recv: is_network,
390
32
                    reliable: true,
391
32
                    broadcast: false,
392
32
                    shared_prefix: opts.shared_prefix.clone(),
393
32
                    ..Default::default()
394
32
                },
395
32
                conn,
396
32
            );
397
32
            let dldata_result_opts = QueueOptions::Mqtt(
398
32
                MqttQueueOptions {
399
32
                    name: format!("{}.{}.{}.dldata-result", prefix, unit, opts.name.as_str()),
400
32
                    is_recv: is_network,
401
32
                    reliable: true,
402
32
                    broadcast: false,
403
32
                    shared_prefix: opts.shared_prefix.clone(),
404
32
                    ..Default::default()
405
32
                },
406
32
                conn,
407
32
            );
408
32
            uldata = Arc::new(Mutex::new(Queue::new(uldata_opts)?));
409
32
            dldata = Arc::new(Mutex::new(Queue::new(dldata_opts)?));
410
32
            dldata_resp = match is_network {
411
12
                false => Some(Arc::new(Mutex::new(Queue::new(dldata_resp_opts)?))),
412
20
                true => None,
413
            };
414
32
            dldata_result = Arc::new(Mutex::new(Queue::new(dldata_result_opts)?));
415
        }
416
    }
417

            
418
109
    Ok((uldata, dldata, dldata_resp, dldata_result))
419
126
}