1
use std::{
2
    cmp::Ordering,
3
    collections::HashMap,
4
    error::Error as StdError,
5
    io::{Error as IoError, ErrorKind},
6
    sync::{Arc, Mutex},
7
};
8

            
9
use async_trait::async_trait;
10
use chrono::{DateTime, Utc};
11
use hex;
12
use serde::{Deserialize, Serialize};
13
use serde_json::{Map, Value};
14
use tokio::task;
15
use url::Url;
16

            
17
use general_mq::{
18
    queue::{
19
        EventHandler as QueueEventHandler, GmqQueue, Message, MessageHandler, Status as QueueStatus,
20
    },
21
    Queue,
22
};
23

            
24
use crate::util::strings;
25

            
26
use super::{
27
    get_connection, new_data_queues, remove_connection, Connection, DataMqStatus, MgrStatus,
28
    Options,
29
};
30

            
31
/// Uplink data from network to broker.
32
pub struct UlData {
33
    pub time: DateTime<Utc>,
34
    pub network_addr: String,
35
    pub data: Vec<u8>,
36
    pub extension: Option<Map<String, Value>>,
37
}
38

            
39
/// Downlink data from broker to network.
40
pub struct DlData {
41
    pub data_id: String,
42
    pub publish: DateTime<Utc>,
43
    pub expires_in: i64,
44
    pub network_addr: String,
45
    pub data: Vec<u8>,
46
    pub extension: Option<Map<String, Value>>,
47
}
48

            
49
/// Downlink data result when processing or completing data transfer to the device.
50
#[derive(Clone, Serialize)]
51
pub struct DlDataResult {
52
    #[serde(rename = "dataId")]
53
    pub data_id: String,
54
    pub status: i32,
55
    #[serde(skip_serializing_if = "Option::is_none")]
56
    pub message: Option<String>,
57
}
58

            
59
/// Network control message from broker to network.
60
#[derive(Clone, Deserialize)]
61
#[serde(tag = "operation")]
62
pub enum NetworkCtrlMsg {
63
    #[serde(rename = "add-device")]
64
    AddDevice {
65
        time: DateTime<Utc>,
66
        new: CtrlAddDevice,
67
    },
68
    #[serde(rename = "add-device-bulk")]
69
    AddDeviceBulk {
70
        time: DateTime<Utc>,
71
        new: CtrlAddDeviceBulk,
72
    },
73
    #[serde(rename = "add-device-range")]
74
    AddDeviceRange {
75
        time: DateTime<Utc>,
76
        new: CtrlAddDeviceRange,
77
    },
78
    #[serde(rename = "del-device")]
79
    DelDevice {
80
        time: DateTime<Utc>,
81
        new: CtrlDelDevice,
82
    },
83
    #[serde(rename = "del-device-bulk")]
84
    DelDeviceBulk {
85
        time: DateTime<Utc>,
86
        new: CtrlDelDeviceBulk,
87
    },
88
    #[serde(rename = "del-device-range")]
89
    DelDeviceRange {
90
        time: DateTime<Utc>,
91
        new: CtrlDelDeviceRange,
92
    },
93
}
94

            
95
#[derive(Clone, Deserialize)]
96
pub struct CtrlAddDevice {
97
    #[serde(rename = "networkAddr")]
98
    pub network_addr: String,
99
}
100

            
101
#[derive(Clone, Deserialize)]
102
pub struct CtrlAddDeviceBulk {
103
    #[serde(rename = "networkAddrs")]
104
    pub network_addrs: Vec<String>,
105
}
106

            
107
#[derive(Clone, Deserialize)]
108
pub struct CtrlAddDeviceRange {
109
    #[serde(rename = "startAddr")]
110
    pub start_addr: String,
111
    #[serde(rename = "endAddr")]
112
    pub end_addr: String,
113
}
114

            
115
#[derive(Clone, Deserialize)]
116
pub struct CtrlDelDevice {
117
    #[serde(rename = "networkAddr")]
118
    pub network_addr: String,
119
}
120

            
121
#[derive(Clone, Deserialize)]
122
pub struct CtrlDelDeviceBulk {
123
    #[serde(rename = "networkAddrs")]
124
    pub network_addrs: Vec<String>,
125
}
126

            
127
#[derive(Clone, Deserialize)]
128
pub struct CtrlDelDeviceRange {
129
    #[serde(rename = "startAddr")]
130
    pub start_addr: String,
131
    #[serde(rename = "endAddr")]
132
    pub end_addr: String,
133
}
134

            
135
/// The manager for network queues.
136
#[derive(Clone)]
137
pub struct NetworkMgr {
138
    opts: Arc<Options>,
139

            
140
    // Information for delete connection automatically.
141
    conn_pool: Arc<Mutex<HashMap<String, Connection>>>,
142
    host_uri: String,
143

            
144
    uldata: Arc<Mutex<Queue>>,
145
    dldata: Arc<Mutex<Queue>>,
146
    dldata_result: Arc<Mutex<Queue>>,
147
    ctrl: Arc<Mutex<Queue>>,
148

            
149
    status: Arc<Mutex<MgrStatus>>,
150
    handler: Arc<Mutex<Arc<dyn EventHandler>>>,
151
}
152

            
153
/// Event handler trait for the [`NetworkMgr`].
154
#[async_trait]
155
pub trait EventHandler: Send + Sync {
156
    /// Fired when one of the manager's queues encounters a state change.
157
    async fn on_status_change(&self, mgr: &NetworkMgr, status: MgrStatus);
158

            
159
    /// Fired when a [`DlData`] data is received.
160
    ///
161
    /// Return [`Err`] will NACK the data.
162
    /// The data may will be received again depending on the protocol (such as AMQP).
163
    async fn on_dldata(&self, mgr: &NetworkMgr, data: Box<DlData>) -> Result<(), ()>;
164

            
165
    /// Fired when a [`NetworkCtrlMsg`] data is received.
166
    ///
167
    /// Return [`Err`] will NACK the data.
168
    /// The data may will be received again depending on the protocol (such as AMQP).
169
    async fn on_ctrl(&self, mgr: &NetworkMgr, data: Box<NetworkCtrlMsg>) -> Result<(), ()>;
170
}
171

            
172
/// The event handler for [`general_mq::queue::GmqQueue`].
173
struct MgrMqEventHandler {
174
    mgr: NetworkMgr,
175
}
176

            
177
#[derive(Serialize)]
178
struct UlDataInner<'a> {
179
    time: String,
