1
use std::{
2
    collections::HashMap,
3
    error::Error as StdError,
4
    io::{Error as IoError, ErrorKind},
5
    sync::Arc,
6
    time::Duration,
7
};
8

            
9
use async_trait::async_trait;
10
use chrono::DateTime;
11
use log::{error, info, warn};
12
use serde::Deserialize;
13
use serde_json::{Map, Value};
14
use tokio::time;
15

            
16
use general_mq::{
17
    queue::{EventHandler, GmqQueue, Message, MessageHandler, Status},
18
    Queue,
19
};
20

            
21
use super::{super::config::DataData as DataMqConfig, new_data_queue, Connection};
22
use crate::models::{coremgr_opdata::CoremgrOpData, Model};
23

            
24
#[derive(Clone)]
25
struct DataHandler {
26
    model: Arc<dyn Model>,
27
}
28

            
29
#[derive(Deserialize)]
30
#[serde(tag = "kind")]
31
enum RecvDataMsg {
32
    #[serde(rename = "operation")]
33
    Operation { data: CmOpData },
34
}
35

            
36
#[derive(Deserialize)]
37
struct CmOpData {
38
    #[serde(rename = "dataId")]
39
    data_id: String,
40
    #[serde(rename = "reqTime")]
41
    req_time: String,
42
    #[serde(rename = "resTime")]
43
    res_time: String,
44
    #[serde(rename = "latencyMs")]
45
    latency_ms: i64,
46
    status: i32,
47
    #[serde(rename = "sourceIp")]
48
    source_ip: String,
49
    method: String,
50
    path: String,
51
    body: Option<Map<String, Value>>,
52
    #[serde(rename = "userId")]
53
    user_id: String,
54
    #[serde(rename = "clientId")]
55
    client_id: String,
56
    #[serde(rename = "errCode")]
57
    err_code: Option<String>,
58
    #[serde(rename = "errMessage")]
59
    err_message: Option<String>,
60
}
61

            
62
const QUEUE_NAME: &'static str = "coremgr.data";
63

            
64
/// Create a receive queue to receive data from `coremgr.data` queue.
65
52
pub fn new(
66
52
    model: Arc<dyn Model>,
67
52
    mq_conns: &mut HashMap<String, Connection>,
68
52
    config: &DataMqConfig,
69
52
) -> Result<Queue, Box<dyn StdError>> {
70
52
    let handler = Arc::new(DataHandler { model });
71
52
    match new_data_queue(mq_conns, config, QUEUE_NAME, handler.clone(), handler) {
72
12
        Err(e) => Err(Box::new(IoError::new(ErrorKind::Other, e))),
73
40
        Ok(q) => Ok(q),
74
    }
75
52
}
76

            
77
#[async_trait]
78
impl EventHandler for DataHandler {
79
    async fn on_error(&self, queue: Arc<dyn GmqQueue>, err: Box<dyn StdError + Send + Sync>) {
80
        const FN_NAME: &'static str = "DataHandler::on_error";
81
        let queue_name = queue.name();
82
        error!("[{}] {} error: {}", FN_NAME, queue_name, err);
83
    }
84

            
85
64
    async fn on_status(&self, queue: Arc<dyn GmqQueue>, status: Status) {
86
        const FN_NAME: &'static str = "DataHandler::on_status";
87
64
        let queue_name = queue.name();
88
64

            
89
64
        match status {
90
40
            Status::Connected => info!("[{}] {} connected", queue_name, FN_NAME),
91
24
            _ => warn!("[{}] {} status to {:?}", FN_NAME, queue_name, status),
92
        }
93
128
    }
94
}
95

            
96
#[async_trait]
97
impl MessageHandler for DataHandler {
98
20
    async fn on_message(&self, queue: Arc<dyn GmqQueue>, msg: Box<dyn Message>) {
99
        const FN_NAME: &'static str = "DataHandler::on_message";
100
20
        let queue_name = queue.name();
101

            
102
20
        let data_msg = match serde_json::from_slice::<RecvDataMsg>(msg.payload()) {
103
4
            Err(e) => {
104
4
                let src_str: String = String::from_utf8_lossy(msg.payload()).into();
105
4
                warn!(
106
                    "[{}] {} parse JSON error: {}, src: {}",
107
                    FN_NAME, queue_name, e, src_str
108
                );
109
4
                if let Err(e) = msg.ack().await {
110
                    error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
111
4
                }
112
4
                return;
113
            }
114
16
            Ok(msg) => msg,
115
16
        };
116
16
        match data_msg {
117
16
            RecvDataMsg::Operation { data } => {
118
8
                let data = CoremgrOpData {
119
16
                    data_id: data.data_id,
120
16
                    req_time: match DateTime::parse_from_rfc3339(data.req_time.as_str()) {
121
4
                        Err(e) => {
122
4
                            warn!(
123
                                "[{}] {} parse coremgr_opdata req_time \"{}\" error: {}",
124
                                FN_NAME, queue_name, data.req_time, e
125
                            );
126
4
                            if let Err(e) = msg.ack().await {
127
                                error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
128
4
                            }
129
4
                            return;
130
                        }
131
12
                        Ok(req_time) => req_time.into(),
132
12
                    },
133
12
                    res_time: match DateTime::parse_from_rfc3339(data.res_time.as_str()) {
134
4
                        Err(e) => {
135
4
                            warn!(
136
                                "[{}] {} parse coremgr_opdata res_time \"{}\" error: {}",
137
                                FN_NAME, queue_name, data.res_time, e
138
                            );
139
4
                            if let Err(e) = msg.ack().await {
140
                                error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
141
4
                            }
142
4
                            return;
143
                        }
144
8
                        Ok(res_time) => res_time.into(),
145
8
                    },
146
8
                    latency_ms: data.latency_ms,
147
8
                    status: data.status,
148
8
                    source_ip: data.source_ip,
149
8
                    method: data.method,
150
8
                    path: data.path,
151
8
                    body: data.body,
152
8
                    user_id: data.user_id,
153
8
                    client_id: data.client_id,
154
8
                    err_code: data.err_code,
155
8
                    err_message: data.err_message,
156
8
                };
157
8
                let mut is_err = false;
158
8
                if let Err(e) = self.model.coremgr_opdata().add(&data).await {
159
                    error!(
160
                        "[{}] {} add coremgr_opdata error: {}",
161
                        FN_NAME, queue_name, e
162
                    );
163
                    is_err = true;
164
8
                }
165
8
                if is_err {
166
                    time::sleep(Duration::from_secs(1)).await;
167
                    if let Err(e) = msg.nack().await {
168
                        error!("[{}] {} NACK error: {}", FN_NAME, queue_name, e);
169
                    }
170
                    return;
171
8
                }
172
            }
173
        }
174
8
        if let Err(e) = msg.ack().await {
175
            error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
176
8
        }
177
40
    }
178
}