1
use std::{
2
    collections::HashMap,
3
    error::Error as StdError,
4
    io::{Error as IoError, ErrorKind},
5
    sync::Arc,
6
    time::Duration,
7
};
8

            
9
use async_trait::async_trait;
10
use chrono::DateTime;
11
use log::{error, info, warn};
12
use serde::Deserialize;
13
use serde_json::{Map, Value};
14
use tokio::time;
15

            
16
use super::{super::config::DataData as DataMqConfig, new_data_queue, Connection};
17
use crate::models::{
18
    application_dldata::{
19
        ApplicationDlData, UpdateQueryCond as ApplicationDlDataCond,
20
        Updates as ApplicationDlDataUpdate,
21
    },
22
    application_uldata::ApplicationUlData,
23
    network_dldata::{
24
        NetworkDlData, UpdateQueryCond as NetworkDlDataCond, Updates as NetworkDlDataUpdate,
25
    },
26
    network_uldata::NetworkUlData,
27
    Model,
28
};
29
use general_mq::{
30
    queue::{EventHandler, GmqQueue, Message, MessageHandler, Status},
31
    Queue,
32
};
33

            
34
#[derive(Clone)]
35
struct DataHandler {
36
    model: Arc<dyn Model>,
37
}
38

            
39
#[derive(Deserialize)]
40
#[serde(tag = "kind")]
41
enum RecvDataMsg {
42
    #[serde(rename = "application-uldata")]
43
    AppUlData { data: AppUlData },
44
    #[serde(rename = "application-dldata")]
45
    AppDlData { data: AppDlData },
46
    #[serde(rename = "application-dldata-result")]
47
    AppDlDataResult { data: AppDlDataResult },
48
    #[serde(rename = "network-uldata")]
49
    NetUlData { data: NetUlData },
50
    #[serde(rename = "network-dldata")]
51
    NetDlData { data: NetDlData },
52
    #[serde(rename = "network-dldata-result")]
53
    NetDlDataResult { data: NetDlDataResult },
54
}
55

            
56
#[derive(Deserialize)]
57
struct AppUlData {
58
    #[serde(rename = "dataId")]
59
    data_id: String,
60
    proc: String,
61
    #[serde(rename = "pub")]
62
    publish: String,
63
    #[serde(rename = "unitCode")]
64
    unit_code: Option<String>,
65
    #[serde(rename = "networkCode")]
66
    network_code: String,
67
    #[serde(rename = "networkAddr")]
68
    network_addr: String,
69
    #[serde(rename = "unitId")]
70
    unit_id: String,
71
    #[serde(rename = "deviceId")]
72
    device_id: String,
73
    time: String,
74
    profile: String,
75
    data: String,
76
    extension: Option<Map<String, Value>>,
77
}
78

            
79
#[derive(Deserialize)]
80
struct AppDlData {
81
    #[serde(rename = "dataId")]
82
    data_id: String,
83
    proc: String,
84
    status: i32,
85
    #[serde(rename = "unitId")]
86
    unit_id: String,
87
    #[serde(rename = "deviceId")]
88
    device_id: Option<String>,
89
    #[serde(rename = "networkCode")]
90
    network_code: Option<String>,
91
    #[serde(rename = "networkAddr")]
92
    network_addr: Option<String>,
93
    profile: String,
94
    data: String,
95
    extension: Option<Map<String, Value>>,
96
}
97

            
98
#[derive(Deserialize)]
99
struct AppDlDataResult {
100
    #[serde(rename = "dataId")]
101
    data_id: String,
102
    resp: String,
103
    status: i32,
104
}
105

            
106
#[derive(Deserialize)]
107
struct NetUlData {
108
    #[serde(rename = "dataId")]
109
    data_id: String,
110
    proc: String,
111
    #[serde(rename = "unitCode")]
112
    unit_code: Option<String>,
113
    #[serde(rename = "networkCode")]
114
    network_code: String,
115
    #[serde(rename = "networkAddr")]
116
    network_addr: String,
117
    #[serde(rename = "unitId")]
118
    unit_id: Option<String>,
119
    #[serde(rename = "deviceId")]
120
    device_id: Option<String>,
121
    time: String,
122
    profile: String,
123
    data: String,
124
    extension: Option<Map<String, Value>>,
125
}
126

            
127
#[derive(Deserialize)]
128
struct NetDlData {
129
    #[serde(rename = "dataId")]
130
    data_id: String,
131
    proc: String,
132
    #[serde(rename = "pub")]
133
    publish: String,
134
    status: i32,
135
    #[serde(rename = "unitId")]
136
    unit_id: String,
137
    #[serde(rename = "deviceId")]
138
    device_id: String,
139
    #[serde(rename = "networkCode")]
140
    network_code: String,
141
    #[serde(rename = "networkAddr")]
142
    network_addr: String,
143
    profile: String,
144
    data: String,
145
    extension: Option<Map<String, Value>>,
146
}
147

            
148
#[derive(Deserialize)]
149
struct NetDlDataResult {
150
    #[serde(rename = "dataId")]
151
    data_id: String,
152
    resp: String,
153
    status: i32,
154
}
155

            
156
const QUEUE_NAME: &'static str = "broker.data";
157

            
158
/// Create a receive queue to receive data from `broker.data` queue.
159
76
pub fn new(
160
76
    model: Arc<dyn Model>,
161
76
    mq_conns: &mut HashMap<String, Connection>,
162
76
    config: &DataMqConfig,
163
76
) -> Result<Queue, Box<dyn StdError>> {
164
76
    let handler = Arc::new(DataHandler { model });
165
76
    match new_data_queue(mq_conns, config, QUEUE_NAME, handler.clone(), handler) {
166
12
        Err(e) => Err(Box::new(IoError::new(ErrorKind::Other, e))),
167
64
        Ok(q) => Ok(q),
168
    }
169
76
}
170

            
171
#[async_trait]
172
impl EventHandler for DataHandler {
173
    async fn on_error(&self, queue: Arc<dyn GmqQueue>, err: Box<dyn StdError + Send + Sync>) {
174
        const FN_NAME: &'static str = "DataHandler::on_error";
175
        let queue_name = queue.name();
176
        error!("[{}] {} error: {}", FN_NAME, queue_name, err);
177
    }
178

            
179
112
    async fn on_status(&self, queue: Arc<dyn GmqQueue>, status: Status) {
180
        const FN_NAME: &'static str = "DataHandler::on_status";
181
112
        let queue_name = queue.name();
182
112

            
183
112
        match status {
184
64
            Status::Connected => info!("[{}] {} connected", queue_name, FN_NAME),
185
48
            _ => warn!("[{}] {} status to {:?}", FN_NAME, queue_name, status),
186
        }
187
224
    }
188
}
189

            
190
#[async_trait]
191
impl MessageHandler for DataHandler {
192
136
    async fn on_message(&self, queue: Arc<dyn GmqQueue>, msg: Box<dyn Message>) {
193
        const FN_NAME: &'static str = "DataHandler::on_message";
194
136
        let queue_name = queue.name();
195

            
196
136
        let data_msg = match serde_json::from_slice::<RecvDataMsg>(msg.payload()) {
197
16
            Err(e) => {
198
16
                let src_str: String = String::from_utf8_lossy(msg.payload()).into();
199
16
                warn!(
200
                    "[{}] {} parse JSON error: {}, src: {}",
201
                    FN_NAME, queue_name, e, src_str
202
                );
203
16
                if let Err(e) = msg.ack().await {
204
                    error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
205
16
                }
206
16
                return;
207
            }
208
120
            Ok(msg) => msg,
209
120
        };
210
120
        match data_msg {
211
12
            RecvDataMsg::AppDlData { data } => {
212
8
                let data = ApplicationDlData {
213
12
                    data_id: data.data_id,
214
12
                    proc: match DateTime::parse_from_rfc3339(data.proc.as_str()) {
215
4
                        Err(e) => {
216
4
                            warn!(
217
                                "[{}] {} parse application_dldata proc \"{}\" error: {}",
218
                                FN_NAME, queue_name, data.proc, e
219
                            );
220
4
                            if let Err(e) = msg.ack().await {
221
                                error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
222
4
                            }
223
4
                            return;
224
                        }
225
8
                        Ok(proc) => proc.into(),
226
8
                    },
227
8
                    resp: None,
228
8
                    status: data.status,
229
8
                    unit_id: data.unit_id,
230
8
                    device_id: data.device_id,
231
8
                    network_code: data.network_code,
232
8
                    network_addr: data.network_addr,
233
8
                    profile: data.profile,
234
8
                    data: data.data,
235
8
                    extension: data.extension,
236
8
                };
237
8
                let mut is_err = false;
238
8
                if let Err(e) = self.model.application_dldata().add(&data).await {
239
                    error!(
240
                        "[{}] {} add application_dldata error: {}",
241
                        FN_NAME, queue_name, e
242
                    );
243
                    is_err = true;
244
8
                }
245
8
                if is_err {
246
                    time::sleep(Duration::from_secs(1)).await;
247
                    if let Err(e) = msg.nack().await {
248
                        error!("[{}] {} NACK error: {}", FN_NAME, queue_name, e);
249
                    }
250
                    return;
251
8
                }
252
            }
253
40
            RecvDataMsg::AppDlDataResult { data } => {
254
40
                // FIXME: wait 1 second to wait for the associated dldata has been written in DB.
255
40
                time::sleep(Duration::from_secs(1)).await;
256

            
257
40
                let cond = ApplicationDlDataCond {
258
40
                    data_id: data.data_id.as_str(),
259
40
                };
260
28
                let updates = ApplicationDlDataUpdate {
261
40
                    resp: match DateTime::parse_from_rfc3339(data.resp.as_str()) {
262
12
                        Err(e) => {
263
12
                            warn!(
264
                                "[{}] {} parse application_dldata resp \"{}\" error: {}",
265
                                FN_NAME, queue_name, data.resp, e
266
                            );
267
12
                            if let Err(e) = msg.ack().await {
268
10
                                error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
269
2
                            }
270
12
                            return;
271
                        }
272
28
                        Ok(resp) => resp.into(),
273
28
                    },
274
28
                    status: data.status,
275
28
                };
276
28
                let mut is_err = false;
277
28
                if let Err(e) = self
278
28
                    .model
279
28
                    .application_dldata()
280
28
                    .update(&cond, &updates)
281
28
                    .await
282
                {
283
                    error!(
284
                        "[{}] {} update application_dldata error: {}",
285
                        FN_NAME, queue_name, e
286
                    );
287
                    is_err = true;
288
28
                }
289
28
                if is_err {
290
                    time::sleep(Duration::from_secs(1)).await;
291
                    if let Err(e) = msg.nack().await {
292
                        error!("[{}] {} NACK error: {}", FN_NAME, queue_name, e);
293
                    }
294
                    return;
295
28
                }
296
            }
297
20
            RecvDataMsg::AppUlData { data } => {
298
8
                let data = ApplicationUlData {
299
20
                    data_id: data.data_id,
300
20
                    proc: match DateTime::parse_from_rfc3339(data.proc.as_str()) {
301
4
                        Err(e) => {
302
4
                            warn!(
303
                                "[{}] {} parse application_uldata proc \"{}\" error: {}",
304
                                FN_NAME, queue_name, data.proc, e
305
                            );
306
4
                            if let Err(e) = msg.ack().await {
307
                                error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
308
4
                            }
309
4
                            return;
310
                        }
311
16
                        Ok(proc) => proc.into(),
312
16
                    },
313
16
                    publish: match DateTime::parse_from_rfc3339(data.publish.as_str()) {
314
4
                        Err(e) => {
315
4
                            warn!(
316
                                "[{}] {} parse application_uldata publish \"{}\" error: {}",
317
                                FN_NAME, queue_name, data.publish, e
318
                            );
319
4
                            if let Err(e) = msg.ack().await {
320
                                error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
321
4
                            }
322
4
                            return;
323
                        }
324
12
                        Ok(publish) => publish.into(),
325
12
                    },
326
12
                    unit_code: data.unit_code,
327
12
                    network_code: data.network_code,
328
12
                    network_addr: data.network_addr,
329
12
                    unit_id: data.unit_id,
330
12
                    device_id: data.device_id,
331
12
                    time: match DateTime::parse_from_rfc3339(data.time.as_str()) {
332
4
                        Err(e) => {
333
4
                            warn!(
334
                                "[{}] {} parse application_uldata time \"{}\" error: {}",
335
                                FN_NAME, queue_name, data.time, e
336
                            );
337
4
                            if let Err(e) = msg.ack().await {
338
                                error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
339
4
                            }
340
4
                            return;
341
                        }
342
8
                        Ok(time) => time.into(),
343
8
                    },
344
8
                    profile: data.profile,
345
8
                    data: data.data,
346
8
                    extension: data.extension,
347
8
                };
348
8
                let mut is_err = false;
349
8
                if let Err(e) = self.model.application_uldata().add(&data).await {
350
                    error!(
351
                        "[{}] {} add application_uldata error: {}",
352
                        FN_NAME, queue_name, e
353
                    );
354
                    is_err = true;
355
8
                }
356
8
                if is_err {
357
                    time::sleep(Duration::from_secs(1)).await;
358
                    if let Err(e) = msg.nack().await {
359
                        error!("[{}] {} NACK error: {}", FN_NAME, queue_name, e);
360
                    }
361
                    return;
362
8
                }
363
            }
364
16
            RecvDataMsg::NetDlData { data } => {
365
8
                let data = NetworkDlData {
366
16
                    data_id: data.data_id,
367
16
                    proc: match DateTime::parse_from_rfc3339(data.proc.as_str()) {
368
4
                        Err(e) => {
369
4
                            warn!(
370
                                "[{}] {} parse network_dldata proc \"{}\" error: {}",
371
                                FN_NAME, queue_name, data.proc, e
372
                            );
373
4
                            if let Err(e) = msg.ack().await {
374
                                error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
375
4
                            }
376
4
                            return;
377
                        }
378
12
                        Ok(proc) => proc.into(),
379
12
                    },
380
12
                    publish: match DateTime::parse_from_rfc3339(data.publish.as_str()) {
381
4
                        Err(e) => {
382
4
                            warn!(
383
                                "[{}] {} parse network_dldata publish \"{}\" error: {}",
384
                                FN_NAME, queue_name, data.publish, e
385
                            );
386
4
                            if let Err(e) = msg.ack().await {
387
                                error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
388
4
                            }
389
4
                            return;
390
                        }
391
8
                        Ok(publish) => publish.into(),
392
8
                    },
393
8
                    resp: None,
394
8
                    status: data.status,
395
8
                    unit_id: data.unit_id,
396
8
                    device_id: data.device_id,
397
8
                    network_code: data.network_code,
398
8
                    network_addr: data.network_addr,
399
8
                    profile: data.profile,
400
8
                    data: data.data,
401
8
                    extension: data.extension,
402
8
                };
403
8
                let mut is_err = false;
404
8
                if let Err(e) = self.model.network_dldata().add(&data).await {
405
                    error!(
406
                        "[{}] {} add network_dldata error: {}",
407
                        FN_NAME, queue_name, e
408
                    );
409
                    is_err = true;
410
8
                }
411
8
                if is_err {
412
                    time::sleep(Duration::from_secs(1)).await;
413
                    if let Err(e) = msg.nack().await {
414
                        error!("[{}] {} NACK error: {}", FN_NAME, queue_name, e);
415
                    }
416
                    return;
417
8
                }
418
            }
419
16
            RecvDataMsg::NetDlDataResult { data } => {
420
16
                // FIXME: wait 1 second to wait for the associated dldata has been written in DB.
421
16
                time::sleep(Duration::from_secs(1)).await;
422

            
423
16
                let cond = NetworkDlDataCond {
424
16
                    data_id: data.data_id.as_str(),
425
16
                };
426
12
                let updates = NetworkDlDataUpdate {
427
16
                    resp: match DateTime::parse_from_rfc3339(data.resp.as_str()) {
428
4
                        Err(e) => {
429
4
                            warn!(
430
                                "[{}] {} parse network_dldata resp \"{}\" error: {}",
431
                                FN_NAME, queue_name, data.resp, e
432
                            );
433
4
                            if let Err(e) = msg.ack().await {
434
2
                                error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
435
2
                            }
436
4
                            return;
437
                        }
438
12
                        Ok(resp) => resp.into(),
439
12
                    },
440
12
                    status: data.status,
441
12
                };
442
12
                let mut is_err = false;
443
12
                if let Err(e) = self.model.network_dldata().update(&cond, &updates).await {
444
                    error!(
445
                        "[{}] {} update network_dldata error: {}",
446
                        FN_NAME, queue_name, e
447
                    );
448
                    is_err = true;
449
12
                }
450
12
                if is_err {
451
                    time::sleep(Duration::from_secs(1)).await;
452
                    if let Err(e) = msg.nack().await {
453
                        error!("[{}] {} NACK error: {}", FN_NAME, queue_name, e);
454
                    }
455
                    return;
456
12
                }
457
            }
458
16
            RecvDataMsg::NetUlData { data } => {
459
8
                let data = NetworkUlData {
460
16
                    data_id: data.data_id,
461
16
                    proc: match DateTime::parse_from_rfc3339(data.proc.as_str()) {
462
4
                        Err(e) => {
463
4
                            warn!(
464
                                "[{}] {} parse network_uldata proc \"{}\" error: {}",
465
                                FN_NAME, queue_name, data.proc, e
466
                            );
467
4
                            if let Err(e) = msg.ack().await {
468
                                error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
469
4
                            }
470
4
                            return;
471
                        }
472
12
                        Ok(proc) => proc.into(),
473
12
                    },
474
12
                    unit_code: data.unit_code,
475
12
                    network_code: data.network_code,
476
12
                    network_addr: data.network_addr,
477
12
                    unit_id: data.unit_id,
478
12
                    device_id: data.device_id,
479
12
                    time: match DateTime::parse_from_rfc3339(data.time.as_str()) {
480
4
                        Err(e) => {
481
4
                            warn!(
482
                                "[{}] {} parse network_uldata time \"{}\" error: {}",
483
                                FN_NAME, queue_name, data.time, e
484
                            );
485
4
                            if let Err(e) = msg.ack().await {
486
                                error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
487
4
                            }
488
4
                            return;
489
                        }
490
8
                        Ok(time) => time.into(),
491
8
                    },
492
8
                    profile: data.profile,
493
8
                    data: data.data,
494
8
                    extension: data.extension,
495
8
                };
496
8
                let mut is_err = false;
497
8
                if let Err(e) = self.model.network_uldata().add(&data).await {
498
                    error!(
499
                        "[{}] {} add network_uldata error: {}",
500
                        FN_NAME, queue_name, e
501
                    );
502
                    is_err = true;
503
8
                }
504
8
                if is_err {
505
                    time::sleep(Duration::from_secs(1)).await;
506
                    if let Err(e) = msg.nack().await {
507
                        error!("[{}] {} NACK error: {}", FN_NAME, queue_name, e);
508
                    }
509
                    return;
510
8
                }
511
            }
512
        }
513
72
        if let Err(e) = msg.ack().await {
514
32
            error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
515
40
        }
516
272
    }
517
}