1
use std::{
2
    cmp::Ordering,
3
    collections::HashMap,
4
    error::Error as StdError,
5
    io::{Error as IoError, ErrorKind},
6
    sync::{Arc, Mutex},
7
};
8

            
9
use async_trait::async_trait;
10
use chrono::{DateTime, Utc};
11
use hex;
12
use serde::{Deserialize, Serialize};
13
use serde_json::{self, Map, Value};
14
use tokio::task;
15
use url::Url;
16

            
17
use general_mq::{
18
    Queue,
19
    queue::{
20
        EventHandler as QueueEventHandler, GmqQueue, Message, MessageHandler, Status as QueueStatus,
21
    },
22
};
23

            
24
use super::{
25
    Connection, DataMqStatus, MgrStatus, Options, get_connection, new_data_queues,
26
    remove_connection,
27
};
28

            
29
/// Uplink data from broker to application.
30
pub struct UlData {
31
    pub data_id: String,
32
    pub time: DateTime<Utc>,
33
    pub publish: DateTime<Utc>,
34
    pub device_id: String,
35
    pub network_id: String,
36
    pub network_code: String,
37
    pub network_addr: String,
38
    pub is_public: bool,
39
    pub data: Vec<u8>,
40
    pub extension: Option<Map<String, Value>>,
41
}
42

            
43
/// Downlink data from application to broker.
44
pub struct DlData {
45
    pub correlation_id: String,
46
    pub device_id: Option<String>,
47
    pub network_code: Option<String>,
48
    pub network_addr: Option<String>,
49
    pub data: Vec<u8>,
50
    pub extension: Option<Map<String, Value>>,
51
}
52

            
53
/// Downlink data response for [`DlData`].
54
#[derive(Deserialize)]
55
pub struct DlDataResp {
56
    #[serde(rename = "correlationId")]
57
    pub correlation_id: String,
58
    #[serde(rename = "dataId")]
59
    pub data_id: Option<String>,
60
    pub error: Option<String>,
61
    pub message: Option<String>,
62
}
63

            
64
/// Downlink data result when processing or completing data transfer to the device.
65
#[derive(Deserialize)]
66
pub struct DlDataResult {
67
    #[serde(rename = "dataId")]
68
    pub data_id: String,
69
    pub status: i32,
70
    pub message: Option<String>,
71
}
72

            
73
/// The manager for application queues.
74
#[derive(Clone)]
75
pub struct ApplicationMgr {
76
    opts: Arc<Options>,
77

            
78
    // Information for delete connection automatically.
79
    conn_pool: Arc<Mutex<HashMap<String, Connection>>>,
80
    host_uri: String,
81

            
82
    uldata: Arc<Mutex<Queue>>,
83
    dldata: Arc<Mutex<Queue>>,
84
    dldata_resp: Arc<Mutex<Queue>>,
85
    dldata_result: Arc<Mutex<Queue>>,
86

            
87
    status: Arc<Mutex<MgrStatus>>,
88
    handler: Arc<Mutex<Arc<dyn EventHandler>>>,
89
}
90

            
91
/// Event handler trait for the [`ApplicationMgr`].
92
#[async_trait]
93
pub trait EventHandler: Send + Sync {
94
    /// Fired when a queue error occurs. Override to handle MQ-level errors.
95
    async fn on_error(&self, _mgr: &ApplicationMgr, _err: Box<dyn StdError + Send + Sync>) {}
96

            
97
    /// Fired when one of the manager's queues encounters a state change.
98
    async fn on_status_change(&self, mgr: &ApplicationMgr, status: MgrStatus);
99

            
100
    /// Fired when a [`UlData`] data is received.
101
    ///
102
    /// Return [`Err`] will NACK the data.
103
    /// The data may will be received again depending on the protocol (such as AMQP).
104
    async fn on_uldata(&self, mgr: &ApplicationMgr, data: Box<UlData>) -> Result<(), ()>;
105

            
106
    /// Fired when a [`DlDataResp`] data is received.
107
    ///
108
    /// Return [`Err`] will NACK the data.
109
    /// The data may will be received again depending on the protocol (such as AMQP).
110
    async fn on_dldata_resp(&self, mgr: &ApplicationMgr, data: Box<DlDataResp>) -> Result<(), ()>;
111

            
112
    /// Fired when a [`DlDataResult`] data is received.
113
    ///
114
    /// Return [`Err`] will NACK the data.
115
    /// The data may will be received again depending on the protocol (such as AMQP).
116
    async fn on_dldata_result(
117
        &self,
118
        mgr: &ApplicationMgr,
119
        data: Box<DlDataResult>,
120
    ) -> Result<(), ()>;
121
}
122

            
123
/// The event handler for [`general_mq::queue::GmqQueue`].
124
struct MgrMqEventHandler {
125
    mgr: ApplicationMgr,
126
}
127

            
128
#[derive(Deserialize)]
129
struct UlDataInner {
130
    #[serde(rename = "dataId")]
131
    data_id: String,
132
    time: String,
133
    #[serde(rename = "pub")]
134
    publish: String,
135
    #[serde(rename = "deviceId")]
136
    device_id: String,
137
    #[serde(rename = "networkId")]
138
    network_id: String,
139
    #[serde(rename = "networkCode")]
140
    network_code: String,
141
    #[serde(rename = "networkAddr")]
142
    network_addr: String,
143
    #[serde(rename = "isPublic")]
144
    is_public: bool,
145
    data: String,
146
    extension: Option<Map<String, Value>>,
147
}
148

            
149
#[derive(Clone, Serialize)]
150
struct DlDataInner<'a> {
151
    #[serde(rename = "correlationId")]
