1
use std::{
2
    cmp::Ordering,
3
    collections::HashMap,
4
    error::Error as StdError,
5
    io::{Error as IoError, ErrorKind},
6
    sync::{Arc, Mutex},
7
};
8

            
9
use async_trait::async_trait;
10
use chrono::{DateTime, Utc};
11
use hex;
12
use serde::{Deserialize, Serialize};
13
use serde_json::{self, Map, Value};
14
use tokio::task;
15
use url::Url;
16

            
17
use general_mq::{
18
    queue::{
19
        EventHandler as QueueEventHandler, GmqQueue, Message, MessageHandler, Status as QueueStatus,
20
    },
21
    Queue,
22
};
23

            
24
use super::{
25
    get_connection, new_data_queues, remove_connection, Connection, DataMqStatus, MgrStatus,
26
    Options,
27
};
28

            
29
/// Uplink data from broker to application.
30
pub struct UlData {
31
    pub data_id: String,
32
    pub time: DateTime<Utc>,
33
    pub publish: DateTime<Utc>,
34
    pub device_id: String,
35
    pub network_id: String,
36
    pub network_code: String,
37
    pub network_addr: String,
38
    pub is_public: bool,
39
    pub data: Vec<u8>,
40
    pub extension: Option<Map<String, Value>>,
41
}
42

            
43
/// Downlink data from application to broker.
44
pub struct DlData {
45
    pub correlation_id: String,
46
    pub device_id: Option<String>,
47
    pub network_code: Option<String>,
48
    pub network_addr: Option<String>,
49
    pub data: Vec<u8>,
50
    pub extension: Option<Map<String, Value>>,
51
}
52

            
53
/// Downlink data response for [`DlData`].
54
#[derive(Deserialize)]
55
pub struct DlDataResp {
56
    #[serde(rename = "correlationId")]
57
    pub correlation_id: String,
58
    #[serde(rename = "dataId")]
59
    pub data_id: Option<String>,
60
    pub error: Option<String>,
61
    pub message: Option<String>,
62
}
63

            
64
/// Downlink data result when processing or completing data transfer to the device.
65
#[derive(Deserialize)]
66
pub struct DlDataResult {
67
    #[serde(rename = "dataId")]
68
    pub data_id: String,
69
    pub status: i32,
70
    pub message: Option<String>,
71
}
72

            
73
/// The manager for application queues.
74
#[derive(Clone)]
75
pub struct ApplicationMgr {
76
    opts: Arc<Options>,
77

            
78
    // Information for delete connection automatically.
79
    conn_pool: Arc<Mutex<HashMap<String, Connection>>>,
80
    host_uri: String,
81

            
82
    uldata: Arc<Mutex<Queue>>,
83
    dldata: Arc<Mutex<Queue>>,
84
    dldata_resp: Arc<Mutex<Queue>>,
85
    dldata_result: Arc<Mutex<Queue>>,
86

            
87
    status: Arc<Mutex<MgrStatus>>,
88
    handler: Arc<Mutex<Arc<dyn EventHandler>>>,
89
}
90

            
91
/// Event handler trait for the [`ApplicationMgr`].
92
#[async_trait]
93
pub trait EventHandler: Send + Sync {
94
    /// Fired when one of the manager's queues encounters a state change.
95
    async fn on_status_change(&self, mgr: &ApplicationMgr, status: MgrStatus);
96

            
97
    /// Fired when a [`UlData`] data is received.
98
    ///
99
    /// Return [`Err`] will NACK the data.
100
    /// The data may will be received again depending on the protocol (such as AMQP).
101
    async fn on_uldata(&self, mgr: &ApplicationMgr, data: Box<UlData>) -> Result<(), ()>;
102

            
103
    /// Fired when a [`DlDataResp`] data is received.
104
    ///
105
    /// Return [`Err`] will NACK the data.
106
    /// The data may will be received again depending on the protocol (such as AMQP).
107
    async fn on_dldata_resp(&self, mgr: &ApplicationMgr, data: Box<DlDataResp>) -> Result<(), ()>;
108

            
109
    /// Fired when a [`DlDataResult`] data is received.
110
    ///
111
    /// Return [`Err`] will NACK the data.
112
    /// The data may will be received again depending on the protocol (such as AMQP).
113
    async fn on_dldata_result(
114
        &self,
115
        mgr: &ApplicationMgr,
116
        data: Box<DlDataResult>,
117
    ) -> Result<(), ()>;
118
}
119

            
120
/// The event handler for [`general_mq::queue::GmqQueue`].
121
struct MgrMqEventHandler {
122
    mgr: ApplicationMgr,
123
}
124

            
125
#[derive(Deserialize)]
126
struct UlDataInner {
127
    #[serde(rename = "dataId")]
128
    data_id: String,
129
    time: String,
130
    #[serde(rename = "pub")]
131
    publish: String,
132
    #[serde(rename = "deviceId")]
133
    device_id: String,
134
    #[serde(rename = "networkId")]
135
    network_id: String,
136
    #[serde(rename = "networkCode")]
137
    network_code: String,
138
    #[serde(rename = "networkAddr")]
139
    network_addr: String,
140
    #[serde(rename = "isPublic")]
141
    is_public: bool,
142
    data: String,
143
    extension: Option<Map<String, Value>>,
144
}
145

            
146
#[derive(Clone, Serialize)]
147
struct DlDataInner<'a> {
148
    #[serde(rename = "correlationId")]
