1
//! To management queues for applications and networks.
2
//!
3
//! For applications, the [`application::ApplicationMgr`] manages the following kind of queues:
4
//! - uldata: uplink data from the broker to the application.
5
//! - dldata: downlink data from the application to the broker.
6
//! - dldata-resp: the response of downlink data.
7
//! - dldata-result: the data process result from the network.
8
//!
9
//! For networks, the [`network::NetworkMgr`] manages the following kind of queues:
10
//! - uldata: device uplink data from the network to the broker.
11
//! - dldata: downlink data from the broker to the network.
12
//! - dldata-result: the data process result from the network.
13
//! - ctrl: the control messages from the broker to the network
14

            
15
use std::{
16
    collections::HashMap,
17
    error::Error as StdError,
18
    sync::{Arc, Mutex},
19
};
20

            
21
use serde::{Deserialize, Serialize};
22
use url::Url;
23

            
24
use general_mq::{
25
    connection::GmqConnection, queue::Status, AmqpConnection, AmqpConnectionOptions,
26
    AmqpQueueOptions, MqttConnection, MqttConnectionOptions, MqttQueueOptions, Queue, QueueOptions,
27
};
28

            
29
pub mod application;
30
pub mod network;
31

            
32
/// The general connection type with reference counter for upper layer maintenance.
33
#[derive(Clone)]
34
pub enum Connection {
35
    Amqp(AmqpConnection, Arc<Mutex<isize>>),
36
    Mqtt(MqttConnection, Arc<Mutex<isize>>),
37
}
38

            
39
/// Manager status.
40
#[derive(PartialEq)]
41
pub enum MgrStatus {
42
    /// One or more queues are not connected.
43
    NotReady,
44
    /// All queues are connected.
45
    Ready,
46
}
47

            
48
/// Detail queue connection status.
49
pub struct DataMqStatus {
50
    /// For `uldata`.
51
    pub uldata: Status,
52
    /// For `dldata`.
53
    pub dldata: Status,
54
    /// For `dldata-resp`.
55
    pub dldata_resp: Status,
56
    /// For `dldata-result`.
57
    pub dldata_result: Status,
58
    /// For `ctrl`.
59
    pub ctrl: Status,
60
}
61

            
62
/// The options of the application/network manager.
63
#[derive(Default, Deserialize, Serialize)]
64
pub struct Options {
65
    /// The associated unit ID of the application/network. Empty for public network.
66
    #[serde(rename = "unitId")]
67
    pub unit_id: String,
68
    /// The associated unit code of the application/network. Empty for public network.
69
    #[serde(rename = "unitCode")]
70
    pub unit_code: String,
71
    /// The associated application/network ID.
72
    pub id: String,
73
    /// The associated application/network code.
74
    pub name: String,
75
    /// AMQP prefetch option.
76
    #[serde(skip_serializing_if = "Option::is_none")]
77
    pub prefetch: Option<u16>,
78
    /// AMQP persistent option.
79
    #[serde(skip_serializing_if = "Option::is_none")]
80
    pub persistent: Option<bool>,
81
    /// MQTT shared queue prefix option.
82
    #[serde(rename = "sharedPrefix", skip_serializing_if = "Option::is_none")]
83
    pub shared_prefix: Option<String>,
84
}
85

            
86
/// Support application/network host schemes.
87
pub const SUPPORT_SCHEMES: &'static [&'static str] = &["amqp", "amqps", "mqtt", "mqtts"];
88

            
89
/// The default prefetch value for AMQP.
90
const DEF_PREFETCH: u16 = 100;
91
/// The default persistent value for AMQP.
92
const DEF_PERSISTENT: bool = false;
93

            
94
impl Copy for MgrStatus {}
95

            
96
impl Clone for MgrStatus {
97
    fn clone(&self) -> MgrStatus {
98
        *self
99
    }
100
}
101

            
102
/// Utility function to get the message queue connection instance. A new connection will be created
103
/// if the host does not exist.
104
128
fn get_connection(
105
128
    conn_pool: &Arc<Mutex<HashMap<String, Connection>>>,
106
128
    host_uri: &Url,
107
128
) -> Result<Connection, String> {
108
128
    let uri = host_uri.to_string();
109
128
    let mut mutex = conn_pool.lock().unwrap();
110
128
    if let Some(conn) = mutex.get(&uri) {
111
32
        return Ok(conn.clone());
112
96
    }
113
96

            
114
96
    match host_uri.scheme() {
115
96
        "amqp" | "amqps" => {
116
48
            let opts = AmqpConnectionOptions {
117
48
                uri: host_uri.to_string(),
118
48
                ..Default::default()
119
48
            };
120
48
            let mut conn = AmqpConnection::new(opts)?;
121
48
            let _ = conn.connect();
122
48
            let conn = Connection::Amqp(conn, Arc::new(Mutex::new(0)));
123
48
            mutex.insert(uri, conn.clone());
124
48
            Ok(conn)
125
        }
126
48
        "mqtt" | "mqtts" => {
127
48
            let opts = MqttConnectionOptions {
128
48
                uri: host_uri.to_string(),
129
48
                ..Default::default()
130
48
            };
131
48
            let mut conn = MqttConnection::new(opts)?;
132
48
            let _ = conn.connect();
133
48
            let conn = Connection::Mqtt(conn, Arc::new(Mutex::new(0)));
134
48
            mutex.insert(uri, conn.clone());
135
48
            Ok(conn)
136
        }
137
        s => Err(format!("unsupport scheme {}", s)),
138
    }
139
128
}
140

            
141
/// Utility function to remove connection from the pool if the reference count meet zero.
142
100
async fn remove_connection(
143
100
    conn_pool: &Arc<Mutex<HashMap<String, Connection>>>,
144
100
    host_uri: &String,
145
100
    count: isize,
146
100
) -> Result<(), Box<dyn StdError + Send + Sync>> {
147
88
    let conn = {
148
100
        let mut mutex = conn_pool.lock().unwrap();
149
100
        match mutex.get(host_uri) {
150
            None => return Ok(()),
151
100
            Some(conn) => match conn {
152
50
                Connection::Amqp(_, counter) => {
153
50
                    let mut mutex = counter.lock().unwrap();
154
50
                    *mutex -= count;
155
50
                    if *mutex > 0 {
156
6
                        return Ok(());
157
44
                    }
158
                }
159
50
                Connection::Mqtt(_, counter) => {
160
50
                    let mut mutex = counter.lock().unwrap();
161
50
                    *mutex -= count;
162
50
                    if *mutex > 0 {
163
6
                        return Ok(());
164
44
                    }
165
                }
166
            },
167
        }
168
88
        mutex.remove(host_uri)
169
    };
170
88
    if let Some(conn) = conn {
171
88
        match conn {
172
44
            Connection::Amqp(mut conn, _) => {
173
44
                conn.close().await?;
174
            }
175
44
            Connection::Mqtt(mut conn, _) => {
176
44
                conn.close().await?;
177
            }
178
        }
179
    }
180
88
    Ok(())
181
100
}
182

            
183
/// The utility function for creating application/network queue. The return tuple contains:
184
/// - `[prefix].[unit].[code].uldata`
185
/// - `[prefix].[unit].[code].dldata`
186
/// - `[prefix].[unit].[code].dldata-resp`: `Some` for applications and `None` for networks.
187
/// - `[prefix].[unit].[code].dldata-result`
188
/// - `[prefix].[unit].[code].ctrl`
189
128
fn new_data_queues(
190
128
    conn: &Connection,
191
128
    opts: &Options,
192
128
    prefix: &str,
193
128
    is_network: bool,
194
128
) -> Result<
195
128
    (
196
128
        Arc<Mutex<Queue>>,
197
128
        Arc<Mutex<Queue>>,
198
128
        Option<Arc<Mutex<Queue>>>,
199
128
        Arc<Mutex<Queue>>,
200
128
        Option<Arc<Mutex<Queue>>>,
201
128
    ),
