1
use std::{
2
    cmp::Ordering,
3
    collections::HashMap,
4
    error::Error as StdError,
5
    sync::{Arc, Mutex},
6
};
7

            
8
use async_trait::async_trait;
9
use chrono::DateTime;
10
use hex;
11
use log::{error, warn};
12
use serde::{Deserialize, Serialize};
13
use serde_json::{Map, Value};
14
use tokio::task;
15
use url::Url;
16

            
17
use general_mq::{
18
    queue::{
19
        EventHandler as QueueEventHandler, GmqQueue, Message, MessageHandler, Status as QueueStatus,
20
    },
21
    Queue,
22
};
23
use sylvia_iot_corelib::strings;
24

            
25
use super::{
26
    get_connection, new_ctrl_queues, new_data_queues, remove_connection, Connection, MgrMqStatus,
27
    MgrStatus, Options,
28
};
29

            
30
/// Uplink data from network to broker.
31
304
#[derive(Deserialize)]
32
pub struct UlData {
33
    pub time: String,
34
    #[serde(rename = "networkAddr")]
35
    pub network_addr: String,
36
    pub data: String,
37
    pub extension: Option<Map<String, Value>>,
38
}
39

            
40
/// Downlink data from broker to network.
41
#[derive(Serialize)]
42
pub struct DlData {
43
    #[serde(rename = "dataId")]
44
    pub data_id: String,
45
    #[serde(rename = "pub")]
46
    pub publish: String,
47
    #[serde(rename = "expiresIn")]
48
    pub expires_in: i64,
49
    #[serde(rename = "networkAddr")]
50
    pub network_addr: String,
51
    pub data: String,
52
    #[serde(skip_serializing_if = "Option::is_none")]
53
    pub extension: Option<Map<String, Value>>,
54
}
55

            
56
/// Downlink data result when processing or completing data transfer to the device.
57
87
#[derive(Deserialize)]
58
pub struct DlDataResult {
59
    #[serde(rename = "dataId")]
60
    pub data_id: String,
61
    pub status: i32,
62
    pub message: Option<String>,
63
}
64

            
65
/// The manager for network queues.
66
#[derive(Clone)]
67
pub struct NetworkMgr {
68
    opts: Arc<Options>,
69

            
70
    // Information for delete connection automatically.
71
    conn_pool: Arc<Mutex<HashMap<String, Connection>>>,
72
    host_uri: String,
73

            
74
    uldata: Arc<Mutex<Queue>>,
75
    dldata: Arc<Mutex<Queue>>,
76
    dldata_result: Arc<Mutex<Queue>>,
77
    ctrl: Arc<Mutex<Queue>>,
78

            
79
    status: Arc<Mutex<MgrStatus>>,
80
    handler: Arc<Mutex<Arc<dyn EventHandler>>>,
81
}
82

            
83
/// Event handler trait for the [`NetworkMgr`].
84
#[async_trait]
85
pub trait EventHandler: Send + Sync {
86
    async fn on_status_change(&self, mgr: &NetworkMgr, status: MgrStatus);
87

            
88
    async fn on_uldata(&self, mgr: &NetworkMgr, data: Box<UlData>) -> Result<(), ()>;
89
    async fn on_dldata_result(&self, mgr: &NetworkMgr, data: Box<DlDataResult>) -> Result<(), ()>;
90
}
91

            
92
/// The event handler for [`general_mq::queue::GmqQueue`].
93
struct MgrMqEventHandler {
94
    mgr: NetworkMgr,
95
}
96

            
97
const QUEUE_PREFIX: &'static str = "broker.network";
98

            
99
impl NetworkMgr {
100
    /// To create a manager instance.
101
79
    pub fn new(
102
79
        conn_pool: Arc<Mutex<HashMap<String, Connection>>>,
103
79
        host_uri: &Url,
104
79
        opts: Options,
105
79
        handler: Arc<dyn EventHandler>,
106
79
    ) -> Result<Self, String> {
107
79
        let conn = get_connection(&conn_pool, host_uri)?;
108

            
109
79
        let (uldata, dldata, _, dldata_result) = new_data_queues(&conn, &opts, QUEUE_PREFIX, true)?;
110
68
        let ctrl = new_ctrl_queues(&conn, &opts, QUEUE_PREFIX)?;
111

            
112
68
        let mgr = NetworkMgr {
113
68
            opts: Arc::new(opts),
114
68
            conn_pool,
115
68
            host_uri: host_uri.to_string(),
116
68
            uldata,
117
68
            dldata,
118
68
            dldata_result,
119
68
            ctrl,
120
68
            status: Arc::new(Mutex::new(MgrStatus::NotReady)),
121
68
            handler: Arc::new(Mutex::new(handler)),
122
68
        };
123
68
        let mq_handler = Arc::new(MgrMqEventHandler { mgr: mgr.clone() });
124
68
        let mut q = { mgr.uldata.lock().unwrap().clone() };
125
68
        q.set_handler(mq_handler.clone());
126
68
        q.set_msg_handler(mq_handler.clone());
127
68
        if let Err(e) = q.connect() {
128
            return Err(e.to_string());
129
68
        }
130
68
        let mut q = { mgr.dldata.lock().unwrap().clone() };
131
68
        q.set_handler(mq_handler.clone());
132
68
        if let Err(e) = q.connect() {
133
            return Err(e.to_string());
134
68
        }
135
68
        let mut q = { mgr.dldata_result.lock().unwrap().clone() };
136
68
        q.set_handler(mq_handler.clone());
137
68
        q.set_msg_handler(mq_handler.clone());
138
68
        if let Err(e) = q.connect() {
139
            return Err(e.to_string());
140
68
        }
141
68
        let mut q = { mgr.ctrl.lock().unwrap().clone() };
142
68
        q.set_handler(mq_handler.clone());
143
68
        if let Err(e) = q.connect() {
144
            return Err(e.to_string());
145
68
        }
146
68
        match conn {
147
48
            Connection::Amqp(_, counter) => {
148
48
                *counter.lock().unwrap() += 4;
149
48
            }
150
20
            Connection::Mqtt(_, counter) => {
151
20
                *counter.lock().unwrap() += 4;
152
20
            }
153
        }
154
68
        Ok(mgr)
155
79
    }
156

            
157
    /// The associated unit ID of the network.
158
77
    pub fn unit_id(&self) -> &str {
159
77
        self.opts.unit_id.as_str()
160
77
    }
161

            
162
    /// The associated unit code of the network.
163
236
    pub fn unit_code(&self) -> &str {
164
236
        self.opts.unit_code.as_str()
165
236
    }
166

            
167
    /// The network ID.
168
116
    pub fn id(&self) -> &str {
169
116
        self.opts.id.as_str()
170
116
    }
171

            
172
    /// The network code.
173
146
    pub fn name(&self) -> &str {
174
146
        self.opts.name.as_str()
175
146
    }
176

            
177
    /// Manager status.
178
123
    pub fn status(&self) -> MgrStatus {
179
123
        *self.status.lock().unwrap()
180
123
    }
181

            
182
    /// Detail status of each message queue. Please ignore `dldata_resp`.
183
4
    pub fn mq_status(&self) -> MgrMqStatus {
184
4
        MgrMqStatus {
185
4
            uldata: { self.uldata.lock().unwrap().status() },
186
4
            dldata: { self.dldata.lock().unwrap().status() },
187
4
            dldata_resp: QueueStatus::Closed,
188
4
            dldata_result: { self.dldata_result.lock().unwrap().status() },
189
4
            ctrl: { self.ctrl.lock().unwrap().status() },
190
4
        }
191
4
    }
192

            
193
    /// To close the manager queues.
194
80
    pub async fn close(&self) -> Result<(), Box<dyn StdError + Send + Sync>> {
195
80
        let mut q = { self.uldata.lock().unwrap().clone() };
196
80
        q.close().await?;
197
80
        let mut q = { self.dldata.lock().unwrap().clone() };
198
80
        q.close().await?;
199
80
        let mut q = { self.dldata_result.lock().unwrap().clone() };
200
80
        q.close().await?;
201
80
        let mut q = { self.ctrl.lock().unwrap().clone() };
202
80
        q.close().await?;
203

            
204
80
        remove_connection(&self.conn_pool, &self.host_uri, 4).await
205
80
    }
206

            
207
    /// Send downlink data to the network.
208
16
    pub fn send_dldata(&self, data: &DlData) -> Result<(), Box<dyn StdError>> {
209
16
        let payload = serde_json::to_vec(data)?;
210
16
        let queue = { (*self.dldata.lock().unwrap()).clone() };
211
16
        task::spawn(async move {
212
16
            let _ = queue.send_msg(payload).await;
213
16
        });
214
16
        Ok(())
215
16
    }
216

            
217
    /// Send control data to the network.
218
93
    pub async fn send_ctrl(&self, payload: Vec<u8>) -> Result<(), Box<dyn StdError + Send + Sync>> {
219
93
        let queue = { (*self.ctrl.lock().unwrap()).clone() };
220
93
        queue.send_msg(payload).await
221
93
    }
222
}
223

            
224
#[async_trait]
225
impl QueueEventHandler for MgrMqEventHandler {
226
    async fn on_error(&self, _queue: Arc<dyn GmqQueue>, _err: Box<dyn StdError + Send + Sync>) {}
227

            
228
426
    async fn on_status(&self, _queue: Arc<dyn GmqQueue>, _status: QueueStatus) {
229
426
        let uldata_status = { self.mgr.uldata.lock().unwrap().status() };
230
426
        let dldata_status = { self.mgr.dldata.lock().unwrap().status() };
231
426
        let dldata_result_status = { self.mgr.dldata_result.lock().unwrap().status() };
232
426
        let ctrl_status = { self.mgr.ctrl.lock().unwrap().status() };
233

            
234
426
        let status = match uldata_status == QueueStatus::Connected
235
107
            && dldata_status == QueueStatus::Connected
236
93
            && dldata_result_status == QueueStatus::Connected
237
61
            && ctrl_status == QueueStatus::Connected
238
        {
239
379
            false => MgrStatus::NotReady,
240
47
            true => MgrStatus::Ready,
241
        };
242

            
243
426
        let mut changed = false;
244
426
        {
245
426
            let mut mutex = self.mgr.status.lock().unwrap();
246
426
            if *mutex != status {
247
74
                *mutex = status;
248
74
                changed = true;
249
352
            }
250
        }
251
426
        if changed {
252
74
            let handler = { self.mgr.handler.lock().unwrap().clone() };
253
74
            handler.on_status_change(&self.mgr, status).await;
254
352
        }
255
852
    }
256
}
257

            
258
#[async_trait]
259
impl MessageHandler for MgrMqEventHandler {
260
    // Validate and decode data.
261
105
    async fn on_message(&self, queue: Arc<dyn GmqQueue>, msg: Box<dyn Message>) {
262
        const FN_NAME: &'static str = "NetworkMgr.on_message";
263

            
264
105
        let queue_name = queue.name();
265
105
        if queue_name.cmp(self.mgr.uldata.lock().unwrap().name()) == Ordering::Equal {
266
76
            let data = match serde_json::from_slice::<UlData>(msg.payload()) {
267
                Err(_) => {
268
2
                    warn!("[{}] invalid format from {}", FN_NAME, queue_name);
269
2
                    if let Err(e) = msg.ack().await {
270
                        error!("[{}] ACK message error: {}", FN_NAME, e);
271
2
                    }
272
2
                    return;
273
                }
274
74
                Ok(mut data) => {
275
74
                    let time = match DateTime::parse_from_rfc3339(data.time.as_str()) {
276
2
                        Err(e) => {
277
2
                            warn!(
278
                                "[{}] invalid time format from {}: {}",
279
                                FN_NAME, queue_name, e
280
                            );
281
2
                            if let Err(e) = msg.ack().await {
282
                                error!("[{}] ACK message error: {}", FN_NAME, e);
283
2
                            }
284
2
                            return;
285
                        }
286
72
                        Ok(time) => time.into(),
287
72
                    };
288
72
                    data.time = strings::time_str(&time);
289
72
                    if data.network_addr.len() == 0 {
290
2
                        warn!(
291
                            "[{}] invalid network_addr format from {}",
292
                            FN_NAME, queue_name,
293
                        );
294
2
                        if let Err(e) = msg.ack().await {
295
                            error!("[{}] ACK message error: {}", FN_NAME, e);
296
2
                        }
297
2
                        return;
298
70
                    }
299
70
                    data.network_addr = data.network_addr.to_lowercase();
300
70
                    if data.data.len() > 0 {
301
70
                        if let Err(_) = hex::decode(data.data.as_str()) {
302
2
                            warn!("[{}] invalid data format from {}", FN_NAME, queue_name);
303
2
                            if let Err(e) = msg.ack().await {
304
                                error!("[{}] ACK message error: {}", FN_NAME, e);
305
2
                            }
306
2
                            return;
307
68
                        }
308
68
                        data.data = data.data.to_lowercase();
309
                    }
310
68
                    data
311
68
                }
312
68
            };
313
68
            let handler = { self.mgr.handler.lock().unwrap().clone() };
314
305
            match handler.on_uldata(&self.mgr, Box::new(data)).await {
315
                Err(_) => {
316
2
                    if let Err(e) = msg.nack().await {
317
                        error!("[{}] NACK message error: {}", FN_NAME, e);
318
2
                    }
319
                }
320
                Ok(_) => {
321
66
                    if let Err(e) = msg.ack().await {
322
                        error!("[{}] ACK message error: {}", FN_NAME, e);
323
66
                    }
324
                }
325
            }
326
29
        } else if queue_name.cmp(self.mgr.dldata_result.lock().unwrap().name()) == Ordering::Equal {
327
29
            let data = match serde_json::from_slice::<DlDataResult>(msg.payload()) {
328
                Err(_) => {
329
2
                    warn!("[{}] invalid format from {}", FN_NAME, queue_name);
330
2
                    if let Err(e) = msg.ack().await {
331
                        error!("[{}] ACK message error: {}", FN_NAME, e);
332
2
                    }
333
2
                    return;
334
                }
335
27
                Ok(data) => {
336
27
                    if data.data_id.len() == 0 {
337
2
                        warn!("[{}] invalid data_id format from {}", FN_NAME, queue_name);
338
2
                        if let Err(e) = msg.ack().await {
339
                            error!("[{}] ACK message error: {}", FN_NAME, e);
340
2
                        }
341
2
                        return;
342
25
                    }
343
25
                    if let Some(message) = data.message.as_ref() {
344
4
                        if message.len() == 0 {
345
2
                            warn!("[{}] invalid message format from {}", FN_NAME, queue_name);
346
2
                            if let Err(e) = msg.ack().await {
347
                                error!("[{}] ACK message error: {}", FN_NAME, e);
348
2
                            }
349
2
                            return;
350
2
                        }
351
21
                    }
352
23
                    data
353
23
                }
354
23
            };
355
23
            let handler = { self.mgr.handler.lock().unwrap().clone() };
356
58
            match handler.on_dldata_result(&self.mgr, Box::new(data)).await {
357
                Err(_) => {
358
2
                    if let Err(e) = msg.nack().await {
359
                        error!("[{}] NACK message error: {}", FN_NAME, e);
360
2
                    }
361
                }
362
                Ok(_) => {
363
21
                    if let Err(e) = msg.ack().await {
364
                        error!("[{}] ACK message error: {}", FN_NAME, e);
365
21
                    }
366
                }
367
            }
368
        }
369
210
    }
370
}