1
//! To management queues for applications and networks.
2
//!
3
//! For applications, the [`application::ApplicationMgr`] manages the following kind of queues:
4
//! - uldata: uplink data from the broker to the application.
5
//! - dldata: downlink data from the application to the broker.
6
//! - dldata-resp: the response of downlink data.
7
//! - dldata-result: the data process result from the network.
8
//!
9
//! For networks, the [`network::NetworkMgr`] manages the following kind of queues:
10
//! - uldata: device uplink data from the network to the broker.
11
//! - dldata: downlink data from the broker to the network.
12
//! - dldata-result: the data process result from the network.
13
//! - ctrl: the control messages from the broker to the network
14

            
15
use std::{
16
    collections::HashMap,
17
    error::Error as StdError,
18
    sync::{Arc, Mutex},
19
};
20

            
21
use serde::{Deserialize, Serialize};
22
use url::Url;
23

            
24
use general_mq::{
25
    connection::GmqConnection, queue::Status, AmqpConnection, AmqpConnectionOptions,
26
    AmqpQueueOptions, MqttConnection, MqttConnectionOptions, MqttQueueOptions, Queue, QueueOptions,
27
};
28

            
29
pub mod application;
30
pub mod network;
31

            
32
/// The general connection type with reference counter for upper layer maintenance.
33
#[derive(Clone)]
34
pub enum Connection {
35
    Amqp(AmqpConnection, Arc<Mutex<isize>>),
36
    Mqtt(MqttConnection, Arc<Mutex<isize>>),
37
}
38

            
39
/// Manager status.
40
#[derive(PartialEq)]
41
pub enum MgrStatus {
42
    /// One or more queues are not connected.
43
    NotReady,
44
    /// All queues are connected.
45
    Ready,
46
}
47

            
48
/// Detail queue connection status.
49
pub struct DataMqStatus {
50
    /// For `uldata`.
51
    pub uldata: Status,
52
    /// For `dldata`.
53
    pub dldata: Status,
54
    /// For `dldata-resp`.
55
    pub dldata_resp: Status,
56
    /// For `dldata-result`.
57
    pub dldata_result: Status,
58
    /// For `ctrl`.
59
    pub ctrl: Status,
60
}
61

            
62
/// The options of the application/network manager.
63
#[derive(Default, Deserialize, Serialize)]
64
pub struct Options {
65
    /// The associated unit ID of the application/network. Empty for public network.
66
    #[serde(rename = "unitId")]
67
    pub unit_id: String,
68
    /// The associated unit code of the application/network. Empty for public network.
69
    #[serde(rename = "unitCode")]
70
    pub unit_code: String,
71
    /// The associated application/network ID.
72
    pub id: String,
73
    /// The associated application/network code.
74
    pub name: String,
75
    /// AMQP prefetch option.
76
    #[serde(skip_serializing_if = "Option::is_none")]
77
    pub prefetch: Option<u16>,
78
    /// AMQP persistent option.
79
    #[serde(skip_serializing_if = "Option::is_none")]
80
    pub persistent: Option<bool>,
81
    /// MQTT shared queue prefix option.
82
    #[serde(rename = "sharedPrefix", skip_serializing_if = "Option::is_none")]
83
    pub shared_prefix: Option<String>,
84
}
85

            
86
/// Support application/network host schemes.
87
pub const SUPPORT_SCHEMES: &'static [&'static str] = &["amqp", "amqps", "mqtt", "mqtts"];
88

            
89
/// The default prefetch value for AMQP.
90
const DEF_PREFETCH: u16 = 100;
91
/// The default persistent value for AMQP.
92
const DEF_PERSISTENT: bool = false;
93

            
94
impl Copy for MgrStatus {}
95

            
96
impl Clone for MgrStatus {
97
    fn clone(&self) -> MgrStatus {
98
        *self
99
    }
100
}
101

            
102
/// Utility function to get the message queue connection instance. A new connection will be created
103
/// if the host does not exist.
104
64
fn get_connection(
105
64
    conn_pool: &Arc<Mutex<HashMap<String, Connection>>>,
106
64
    host_uri: &Url,
107
64
) -> Result<Connection, String> {
108
64
    let uri = host_uri.to_string();
109
64
    let mut mutex = conn_pool.lock().unwrap();
110
64
    if let Some(conn) = mutex.get(&uri) {
111
16
        return Ok(conn.clone());
112
48
    }
113
48

            
114
48
    match host_uri.scheme() {
115
48
        "amqp" | "amqps" => {
116
24
            let opts = AmqpConnectionOptions {
117
24
                uri: host_uri.to_string(),
118
24
                ..Default::default()
119
24
            };
120
24
            let mut conn = AmqpConnection::new(opts)?;
121
24
            let _ = conn.connect();
122
24
            let conn = Connection::Amqp(conn, Arc::new(Mutex::new(0)));
123
24
            mutex.insert(uri, conn.clone());
124
24
            Ok(conn)
125
        }
126
24
        "mqtt" | "mqtts" => {
127
24
            let opts = MqttConnectionOptions {
128
24
                uri: host_uri.to_string(),
129
24
                ..Default::default()
130
24
            };
131
24
            let mut conn = MqttConnection::new(opts)?;
132
24
            let _ = conn.connect();
133
24
            let conn = Connection::Mqtt(conn, Arc::new(Mutex::new(0)));
134
24
            mutex.insert(uri, conn.clone());
135
24
            Ok(conn)
136
        }
137
        s => Err(format!("unsupport scheme {}", s)),
138
    }
139
64
}
140

            
141
/// Utility function to remove connection from the pool if the reference count meet zero.
142
50
async fn remove_connection(
143
50
    conn_pool: &Arc<Mutex<HashMap<String, Connection>>>,
144
50
    host_uri: &String,
145
50
    count: isize,
146
50
) -> Result<(), Box<dyn StdError + Send + Sync>> {
147
44
    let conn = {
148
50
        let mut mutex = conn_pool.lock().unwrap();
149
50
        match mutex.get(host_uri) {
150
            None => return Ok(()),
151
50
            Some(conn) => match conn {
152
25
                Connection::Amqp(_, counter) => {
153
25
                    let mut mutex = counter.lock().unwrap();
154
25
                    *mutex -= count;
155
25
                    if *mutex > 0 {
156
3
                        return Ok(());
157
22
                    }
158
                }
159
25
                Connection::Mqtt(_, counter) => {
160
25
                    let mut mutex = counter.lock().unwrap();
161
25
                    *mutex -= count;
162
25
                    if *mutex > 0 {
163
3
                        return Ok(());
164
22
                    }
165
                }
166
            },
167
        }
168
44
        mutex.remove(host_uri)
169
    };
170
44
    if let Some(conn) = conn {
171
44
        match conn {
172
22
            Connection::Amqp(mut conn, _) => {
173
36
                conn.close().await?;
174
            }
175
22
            Connection::Mqtt(mut conn, _) => {
176
22
                conn.close().await?;
177
            }
178
        }
179
    }
180
44
    Ok(())
181
50
}
182

            
183
/// The utility function for creating application/network queue. The return tuple contains:
184
/// - `[prefix].[unit].[code].uldata`
185
/// - `[prefix].[unit].[code].dldata`
186
/// - `[prefix].[unit].[code].dldata-resp`: `Some` for applications and `None` for networks.
187
/// - `[prefix].[unit].[code].dldata-result`
188
/// - `[prefix].[unit].[code].ctrl`
189
64
fn new_data_queues(
190
64
    conn: &Connection,
191
64
    opts: &Options,
192
64
    prefix: &str,
193
64
    is_network: bool,
194
64
) -> Result<
195
64
    (
196
64
        Arc<Mutex<Queue>>,
197
64
        Arc<Mutex<Queue>>,
198
64
        Option<Arc<Mutex<Queue>>>,
199
64
        Arc<Mutex<Queue>>,
200
64
        Option<Arc<Mutex<Queue>>>,
201
64
    ),