202
128
    String,
203
128
> {
204
128
    let uldata: Arc<Mutex<Queue>>;
205
128
    let dldata: Arc<Mutex<Queue>>;
206
128
    let dldata_resp: Option<Arc<Mutex<Queue>>>;
207
128
    let dldata_result: Arc<Mutex<Queue>>;
208
128
    let ctrl: Option<Arc<Mutex<Queue>>>;
209
128

            
210
128
    if opts.unit_id.len() == 0 {
211
8
        if opts.unit_code.len() != 0 {
212
4
            return Err("unit_id and unit_code must both empty or non-empty".to_string());
213
4
        }
214
    } else {
215
120
        if opts.unit_code.len() == 0 {
216
8
            return Err("unit_id and unit_code must both empty or non-empty".to_string());
217
112
        }
218
    }
219
116
    if opts.id.len() == 0 {
220
8
        return Err("`id` cannot be empty".to_string());
221
108
    }
222
108
    if opts.name.len() == 0 {
223
8
        return Err("`name` cannot be empty".to_string());
224
100
    }
225

            
226
100
    let unit = match opts.unit_code.len() {
227
4
        0 => "_",
228
96
        _ => opts.unit_code.as_str(),
229
    };
230

            
231
100
    match conn {
232
50
        Connection::Amqp(conn, _) => {
233
50
            let prefetch = match opts.prefetch {
234
36
                None => DEF_PREFETCH,
235
14
                Some(prefetch) => match prefetch {
236
10
                    0 => DEF_PREFETCH,
237
4
                    _ => prefetch,
238
                },
239
            };
240
50
            let persistent = match opts.persistent {
241
42
                None => DEF_PERSISTENT,
242
8
                Some(persistent) => persistent,
243
            };
244

            
245
50
            let uldata_opts = QueueOptions::Amqp(
246
50
                AmqpQueueOptions {
247
50
                    name: format!("{}.{}.{}.uldata", prefix, unit, opts.name.as_str()),
248
50
                    is_recv: !is_network,
249
50
                    reliable: true,
250
50
                    persistent,
251
50
                    broadcast: false,
252
50
                    prefetch,
253
50
                    ..Default::default()
254
50
                },
255
50
                conn,
256
50
            );
257
50
            let dldata_opts = QueueOptions::Amqp(
258
50
                AmqpQueueOptions {
259
50
                    name: format!("{}.{}.{}.dldata", prefix, unit, opts.name.as_str()),
260
50
                    is_recv: is_network,
261
50
                    reliable: true,
262
50
                    broadcast: false,
263
50
                    prefetch,
264
50
                    ..Default::default()
265
50
                },
266
50
                conn,
267
50
            );
268
50
            let dldata_resp_opts = QueueOptions::Amqp(
269
50
                AmqpQueueOptions {
270
50
                    name: format!("{}.{}.{}.dldata-resp", prefix, unit, opts.name.as_str()),
271
50
                    is_recv: !is_network,
272
50
                    reliable: true,
273
50
                    broadcast: false,
274
50
                    prefetch,
275
50
                    ..Default::default()
276
50
                },
277
50
                conn,
278
50
            );
279
50
            let dldata_result_opts = QueueOptions::Amqp(
280
50
                AmqpQueueOptions {
281
50
                    name: format!("{}.{}.{}.dldata-result", prefix, unit, opts.name.as_str()),
282
50
                    is_recv: !is_network,
283
50
                    reliable: true,
284
50
                    broadcast: false,
285
50
                    prefetch,
286
50
                    ..Default::default()
287
50
                },
288
50
                conn,
289
50
            );
290
50
            let ctrl_opts = QueueOptions::Amqp(
291
50
                AmqpQueueOptions {
292
50
                    name: format!("{}.{}.{}.ctrl", prefix, unit, opts.name.as_str()),
293
50
                    is_recv: true,
294
50
                    reliable: true,
295
50
                    broadcast: false,
296
50
                    prefetch,
297
50
                    ..Default::default()
298
50
                },
299
50
                conn,
300
50
            );
301
50
            uldata = Arc::new(Mutex::new(Queue::new(uldata_opts)?));
302
50
            dldata = Arc::new(Mutex::new(Queue::new(dldata_opts)?));
303
50
            dldata_resp = match is_network {
304
24
                false => Some(Arc::new(Mutex::new(Queue::new(dldata_resp_opts)?))),
305
26
                true => None,
306
            };
307
50
            dldata_result = Arc::new(Mutex::new(Queue::new(dldata_result_opts)?));
308
50
            ctrl = match is_network {
309
24
                false => None,
310
26
                true => Some(Arc::new(Mutex::new(Queue::new(ctrl_opts)?))),
311
            };
312
        }
313
50
        Connection::Mqtt(conn, _) => {
314
50
            let uldata_opts = QueueOptions::Mqtt(
315
50
                MqttQueueOptions {
316
50
                    name: format!("{}.{}.{}.uldata", prefix, unit, opts.name.as_str()),
317
50
                    is_recv: !is_network,
318
50
                    reliable: true,
319
50
                    broadcast: false,
320
50
                    shared_prefix: opts.shared_prefix.clone(),
321
50
                    ..Default::default()
322
50
                },
323
50
                conn,
324
50
            );
325
50
            let dldata_opts = QueueOptions::Mqtt(
326
50
                MqttQueueOptions {
327
50
                    name: format!("{}.{}.{}.dldata", prefix, unit, opts.name.as_str()),
328
50
                    is_recv: is_network,
329
50
                    reliable: true,
330
50
                    broadcast: false,
331
50
                    shared_prefix: opts.shared_prefix.clone(),
332
50
                    ..Default::default()
333
50
                },
334
50
                conn,
335
50
            );
336
50
            let dldata_resp_opts = QueueOptions::Mqtt(
337
50
                MqttQueueOptions {
338
50
                    name: format!("{}.{}.{}.dldata-resp", prefix, unit, opts.name.as_str()),
339
50
                    is_recv: !is_network,
340
50
                    reliable: true,
341
50
                    broadcast: false,
342
50
                    shared_prefix: opts.shared_prefix.clone(),
343
50
                    ..Default::default()
344
50
                },
345
50
                conn,
346
50
            );
347
50
            let dldata_result_opts = QueueOptions::Mqtt(
348
50
                MqttQueueOptions {
349
50
                    name: format!("{}.{}.{}.dldata-result", prefix, unit, opts.name.as_str()),
350
50
                    is_recv: !is_network,
351
50
                    reliable: true,
352
50
                    broadcast: false,
353
50
                    shared_prefix: opts.shared_prefix.clone(),
354
50
                    ..Default::default()
355
50
                },
356
50
                conn,
357
50
            );
358
50
            let ctrl_opts = QueueOptions::Mqtt(
359
50
                MqttQueueOptions {
360
50
                    name: format!("{}.{}.{}.ctrl", prefix, unit, opts.name.as_str()),
361
50
                    is_recv: true,
362
50
                    reliable: true,
363
50
                    broadcast: false,
364
50
                    shared_prefix: opts.shared_prefix.clone(),
365
50
                    ..Default::default()
366
50
                },
367
50
                conn,
368
50
            );
369
50
            uldata = Arc::new(Mutex::new(Queue::new(uldata_opts)?));
370
50
            dldata = Arc::new(Mutex::new(Queue::new(dldata_opts)?));
371
50
            dldata_resp = match is_network {
372
24
                false => Some(Arc::new(Mutex::new(Queue::new(dldata_resp_opts)?))),
373
26
                true => None,
374
            };
375
50
            dldata_result = Arc::new(Mutex::new(Queue::new(dldata_result_opts)?));
376
50
            ctrl = match is_network {
377
24
                false => None,
378
26
                true => Some(Arc::new(Mutex::new(Queue::new(ctrl_opts)?))),
379
            };
380
        }
381
    }
382

            
383
100
    Ok((uldata, dldata, dldata_resp, dldata_result, ctrl))
384
128
}