180
    #[serde(rename = "networkAddr")]
181
    network_addr: &'a String,
182
    data: String,
183
    #[serde(skip_serializing_if = "Option::is_none")]
184
    extension: &'a Option<Map<String, Value>>,
185
}
186

            
187
/// Downlink data from broker to network.
188
#[derive(Deserialize)]
189
struct DlDataInner {
190
    #[serde(rename = "dataId")]
191
    data_id: String,
192
    #[serde(rename = "pub")]
193
    publish: String,
194
    #[serde(rename = "expiresIn")]
195
    expires_in: i64,
196
    #[serde(rename = "networkAddr")]
197
    network_addr: String,
198
    data: String,
199
    #[serde(skip_serializing_if = "Option::is_none")]
200
    extension: Option<Map<String, Value>>,
201
}
202

            
203
const QUEUE_PREFIX: &'static str = "broker.network";
204
const ERR_PARAM_DEV: &'static str = "the `network_addr` must be a non-empty string";
205
const ERR_PARAM_DATA_ID: &'static str = "the `data_id` must be a non-empty string";
206

            
207
impl NetworkMgr {
208
    /// To create a manager instance.
209
34
    pub fn new(
210
34
        conn_pool: Arc<Mutex<HashMap<String, Connection>>>,
211
34
        host_uri: &Url,
212
34
        opts: Options,
213
34
        handler: Arc<dyn EventHandler>,
214
34
    ) -> Result<Self, String> {
215
34
        let conn = get_connection(&conn_pool, host_uri)?;
216

            
217
26
        let (uldata, dldata, _, dldata_result, ctrl) =
218
34
            new_data_queues(&conn, &opts, QUEUE_PREFIX, true)?;
219

            
220
26
        let mgr = NetworkMgr {
221
26
            opts: Arc::new(opts),
222
26
            conn_pool,
223
26
            host_uri: host_uri.to_string(),
224
26
            uldata,
225
26
            dldata,
226
26
            dldata_result,
227
26
            ctrl: ctrl.unwrap(),
228
26
            status: Arc::new(Mutex::new(MgrStatus::NotReady)),
229
26
            handler: Arc::new(Mutex::new(handler)),
230
26
        };
231
26
        let mq_handler = Arc::new(MgrMqEventHandler { mgr: mgr.clone() });
232
26
        let mut q = { mgr.uldata.lock().unwrap().clone() };
233
26
        q.set_handler(mq_handler.clone());
234
26
        if let Err(e) = q.connect() {
235
            return Err(e.to_string());
236
26
        }
237
26
        let mut q = { mgr.dldata.lock().unwrap().clone() };
238
26
        q.set_handler(mq_handler.clone());
239
26
        q.set_msg_handler(mq_handler.clone());
240
26
        if let Err(e) = q.connect() {
241
            return Err(e.to_string());
242
26
        }
243
26
        let mut q = { mgr.dldata_result.lock().unwrap().clone() };
244
26
        q.set_handler(mq_handler.clone());
245
26
        if let Err(e) = q.connect() {
246
            return Err(e.to_string());
247
26
        }
248
26
        let mut q = { mgr.ctrl.lock().unwrap().clone() };
249
26
        q.set_handler(mq_handler.clone());
250
26
        q.set_msg_handler(mq_handler.clone());
251
26
        if let Err(e) = q.connect() {
252
            return Err(e.to_string());
253
26
        }
254
26
        match conn {
255
13
            Connection::Amqp(_, counter) => {
256
13
                *counter.lock().unwrap() += 4;
257
13
            }
258
13
            Connection::Mqtt(_, counter) => {
259
13
                *counter.lock().unwrap() += 4;
260
13
            }
261
        }
262
26
        Ok(mgr)
263
34
    }
264

            
265
    /// The associated unit ID of the network.
266
2
    pub fn unit_id(&self) -> &str {
267
2
        self.opts.unit_id.as_str()
268
2
    }
269

            
270
    /// The associated unit code of the network.
271
2
    pub fn unit_code(&self) -> &str {
272
2
        self.opts.unit_code.as_str()
273
2
    }
274

            
275
    /// The network ID.
276
2
    pub fn id(&self) -> &str {
277
2
        self.opts.id.as_str()
278
2
    }
279

            
280
    /// The network code.
281
2
    pub fn name(&self) -> &str {
282
2
        self.opts.name.as_str()
283
2
    }
284

            
285
    /// Manager status.
286
110
    pub fn status(&self) -> MgrStatus {
287
110
        *self.status.lock().unwrap()
288
110
    }
289

            
290
    /// Detail status of each message queue. Please ignore `dldata_resp`.
291
4
    pub fn mq_status(&self) -> DataMqStatus {
292
4
        DataMqStatus {
293
4
            uldata: { self.uldata.lock().unwrap().status() },
294
4
            dldata: { self.dldata.lock().unwrap().status() },
295
4
            dldata_resp: QueueStatus::Closed,
296
4
            dldata_result: { self.dldata_result.lock().unwrap().status() },
297
4
            ctrl: { self.ctrl.lock().unwrap().status() },
298
4
        }
299
4
    }
300

            
301
    /// To close the manager queues.
302
    /// The underlying connection will be closed when there are no queues use it.
303
26
    pub async fn close(&self) -> Result<(), Box<dyn StdError + Send + Sync>> {
304
26
        let mut q = { self.uldata.lock().unwrap().clone() };
305
26
        q.close().await?;
306
26
        let mut q = { self.dldata.lock().unwrap().clone() };
307
26
        q.close().await?;
308
26
        let mut q = { self.dldata_result.lock().unwrap().clone() };
309
26
        q.close().await?;
310
26
        let mut q = { self.ctrl.lock().unwrap().clone() };
311
26
        q.close().await?;
312

            
313
26
        remove_connection(&self.conn_pool, &self.host_uri, 4).await
314
26
    }
315

            
316
    /// Send uplink data to the broker.
317
6
    pub fn send_uldata(&self, data: &UlData) -> Result<(), Box<dyn StdError>> {
318
6
        if data.network_addr.len() == 0 {
319
2
            let err = IoError::new(ErrorKind::InvalidInput, ERR_PARAM_DEV.to_string());
320
2
            return Err(Box::new(err));
321
4
        }
322
4

            
323
4
        let msg_data = UlDataInner {
324
4
            time: strings::time_str(&data.time),
325
4
            network_addr: &data.network_addr,
326
4
            data: hex::encode(&data.data),
327
4
            extension: &data.extension,
328
4
        };
329
4
        let payload = serde_json::to_vec(&msg_data)?;
330
4
        let queue = { (*self.uldata.lock().unwrap()).clone() };
331
4
        task::spawn(async move {
332
4
            let _ = queue.send_msg(payload).await;
333
4
        });
334
4
        Ok(())
335
6
    }
336

            
337
    /// Send downlink result data to the broker.
338
6
    pub fn send_dldata_result(&self, data: &DlDataResult) -> Result<(), Box<dyn StdError>> {
339
6
        if data.data_id.len() == 0 {
340
2
            let err = IoError::new(ErrorKind::InvalidInput, ERR_PARAM_DATA_ID.to_string());
341
2
            return Err(Box::new(err));
342
4
        }
343

            
344
4
        let payload = serde_json::to_vec(&data)?;
345
4
        let queue = { (*self.dldata_result.lock().unwrap()).clone() };
346
4
        task::spawn(async move {
347
4
            let _ = queue.send_msg(payload).await;
348
4
        });
349
4
        Ok(())
350
6
    }
351
}
352

            
353
#[async_trait]
354
impl QueueEventHandler for MgrMqEventHandler {
355
    async fn on_error(&self, _queue: Arc<dyn GmqQueue>, _err: Box<dyn StdError + Send + Sync>) {}
356

            
357
190
    async fn on_status(&self, _queue: Arc<dyn GmqQueue>, _status: QueueStatus) {
358
190
        let uldata_status = { self.mgr.uldata.lock().unwrap().status() };
359
190
        let dldata_status = { self.mgr.dldata.lock().unwrap().status() };
360
190
        let dldata_result_status = { self.mgr.dldata_result.lock().unwrap().status() };
361
190
        let ctrl_status = { self.mgr.ctrl.lock().unwrap().status() };
362

            
363
190
        let status = match uldata_status == QueueStatus::Connected
364
84
            && dldata_status == QueueStatus::Connected
365
54
            && dldata_result_status == QueueStatus::Connected
366
43
            && ctrl_status == QueueStatus::Connected
367
        {
368
169
            false => MgrStatus::NotReady,
369
21
            true => MgrStatus::Ready,
370
        };
371

            
372
190
        let mut changed = false;
373
190
        {
374
190
            let mut mutex = self.mgr.status.lock().unwrap();
375
190
            if *mutex != status {
376
42
                *mutex = status;
377
42
                changed = true;
378
148
            }
379
        }
380
190
        if changed {
381
42
            let handler = { self.mgr.handler.lock().unwrap().clone() };
382
42
            handler.on_status_change(&self.mgr, status).await;
383
148
        }
384
380
    }
385
}
386

            
387
#[async_trait]
388
impl MessageHandler for MgrMqEventHandler {
389
    // Validate and decode data.
390
27
    async fn on_message(&self, queue: Arc<dyn GmqQueue>, msg: Box<dyn Message>) {
391
        const _FN_NAME: &'static str = "NetworkMgr.on_message";
392

            
393
27
        let queue_name = queue.name();
394
27
        if queue_name.cmp(self.mgr.dldata.lock().unwrap().name()) == Ordering::Equal {
395
11
            let data = match serde_json::from_slice::<DlDataInner>(msg.payload()) {
396
                Err(_) => {
397
2
                    let _ = msg.ack().await;
398
2
                    return;
399
                }
400
9
                Ok(data) => {
401
9
                    let data_bytes = match data.data.len() {
402
                        0 => vec![],
403
9
                        _ => match hex::decode(data.data.as_str()) {
404
                            Err(_) => {
405
                                let _ = msg.ack().await;
406
                                return;
407
                            }
408
9
                            Ok(data) => data,
409
                        },
410
                    };
411
9
                    let publish = match DateTime::parse_from_rfc3339(data.publish.as_str()) {
412
                        Err(_) => {
413
2
                            let _ = msg.ack().await;
414
2
                            return;
415
                        }
416
7
                        Ok(publish) => publish.into(),
417
7
                    };
418
7
                    DlData {
419
7
                        data_id: data.data_id,
420
7
                        publish,
421
7
                        expires_in: data.expires_in,
422
7
                        network_addr: data.network_addr,
423
7
                        data: data_bytes,
424
7
                        extension: data.extension,
425
7
                    }
426
7
                }
427
7
            };
428
7
            let handler = { self.mgr.handler.lock().unwrap().clone() };
429
7
            let _ = match handler.on_dldata(&self.mgr, Box::new(data)).await {
430
2
                Err(_) => msg.nack().await,
431
5
                Ok(_) => msg.ack().await,
432
            };
433
16
        } else if queue_name.cmp(self.mgr.ctrl.lock().unwrap().name()) == Ordering::Equal {
434
16
            let data = match serde_json::from_slice::<NetworkCtrlMsg>(msg.payload()) {
435
                Err(_) => {
436
4
                    let _ = msg.ack().await;
437
4
                    return;
438
                }
439
12
                Ok(data) => data,
440
12
            };
441
12
            let handler = { self.mgr.handler.lock().unwrap().clone() };
442
12
            let _ = match handler.on_ctrl(&self.mgr, Box::new(data)).await {
443
                Err(_) => msg.nack().await,
444
12
                Ok(_) => msg.ack().await,
445
            };
446
        }
447
54
    }
448
}