152
    correlation_id: &'a String,
153
    #[serde(rename = "deviceId", skip_serializing_if = "Option::is_none")]
154
    device_id: Option<&'a String>,
155
    #[serde(rename = "networkCode", skip_serializing_if = "Option::is_none")]
156
    network_code: Option<&'a String>,
157
    #[serde(rename = "networkAddr", skip_serializing_if = "Option::is_none")]
158
    network_addr: Option<&'a String>,
159
    data: String,
160
    #[serde(skip_serializing_if = "Option::is_none")]
161
    extension: &'a Option<Map<String, Value>>,
162
}
163

            
164
const QUEUE_PREFIX: &'static str = "broker.application";
165
const ERR_PARAM_CORR_ID: &'static str = "the `correlation_id` must be a non-empty string";
166
const ERR_PARAM_DEV: &'static str = "one of `device_id` or [`network_code`, `network_addr`] pair must be provided with non-empty string";
167

            
168
impl ApplicationMgr {
169
    /// To create a manager instance.
170
68
    pub fn new(
171
68
        conn_pool: Arc<Mutex<HashMap<String, Connection>>>,
172
68
        host_uri: &Url,
173
68
        opts: Options,
174
68
        handler: Arc<dyn EventHandler>,
175
68
    ) -> Result<Self, String> {
176
68
        if opts.unit_id.len() == 0 {
177
8
            return Err("`unit_id` cannot be empty for application".to_string());
178
60
        }
179

            
180
60
        let conn = get_connection(&conn_pool, host_uri)?;
181

            
182
48
        let (uldata, dldata, dldata_resp, dldata_result, _) =
183
60
            new_data_queues(&conn, &opts, QUEUE_PREFIX, false)?;
184

            
185
48
        let mgr = ApplicationMgr {
186
48
            opts: Arc::new(opts),
187
48
            conn_pool,
188
48
            host_uri: host_uri.to_string(),
189
48
            uldata,
190
48
            dldata,
191
48
            dldata_resp: dldata_resp.unwrap(),
192
48
            dldata_result,
193
48
            status: Arc::new(Mutex::new(MgrStatus::NotReady)),
194
48
            handler: Arc::new(Mutex::new(handler)),
195
48
        };
196
48
        let mq_handler = Arc::new(MgrMqEventHandler { mgr: mgr.clone() });
197
48
        let mut q = { mgr.uldata.lock().unwrap().clone() };
198
48
        q.set_handler(mq_handler.clone());
199
48
        q.set_msg_handler(mq_handler.clone());
200
48
        if let Err(e) = q.connect() {
201
            return Err(e.to_string());
202
48
        }
203
48
        let mut q = { mgr.dldata.lock().unwrap().clone() };
204
48
        q.set_handler(mq_handler.clone());
205
48
        if let Err(e) = q.connect() {
206
            return Err(e.to_string());
207
48
        }
208
48
        let mut q = { mgr.dldata_resp.lock().unwrap().clone() };
209
48
        q.set_handler(mq_handler.clone());
210
48
        q.set_msg_handler(mq_handler.clone());
211
48
        if let Err(e) = q.connect() {
212
            return Err(e.to_string());
213
48
        }
214
48
        let mut q = { mgr.dldata_result.lock().unwrap().clone() };
215
48
        q.set_handler(mq_handler.clone());
216
48
        q.set_msg_handler(mq_handler.clone());
217
48
        if let Err(e) = q.connect() {
218
            return Err(e.to_string());
219
48
        }
220
48
        match conn {
221
24
            Connection::Amqp(_, counter) => {
222
24
                *counter.lock().unwrap() += 4;
223
24
            }
224
24
            Connection::Mqtt(_, counter) => {
225
24
                *counter.lock().unwrap() += 4;
226
24
            }
227
        }
228
48
        Ok(mgr)
229
68
    }
230

            
231
    /// The associated unit ID of the application.
232
4
    pub fn unit_id(&self) -> &str {
233
4
        self.opts.unit_id.as_str()
234
4
    }
235

            
236
    /// The associated unit code of the application.
237
4
    pub fn unit_code(&self) -> &str {
238
4
        self.opts.unit_code.as_str()
239
4
    }
240

            
241
    /// The application ID.
242
4
    pub fn id(&self) -> &str {
243
4
        self.opts.id.as_str()
244
4
    }
245

            
246
    /// The application code.
247
4
    pub fn name(&self) -> &str {
248
4
        self.opts.name.as_str()
249
4
    }
250

            
251
    /// Manager status.
252
90
    pub fn status(&self) -> MgrStatus {
253
90
        *self.status.lock().unwrap()
254
90
    }
255

            
256
    /// Detail status of each message queue. Please ignore `ctrl`.
257
4
    pub fn mq_status(&self) -> DataMqStatus {
258
4
        DataMqStatus {
259
4
            uldata: { self.uldata.lock().unwrap().status() },
260
4
            dldata: { self.dldata.lock().unwrap().status() },
261
4
            dldata_resp: { self.dldata_resp.lock().unwrap().status() },
262
4
            dldata_result: { self.dldata_result.lock().unwrap().status() },
263
4
            ctrl: QueueStatus::Closed,
264
4
        }
265
4
    }
266

            
267
    /// To close the manager queues.
268
    /// The underlying connection will be closed when there are no queues use it.
269
48
    pub async fn close(&self) -> Result<(), Box<dyn StdError + Send + Sync>> {
270
48
        let mut q = { self.uldata.lock().unwrap().clone() };
271
48
        q.close().await?;
272
48
        let mut q = { self.dldata.lock().unwrap().clone() };
273
48
        q.close().await?;
274
48
        let mut q = { self.dldata_resp.lock().unwrap().clone() };
275
48
        q.close().await?;
276
48
        let mut q = { self.dldata_result.lock().unwrap().clone() };
277
48
        q.close().await?;
278

            
279
48
        remove_connection(&self.conn_pool, &self.host_uri, 4).await
280
48
    }
281

            
282
    /// Send downlink data [`DlData`] to the broker.
283
24
    pub fn send_dldata(&self, data: &DlData) -> Result<(), Box<dyn StdError>> {
284
24
        if data.correlation_id.len() == 0 {
285
4
            let err = IoError::new(ErrorKind::InvalidInput, ERR_PARAM_CORR_ID.to_string());
286
4
            return Err(Box::new(err));
287
20
        }
288
20
        if data.device_id.is_none() {
289
12
            if data.network_code.is_none() || data.network_addr.is_none() {
290
4
                let err = IoError::new(ErrorKind::InvalidInput, ERR_PARAM_DEV.to_string());
291
4
                return Err(Box::new(err));
292
8
            } else if data.network_code.as_ref().unwrap().len() == 0
293
4
                || data.network_addr.as_ref().unwrap().len() == 0
294
            {
295
4
                let err = IoError::new(ErrorKind::InvalidInput, ERR_PARAM_DEV.to_string());
296
4
                return Err(Box::new(err));
297
4
            }
298
8
        } else if data.device_id.as_ref().unwrap().len() == 0 {
299
4
            let err = IoError::new(ErrorKind::InvalidInput, ERR_PARAM_DEV.to_string());
300
4
            return Err(Box::new(err));
301
4
        }
302

            
303
8
        let payload = serde_json::to_vec(&DlDataInner {
304
8
            correlation_id: &data.correlation_id,
305
8
            device_id: data.device_id.as_ref(),
306
8
            network_code: data.network_code.as_ref(),
307
8
            network_addr: data.network_addr.as_ref(),
308
8
            data: hex::encode(&data.data),
309
8
            extension: &data.extension,
310
8
        })?;
311
8
        let queue = { (*self.dldata.lock().unwrap()).clone() };
312
8
        task::spawn(async move {
313
8
            let _ = queue.send_msg(payload).await;
314
8
        });
315
8
        Ok(())
316
24
    }
317
}
318

            
319
#[async_trait]
320
impl QueueEventHandler for MgrMqEventHandler {
321
    async fn on_error(&self, _queue: Arc<dyn GmqQueue>, err: Box<dyn StdError + Send + Sync>) {
322
        let handler = { self.mgr.handler.lock().unwrap().clone() };
323
        handler.on_error(&self.mgr, err).await;
324
    }
325

            
326
348
    async fn on_status(&self, _queue: Arc<dyn GmqQueue>, _status: QueueStatus) {
327
        let status = match { self.mgr.uldata.lock().unwrap().status() } == QueueStatus::Connected
328
            && { self.mgr.dldata.lock().unwrap().status() } == QueueStatus::Connected
329
            && { self.mgr.dldata_resp.lock().unwrap().status() } == QueueStatus::Connected
330
            && { self.mgr.dldata_result.lock().unwrap().status() } == QueueStatus::Connected
331
        {
332
            false => MgrStatus::NotReady,
333
            true => MgrStatus::Ready,
334
        };
335

            
336
        let mut changed = false;
337
        {
338
            let mut mutex = self.mgr.status.lock().unwrap();
339
            if *mutex != status {
340
                *mutex = status;
341
                changed = true;
342
            }
343
        }
344
        if changed {
345
            let handler = { self.mgr.handler.lock().unwrap().clone() };
346
            handler.on_status_change(&self.mgr, status).await;
347
        }
348
348
    }
349
}
350

            
351
#[async_trait]
352
impl MessageHandler for MgrMqEventHandler {
353
    // Validate and decode data.
354
66
    async fn on_message(&self, queue: Arc<dyn GmqQueue>, msg: Box<dyn Message>) {
355
        const _FN_NAME: &'static str = "ApplicationMgr.on_message";
356

            
357
        let queue_name = queue.name();
358
        if queue_name.cmp(self.mgr.uldata.lock().unwrap().name()) == Ordering::Equal {
359
            let data = match serde_json::from_slice::<UlDataInner>(msg.payload()) {
360
                Err(_) => {
361
                    let _ = msg.ack().await;
362
                    return;
363
                }
364
                Ok(data) => {
365
                    let data_bytes = match data.data.len() {
366
                        0 => vec![],
367
                        _ => match hex::decode(data.data.as_str()) {
368
                            Err(_) => {
369
                                let _ = msg.ack().await;
370
                                return;
371
                            }
372
                            Ok(data) => data,
373
                        },
374
                    };
375
                    let time = match DateTime::parse_from_rfc3339(data.time.as_str()) {
376
                        Err(_) => {
377
                            let _ = msg.ack().await;
378
                            return;
379
                        }
380
                        Ok(time) => time.into(),
381
                    };
382
                    let publish = match DateTime::parse_from_rfc3339(data.publish.as_str()) {
383
                        Err(_) => {
384
                            let _ = msg.ack().await;
385
                            return;
386
                        }
387
                        Ok(publish) => publish.into(),
388
                    };
389
                    UlData {
390
                        data_id: data.data_id,
391
                        time,
392
                        publish,
393
                        device_id: data.device_id,
394
                        network_id: data.network_id,
395
                        network_code: data.network_code,
396
                        network_addr: data.network_addr,
397
                        is_public: data.is_public,
398
                        data: data_bytes,
399
                        extension: data.extension,
400
                    }
401
                }
402
            };
403
            let handler = { self.mgr.handler.lock().unwrap().clone() };
404
            let _ = match handler.on_uldata(&self.mgr, Box::new(data)).await {
405
                Err(_) => msg.nack().await,
406
                Ok(_) => msg.ack().await,
407
            };
408
        } else if queue_name.cmp(self.mgr.dldata_resp.lock().unwrap().name()) == Ordering::Equal {
409
            let data = match serde_json::from_slice::<DlDataResp>(msg.payload()) {
410
                Err(_) => {
411
                    let _ = msg.ack().await;
412
                    return;
413
                }
414
                Ok(data) => data,
415
            };
416
            let handler = { self.mgr.handler.lock().unwrap().clone() };
417
            let _ = match handler.on_dldata_resp(&self.mgr, Box::new(data)).await {
418
                Err(_) => msg.nack().await,
419
                Ok(_) => msg.ack().await,
420
            };
421
        } else if queue_name.cmp(self.mgr.dldata_result.lock().unwrap().name()) == Ordering::Equal {
422
            let data = match serde_json::from_slice::<DlDataResult>(msg.payload()) {
423
                Err(_) => {
424
                    let _ = msg.ack().await;
425
                    return;
426
                }
427
                Ok(data) => data,
428
            };
429
            let handler = { self.mgr.handler.lock().unwrap().clone() };
430
            let _ = match handler.on_dldata_result(&self.mgr, Box::new(data)).await {
431
                Err(_) => msg.nack().await,
432
                Ok(_) => msg.ack().await,
433
            };
434
        }
435
66
    }
436
}