1
use std::{error::Error as StdError, sync::Arc};
2

            
3
use async_trait::async_trait;
4
use futures::TryStreamExt;
5
use mongodb::{
6
    action::Find,
7
    bson::{self, doc, Bson, DateTime, Document},
8
    Cursor as MongoDbCursor, Database,
9
};
10
use serde::{Deserialize, Serialize};
11

            
12
use super::super::coremgr_opdata::{
13
    CoremgrOpData, CoremgrOpDataModel, Cursor, ListOptions, ListQueryCond, QueryCond, SortKey,
14
    EXPIRES,
15
};
16

            
17
/// Model instance.
18
pub struct Model {
19
    /// The associated database connection.
20
    conn: Arc<Database>,
21
}
22

            
23
/// Cursor instance.
24
struct DbCursor {
25
    /// The associated collection cursor.
26
    cursor: MongoDbCursor<Schema>,
27
    /// (Useless) only for Cursor trait implementation.
28
    offset: u64,
29
}
30

            
31
/// MongoDB schema.
32
#[derive(Deserialize, Serialize)]
33
struct Schema {
34
    #[serde(rename = "dataId")]
35
    pub data_id: String,
36
    #[serde(rename = "reqTime")]
37
    pub req_time: DateTime,
38
    #[serde(rename = "resTime")]
39
    pub res_time: DateTime,
40
    #[serde(rename = "latencyMs")]
41
    pub latency_ms: i64,
42
    pub status: i32,
43
    #[serde(rename = "sourceIp")]
44
    pub source_ip: String,
45
    pub method: String,
46
    pub path: String,
47
    #[serde(skip_serializing_if = "Option::is_none")]
48
    pub body: Option<Document>,
49
    #[serde(rename = "userId")]
50
    pub user_id: String,
51
    #[serde(rename = "clientId")]
52
    pub client_id: String,
53
    #[serde(rename = "errCode", skip_serializing_if = "Option::is_none")]
54
    pub err_code: Option<String>,
55
    #[serde(rename = "errMessage", skip_serializing_if = "Option::is_none")]
56
    pub err_message: Option<String>,
57
}
58

            
59
const COL_NAME: &'static str = "coremgrOpData";
60

            
61
impl Model {
62
    /// To create the model instance with a database connection.
63
7
    pub async fn new(conn: Arc<Database>) -> Result<Self, Box<dyn StdError>> {
64
7
        let model = Model { conn };
65
7
        model.init().await?;
66
7
        Ok(model)
67
7
    }
68
}
69

            
70
#[async_trait]
71
impl CoremgrOpDataModel for Model {
72
12
    async fn init(&self) -> Result<(), Box<dyn StdError>> {
73
12
        let indexes = vec![
74
12
            doc! {"name": "dataId_1", "key": {"dataId": 1}, "unique": true},
75
12
            doc! {"name": "userId_1", "key": {"userId": 1}},
76
12
            doc! {"name": "clientId_1", "key": {"clientId": 1}},
77
12
            doc! {"name": "reqTime_1", "key": {"reqTime": 1}, "expireAfterSeconds": EXPIRES},
78
12
            doc! {"name": "resTime_1", "key": {"resTime": 1}},
79
12
            doc! {"name": "latencyMs_1", "key": {"latencyMs": 1}},
80
12
        ];
81
12
        let command = doc! {
82
12
            "createIndexes": COL_NAME,
83
12
            "indexes": indexes,
84
12
        };
85
12
        self.conn.run_command(command).await?;
86
12
        Ok(())
87
24
    }
88

            
89
18
    async fn count(&self, cond: &ListQueryCond) -> Result<u64, Box<dyn StdError>> {
90
18
        let filter = get_list_query_filter(cond);
91
18
        let count = self
92
18
            .conn
93
18
            .collection::<Schema>(COL_NAME)
94
18
            .count_documents(filter)
95
18
            .await?;
96
18
        Ok(count)
97
36
    }
98

            
99
    async fn list(
100
        &self,
101
        opts: &ListOptions,
102
        cursor: Option<Box<dyn Cursor>>,
103
65
    ) -> Result<(Vec<CoremgrOpData>, Option<Box<dyn Cursor>>), Box<dyn StdError>> {
104
65
        let mut cursor = match cursor {
105
            None => {
106
52
                let filter = get_list_query_filter(opts.cond);
107
52
                Box::new(DbCursor::new(
108
52
                    build_find_options(opts, self.conn.collection::<Schema>(COL_NAME).find(filter))
109
52
                        .await?,
110
                ))
111
            }
112
13
            Some(cursor) => cursor,
113
        };
114

            
115
65
        let mut count: u64 = 0;
116
65
        let mut list = Vec::new();
117
1125
        while let Some(item) = cursor.try_next().await? {
118
1073
            list.push(item);
119
1073
            if let Some(cursor_max) = opts.cursor_max {
120
1011
                count += 1;
121
1011
                if count >= cursor_max {
122
13
                    return Ok((list, Some(cursor)));
123
998
                }
124
62
            }
125
        }
126
52
        Ok((list, None))
127
130
    }
128

            
129
459
    async fn add(&self, data: &CoremgrOpData) -> Result<(), Box<dyn StdError>> {
130
459
        let item = Schema {
131
459
            data_id: data.data_id.clone(),
132
459
            req_time: data.req_time.into(),
133
459
            res_time: data.res_time.into(),
134
459
            latency_ms: data.latency_ms,
135
459
            status: data.status,
136
459
            source_ip: data.source_ip.clone(),
137
459
            method: data.method.clone(),
138
459
            path: data.path.clone(),
139
459
            body: match data.body.as_ref() {
140
248
                None => None,
141
211
                Some(body) => Some(bson::to_document(body)?),
142
            },
143
459
            user_id: data.user_id.clone(),
144
459
            client_id: data.client_id.clone(),
145
459
            err_code: data.err_code.clone(),
146
459
            err_message: data.err_message.clone(),
147
459
        };
148
459
        self.conn
149
459
            .collection::<Schema>(COL_NAME)
150
459
            .insert_one(item)
151
459
            .await?;
152
458
        Ok(())
153
918
    }
154

            
155
5
    async fn del(&self, cond: &QueryCond) -> Result<(), Box<dyn StdError>> {
156
5
        let filter = get_query_filter(cond);
157
5
        self.conn
158
5
            .collection::<Schema>(COL_NAME)
159
5
            .delete_many(filter)
160
5
            .await?;
161
5
        Ok(())
162
10
    }
163
}
164

            
165
impl DbCursor {
166
    /// To create the cursor instance with a collection cursor.
167
52
    pub fn new(cursor: MongoDbCursor<Schema>) -> Self {
168
52
        DbCursor { cursor, offset: 0 }
169
52
    }
170
}
171

            
172
#[async_trait]
173
impl Cursor for DbCursor {
174
1125
    async fn try_next(&mut self) -> Result<Option<CoremgrOpData>, Box<dyn StdError>> {
175
1125
        if let Some(item) = self.cursor.try_next().await? {
176
1073
            self.offset += 1;
177
1073
            return Ok(Some(CoremgrOpData {
178
1073
                data_id: item.data_id,
179
1073
                req_time: item.req_time.into(),
180
1073
                res_time: item.res_time.into(),
181
1073
                latency_ms: item.latency_ms,
182
1073
                status: item.status,
183
1073
                source_ip: item.source_ip,
184
1073
                method: item.method,
185
1073
                path: item.path,
186
1073
                body: match item.body {
187
823
                    None => None,
188
250
                    Some(body) => bson::from_document(body)?,
189
                },
190
1073
                user_id: item.user_id,
191
1073
                client_id: item.client_id,
192
1073
                err_code: item.err_code,
193
1073
                err_message: item.err_message,
194
            }));
195
52
        }
196
52
        Ok(None)
197
2250
    }
198

            
199
5
    fn offset(&self) -> u64 {
200
5
        self.offset
201
5
    }
202
}
203

            
204
/// Transforms query conditions to the MongoDB document.
205
5
fn get_query_filter(cond: &QueryCond) -> Document {
206
5
    let mut filter = Document::new();
207
5
    if let Some(value) = cond.user_id {
208
3
        filter.insert("userId", value);
209
3
    }
210
5
    if let Some(value) = cond.client_id {
211
1
        filter.insert("clientId", value);
212
4
    }
213
5
    let mut time_doc = Document::new();
214
5
    if let Some(value) = cond.req_gte {
215
1
        time_doc.insert("$gte", Bson::DateTime(value.into()));
216
4
    }
217
5
    if let Some(value) = cond.req_lte {
218
1
        time_doc.insert("$lte", Bson::DateTime(value.into()));
219
4
    }
220
5
    if time_doc.len() > 0 {
221
1
        filter.insert("reqTime", time_doc);
222
4
    }
223
5
    filter
224
5
}
225

            
226
/// Transforms query conditions to the MongoDB document.
227
70
fn get_list_query_filter(cond: &ListQueryCond) -> Document {
228
70
    let mut filter = Document::new();
229
70
    if let Some(value) = cond.user_id {
230
24
        filter.insert("userId", value);
231
46
    }
232
70
    if let Some(value) = cond.client_id {
233
4
        filter.insert("clientId", value);
234
66
    }
235
70
    let mut time_doc = Document::new();
236
70
    if let Some(value) = cond.req_gte {
237
13
        time_doc.insert("$gte", Bson::DateTime(value.into()));
238
57
    }
239
70
    if let Some(value) = cond.req_lte {
240
2
        time_doc.insert("$lte", Bson::DateTime(value.into()));
241
68
    }
242
70
    if time_doc.len() > 0 {
243
13
        filter.insert("reqTime", time_doc);
244
57
    }
245
70
    time_doc = Document::new();
246
70
    if let Some(value) = cond.res_gte {
247
2
        time_doc.insert("$gte", Bson::DateTime(value.into()));
248
68
    }
249
70
    if let Some(value) = cond.res_lte {
250
2
        time_doc.insert("$lte", Bson::DateTime(value.into()));
251
68
    }
252
70
    if time_doc.len() > 0 {
253
2
        filter.insert("resTime", time_doc);
254
68
    }
255
70
    filter
256
70
}
257

            
258
/// Transforms model options to the options.
259
52
fn build_find_options<'a, T>(opts: &ListOptions, mut find: Find<'a, T>) -> Find<'a, T>
260
52
where
261
52
    T: Send + Sync,
262
52
{
263
52
    if let Some(offset) = opts.offset {
264
12
        find = find.skip(offset);
265
40
    }
266
52
    if let Some(limit) = opts.limit {
267
38
        if limit > 0 {
268
35
            find = find.limit(limit as i64);
269
35
        }
270
14
    }
271
52
    if let Some(sort_list) = opts.sort.as_ref() {
272
46
        if sort_list.len() > 0 {
273
46
            let mut sort_opts = Document::new();
274
46
            for cond in sort_list.iter() {
275
46
                let key = match cond.key {
276
38
                    SortKey::ReqTime => "reqTime",
277
4
                    SortKey::ResTime => "resTime",
278
4
                    SortKey::Latency => "latencyMs",
279
                };
280
46
                if cond.asc {
281
27
                    sort_opts.insert(key.to_string(), 1);
282
27
                } else {
283
19
                    sort_opts.insert(key.to_string(), -1);
284
19
                }
285
            }
286
46
            find = find.sort(sort_opts);
287
        }
288
6
    }
289
52
    find
290
52
}