1
use std::{
2
    cmp::Ordering,
3
    collections::HashMap,
4
    error::Error as StdError,
5
    sync::{Arc, Mutex},
6
};
7

            
8
use async_trait::async_trait;
9
use hex;
10
use log::{error, warn};
11
use serde::{Deserialize, Serialize};
12
use serde_json::{self, Map, Value};
13
use tokio::task;
14
use url::Url;
15

            
16
use general_mq::{
17
    queue::{
18
        EventHandler as QueueEventHandler, GmqQueue, Message, MessageHandler, Status as QueueStatus,
19
    },
20
    Queue,
21
};
22
use sylvia_iot_corelib::{err, strings};
23

            
24
use super::{
25
    get_connection, new_data_queues, remove_connection, Connection, MgrMqStatus, MgrStatus, Options,
26
};
27

            
28
/// Uplink data from broker to application.
29
#[derive(Serialize)]
30
pub struct UlData {
31
    #[serde(rename = "dataId")]
32
    pub data_id: String,
33
    pub time: String,
34
    #[serde(rename = "pub")]
35
    pub publish: String,
36
    #[serde(rename = "deviceId")]
37
    pub device_id: String,
38
    #[serde(rename = "networkId")]
39
    pub network_id: String,
40
    #[serde(rename = "networkCode")]
41
    pub network_code: String,
42
    #[serde(rename = "networkAddr")]
43
    pub network_addr: String,
44
    #[serde(rename = "isPublic")]
45
    pub is_public: bool,
46
    pub profile: String,
47
    pub data: String,
48
    #[serde(skip_serializing_if = "Option::is_none")]
49
    pub extension: Option<Map<String, Value>>,
50
}
51

            
52
/// Downlink data from application to broker.
53
#[derive(Deserialize)]
54
pub struct DlData {
55
    #[serde(rename = "correlationId")]
56
    pub correlation_id: String,
57
    #[serde(rename = "deviceId")]
58
    pub device_id: Option<String>,
59
    #[serde(rename = "networkCode")]
60
    pub network_code: Option<String>,
61
    #[serde(rename = "networkAddr")]
62
    pub network_addr: Option<String>,
63
    pub data: String,
64
    pub extension: Option<Map<String, Value>>,
65
}
66

            
67
/// Downlink data response for [`DlData`].
68
#[derive(Default, Serialize)]
69
pub struct DlDataResp {
70
    #[serde(rename = "correlationId")]
71
    pub correlation_id: String,
72
    #[serde(rename = "dataId", skip_serializing_if = "Option::is_none")]
73
    pub data_id: Option<String>,
74
    #[serde(skip_serializing_if = "Option::is_none")]
75
    pub error: Option<String>,
76
    #[serde(skip_serializing_if = "Option::is_none")]
77
    pub message: Option<String>,
78
}
79

            
80
/// Downlink data result when processing or completing data transfer to the device.
81
#[derive(Serialize)]
82
pub struct DlDataResult {
83
    #[serde(rename = "dataId")]
84
    pub data_id: String,
85
    pub status: i32,
86
    #[serde(skip_serializing_if = "Option::is_none")]
87
    pub message: Option<String>,
88
}
89

            
90
/// The manager for application queues.
91
#[derive(Clone)]
92
pub struct ApplicationMgr {
93
    opts: Arc<Options>,
94

            
95
    // Information for delete connection automatically.
96
    conn_pool: Arc<Mutex<HashMap<String, Connection>>>,
97
    host_uri: String,
98

            
99
    uldata: Arc<Mutex<Queue>>,
100
    dldata: Arc<Mutex<Queue>>,
101
    dldata_resp: Arc<Mutex<Queue>>,
102
    dldata_result: Arc<Mutex<Queue>>,
103

            
104
    status: Arc<Mutex<MgrStatus>>,
105
    handler: Arc<Mutex<Arc<dyn EventHandler>>>,
106
}
107

            
108
/// Event handler trait for the [`ApplicationMgr`].
109
#[async_trait]
110
pub trait EventHandler: Send + Sync {
111
    async fn on_status_change(&self, mgr: &ApplicationMgr, status: MgrStatus);
112

            
113
    async fn on_dldata(
114
        &self,
115
        mgr: &ApplicationMgr,
116
        data: Box<DlData>,
117
    ) -> Result<Box<DlDataResp>, ()>;
118
}
119

            
120
/// The event handler for [`general_mq::queue::GmqQueue`].
121
struct MgrMqEventHandler {
122
    mgr: ApplicationMgr,
123
}
124

            
125
const QUEUE_PREFIX: &'static str = "broker.application";
126

            
127
impl ApplicationMgr {
128
    /// To create a manager instance.
129
108
    pub fn new(
130
108
        conn_pool: Arc<Mutex<HashMap<String, Connection>>>,
131
108
        host_uri: &Url,
132
108
        opts: Options,
133
108
        handler: Arc<dyn EventHandler>,
134
108
    ) -> Result<Self, String> {
135
108
        if opts.unit_id.len() == 0 {
136
14
            return Err("`unit_id` cannot be empty for application".to_string());
137
94
        }
138

            
139
94
        let conn = get_connection(&conn_pool, host_uri)?;
140

            
141
82
        let (uldata, dldata, dldata_resp, dldata_result) =
142
94
            new_data_queues(&conn, &opts, QUEUE_PREFIX, false)?;
143

            
144
82
        let mgr = ApplicationMgr {
145
82
            opts: Arc::new(opts),
146
82
            conn_pool,
147
82
            host_uri: host_uri.to_string(),
148
82
            uldata,
149
82
            dldata,
150
82
            dldata_resp: dldata_resp.unwrap(),
151
82
            dldata_result,
152
82
            status: Arc::new(Mutex::new(MgrStatus::NotReady)),
153
82
            handler: Arc::new(Mutex::new(handler)),
154
82
        };
155
82
        let mq_handler = Arc::new(MgrMqEventHandler { mgr: mgr.clone() });
156
82
        let mut q = { mgr.uldata.lock().unwrap().clone() };
157
82
        q.set_handler(mq_handler.clone());
158
82
        if let Err(e) = q.connect() {
159
            return Err(e.to_string());
160
82
        }
161
82
        let mut q = { mgr.dldata.lock().unwrap().clone() };
162
82
        q.set_handler(mq_handler.clone());
163
82
        q.set_msg_handler(mq_handler.clone());
164
82
        if let Err(e) = q.connect() {
165
            return Err(e.to_string());
166
82
        }
167
82
        let mut q = { mgr.dldata_resp.lock().unwrap().clone() };
168
82
        q.set_handler(mq_handler.clone());
169
82
        if let Err(e) = q.connect() {
170
            return Err(e.to_string());
171
82
        }
172
82
        let mut q = { mgr.dldata_result.lock().unwrap().clone() };
173
82
        q.set_handler(mq_handler.clone());
174
82
        if let Err(e) = q.connect() {
175
            return Err(e.to_string());
176
82
        }
177
82
        match conn {
178
58
            Connection::Amqp(_, counter) => {
179
58
                *counter.lock().unwrap() += 4;
180
58
            }
181
24
            Connection::Mqtt(_, counter) => {
182
24
                *counter.lock().unwrap() += 4;
183
24
            }
184
        }
185
82
        Ok(mgr)
186
108
    }
187

            
188
    /// The associated unit ID of the application.
189
136
    pub fn unit_id(&self) -> &str {
190
136
        self.opts.unit_id.as_str()
191
136
    }
192

            
193
    /// The associated unit code of the application.
194
54
    pub fn unit_code(&self) -> &str {
195
54
        self.opts.unit_code.as_str()
196
54
    }
197

            
198
    /// The application ID.
199
28
    pub fn id(&self) -> &str {
200
28
        self.opts.id.as_str()
201
28
    }
202

            
203
    /// The application code.
204
28
    pub fn name(&self) -> &str {
205
28
        self.opts.name.as_str()
206
28
    }
207

            
208
    /// Manager status.
209
238
    pub fn status(&self) -> MgrStatus {
210
238
        *self.status.lock().unwrap()
211
238
    }
212

            
213
    /// Detail status of each message queue.
214
8
    pub fn mq_status(&self) -> MgrMqStatus {
215
8
        MgrMqStatus {
216
8
            uldata: { self.uldata.lock().unwrap().status() },
217
8
            dldata: { self.dldata.lock().unwrap().status() },
218
8
            dldata_resp: { self.dldata_resp.lock().unwrap().status() },
219
8
            dldata_result: { self.dldata_result.lock().unwrap().status() },
220
8
            ctrl: QueueStatus::Closed,
221
8
        }
222
8
    }
223

            
224
    /// To close the manager queues.
225
98
    pub async fn close(&self) -> Result<(), Box<dyn StdError + Send + Sync>> {
226
98
        let mut q = { self.uldata.lock().unwrap().clone() };
227
98
        q.close().await?;
228
98
        let mut q = { self.dldata.lock().unwrap().clone() };
229
98
        q.close().await?;
230
98
        let mut q = { self.dldata_resp.lock().unwrap().clone() };
231
98
        q.close().await?;
232
98
        let mut q = { self.dldata_result.lock().unwrap().clone() };
233
98
        q.close().await?;
234

            
235
98
        remove_connection(&self.conn_pool, &self.host_uri, 4).await
236
98
    }
237

            
238
    /// Send uplink data to the application.
239
84
    pub fn send_uldata(&self, data: &UlData) -> Result<(), Box<dyn StdError>> {
240
84
        let payload = serde_json::to_vec(data)?;
241
84
        let queue = { (*self.uldata.lock().unwrap()).clone() };
242
84
        task::spawn(async move {
243
84
            let _ = queue.send_msg(payload).await;
244
84
        });
245
84
        Ok(())
246
84
    }
247

            
248
    /// Send downlink response for a downlink data to the application.
249
68
    pub async fn send_dldata_resp(
250
68
        &self,
251
68
        data: &DlDataResp,
252
68
    ) -> Result<(), Box<dyn StdError + Send + Sync>> {
253
68
        let payload = serde_json::to_vec(data)?;
254
68
        let queue = { (*self.dldata_resp.lock().unwrap()).clone() };
255
68
        queue.send_msg(payload).await
256
68
    }
257

            
258
    /// Send the downlink data process result to the application.
259
44
    pub async fn send_dldata_result(
260
44
        &self,
261
44
        data: &DlDataResult,
262
44
    ) -> Result<(), Box<dyn StdError + Send + Sync>> {
263
44
        let payload = serde_json::to_vec(data)?;
264
44
        let queue = { (*self.dldata_result.lock().unwrap()).clone() };
265
44
        queue.send_msg(payload).await
266
44
    }
267
}
268

            
269
#[async_trait]
270
impl QueueEventHandler for MgrMqEventHandler {
271
    async fn on_error(&self, _queue: Arc<dyn GmqQueue>, _err: Box<dyn StdError + Send + Sync>) {}
272

            
273
502
    async fn on_status(&self, _queue: Arc<dyn GmqQueue>, _status: QueueStatus) {
274
502
        let status = match { self.mgr.uldata.lock().unwrap().status() } == QueueStatus::Connected
275
138
            && { self.mgr.dldata.lock().unwrap().status() } == QueueStatus::Connected
276
80
            && { self.mgr.dldata_resp.lock().unwrap().status() } == QueueStatus::Connected
277
64
            && { self.mgr.dldata_result.lock().unwrap().status() } == QueueStatus::Connected
278
        {
279
454
            false => MgrStatus::NotReady,
280
48
            true => MgrStatus::Ready,
281
        };
282

            
283
502
        let mut changed = false;
284
502
        {
285
502
            let mut mutex = self.mgr.status.lock().unwrap();
286
502
            if *mutex != status {
287
84
                *mutex = status;
288
84
                changed = true;
289
418
            }
290
        }
291
502
        if changed {
292
84
            let handler = { self.mgr.handler.lock().unwrap().clone() };
293
84
            handler.on_status_change(&self.mgr, status).await;
294
418
        }
295
1004
    }
296
}
297

            
298
#[async_trait]
299
impl MessageHandler for MgrMqEventHandler {
300
    // Validate and decode data.
301
72
    async fn on_message(&self, queue: Arc<dyn GmqQueue>, msg: Box<dyn Message>) {
302
        const FN_NAME: &'static str = "ApplicationMgr.on_message";
303

            
304
72
        let queue_name = queue.name();
305
72
        if queue_name.cmp(self.mgr.dldata.lock().unwrap().name()) == Ordering::Equal {
306
72
            let data = match parse_dldata_msg(msg.payload()) {
307
32
                Err(resp) => {
308
32
                    warn!("[{}] invalid format from {}", FN_NAME, queue_name);
309
32
                    if let Err(e) = msg.ack().await {
310
                        error!("[{}] ACK message error: {}", FN_NAME, e);
311
32
                    }
312
32
                    if let Err(e) = self.mgr.send_dldata_resp(&resp).await {
313
                        error!("[{}] send response error: {}", FN_NAME, e);
314
32
                    }
315
32
                    return;
316
                }
317
40
                Ok(data) => data,
318
40
            };
319
40
            let handler = { self.mgr.handler.lock().unwrap().clone() };
320
40
            match handler.on_dldata(&self.mgr, Box::new(data)).await {
321
                Err(_) => {
322
4
                    if let Err(e) = msg.nack().await {
323
                        error!("[{}] NACK message error: {}", FN_NAME, e);
324
4
                    }
325
                }
326
36
                Ok(resp) => {
327
36
                    if let Err(e) = msg.ack().await {
328
                        error!("[{}] ACK message error: {}", FN_NAME, e);
329
36
                    }
330
36
                    if let Err(e) = self.mgr.send_dldata_resp(resp.as_ref()).await {
331
                        error!("[{}] send response error: {}", FN_NAME, e);
332
36
                    }
333
                }
334
            }
335
        }
336
144
    }
337
}
338

            
339
/// Parses downlink data from the application and responds error for wrong format data.
340
72
fn parse_dldata_msg(msg: &[u8]) -> Result<DlData, DlDataResp> {
341
72
    let mut data = match serde_json::from_slice::<DlData>(msg) {
342
        Err(_) => {
343
4
            return Err(DlDataResp {
344
4
                correlation_id: "".to_string(),
345
4
                error: Some(err::E_PARAM.to_string()),
346
4
                message: Some("invalid format".to_string()),
347
4
                ..Default::default()
348
4
            });
349
        }
350
68
        Ok(data) => data,
351
68
    };
352
68

            
353
68
    if data.correlation_id.len() == 0 {
354
4
        return Err(DlDataResp {
355
4
            correlation_id: data.correlation_id.clone(),
356
4
            error: Some(err::E_PARAM.to_string()),
357
4
            message: Some("invalid `correlationId`".to_string()),
358
4
            ..Default::default()
359
4
        });
360
64
    }
361
64
    match data.device_id.as_ref() {
362
        None => {
363
42
            match data.network_code.as_ref() {
364
                None => {
365
4
                    return Err(DlDataResp {
366
4
                        correlation_id: data.correlation_id.clone(),
367
4
                        error: Some(err::E_PARAM.to_string()),
368
4
                        message: Some("missing `networkCode`".to_string()),
369
4
                        ..Default::default()
370
4
                    });
371
                }
372
38
                Some(code) => {
373
38
                    let code = code.to_lowercase();
374
38
                    match strings::is_code(code.as_str()) {
375
                        false => {
376
4
                            return Err(DlDataResp {
377
4
                                correlation_id: data.correlation_id.clone(),
378
4
                                error: Some(err::E_PARAM.to_string()),
379
4
                                message: Some("invalid `networkCode`".to_string()),
380
4
                                ..Default::default()
381
4
                            });
382
                        }
383
                        true => {
384
34
                            data.network_code = Some(code);
385
34
                            ()
386
34
                        }
387
34
                    }
388
34
                }
389
34
            }
390
34
            match data.network_addr.as_ref() {
391
                None => {
392
4
                    return Err(DlDataResp {
393
4
                        correlation_id: data.correlation_id.clone(),
394
4
                        error: Some(err::E_PARAM.to_string()),
395
4
                        message: Some("missing `networkAddr`".to_string()),
396
4
                        ..Default::default()
397
4
                    });
398
                }
399
30
                Some(addr) => match addr.len() {
400
                    0 => {
401
4
                        return Err(DlDataResp {
402
4
                            correlation_id: data.correlation_id.clone(),
403
4
                            error: Some(err::E_PARAM.to_string()),
404
4
                            message: Some("invalid `networkAddr`".to_string()),
405
4
                            ..Default::default()
406
4
                        });
407
                    }
408
                    _ => {
409
26
                        data.network_addr = Some(addr.to_lowercase());
410
26
                        ()
411
                    }
412
                },
413
            }
414
        }
415
22
        Some(device_id) => match device_id.len() {
416
            0 => {
417
4
                return Err(DlDataResp {
418
4
                    correlation_id: data.correlation_id.clone(),
419
4
                    error: Some(err::E_PARAM.to_string()),
420
4
                    message: Some("invalid `deviceId`".to_string()),
421
4
                    ..Default::default()
422
4
                });
423
            }
424
18
            _ => (),
425
        },
426
    }
427
44
    if data.data.len() > 0 {
428
44
        if let Err(_) = hex::decode(data.data.as_str()) {
429
4
            return Err(DlDataResp {
430
4
                correlation_id: data.correlation_id.clone(),
431
4
                error: Some(err::E_PARAM.to_string()),
432
4
                message: Some("invalid `data`".to_string()),
433
4
                ..Default::default()
434
4
            });
435
40
        }
436
40
        data.data = data.data.to_lowercase();
437
    }
438
40
    Ok(data)
439
72
}