149
    correlation_id: &'a String,
150
    #[serde(rename = "deviceId", skip_serializing_if = "Option::is_none")]
151
    device_id: Option<&'a String>,
152
    #[serde(rename = "networkCode", skip_serializing_if = "Option::is_none")]
153
    network_code: Option<&'a String>,
154
    #[serde(rename = "networkAddr", skip_serializing_if = "Option::is_none")]
155
    network_addr: Option<&'a String>,
156
    data: String,
157
    #[serde(skip_serializing_if = "Option::is_none")]
158
    extension: &'a Option<Map<String, Value>>,
159
}
160

            
161
const QUEUE_PREFIX: &'static str = "broker.application";
162
const ERR_PARAM_CORR_ID: &'static str = "the `correlation_id` must be a non-empty string";
163
const ERR_PARAM_DEV: &'static str =
164
    "one of `device_id` or [`network_code`, `network_addr`] pair must be provided with non-empty string";
165

            
166
impl ApplicationMgr {
167
    /// To create a manager instance.
168
34
    pub fn new(
169
34
        conn_pool: Arc<Mutex<HashMap<String, Connection>>>,
170
34
        host_uri: &Url,
171
34
        opts: Options,
172
34
        handler: Arc<dyn EventHandler>,
173
34
    ) -> Result<Self, String> {
174
34
        if opts.unit_id.len() == 0 {
175
4
            return Err("`unit_id` cannot be empty for application".to_string());
176
30
        }
177

            
178
30
        let conn = get_connection(&conn_pool, host_uri)?;
179

            
180
24
        let (uldata, dldata, dldata_resp, dldata_result, _) =
181
30
            new_data_queues(&conn, &opts, QUEUE_PREFIX, false)?;
182

            
183
24
        let mgr = ApplicationMgr {
184
24
            opts: Arc::new(opts),
185
24
            conn_pool,
186
24
            host_uri: host_uri.to_string(),
187
24
            uldata,
188
24
            dldata,
189
24
            dldata_resp: dldata_resp.unwrap(),
190
24
            dldata_result,
191
24
            status: Arc::new(Mutex::new(MgrStatus::NotReady)),
192
24
            handler: Arc::new(Mutex::new(handler)),
193
24
        };
194
24
        let mq_handler = Arc::new(MgrMqEventHandler { mgr: mgr.clone() });
195
24
        let mut q = { mgr.uldata.lock().unwrap().clone() };
196
24
        q.set_handler(mq_handler.clone());
197
24
        q.set_msg_handler(mq_handler.clone());
198
24
        if let Err(e) = q.connect() {
199
            return Err(e.to_string());
200
24
        }
201
24
        let mut q = { mgr.dldata.lock().unwrap().clone() };
202
24
        q.set_handler(mq_handler.clone());
203
24
        if let Err(e) = q.connect() {
204
            return Err(e.to_string());
205
24
        }
206
24
        let mut q = { mgr.dldata_resp.lock().unwrap().clone() };
207
24
        q.set_handler(mq_handler.clone());
208
24
        q.set_msg_handler(mq_handler.clone());
209
24
        if let Err(e) = q.connect() {
210
            return Err(e.to_string());
211
24
        }
212
24
        let mut q = { mgr.dldata_result.lock().unwrap().clone() };
213
24
        q.set_handler(mq_handler.clone());
214
24
        q.set_msg_handler(mq_handler.clone());
215
24
        if let Err(e) = q.connect() {
216
            return Err(e.to_string());
217
24
        }
218
24
        match conn {
219
12
            Connection::Amqp(_, counter) => {
220
12
                *counter.lock().unwrap() += 4;
221
12
            }
222
12
            Connection::Mqtt(_, counter) => {
223
12
                *counter.lock().unwrap() += 4;
224
12
            }
225
        }
226
24
        Ok(mgr)
227
34
    }
228

            
229
    /// The associated unit ID of the application.
230
2
    pub fn unit_id(&self) -> &str {
231
2
        self.opts.unit_id.as_str()
232
2
    }
233

            
234
    /// The associated unit code of the application.
235
2
    pub fn unit_code(&self) -> &str {
236
2
        self.opts.unit_code.as_str()
237
2
    }
238

            
239
    /// The application ID.
240
2
    pub fn id(&self) -> &str {
241
2
        self.opts.id.as_str()
242
2
    }
243

            
244
    /// The application code.
245
2
    pub fn name(&self) -> &str {
246
2
        self.opts.name.as_str()
247
2
    }
248

            
249
    /// Manager status.
250
108
    pub fn status(&self) -> MgrStatus {
251
108
        *self.status.lock().unwrap()
252
108
    }
253

            
254
    /// Detail status of each message queue. Please ignore `ctrl`.
255
2
    pub fn mq_status(&self) -> DataMqStatus {
256
2
        DataMqStatus {
257
2
            uldata: { self.uldata.lock().unwrap().status() },
258
2
            dldata: { self.dldata.lock().unwrap().status() },
259
2
            dldata_resp: { self.dldata_resp.lock().unwrap().status() },
260
2
            dldata_result: { self.dldata_result.lock().unwrap().status() },
261
2
            ctrl: QueueStatus::Closed,
262
2
        }
263
2
    }
264

            
265
    /// To close the manager queues.
266
    /// The underlying connection will be closed when there are no queues use it.
267
24
    pub async fn close(&self) -> Result<(), Box<dyn StdError + Send + Sync>> {
268
24
        let mut q = { self.uldata.lock().unwrap().clone() };
269
24
        q.close().await?;
270
24
        let mut q = { self.dldata.lock().unwrap().clone() };
271
24
        q.close().await?;
272
24
        let mut q = { self.dldata_resp.lock().unwrap().clone() };
273
24
        q.close().await?;
274
24
        let mut q = { self.dldata_result.lock().unwrap().clone() };
275
24
        q.close().await?;
276

            
277
24
        remove_connection(&self.conn_pool, &self.host_uri, 4).await
278
24
    }
279

            
280
    /// Send downlink data [`DlData`] to the broker.
281
12
    pub fn send_dldata(&self, data: &DlData) -> Result<(), Box<dyn StdError>> {
282
12
        if data.correlation_id.len() == 0 {
283
2
            let err = IoError::new(ErrorKind::InvalidInput, ERR_PARAM_CORR_ID.to_string());
284
2
            return Err(Box::new(err));
285
10
        }
286
10
        if data.device_id.is_none() {
287
6
            if data.network_code.is_none() || data.network_addr.is_none() {
288
2
                let err = IoError::new(ErrorKind::InvalidInput, ERR_PARAM_DEV.to_string());
289
2
                return Err(Box::new(err));
290
4
            } else if data.network_code.as_ref().unwrap().len() == 0
291
2
                || data.network_addr.as_ref().unwrap().len() == 0
292
            {
293
2
                let err = IoError::new(ErrorKind::InvalidInput, ERR_PARAM_DEV.to_string());
294
2
                return Err(Box::new(err));
295
2
            }
296
4
        } else if data.device_id.as_ref().unwrap().len() == 0 {
297
2
            let err = IoError::new(ErrorKind::InvalidInput, ERR_PARAM_DEV.to_string());
298
2
            return Err(Box::new(err));
299
2
        }
300

            
301
4
        let payload = serde_json::to_vec(&DlDataInner {
302
4
            correlation_id: &data.correlation_id,
303
4
            device_id: data.device_id.as_ref(),
304
4
            network_code: data.network_code.as_ref(),
305
4
            network_addr: data.network_addr.as_ref(),
306
4
            data: hex::encode(&data.data),
307
4
            extension: &data.extension,
308
4
        })?;
309
4
        let queue = { (*self.dldata.lock().unwrap()).clone() };
310
4
        task::spawn(async move {
311
4
            let _ = queue.send_msg(payload).await;
312
4
        });
313
4
        Ok(())
314
12
    }
315
}
316

            
317
#[async_trait]
318
impl QueueEventHandler for MgrMqEventHandler {
319
    async fn on_error(&self, _queue: Arc<dyn GmqQueue>, _err: Box<dyn StdError + Send + Sync>) {}
320

            
321
174
    async fn on_status(&self, _queue: Arc<dyn GmqQueue>, _status: QueueStatus) {
322
174
        let status = match { self.mgr.uldata.lock().unwrap().status() } == QueueStatus::Connected
323
69
            && { self.mgr.dldata.lock().unwrap().status() } == QueueStatus::Connected
324
57
            && { self.mgr.dldata_resp.lock().unwrap().status() } == QueueStatus::Connected
325
38
            && { self.mgr.dldata_result.lock().unwrap().status() } == QueueStatus::Connected
326
        {
327
155
            false => MgrStatus::NotReady,
328
19
            true => MgrStatus::Ready,
329
        };
330

            
331
174
        let mut changed = false;
332
174
        {
333
174
            let mut mutex = self.mgr.status.lock().unwrap();
334
174
            if *mutex != status {
335
38
                *mutex = status;
336
38
                changed = true;
337
136
            }
338
        }
339
174
        if changed {
340
38
            let handler = { self.mgr.handler.lock().unwrap().clone() };
341
38
            handler.on_status_change(&self.mgr, status).await;
342
136
        }
343
348
    }
344
}
345

            
346
#[async_trait]
347
impl MessageHandler for MgrMqEventHandler {
348
    // Validate and decode data.
349
33
    async fn on_message(&self, queue: Arc<dyn GmqQueue>, msg: Box<dyn Message>) {
350
        const _FN_NAME: &'static str = "ApplicationMgr.on_message";
351

            
352
33
        let queue_name = queue.name();
353
33
        if queue_name.cmp(self.mgr.uldata.lock().unwrap().name()) == Ordering::Equal {
354
15
            let data = match serde_json::from_slice::<UlDataInner>(msg.payload()) {
355
                Err(_) => {
356
2
                    let _ = msg.ack().await;
357
2
                    return;
358
                }
359
13
                Ok(data) => {
360
13
                    let data_bytes = match data.data.len() {
361
2
                        0 => vec![],
362
11
                        _ => match hex::decode(data.data.as_str()) {
363
                            Err(_) => {
364
2
                                let _ = msg.ack().await;
365
2
                                return;
366
                            }
367
9
                            Ok(data) => data,
368
                        },
369
                    };
370
11
                    let time = match DateTime::parse_from_rfc3339(data.time.as_str()) {
371
                        Err(_) => {
372
2
                            let _ = msg.ack().await;
373
2
                            return;
374
                        }
375
9
                        Ok(time) => time.into(),
376
                    };
377
9
                    let publish = match DateTime::parse_from_rfc3339(data.publish.as_str()) {
378
                        Err(_) => {
379
2
                            let _ = msg.ack().await;
380
2
                            return;
381
                        }
382
7
                        Ok(publish) => publish.into(),
383
7
                    };
384
7
                    UlData {
385
7
                        data_id: data.data_id,
386
7
                        time,
387
7
                        publish,
388
7
                        device_id: data.device_id,
389
7
                        network_id: data.network_id,
390
7
                        network_code: data.network_code,
391
7
                        network_addr: data.network_addr,
392
7
                        is_public: data.is_public,
393
7
                        data: data_bytes,
394
7
                        extension: data.extension,
395
7
                    }
396
7
                }
397
7
            };
398
7
            let handler = { self.mgr.handler.lock().unwrap().clone() };
399
7
            let _ = match handler.on_uldata(&self.mgr, Box::new(data)).await {
400
2
                Err(_) => msg.nack().await,
401
5
                Ok(_) => msg.ack().await,
402
            };
403
18
        } else if queue_name.cmp(self.mgr.dldata_resp.lock().unwrap().name()) == Ordering::Equal {
404
9
            let data = match serde_json::from_slice::<DlDataResp>(msg.payload()) {
405
                Err(_) => {
406
2
                    let _ = msg.ack().await;
407
2
                    return;
408
                }
409
7
                Ok(data) => data,
410
7
            };
411
7
            let handler = { self.mgr.handler.lock().unwrap().clone() };
412
7
            let _ = match handler.on_dldata_resp(&self.mgr, Box::new(data)).await {
413
2
                Err(_) => msg.nack().await,
414
5
                Ok(_) => msg.ack().await,
415
            };
416
9
        } else if queue_name.cmp(self.mgr.dldata_result.lock().unwrap().name()) == Ordering::Equal {
417
9
            let data = match serde_json::from_slice::<DlDataResult>(msg.payload()) {
418
                Err(_) => {
419
2
                    let _ = msg.ack().await;
420
2
                    return;
421
                }
422
7
                Ok(data) => data,
423
7
            };
424
7
            let handler = { self.mgr.handler.lock().unwrap().clone() };
425
7
            let _ = match handler.on_dldata_result(&self.mgr, Box::new(data)).await {
426
2
                Err(_) => msg.nack().await,
427
5
                Ok(_) => msg.ack().await,
428
            };
429
        }
430
66
    }
431
}