1
use std::{
2
    collections::HashMap,
3
    error::Error as StdError,
4
    io::{Error as IoError, ErrorKind},
5
    sync::Arc,
6
    time::Duration,
7
};
8

            
9
use async_trait::async_trait;
10
use chrono::DateTime;
11
use log::{error, info, warn};
12
use serde::Deserialize;
13
use serde_json::{Map, Value};
14
use tokio::time;
15

            
16
use general_mq::{
17
    queue::{EventHandler, GmqQueue, Message, MessageHandler, Status},
18
    Queue,
19
};
20

            
21
use super::{super::config::DataData as DataMqConfig, new_data_queue, Connection};
22
use crate::models::{coremgr_opdata::CoremgrOpData, Model};
23

            
24
#[derive(Clone)]
25
struct DataHandler {
26
    model: Arc<dyn Model>,
27
}
28

            
29
24
#[derive(Deserialize)]
30
#[serde(tag = "kind")]
31
enum RecvDataMsg {
32
    #[serde(rename = "operation")]
33
    Operation { data: CmOpData },
34
}
35

            
36
94
#[derive(Deserialize)]
37
struct CmOpData {
38
    #[serde(rename = "dataId")]
39
    data_id: String,
40
    #[serde(rename = "reqTime")]
41
    req_time: String,
42
    #[serde(rename = "resTime")]
43
    res_time: String,
44
    #[serde(rename = "latencyMs")]
45
    latency_ms: i64,
46
    status: i32,
47
    #[serde(rename = "sourceIp")]
48
    source_ip: String,
49
    method: String,
50
    path: String,
51
    body: Option<Map<String, Value>>,
52
    #[serde(rename = "userId")]
53
    user_id: String,
54
    #[serde(rename = "clientId")]
55
    client_id: String,
56
    #[serde(rename = "errCode")]
57
    err_code: Option<String>,
58
    #[serde(rename = "errMessage")]
59
    err_message: Option<String>,
60
}
61

            
62
const QUEUE_NAME: &'static str = "coremgr.data";
63

            
64
/// Create a receive queue to receive data from `coremgr.data` queue.
65
26
pub fn new(
66
26
    model: Arc<dyn Model>,
67
26
    mq_conns: &mut HashMap<String, Connection>,
68
26
    config: &DataMqConfig,
69
26
) -> Result<Queue, Box<dyn StdError>> {
70
26
    let handler = Arc::new(DataHandler { model });
71
26
    match new_data_queue(mq_conns, config, QUEUE_NAME, handler.clone(), handler) {
72
6
        Err(e) => Err(Box::new(IoError::new(ErrorKind::Other, e))),
73
20
        Ok(q) => Ok(q),
74
    }
75
26
}
76

            
77
#[async_trait]
78
impl EventHandler for DataHandler {
79
    async fn on_error(&self, queue: Arc<dyn GmqQueue>, err: Box<dyn StdError + Send + Sync>) {
80
        const FN_NAME: &'static str = "DataHandler::on_error";
81
        let queue_name = queue.name();
82
        error!("[{}] {} error: {}", FN_NAME, queue_name, err);
83
    }
84

            
85
32
    async fn on_status(&self, queue: Arc<dyn GmqQueue>, status: Status) {
86
        const FN_NAME: &'static str = "DataHandler::on_status";
87
32
        let queue_name = queue.name();
88
32

            
89
32
        match status {
90
20
            Status::Connected => info!("[{}] {} connected", queue_name, FN_NAME),
91
12
            _ => warn!("[{}] {} status to {:?}", FN_NAME, queue_name, status),
92
        }
93
64
    }
94
}
95

            
96
#[async_trait]
97
impl MessageHandler for DataHandler {
98
10
    async fn on_message(&self, queue: Arc<dyn GmqQueue>, msg: Box<dyn Message>) {
99
        const FN_NAME: &'static str = "DataHandler::on_message";
100
10
        let queue_name = queue.name();
101

            
102
10
        let data_msg = match serde_json::from_slice::<RecvDataMsg>(msg.payload()) {
103
2
            Err(e) => {
104
2
                let src_str: String = String::from_utf8_lossy(msg.payload()).into();
105
2
                warn!(
106
                    "[{}] {} parse JSON error: {}, src: {}",
107
                    FN_NAME, queue_name, e, src_str
108
                );
109
2
                if let Err(e) = msg.ack().await {
110
                    error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
111
2
                }
112
2
                return;
113
            }
114
8
            Ok(msg) => msg,
115
8
        };
116
8
        match data_msg {
117
8
            RecvDataMsg::Operation { data } => {
118
4
                let data = CoremgrOpData {
119
8
                    data_id: data.data_id,
120
8
                    req_time: match DateTime::parse_from_rfc3339(data.req_time.as_str()) {
121
2
                        Err(e) => {
122
2
                            warn!(
123
                                "[{}] {} parse coremgr_opdata req_time \"{}\" error: {}",
124
                                FN_NAME, queue_name, data.req_time, e
125
                            );
126
2
                            if let Err(e) = msg.ack().await {
127
                                error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
128
2
                            }
129
2
                            return;
130
                        }
131
6
                        Ok(req_time) => req_time.into(),
132
6
                    },
133
6
                    res_time: match DateTime::parse_from_rfc3339(data.res_time.as_str()) {
134
2
                        Err(e) => {
135
2
                            warn!(
136
                                "[{}] {} parse coremgr_opdata res_time \"{}\" error: {}",
137
                                FN_NAME, queue_name, data.res_time, e
138
                            );
139
2
                            if let Err(e) = msg.ack().await {
140
                                error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
141
2
                            }
142
2
                            return;
143
                        }
144
4
                        Ok(res_time) => res_time.into(),
145
4
                    },
146
4
                    latency_ms: data.latency_ms,
147
4
                    status: data.status,
148
4
                    source_ip: data.source_ip,
149
4
                    method: data.method,
150
4
                    path: data.path,
151
4
                    body: data.body,
152
4
                    user_id: data.user_id,
153
4
                    client_id: data.client_id,
154
4
                    err_code: data.err_code,
155
4
                    err_message: data.err_message,
156
4
                };
157
4
                let mut is_err = false;
158
8
                if let Err(e) = self.model.coremgr_opdata().add(&data).await {
159
                    error!(
160
                        "[{}] {} add coremgr_opdata error: {}",
161
                        FN_NAME, queue_name, e
162
                    );
163
                    is_err = true;
164
4
                }
165
4
                if is_err {
166
                    time::sleep(Duration::from_secs(1)).await;
167
                    if let Err(e) = msg.nack().await {
168
                        error!("[{}] {} NACK error: {}", FN_NAME, queue_name, e);
169
                    }
170
                    return;
171
4
                }
172
            }
173
        }
174
4
        if let Err(e) = msg.ack().await {
175
            error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
176
4
        }
177
20
    }
178
}