1
use std::{
2
    collections::HashMap,
3
    error::Error as StdError,
4
    io::{Error as IoError, ErrorKind},
5
    sync::Arc,
6
    time::Duration,
7
};
8

            
9
use async_trait::async_trait;
10
use chrono::DateTime;
11
use log::{error, info, warn};
12
use serde::Deserialize;
13
use serde_json::{Map, Value};
14
use tokio::time;
15

            
16
use super::{super::config::DataData as DataMqConfig, new_data_queue, Connection};
17
use crate::models::{
18
    application_dldata::{
19
        ApplicationDlData, UpdateQueryCond as ApplicationDlDataCond,
20
        Updates as ApplicationDlDataUpdate,
21
    },
22
    application_uldata::ApplicationUlData,
23
    network_dldata::{
24
        NetworkDlData, UpdateQueryCond as NetworkDlDataCond, Updates as NetworkDlDataUpdate,
25
    },
26
    network_uldata::NetworkUlData,
27
    Model,
28
};
29
use general_mq::{
30
    queue::{EventHandler, GmqQueue, Message, MessageHandler, Status},
31
    Queue,
32
};
33

            
34
#[derive(Clone)]
35
struct DataHandler {
36
    model: Arc<dyn Model>,
37
}
38

            
39
#[derive(Deserialize)]
40
#[serde(tag = "kind")]
41
enum RecvDataMsg {
42
    #[serde(rename = "application-uldata")]
43
    AppUlData { data: AppUlData },
44
    #[serde(rename = "application-dldata")]
45
    AppDlData { data: AppDlData },
46
    #[serde(rename = "application-dldata-result")]
47
    AppDlDataResult { data: AppDlDataResult },
48
    #[serde(rename = "network-uldata")]
49
    NetUlData { data: NetUlData },
50
    #[serde(rename = "network-dldata")]
51
    NetDlData { data: NetDlData },
52
    #[serde(rename = "network-dldata-result")]
53
    NetDlDataResult { data: NetDlDataResult },
54
}
55

            
56
#[derive(Deserialize)]
57
struct AppUlData {
58
    #[serde(rename = "dataId")]
59
    data_id: String,
60
    proc: String,
61
    #[serde(rename = "pub")]
62
    publish: String,
63
    #[serde(rename = "unitCode")]
64
    unit_code: Option<String>,
65
    #[serde(rename = "networkCode")]
66
    network_code: String,
67
    #[serde(rename = "networkAddr")]
68
    network_addr: String,
69
    #[serde(rename = "unitId")]
70
    unit_id: String,
71
    #[serde(rename = "deviceId")]
72
    device_id: String,
73
    time: String,
74
    profile: String,
75
    data: String,
76
    extension: Option<Map<String, Value>>,
77
}
78

            
79
#[derive(Deserialize)]
80
struct AppDlData {
81
    #[serde(rename = "dataId")]
82
    data_id: String,
83
    proc: String,
84
    status: i32,
85
    #[serde(rename = "unitId")]
86
    unit_id: String,
87
    #[serde(rename = "deviceId")]
88
    device_id: Option<String>,
89
    #[serde(rename = "networkCode")]
90
    network_code: Option<String>,
91
    #[serde(rename = "networkAddr")]
92
    network_addr: Option<String>,
93
    profile: String,
94
    data: String,
95
    extension: Option<Map<String, Value>>,
96
}
97

            
98
#[derive(Deserialize)]
99
struct AppDlDataResult {
100
    #[serde(rename = "dataId")]
101
    data_id: String,
102
    resp: String,
103
    status: i32,
104
}
105

            
106
#[derive(Deserialize)]
107
struct NetUlData {
108
    #[serde(rename = "dataId")]
109
    data_id: String,
110
    proc: String,
111
    #[serde(rename = "unitCode")]
112
    unit_code: Option<String>,
113
    #[serde(rename = "networkCode")]
114
    network_code: String,
115
    #[serde(rename = "networkAddr")]
116
    network_addr: String,
117
    #[serde(rename = "unitId")]
118
    unit_id: Option<String>,
119
    #[serde(rename = "deviceId")]
120
    device_id: Option<String>,
121
    time: String,
122
    profile: String,
123
    data: String,
124
    extension: Option<Map<String, Value>>,
125
}
126

            
127
#[derive(Deserialize)]
128
struct NetDlData {
129
    #[serde(rename = "dataId")]
130
    data_id: String,
131
    proc: String,
132
    #[serde(rename = "pub")]
133
    publish: String,
134
    status: i32,
135
    #[serde(rename = "unitId")]
136
    unit_id: String,
137
    #[serde(rename = "deviceId")]
138
    device_id: String,
139
    #[serde(rename = "networkCode")]
140
    network_code: String,
141
    #[serde(rename = "networkAddr")]
142
    network_addr: String,
143
    profile: String,
144
    data: String,
145
    extension: Option<Map<String, Value>>,
146
}
147

            
148
#[derive(Deserialize)]
149
struct NetDlDataResult {
150
    #[serde(rename = "dataId")]
151
    data_id: String,
152
    resp: String,
153
    status: i32,
154
}
155

            
156
const QUEUE_NAME: &'static str = "broker.data";
157

            
158
/// Create a receive queue to receive data from `broker.data` queue.
159
38
pub fn new(
160
38
    model: Arc<dyn Model>,
161
38
    mq_conns: &mut HashMap<String, Connection>,
162
38
    config: &DataMqConfig,
163
38
) -> Result<Queue, Box<dyn StdError>> {
164
38
    let handler = Arc::new(DataHandler { model });
165
38
    match new_data_queue(mq_conns, config, QUEUE_NAME, handler.clone(), handler) {
166
6
        Err(e) => Err(Box::new(IoError::new(ErrorKind::Other, e))),
167
32
        Ok(q) => Ok(q),
168
    }
169
38
}
170

            
171
#[async_trait]
172
impl EventHandler for DataHandler {
173
    async fn on_error(&self, queue: Arc<dyn GmqQueue>, err: Box<dyn StdError + Send + Sync>) {
174
        const FN_NAME: &'static str = "DataHandler::on_error";
175
        let queue_name = queue.name();
176
        error!("[{}] {} error: {}", FN_NAME, queue_name, err);
177
    }
178

            
179
56
    async fn on_status(&self, queue: Arc<dyn GmqQueue>, status: Status) {
180
        const FN_NAME: &'static str = "DataHandler::on_status";
181
56
        let queue_name = queue.name();
182
56

            
183
56
        match status {
184
32
            Status::Connected => info!("[{}] {} connected", queue_name, FN_NAME),
185
24
            _ => warn!("[{}] {} status to {:?}", FN_NAME, queue_name, status),
186
        }
187
112
    }
188
}
189

            
190
#[async_trait]
191
impl MessageHandler for DataHandler {
192
68
    async fn on_message(&self, queue: Arc<dyn GmqQueue>, msg: Box<dyn Message>) {
193
        const FN_NAME: &'static str = "DataHandler::on_message";
194
68
        let queue_name = queue.name();
195

            
196
68
        let data_msg = match serde_json::from_slice::<RecvDataMsg>(msg.payload()) {
197
8
            Err(e) => {
198
8
                let src_str: String = String::from_utf8_lossy(msg.payload()).into();
199
8
                warn!(
200
                    "[{}] {} parse JSON error: {}, src: {}",
201
                    FN_NAME, queue_name, e, src_str
202
                );
203
8
                if let Err(e) = msg.ack().await {
204
                    error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
205
8
                }
206
8
                return;
207
            }
208
60
            Ok(msg) => msg,
209
60
        };
210
60
        match data_msg {
211
6
            RecvDataMsg::AppDlData { data } => {
212
4
                let data = ApplicationDlData {
213
6
                    data_id: data.data_id,
214
6
                    proc: match DateTime::parse_from_rfc3339(data.proc.as_str()) {
215
2
                        Err(e) => {
216
2
                            warn!(
217
                                "[{}] {} parse application_dldata proc \"{}\" error: {}",
218
                                FN_NAME, queue_name, data.proc, e
219
                            );
220
2
                            if let Err(e) = msg.ack().await {
221
                                error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
222
2
                            }
223
2
                            return;
224
                        }
225
4
                        Ok(proc) => proc.into(),
226
4
                    },
227
4
                    resp: None,
228
4
                    status: data.status,
229
4
                    unit_id: data.unit_id,
230
4
                    device_id: data.device_id,
231
4
                    network_code: data.network_code,
232
4
                    network_addr: data.network_addr,
233
4
                    profile: data.profile,
234
4
                    data: data.data,
235
4
                    extension: data.extension,
236
4
                };
237
4
                let mut is_err = false;
238
4
                if let Err(e) = self.model.application_dldata().add(&data).await {
239
                    error!(
240
                        "[{}] {} add application_dldata error: {}",
241
                        FN_NAME, queue_name, e
242
                    );
243
                    is_err = true;
244
4
                }
245
4
                if is_err {
246
                    time::sleep(Duration::from_secs(1)).await;
247
                    if let Err(e) = msg.nack().await {
248
                        error!("[{}] {} NACK error: {}", FN_NAME, queue_name, e);
249
                    }
250
                    return;
251
4
                }
252
            }
253
20
            RecvDataMsg::AppDlDataResult { data } => {
254
20
                // FIXME: wait 1 second to wait for the associated dldata has been written in DB.
255
20
                time::sleep(Duration::from_secs(1)).await;
256

            
257
20
                let cond = ApplicationDlDataCond {
258
20
                    data_id: data.data_id.as_str(),
259
20
                };
260
14
                let updates = ApplicationDlDataUpdate {
261
20
                    resp: match DateTime::parse_from_rfc3339(data.resp.as_str()) {
262
6
                        Err(e) => {
263
6
                            warn!(
264
                                "[{}] {} parse application_dldata resp \"{}\" error: {}",
265
                                FN_NAME, queue_name, data.resp, e
266
                            );
267
6
                            if let Err(e) = msg.ack().await {
268
5
                                error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
269
1
                            }
270
6
                            return;
271
                        }
272
14
                        Ok(resp) => resp.into(),
273
14
                    },
274
14
                    status: data.status,
275
14
                };
276
14
                let mut is_err = false;
277
14
                if let Err(e) = self
278
14
                    .model
279
14
                    .application_dldata()
280
14
                    .update(&cond, &updates)
281
14
                    .await
282
                {
283
                    error!(
284
                        "[{}] {} update application_dldata error: {}",
285
                        FN_NAME, queue_name, e
286
                    );
287
                    is_err = true;
288
14
                }
289
14
                if is_err {
290
                    time::sleep(Duration::from_secs(1)).await;
291
                    if let Err(e) = msg.nack().await {
292
                        error!("[{}] {} NACK error: {}", FN_NAME, queue_name, e);
293
                    }
294
                    return;
295
14
                }
296
            }
297
10
            RecvDataMsg::AppUlData { data } => {
298
4
                let data = ApplicationUlData {
299
10
                    data_id: data.data_id,
300
10
                    proc: match DateTime::parse_from_rfc3339(data.proc.as_str()) {
301
2
                        Err(e) => {
302
2
                            warn!(
303
                                "[{}] {} parse application_uldata proc \"{}\" error: {}",
304
                                FN_NAME, queue_name, data.proc, e
305
                            );
306
2
                            if let Err(e) = msg.ack().await {
307
                                error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
308
2
                            }
309
2
                            return;
310
                        }
311
8
                        Ok(proc) => proc.into(),
312
8
                    },
313
8
                    publish: match DateTime::parse_from_rfc3339(data.publish.as_str()) {
314
2
                        Err(e) => {
315
2
                            warn!(
316
                                "[{}] {} parse application_uldata publish \"{}\" error: {}",
317
                                FN_NAME, queue_name, data.publish, e
318
                            );
319
2
                            if let Err(e) = msg.ack().await {
320
                                error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
321
2
                            }
322
2
                            return;
323
                        }
324
6
                        Ok(publish) => publish.into(),
325
6
                    },
326
6
                    unit_code: data.unit_code,
327
6
                    network_code: data.network_code,
328
6
                    network_addr: data.network_addr,
329
6
                    unit_id: data.unit_id,
330
6
                    device_id: data.device_id,
331
6
                    time: match DateTime::parse_from_rfc3339(data.time.as_str()) {
332
2
                        Err(e) => {
333
2
                            warn!(
334
                                "[{}] {} parse application_uldata time \"{}\" error: {}",
335
                                FN_NAME, queue_name, data.time, e
336
                            );
337
2
                            if let Err(e) = msg.ack().await {
338
                                error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
339
2
                            }
340
2
                            return;
341
                        }
342
4
                        Ok(time) => time.into(),
343
4
                    },
344
4
                    profile: data.profile,
345
4
                    data: data.data,
346
4
                    extension: data.extension,
347
4
                };
348
4
                let mut is_err = false;
349
4
                if let Err(e) = self.model.application_uldata().add(&data).await {
350
                    error!(
351
                        "[{}] {} add application_uldata error: {}",
352
                        FN_NAME, queue_name, e
353
                    );
354
                    is_err = true;
355
4
                }
356
4
                if is_err {
357
                    time::sleep(Duration::from_secs(1)).await;
358
                    if let Err(e) = msg.nack().await {
359
                        error!("[{}] {} NACK error: {}", FN_NAME, queue_name, e);
360
                    }
361
                    return;
362
4
                }
363
            }
364
8
            RecvDataMsg::NetDlData { data } => {
365
4
                let data = NetworkDlData {
366
8
                    data_id: data.data_id,
367
8
                    proc: match DateTime::parse_from_rfc3339(data.proc.as_str()) {
368
2
                        Err(e) => {
369
2
                            warn!(
370
                                "[{}] {} parse network_dldata proc \"{}\" error: {}",
371
                                FN_NAME, queue_name, data.proc, e
372
                            );
373
2
                            if let Err(e) = msg.ack().await {
374
                                error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
375
2
                            }
376
2
                            return;
377
                        }
378
6
                        Ok(proc) => proc.into(),
379
6
                    },
380
6
                    publish: match DateTime::parse_from_rfc3339(data.publish.as_str()) {
381
2
                        Err(e) => {
382
2
                            warn!(
383
                                "[{}] {} parse network_dldata publish \"{}\" error: {}",
384
                                FN_NAME, queue_name, data.publish, e
385
                            );
386
2
                            if let Err(e) = msg.ack().await {
387
                                error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
388
2
                            }
389
2
                            return;
390
                        }
391
4
                        Ok(publish) => publish.into(),
392
4
                    },
393
4
                    resp: None,
394
4
                    status: data.status,
395
4
                    unit_id: data.unit_id,
396
4
                    device_id: data.device_id,
397
4
                    network_code: data.network_code,
398
4
                    network_addr: data.network_addr,
399
4
                    profile: data.profile,
400
4
                    data: data.data,
401
4
                    extension: data.extension,
402
4
                };
403
4
                let mut is_err = false;
404
4
                if let Err(e) = self.model.network_dldata().add(&data).await {
405
                    error!(
406
                        "[{}] {} add network_dldata error: {}",
407
                        FN_NAME, queue_name, e
408
                    );
409
                    is_err = true;
410
4
                }
411
4
                if is_err {
412
                    time::sleep(Duration::from_secs(1)).await;
413
                    if let Err(e) = msg.nack().await {
414
                        error!("[{}] {} NACK error: {}", FN_NAME, queue_name, e);
415
                    }
416
                    return;
417
4
                }
418
            }
419
8
            RecvDataMsg::NetDlDataResult { data } => {
420
8
                // FIXME: wait 1 second to wait for the associated dldata has been written in DB.
421
8
                time::sleep(Duration::from_secs(1)).await;
422

            
423
8
                let cond = NetworkDlDataCond {
424
8
                    data_id: data.data_id.as_str(),
425
8
                };
426
6
                let updates = NetworkDlDataUpdate {
427
8
                    resp: match DateTime::parse_from_rfc3339(data.resp.as_str()) {
428
2
                        Err(e) => {
429
2
                            warn!(
430
                                "[{}] {} parse network_dldata resp \"{}\" error: {}",
431
                                FN_NAME, queue_name, data.resp, e
432
                            );
433
2
                            if let Err(e) = msg.ack().await {
434
1
                                error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
435
1
                            }
436
2
                            return;
437
                        }
438
6
                        Ok(resp) => resp.into(),
439
6
                    },
440
6
                    status: data.status,
441
6
                };
442
6
                let mut is_err = false;
443
6
                if let Err(e) = self.model.network_dldata().update(&cond, &updates).await {
444
                    error!(
445
                        "[{}] {} update network_dldata error: {}",
446
                        FN_NAME, queue_name, e
447
                    );
448
                    is_err = true;
449
6
                }
450
6
                if is_err {
451
                    time::sleep(Duration::from_secs(1)).await;
452
                    if let Err(e) = msg.nack().await {
453
                        error!("[{}] {} NACK error: {}", FN_NAME, queue_name, e);
454
                    }
455
                    return;
456
6
                }
457
            }
458
8
            RecvDataMsg::NetUlData { data } => {
459
4
                let data = NetworkUlData {
460
8
                    data_id: data.data_id,
461
8
                    proc: match DateTime::parse_from_rfc3339(data.proc.as_str()) {
462
2
                        Err(e) => {
463
2
                            warn!(
464
                                "[{}] {} parse network_uldata proc \"{}\" error: {}",
465
                                FN_NAME, queue_name, data.proc, e
466
                            );
467
2
                            if let Err(e) = msg.ack().await {
468
                                error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
469
2
                            }
470
2
                            return;
471
                        }
472
6
                        Ok(proc) => proc.into(),
473
6
                    },
474
6
                    unit_code: data.unit_code,
475
6
                    network_code: data.network_code,
476
6
                    network_addr: data.network_addr,
477
6
                    unit_id: data.unit_id,
478
6
                    device_id: data.device_id,
479
6
                    time: match DateTime::parse_from_rfc3339(data.time.as_str()) {
480
2
                        Err(e) => {
481
2
                            warn!(
482
                                "[{}] {} parse network_uldata time \"{}\" error: {}",
483
                                FN_NAME, queue_name, data.time, e
484
                            );
485
2
                            if let Err(e) = msg.ack().await {
486
                                error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
487
2
                            }
488
2
                            return;
489
                        }
490
4
                        Ok(time) => time.into(),
491
4
                    },
492
4
                    profile: data.profile,
493
4
                    data: data.data,
494
4
                    extension: data.extension,
495
4
                };
496
4
                let mut is_err = false;
497
4
                if let Err(e) = self.model.network_uldata().add(&data).await {
498
                    error!(
499
                        "[{}] {} add network_uldata error: {}",
500
                        FN_NAME, queue_name, e
501
                    );
502
                    is_err = true;
503
4
                }
504
4
                if is_err {
505
                    time::sleep(Duration::from_secs(1)).await;
506
                    if let Err(e) = msg.nack().await {
507
                        error!("[{}] {} NACK error: {}", FN_NAME, queue_name, e);
508
                    }
509
                    return;
510
4
                }
511
            }
512
        }
513
36
        if let Err(e) = msg.ack().await {
514
16
            error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
515
20
        }
516
136
    }
517
}