202
64
    String,
203
64
> {
204
64
    let uldata: Arc<Mutex<Queue>>;
205
64
    let dldata: Arc<Mutex<Queue>>;
206
64
    let dldata_resp: Option<Arc<Mutex<Queue>>>;
207
64
    let dldata_result: Arc<Mutex<Queue>>;
208
64
    let ctrl: Option<Arc<Mutex<Queue>>>;
209
64

            
210
64
    if opts.unit_id.len() == 0 {
211
4
        if opts.unit_code.len() != 0 {
212
2
            return Err("unit_id and unit_code must both empty or non-empty".to_string());
213
2
        }
214
    } else {
215
60
        if opts.unit_code.len() == 0 {
216
4
            return Err("unit_id and unit_code must both empty or non-empty".to_string());
217
56
        }
218
    }
219
58
    if opts.id.len() == 0 {
220
4
        return Err("`id` cannot be empty".to_string());
221
54
    }
222
54
    if opts.name.len() == 0 {
223
4
        return Err("`name` cannot be empty".to_string());
224
50
    }
225

            
226
50
    let unit = match opts.unit_code.len() {
227
2
        0 => "_",
228
48
        _ => opts.unit_code.as_str(),
229
    };
230

            
231
50
    match conn {
232
25
        Connection::Amqp(conn, _) => {
233
25
            let prefetch = match opts.prefetch {
234
18
                None => DEF_PREFETCH,
235
7
                Some(prefetch) => match prefetch {
236
5
                    0 => DEF_PREFETCH,
237
2
                    _ => prefetch,
238
                },
239
            };
240
25
            let persistent = match opts.persistent {
241
21
                None => DEF_PERSISTENT,
242
4
                Some(persistent) => persistent,
243
            };
244

            
245
25
            let uldata_opts = QueueOptions::Amqp(
246
25
                AmqpQueueOptions {
247
25
                    name: format!("{}.{}.{}.uldata", prefix, unit, opts.name.as_str()),
248
25
                    is_recv: !is_network,
249
25
                    reliable: true,
250
25
                    persistent,
251
25
                    broadcast: false,
252
25
                    prefetch,
253
25
                    ..Default::default()
254
25
                },
255
25
                conn,
256
25
            );
257
25
            let dldata_opts = QueueOptions::Amqp(
258
25
                AmqpQueueOptions {
259
25
                    name: format!("{}.{}.{}.dldata", prefix, unit, opts.name.as_str()),
260
25
                    is_recv: is_network,
261
25
                    reliable: true,
262
25
                    broadcast: false,
263
25
                    prefetch,
264
25
                    ..Default::default()
265
25
                },
266
25
                conn,
267
25
            );
268
25
            let dldata_resp_opts = QueueOptions::Amqp(
269
25
                AmqpQueueOptions {
270
25
                    name: format!("{}.{}.{}.dldata-resp", prefix, unit, opts.name.as_str()),
271
25
                    is_recv: !is_network,
272
25
                    reliable: true,
273
25
                    broadcast: false,
274
25
                    prefetch,
275
25
                    ..Default::default()
276
25
                },
277
25
                conn,
278
25
            );
279
25
            let dldata_result_opts = QueueOptions::Amqp(
280
25
                AmqpQueueOptions {
281
25
                    name: format!("{}.{}.{}.dldata-result", prefix, unit, opts.name.as_str()),
282
25
                    is_recv: !is_network,
283
25
                    reliable: true,
284
25
                    broadcast: false,
285
25
                    prefetch,
286
25
                    ..Default::default()
287
25
                },
288
25
                conn,
289
25
            );
290
25
            let ctrl_opts = QueueOptions::Amqp(
291
25
                AmqpQueueOptions {
292
25
                    name: format!("{}.{}.{}.ctrl", prefix, unit, opts.name.as_str()),
293
25
                    is_recv: true,
294
25
                    reliable: true,
295
25
                    broadcast: false,
296
25
                    prefetch,
297
25
                    ..Default::default()
298
25
                },
299
25
                conn,
300
25
            );
301
25
            uldata = Arc::new(Mutex::new(Queue::new(uldata_opts)?));
302
25
            dldata = Arc::new(Mutex::new(Queue::new(dldata_opts)?));
303
25
            dldata_resp = match is_network {
304
12
                false => Some(Arc::new(Mutex::new(Queue::new(dldata_resp_opts)?))),
305
13
                true => None,
306
            };
307
25
            dldata_result = Arc::new(Mutex::new(Queue::new(dldata_result_opts)?));
308
25
            ctrl = match is_network {
309
12
                false => None,
310
13
                true => Some(Arc::new(Mutex::new(Queue::new(ctrl_opts)?))),
311
            };
312
        }
313
25
        Connection::Mqtt(conn, _) => {
314
25
            let uldata_opts = QueueOptions::Mqtt(
315
25
                MqttQueueOptions {
316
25
                    name: format!("{}.{}.{}.uldata", prefix, unit, opts.name.as_str()),
317
25
                    is_recv: !is_network,
318
25
                    reliable: true,
319
25
                    broadcast: false,
320
25
                    shared_prefix: opts.shared_prefix.clone(),
321
25
                    ..Default::default()
322
25
                },
323
25
                conn,
324
25
            );
325
25
            let dldata_opts = QueueOptions::Mqtt(
326
25
                MqttQueueOptions {
327
25
                    name: format!("{}.{}.{}.dldata", prefix, unit, opts.name.as_str()),
328
25
                    is_recv: is_network,
329
25
                    reliable: true,
330
25
                    broadcast: false,
331
25
                    shared_prefix: opts.shared_prefix.clone(),
332
25
                    ..Default::default()
333
25
                },
334
25
                conn,
335
25
            );
336
25
            let dldata_resp_opts = QueueOptions::Mqtt(
337
25
                MqttQueueOptions {
338
25
                    name: format!("{}.{}.{}.dldata-resp", prefix, unit, opts.name.as_str()),
339
25
                    is_recv: !is_network,
340
25
                    reliable: true,
341
25
                    broadcast: false,
342
25
                    shared_prefix: opts.shared_prefix.clone(),
343
25
                    ..Default::default()
344
25
                },
345
25
                conn,
346
25
            );
347
25
            let dldata_result_opts = QueueOptions::Mqtt(
348
25
                MqttQueueOptions {
349
25
                    name: format!("{}.{}.{}.dldata-result", prefix, unit, opts.name.as_str()),
350
25
                    is_recv: !is_network,
351
25
                    reliable: true,
352
25
                    broadcast: false,
353
25
                    shared_prefix: opts.shared_prefix.clone(),
354
25
                    ..Default::default()
355
25
                },
356
25
                conn,
357
25
            );
358
25
            let ctrl_opts = QueueOptions::Mqtt(
359
25
                MqttQueueOptions {
360
25
                    name: format!("{}.{}.{}.ctrl", prefix, unit, opts.name.as_str()),
361
25
                    is_recv: true,
362
25
                    reliable: true,
363
25
                    broadcast: false,
364
25
                    shared_prefix: opts.shared_prefix.clone(),
365
25
                    ..Default::default()
366
25
                },
367
25
                conn,
368
25
            );
369
25
            uldata = Arc::new(Mutex::new(Queue::new(uldata_opts)?));
370
25
            dldata = Arc::new(Mutex::new(Queue::new(dldata_opts)?));
371
25
            dldata_resp = match is_network {
372
12
                false => Some(Arc::new(Mutex::new(Queue::new(dldata_resp_opts)?))),
373
13
                true => None,
374
            };
375
25
            dldata_result = Arc::new(Mutex::new(Queue::new(dldata_result_opts)?));
376
25
            ctrl = match is_network {
377
12
                false => None,
378
13
                true => Some(Arc::new(Mutex::new(Queue::new(ctrl_opts)?))),
379
            };
380
        }
381
    }
382

            
383
50
    Ok((uldata, dldata, dldata_resp, dldata_result, ctrl))
384
64
}