1
use std::{error::Error as StdError, sync::Arc};
2

            
3
use async_trait::async_trait;
4
use futures::TryStreamExt;
5
use mongodb::{
6
    bson::{self, doc, Bson, DateTime, Document},
7
    options::FindOptions,
8
    Cursor as MongoDbCursor, Database,
9
};
10
use serde::{Deserialize, Serialize};
11

            
12
use super::super::coremgr_opdata::{
13
    CoremgrOpData, CoremgrOpDataModel, Cursor, ListOptions, ListQueryCond, QueryCond, SortKey,
14
    EXPIRES,
15
};
16

            
17
/// Model instance.
18
pub struct Model {
19
    /// The associated database connection.
20
    conn: Arc<Database>,
21
}
22

            
23
/// Cursor instance.
24
struct DbCursor {
25
    /// The associated collection cursor.
26
    cursor: MongoDbCursor<Schema>,
27
    /// (Useless) only for Cursor trait implementation.
28
    offset: u64,
29
}
30

            
31
/// MongoDB schema.
32
25102
#[derive(Deserialize, Serialize)]
33
struct Schema {
34
    #[serde(rename = "dataId")]
35
    pub data_id: String,
36
    #[serde(rename = "reqTime")]
37
    pub req_time: DateTime,
38
    #[serde(rename = "resTime")]
39
    pub res_time: DateTime,
40
    #[serde(rename = "latencyMs")]
41
    pub latency_ms: i64,
42
    pub status: i32,
43
    #[serde(rename = "sourceIp")]
44
    pub source_ip: String,
45
    pub method: String,
46
    pub path: String,
47
    #[serde(skip_serializing_if = "Option::is_none")]
48
    pub body: Option<Document>,
49
    #[serde(rename = "userId")]
50
    pub user_id: String,
51
    #[serde(rename = "clientId")]
52
    pub client_id: String,
53
    #[serde(rename = "errCode", skip_serializing_if = "Option::is_none")]
54
    pub err_code: Option<String>,
55
    #[serde(rename = "errMessage", skip_serializing_if = "Option::is_none")]
56
    pub err_message: Option<String>,
57
}
58

            
59
const COL_NAME: &'static str = "coremgrOpData";
60

            
61
impl Model {
62
    /// To create the model instance with a database connection.
63
7
    pub async fn new(conn: Arc<Database>) -> Result<Self, Box<dyn StdError>> {
64
7
        let model = Model { conn };
65
14
        model.init().await?;
66
7
        Ok(model)
67
7
    }
68
}
69

            
70
#[async_trait]
71
impl CoremgrOpDataModel for Model {
72
12
    async fn init(&self) -> Result<(), Box<dyn StdError>> {
73
12
        let indexes = vec![
74
12
            doc! {"name": "dataId_1", "key": {"dataId": 1}, "unique": true},
75
12
            doc! {"name": "userId_1", "key": {"userId": 1}},
76
12
            doc! {"name": "clientId_1", "key": {"clientId": 1}},
77
12
            doc! {"name": "reqTime_1", "key": {"reqTime": 1}, "expireAfterSeconds": EXPIRES},
78
12
            doc! {"name": "resTime_1", "key": {"resTime": 1}},
79
12
            doc! {"name": "latencyMs_1", "key": {"latencyMs": 1}},
80
12
        ];
81
12
        let command = doc! {
82
12
            "createIndexes": COL_NAME,
83
12
            "indexes": indexes,
84
12
        };
85
24
        self.conn.run_command(command, None).await?;
86
12
        Ok(())
87
24
    }
88

            
89
18
    async fn count(&self, cond: &ListQueryCond) -> Result<u64, Box<dyn StdError>> {
90
18
        let filter = get_list_query_filter(cond);
91
18
        let count = self
92
18
            .conn
93
18
            .collection::<Schema>(COL_NAME)
94
18
            .count_documents(filter, None)
95
36
            .await?;
96
18
        Ok(count)
97
36
    }
98

            
99
    async fn list(
100
        &self,
101
        opts: &ListOptions,
102
        cursor: Option<Box<dyn Cursor>>,
103
65
    ) -> Result<(Vec<CoremgrOpData>, Option<Box<dyn Cursor>>), Box<dyn StdError>> {
104
65
        let mut cursor = match cursor {
105
            None => {
106
52
                let filter = get_list_query_filter(opts.cond);
107
52
                let options = get_find_options(opts);
108
52
                Box::new(DbCursor::new(
109
52
                    self.conn
110
52
                        .collection::<Schema>(COL_NAME)
111
52
                        .find(filter, options)
112
104
                        .await?,
113
                ))
114
            }
115
13
            Some(cursor) => cursor,
116
        };
117

            
118
65
        let mut count: u64 = 0;
119
65
        let mut list = Vec::new();
120
1125
        while let Some(item) = cursor.try_next().await? {
121
1073
            list.push(item);
122
1073
            if let Some(cursor_max) = opts.cursor_max {
123
1011
                count += 1;
124
1011
                if count >= cursor_max {
125
13
                    return Ok((list, Some(cursor)));
126
998
                }
127
62
            }
128
        }
129
52
        Ok((list, None))
130
130
    }
131

            
132
459
    async fn add(&self, data: &CoremgrOpData) -> Result<(), Box<dyn StdError>> {
133
459
        let item = Schema {
134
459
            data_id: data.data_id.clone(),
135
459
            req_time: data.req_time.into(),
136
459
            res_time: data.res_time.into(),
137
459
            latency_ms: data.latency_ms,
138
459
            status: data.status,
139
459
            source_ip: data.source_ip.clone(),
140
459
            method: data.method.clone(),
141
459
            path: data.path.clone(),
142
459
            body: match data.body.as_ref() {
143
248
                None => None,
144
211
                Some(body) => Some(bson::to_document(body)?),
145
            },
146
459
            user_id: data.user_id.clone(),
147
459
            client_id: data.client_id.clone(),
148
459
            err_code: data.err_code.clone(),
149
459
            err_message: data.err_message.clone(),
150
459
        };
151
459
        self.conn
152
459
            .collection::<Schema>(COL_NAME)
153
459
            .insert_one(item, None)
154
919
            .await?;
155
458
        Ok(())
156
918
    }
157

            
158
5
    async fn del(&self, cond: &QueryCond) -> Result<(), Box<dyn StdError>> {
159
5
        let filter = get_query_filter(cond);
160
5
        self.conn
161
5
            .collection::<Schema>(COL_NAME)
162
5
            .delete_many(filter, None)
163
10
            .await?;
164
5
        Ok(())
165
10
    }
166
}
167

            
168
impl DbCursor {
169
    /// To create the cursor instance with a collection cursor.
170
52
    pub fn new(cursor: MongoDbCursor<Schema>) -> Self {
171
52
        DbCursor { cursor, offset: 0 }
172
52
    }
173
}
174

            
175
#[async_trait]
176
impl Cursor for DbCursor {
177
1125
    async fn try_next(&mut self) -> Result<Option<CoremgrOpData>, Box<dyn StdError>> {
178
1125
        if let Some(item) = self.cursor.try_next().await? {
179
1073
            self.offset += 1;
180
1073
            return Ok(Some(CoremgrOpData {
181
1073
                data_id: item.data_id,
182
1073
                req_time: item.req_time.into(),
183
1073
                res_time: item.res_time.into(),
184
1073
                latency_ms: item.latency_ms,
185
1073
                status: item.status,
186
1073
                source_ip: item.source_ip,
187
1073
                method: item.method,
188
1073
                path: item.path,
189
1073
                body: match item.body {
190
823
                    None => None,
191
250
                    Some(body) => bson::from_document(body)?,
192
                },
193
1073
                user_id: item.user_id,
194
1073
                client_id: item.client_id,
195
1073
                err_code: item.err_code,
196
1073
                err_message: item.err_message,
197
            }));
198
52
        }
199
52
        Ok(None)
200
2250
    }
201

            
202
5
    fn offset(&self) -> u64 {
203
5
        self.offset
204
5
    }
205
}
206

            
207
/// Transforms query conditions to the MongoDB document.
208
5
fn get_query_filter(cond: &QueryCond) -> Document {
209
5
    let mut filter = Document::new();
210
5
    if let Some(value) = cond.user_id {
211
3
        filter.insert("userId", value);
212
3
    }
213
5
    if let Some(value) = cond.client_id {
214
1
        filter.insert("clientId", value);
215
4
    }
216
5
    let mut time_doc = Document::new();
217
5
    if let Some(value) = cond.req_gte {
218
1
        time_doc.insert("$gte", Bson::DateTime(value.into()));
219
4
    }
220
5
    if let Some(value) = cond.req_lte {
221
1
        time_doc.insert("$lte", Bson::DateTime(value.into()));
222
4
    }
223
5
    if time_doc.len() > 0 {
224
1
        filter.insert("reqTime", time_doc);
225
4
    }
226
5
    filter
227
5
}
228

            
229
/// Transforms query conditions to the MongoDB document.
230
70
fn get_list_query_filter(cond: &ListQueryCond) -> Document {
231
70
    let mut filter = Document::new();
232
70
    if let Some(value) = cond.user_id {
233
24
        filter.insert("userId", value);
234
46
    }
235
70
    if let Some(value) = cond.client_id {
236
4
        filter.insert("clientId", value);
237
66
    }
238
70
    let mut time_doc = Document::new();
239
70
    if let Some(value) = cond.req_gte {
240
13
        time_doc.insert("$gte", Bson::DateTime(value.into()));
241
57
    }
242
70
    if let Some(value) = cond.req_lte {
243
2
        time_doc.insert("$lte", Bson::DateTime(value.into()));
244
68
    }
245
70
    if time_doc.len() > 0 {
246
13
        filter.insert("reqTime", time_doc);
247
57
    }
248
70
    time_doc = Document::new();
249
70
    if let Some(value) = cond.res_gte {
250
2
        time_doc.insert("$gte", Bson::DateTime(value.into()));
251
68
    }
252
70
    if let Some(value) = cond.res_lte {
253
2
        time_doc.insert("$lte", Bson::DateTime(value.into()));
254
68
    }
255
70
    if time_doc.len() > 0 {
256
2
        filter.insert("resTime", time_doc);
257
68
    }
258
70
    filter
259
70
}
260

            
261
/// Transforms model options to the options.
262
52
fn get_find_options(opts: &ListOptions) -> FindOptions {
263
52
    let mut options = FindOptions::builder().build();
264
52
    if let Some(offset) = opts.offset {
265
12
        options.skip = Some(offset);
266
40
    }
267
52
    if let Some(limit) = opts.limit {
268
38
        if limit > 0 {
269
35
            options.limit = Some(limit as i64);
270
35
        }
271
14
    }
272
52
    if let Some(sort_list) = opts.sort.as_ref() {
273
46
        if sort_list.len() > 0 {
274
46
            let mut sort_opts = Document::new();
275
46
            for cond in sort_list.iter() {
276
46
                let key = match cond.key {
277
38
                    SortKey::ReqTime => "reqTime",
278
4
                    SortKey::ResTime => "resTime",
279
4
                    SortKey::Latency => "latencyMs",
280
                };
281
46
                if cond.asc {
282
27
                    sort_opts.insert(key.to_string(), 1);
283
27
                } else {
284
19
                    sort_opts.insert(key.to_string(), -1);
285
19
                }
286
            }
287
46
            options.sort = Some(sort_opts);
288
        }
289
6
    }
290
52
    options
291
52
}