1
//! To management queues for applications and networks.
2
//!
3
//! For applications, the [`application::ApplicationMgr`] manages the following kind of queues:
4
//! - uldata: uplink data from the broker to the application.
5
//! - dldata: downlink data from the application to the broker.
6
//! - dldata-resp: the response of downlink data.
7
//! - dldata-result: the data process result from the network.
8
//!
9
//! For networks, the [`network::NetworkMgr`] manages the following kind of queues:
10
//! - uldata: device uplink data from the network to the broker.
11
//! - dldata: downlink data from the broker to the network.
12
//! - dldata-result: the data process result from the network.
13

            
14
use std::{
15
    collections::HashMap,
16
    error::Error as StdError,
17
    sync::{Arc, Mutex},
18
};
19

            
20
use serde::{Deserialize, Serialize};
21
use url::Url;
22

            
23
use general_mq::{
24
    connection::GmqConnection, queue::Status, AmqpConnection, AmqpConnectionOptions,
25
    AmqpQueueOptions, MqttConnection, MqttConnectionOptions, MqttQueueOptions, Queue, QueueOptions,
26
};
27

            
28
pub mod application;
29
pub mod control;
30
pub mod data;
31
pub mod network;
32

            
33
/// The general connection type with reference counter for upper layer maintenance.
34
#[derive(Clone)]
35
pub enum Connection {
36
    Amqp(AmqpConnection, Arc<Mutex<isize>>),
37
    Mqtt(MqttConnection, Arc<Mutex<isize>>),
38
}
39

            
40
/// Manager status.
41
#[derive(PartialEq)]
42
pub enum MgrStatus {
43
    /// One or more queues are not connected.
44
    NotReady,
45
    /// All queues are connected.
46
    Ready,
47
}
48

            
49
/// Detail queue connection status.
50
pub struct MgrMqStatus {
51
    /// For `uldata`.
52
    pub uldata: Status,
53
    /// For `dldata`.
54
    pub dldata: Status,
55
    /// For `dldata-resp`.
56
    pub dldata_resp: Status,
57
    /// For `dldata-result`.
58
    pub dldata_result: Status,
59
    /// For `ctrl`.
60
    pub ctrl: Status,
61
}
62

            
63
/// The options of the application/network manager.
64
#[derive(Default, Deserialize, Serialize)]
65
pub struct Options {
66
    /// The associated unit ID of the application/network. Empty for public network.
67
    #[serde(rename = "unitId")]
68
    pub unit_id: String,
69
    /// The associated unit code of the application/network. Empty for public network.
70
    #[serde(rename = "unitCode")]
71
    pub unit_code: String,
72
    /// The associated application/network ID.
73
    pub id: String,
74
    /// The associated application/network code.
75
    pub name: String,
76
    /// AMQP prefetch option.
77
    #[serde(skip_serializing_if = "Option::is_none")]
78
    pub prefetch: Option<u16>,
79
    pub persistent: bool,
80
    /// MQTT shared queue prefix option.
81
    #[serde(rename = "sharedPrefix", skip_serializing_if = "Option::is_none")]
82
    pub shared_prefix: Option<String>,
83
}
84

            
85
/// Support application/network host schemes.
86
pub const SUPPORT_SCHEMES: &'static [&'static str] = &["amqp", "amqps", "mqtt", "mqtts"];
87

            
88
/// The default prefetch value for AMQP.
89
const DEF_PREFETCH: u16 = 100;
90

            
91
impl Copy for MgrStatus {}
92

            
93
impl Clone for MgrStatus {
94
    fn clone(&self) -> MgrStatus {
95
        *self
96
    }
97
}
98

            
99
/// Utility function to get the message queue connection instance. A new connection will be created
100
/// if the host does not exist.
101
684
fn get_connection(
102
684
    conn_pool: &Arc<Mutex<HashMap<String, Connection>>>,
103
684
    host_uri: &Url,
104
684
) -> Result<Connection, String> {
105
684
    let uri = host_uri.to_string();
106
684
    let mut mutex = conn_pool.lock().unwrap();
107
684
    if let Some(conn) = mutex.get(&uri) {
108
518
        return Ok(conn.clone());
109
166
    }
110
166

            
111
166
    match host_uri.scheme() {
112
166
        "amqp" | "amqps" => {
113
110
            let opts = AmqpConnectionOptions {
114
110
                uri: host_uri.to_string(),
115
110
                ..Default::default()
116
110
            };
117
110
            let mut conn = AmqpConnection::new(opts)?;
118
110
            let _ = conn.connect();
119
110
            let conn = Connection::Amqp(conn, Arc::new(Mutex::new(0)));
120
110
            mutex.insert(uri, conn.clone());
121
110
            Ok(conn)
122
        }
123
56
        "mqtt" | "mqtts" => {
124
56
            let opts = MqttConnectionOptions {
125
56
                uri: host_uri.to_string(),
126
56
                ..Default::default()
127
56
            };
128
56
            let mut conn = MqttConnection::new(opts)?;
129
56
            let _ = conn.connect();
130
56
            let conn = Connection::Mqtt(conn, Arc::new(Mutex::new(0)));
131
56
            mutex.insert(uri, conn.clone());
132
56
            Ok(conn)
133
        }
134
        s => Err(format!("unsupport scheme {}", s)),
135
    }
136
684
}
137

            
138
/// Utility function to remove connection from the pool if the reference count meet zero.
139
258
async fn remove_connection(
140
258
    conn_pool: &Arc<Mutex<HashMap<String, Connection>>>,
141
258
    host_uri: &String,
142
258
    count: isize,
143
258
) -> Result<(), Box<dyn StdError + Send + Sync>> {
144
114
    let conn = {
145
258
        let mut mutex = conn_pool.lock().unwrap();
146
258
        match mutex.get(host_uri) {
147
40
            None => return Ok(()),
148
218
            Some(conn) => match conn {
149
154
                Connection::Amqp(_, counter) => {
150
154
                    let mut mutex = counter.lock().unwrap();
151
154
                    *mutex -= count;
152
154
                    if *mutex > 0 {
153
84
                        return Ok(());
154
70
                    }
155
                }
156
64
                Connection::Mqtt(_, counter) => {
157
64
                    let mut mutex = counter.lock().unwrap();
158
64
                    *mutex -= count;
159
64
                    if *mutex > 0 {
160
20
                        return Ok(());
161
44
                    }
162
                }
163
            },
164
        }
165
114
        mutex.remove(host_uri)
166
    };
167
114
    if let Some(conn) = conn {
168
114
        match conn {
169
70
            Connection::Amqp(mut conn, _) => {
170
70
                conn.close().await?;
171
            }
172
44
            Connection::Mqtt(mut conn, _) => {
173
44
                conn.close().await?;
174
            }
175
        }
176
    }
177
114
    Ok(())
178
258
}
179

            
180
/// The utility function for creating application/network control queue with the following name:
181
/// - `[prefix].[unit].[code].ctrl`
182
136
fn new_ctrl_queues(
183
136
    conn: &Connection,
184
136
    opts: &Options,
185
136
    prefix: &str,
186
136
) -> Result<Arc<Mutex<Queue>>, String> {
187
136
    let ctrl: Arc<Mutex<Queue>>;
188
136

            
189
136
    if opts.unit_id.len() == 0 {
190
40
        if opts.unit_code.len() != 0 {
191
            return Err("unit_id and unit_code must both empty or non-empty".to_string());
192
40
        }
193
    } else {
194
96
        if opts.unit_code.len() == 0 {
195
            return Err("unit_id and unit_code must both empty or non-empty".to_string());
196
96
        }
197
    }
198
136
    if opts.id.len() == 0 {
199
        return Err("`id` cannot be empty".to_string());
200
136
    }
201
136
    if opts.name.len() == 0 {
202
        return Err("`name` cannot be empty".to_string());
203
136
    }
204

            
205
136
    let unit = match opts.unit_code.len() {
206
40
        0 => "_",
207
96
        _ => opts.unit_code.as_str(),
208
    };
209

            
210
136
    match conn {
211
96
        Connection::Amqp(conn, _) => {
212
96
            let prefetch = match opts.prefetch {
213
2
                None => DEF_PREFETCH,
214
94
                Some(prefetch) => match prefetch {
215
6
                    0 => DEF_PREFETCH,
216
88
                    _ => prefetch,
217
                },
218
            };
219

            
220
96
            let ctrl_opts = QueueOptions::Amqp(
221
96
                AmqpQueueOptions {
222
96
                    name: format!("{}.{}.{}.ctrl", prefix, unit, opts.name.as_str()),
223
96
                    is_recv: false,
224
96
                    reliable: true,
225
96
                    broadcast: false,
226
96
                    prefetch,
227
96
                    ..Default::default()
228
96
                },
229
96
                conn,
230
96
            );
231
96
            ctrl = Arc::new(Mutex::new(Queue::new(ctrl_opts)?));
232
        }
233
40
        Connection::Mqtt(conn, _) => {
234
40
            let ctrl_opts = QueueOptions::Mqtt(
235
40
                MqttQueueOptions {
236
40
                    name: format!("{}.{}.{}.ctrl", prefix, unit, opts.name.as_str()),
237
40
                    is_recv: false,
238
40
                    reliable: true,
239
40
                    broadcast: false,
240
40
                    shared_prefix: opts.shared_prefix.clone(),
241
40
                    ..Default::default()
242
40
                },
243
40
                conn,
244
40
            );
245
40
            ctrl = Arc::new(Mutex::new(Queue::new(ctrl_opts)?));
246
        }
247
    }
248

            
249
136
    Ok(ctrl)
250
136
}
251

            
252
/// The utility function for creating application/network data queues. The return tuple contains:
253
/// - `[prefix].[unit].[code].uldata`
254
/// - `[prefix].[unit].[code].dldata`
255
/// - `[prefix].[unit].[code].dldata-resp`: `Some` for applications and `None` for networks.
256
/// - `[prefix].[unit].[code].dldata-result`
257
252
fn new_data_queues(
258
252
    conn: &Connection,
259
252
    opts: &Options,
260
252
    prefix: &str,
261
252
    is_network: bool,
262
252
) -> Result<
263
252
    (
264
252
        Arc<Mutex<Queue>>,
265
252
        Arc<Mutex<Queue>>,
266
252
        Option<Arc<Mutex<Queue>>>,
267
252
        Arc<Mutex<Queue>>,
268
252
    ),
269
252
    String,
270
252
> {
271
252
    let uldata: Arc<Mutex<Queue>>;
272
252
    let dldata: Arc<Mutex<Queue>>;
273
252
    let dldata_resp: Option<Arc<Mutex<Queue>>>;
274
252
    let dldata_result: Arc<Mutex<Queue>>;
275
252

            
276
252
    if opts.unit_id.len() == 0 {
277
50
        if opts.unit_code.len() != 0 {
278
4
            return Err("unit_id and unit_code must both empty or non-empty".to_string());
279
46
        }
280
    } else {
281
202
        if opts.unit_code.len() == 0 {
282
8
            return Err("unit_id and unit_code must both empty or non-empty".to_string());
283
194
        }
284
    }
285
240
    if opts.id.len() == 0 {
286
14
        return Err("`id` cannot be empty".to_string());
287
226
    }
288
226
    if opts.name.len() == 0 {
289
8
        return Err("`name` cannot be empty".to_string());
290
218
    }
291

            
292
218
    let unit = match opts.unit_code.len() {
293
40
        0 => "_",
294
178
        _ => opts.unit_code.as_str(),
295
    };
296

            
297
218
    match conn {
298
154
        Connection::Amqp(conn, _) => {
299
154
            let prefetch = match opts.prefetch {
300
4
                None => DEF_PREFETCH,
301
150
                Some(prefetch) => match prefetch {
302
10
                    0 => DEF_PREFETCH,
303
140
                    _ => prefetch,
304
                },
305
            };
306

            
307
154
            let uldata_opts = QueueOptions::Amqp(
308
154
                AmqpQueueOptions {
309
154
                    name: format!("{}.{}.{}.uldata", prefix, unit, opts.name.as_str()),
310
154
                    is_recv: is_network,
311
154
                    reliable: true,
312
154
                    persistent: opts.persistent,
313
154
                    broadcast: false,
314
154
                    prefetch,
315
154
                    ..Default::default()
316
154
                },
317
154
                conn,
318
154
            );
319
154
            let dldata_opts = QueueOptions::Amqp(
320
154
                AmqpQueueOptions {
321
154
                    name: format!("{}.{}.{}.dldata", prefix, unit, opts.name.as_str()),
322
154
                    is_recv: !is_network,
323
154
                    reliable: true,
324
154
                    persistent: opts.persistent,
325
154
                    broadcast: false,
326
154
                    prefetch,
327
154
                    ..Default::default()
328
154
                },
329
154
                conn,
330
154
            );
331
154
            let dldata_resp_opts = QueueOptions::Amqp(
332
154
                AmqpQueueOptions {
333
154
                    name: format!("{}.{}.{}.dldata-resp", prefix, unit, opts.name.as_str()),
334
154
                    is_recv: is_network,
335
154
                    reliable: true,
336
154
                    persistent: opts.persistent,
337
154
                    broadcast: false,
338
154
                    prefetch,
339
154
                    ..Default::default()
340
154
                },
341
154
                conn,
342
154
            );
343
154
            let dldata_result_opts = QueueOptions::Amqp(
344
154
                AmqpQueueOptions {
345
154
                    name: format!("{}.{}.{}.dldata-result", prefix, unit, opts.name.as_str()),
346
154
                    is_recv: is_network,
347
154
                    reliable: true,
348
154
                    persistent: opts.persistent,
349
154
                    broadcast: false,
350
154
                    prefetch,
351
154
                    ..Default::default()
352
154
                },
353
154
                conn,
354
154
            );
355
154
            uldata = Arc::new(Mutex::new(Queue::new(uldata_opts)?));
356
154
            dldata = Arc::new(Mutex::new(Queue::new(dldata_opts)?));
357
154
            dldata_resp = match is_network {
358
58
                false => Some(Arc::new(Mutex::new(Queue::new(dldata_resp_opts)?))),
359
96
                true => None,
360
            };
361
154
            dldata_result = Arc::new(Mutex::new(Queue::new(dldata_result_opts)?));
362
        }
363
64
        Connection::Mqtt(conn, _) => {
364
64
            let uldata_opts = QueueOptions::Mqtt(
365
64
                MqttQueueOptions {
366
64
                    name: format!("{}.{}.{}.uldata", prefix, unit, opts.name.as_str()),
367
64
                    is_recv: is_network,
368
64
                    reliable: true,
369
64
                    broadcast: false,
370
64
                    shared_prefix: opts.shared_prefix.clone(),
371
64
                    ..Default::default()
372
64
                },
373
64
                conn,
374
64
            );
375
64
            let dldata_opts = QueueOptions::Mqtt(
376
64
                MqttQueueOptions {
377
64
                    name: format!("{}.{}.{}.dldata", prefix, unit, opts.name.as_str()),
378
64
                    is_recv: !is_network,
379
64
                    reliable: true,
380
64
                    broadcast: false,
381
64
                    shared_prefix: opts.shared_prefix.clone(),
382
64
                    ..Default::default()
383
64
                },
384
64
                conn,
385
64
            );
386
64
            let dldata_resp_opts = QueueOptions::Mqtt(
387
64
                MqttQueueOptions {
388
64
                    name: format!("{}.{}.{}.dldata-resp", prefix, unit, opts.name.as_str()),
389
64
                    is_recv: is_network,
390
64
                    reliable: true,
391
64
                    broadcast: false,
392
64
                    shared_prefix: opts.shared_prefix.clone(),
393
64
                    ..Default::default()
394
64
                },
395
64
                conn,
396
64
            );
397
64
            let dldata_result_opts = QueueOptions::Mqtt(
398
64
                MqttQueueOptions {
399
64
                    name: format!("{}.{}.{}.dldata-result", prefix, unit, opts.name.as_str()),
400
64
                    is_recv: is_network,
401
64
                    reliable: true,
402
64
                    broadcast: false,
403
64
                    shared_prefix: opts.shared_prefix.clone(),
404
64
                    ..Default::default()
405
64
                },
406
64
                conn,
407
64
            );
408
64
            uldata = Arc::new(Mutex::new(Queue::new(uldata_opts)?));
409
64
            dldata = Arc::new(Mutex::new(Queue::new(dldata_opts)?));
410
64
            dldata_resp = match is_network {
411
24
                false => Some(Arc::new(Mutex::new(Queue::new(dldata_resp_opts)?))),
412
40
                true => None,
413
            };
414
64
            dldata_result = Arc::new(Mutex::new(Queue::new(dldata_result_opts)?));
415
        }
416
    }
417

            
418
218
    Ok((uldata, dldata, dldata_resp, dldata_result))
419
252
}