1
use std::{error::Error as StdError, sync::Arc};
2

            
3
use async_trait::async_trait;
4
use futures::TryStreamExt;
5
use mongodb::{
6
    action::Find,
7
    bson::{self, doc, Bson, DateTime, Document},
8
    Cursor as MongoDbCursor, Database,
9
};
10
use serde::{Deserialize, Serialize};
11

            
12
use super::super::coremgr_opdata::{
13
    CoremgrOpData, CoremgrOpDataModel, Cursor, ListOptions, ListQueryCond, QueryCond, SortKey,
14
    EXPIRES,
15
};
16

            
17
/// Model instance.
18
pub struct Model {
19
    /// The associated database connection.
20
    conn: Arc<Database>,
21
}
22

            
23
/// Cursor instance.
24
struct DbCursor {
25
    /// The associated collection cursor.
26
    cursor: MongoDbCursor<Schema>,
27
    /// (Useless) only for Cursor trait implementation.
28
    offset: u64,
29
}
30

            
31
/// MongoDB schema.
32
#[derive(Deserialize, Serialize)]
33
struct Schema {
34
    #[serde(rename = "dataId")]
35
    pub data_id: String,
36
    #[serde(rename = "reqTime")]
37
    pub req_time: DateTime,
38
    #[serde(rename = "resTime")]
39
    pub res_time: DateTime,
40
    #[serde(rename = "latencyMs")]
41
    pub latency_ms: i64,
42
    pub status: i32,
43
    #[serde(rename = "sourceIp")]
44
    pub source_ip: String,
45
    pub method: String,
46
    pub path: String,
47
    #[serde(skip_serializing_if = "Option::is_none")]
48
    pub body: Option<Document>,
49
    #[serde(rename = "userId")]
50
    pub user_id: String,
51
    #[serde(rename = "clientId")]
52
    pub client_id: String,
53
    #[serde(rename = "errCode", skip_serializing_if = "Option::is_none")]
54
    pub err_code: Option<String>,
55
    #[serde(rename = "errMessage", skip_serializing_if = "Option::is_none")]
56
    pub err_message: Option<String>,
57
}
58

            
59
const COL_NAME: &'static str = "coremgrOpData";
60

            
61
impl Model {
62
    /// To create the model instance with a database connection.
63
14
    pub async fn new(conn: Arc<Database>) -> Result<Self, Box<dyn StdError>> {
64
14
        let model = Model { conn };
65
14
        model.init().await?;
66
14
        Ok(model)
67
14
    }
68
}
69

            
70
#[async_trait]
71
impl CoremgrOpDataModel for Model {
72
24
    async fn init(&self) -> Result<(), Box<dyn StdError>> {
73
24
        let indexes = vec![
74
24
            doc! {"name": "dataId_1", "key": {"dataId": 1}, "unique": true},
75
24
            doc! {"name": "userId_1", "key": {"userId": 1}},
76
24
            doc! {"name": "clientId_1", "key": {"clientId": 1}},
77
24
            doc! {"name": "reqTime_1", "key": {"reqTime": 1}, "expireAfterSeconds": EXPIRES},
78
24
            doc! {"name": "resTime_1", "key": {"resTime": 1}},
79
24
            doc! {"name": "latencyMs_1", "key": {"latencyMs": 1}},
80
24
        ];
81
24
        let command = doc! {
82
24
            "createIndexes": COL_NAME,
83
24
            "indexes": indexes,
84
24
        };
85
24
        self.conn.run_command(command).await?;
86
24
        Ok(())
87
48
    }
88

            
89
36
    async fn count(&self, cond: &ListQueryCond) -> Result<u64, Box<dyn StdError>> {
90
36
        let filter = get_list_query_filter(cond);
91
36
        let count = self
92
36
            .conn
93
36
            .collection::<Schema>(COL_NAME)
94
36
            .count_documents(filter)
95
36
            .await?;
96
36
        Ok(count)
97
72
    }
98

            
99
    async fn list(
100
        &self,
101
        opts: &ListOptions,
102
        cursor: Option<Box<dyn Cursor>>,
103
130
    ) -> Result<(Vec<CoremgrOpData>, Option<Box<dyn Cursor>>), Box<dyn StdError>> {
104
130
        let mut cursor = match cursor {
105
            None => {
106
104
                let filter = get_list_query_filter(opts.cond);
107
104
                Box::new(DbCursor::new(
108
104
                    build_find_options(opts, self.conn.collection::<Schema>(COL_NAME).find(filter))
109
104
                        .await?,
110
                ))
111
            }
112
26
            Some(cursor) => cursor,
113
        };
114

            
115
130
        let mut count: u64 = 0;
116
130
        let mut list = Vec::new();
117
2250
        while let Some(item) = cursor.try_next().await? {
118
2146
            list.push(item);
119
2146
            if let Some(cursor_max) = opts.cursor_max {
120
2022
                count += 1;
121
2022
                if count >= cursor_max {
122
26
                    return Ok((list, Some(cursor)));
123
1996
                }
124
124
            }
125
        }
126
104
        Ok((list, None))
127
260
    }
128

            
129
918
    async fn add(&self, data: &CoremgrOpData) -> Result<(), Box<dyn StdError>> {
130
918
        let item = Schema {
131
918
            data_id: data.data_id.clone(),
132
918
            req_time: data.req_time.into(),
133
918
            res_time: data.res_time.into(),
134
918
            latency_ms: data.latency_ms,
135
918
            status: data.status,
136
918
            source_ip: data.source_ip.clone(),
137
918
            method: data.method.clone(),
138
918
            path: data.path.clone(),
139
918
            body: match data.body.as_ref() {
140
496
                None => None,
141
422
                Some(body) => Some(bson::to_document(body)?),
142
            },
143
918
            user_id: data.user_id.clone(),
144
918
            client_id: data.client_id.clone(),
145
918
            err_code: data.err_code.clone(),
146
918
            err_message: data.err_message.clone(),
147
918
        };
148
918
        self.conn
149
918
            .collection::<Schema>(COL_NAME)
150
918
            .insert_one(item)
151
918
            .await?;
152
916
        Ok(())
153
1836
    }
154

            
155
10
    async fn del(&self, cond: &QueryCond) -> Result<(), Box<dyn StdError>> {
156
10
        let filter = get_query_filter(cond);
157
10
        self.conn
158
10
            .collection::<Schema>(COL_NAME)
159
10
            .delete_many(filter)
160
10
            .await?;
161
10
        Ok(())
162
20
    }
163
}
164

            
165
impl DbCursor {
166
    /// To create the cursor instance with a collection cursor.
167
104
    pub fn new(cursor: MongoDbCursor<Schema>) -> Self {
168
104
        DbCursor { cursor, offset: 0 }
169
104
    }
170
}
171

            
172
#[async_trait]
173
impl Cursor for DbCursor {
174
2250
    async fn try_next(&mut self) -> Result<Option<CoremgrOpData>, Box<dyn StdError>> {
175
2250
        if let Some(item) = self.cursor.try_next().await? {
176
2146
            self.offset += 1;
177
2146
            return Ok(Some(CoremgrOpData {
178
2146
                data_id: item.data_id,
179
2146
                req_time: item.req_time.into(),
180
2146
                res_time: item.res_time.into(),
181
2146
                latency_ms: item.latency_ms,
182
2146
                status: item.status,
183
2146
                source_ip: item.source_ip,
184
2146
                method: item.method,
185
2146
                path: item.path,
186
2146
                body: match item.body {
187
1646
                    None => None,
188
500
                    Some(body) => bson::from_document(body)?,
189
                },
190
2146
                user_id: item.user_id,
191
2146
                client_id: item.client_id,
192
2146
                err_code: item.err_code,
193
2146
                err_message: item.err_message,
194
            }));
195
104
        }
196
104
        Ok(None)
197
4500
    }
198

            
199
10
    fn offset(&self) -> u64 {
200
10
        self.offset
201
10
    }
202
}
203

            
204
/// Transforms query conditions to the MongoDB document.
205
10
fn get_query_filter(cond: &QueryCond) -> Document {
206
10
    let mut filter = Document::new();
207
10
    if let Some(value) = cond.user_id {
208
6
        filter.insert("userId", value);
209
6
    }
210
10
    if let Some(value) = cond.client_id {
211
2
        filter.insert("clientId", value);
212
8
    }
213
10
    let mut time_doc = Document::new();
214
10
    if let Some(value) = cond.req_gte {
215
2
        time_doc.insert("$gte", Bson::DateTime(value.into()));
216
8
    }
217
10
    if let Some(value) = cond.req_lte {
218
2
        time_doc.insert("$lte", Bson::DateTime(value.into()));
219
8
    }
220
10
    if time_doc.len() > 0 {
221
2
        filter.insert("reqTime", time_doc);
222
8
    }
223
10
    filter
224
10
}
225

            
226
/// Transforms query conditions to the MongoDB document.
227
140
fn get_list_query_filter(cond: &ListQueryCond) -> Document {
228
140
    let mut filter = Document::new();
229
140
    if let Some(value) = cond.user_id {
230
48
        filter.insert("userId", value);
231
92
    }
232
140
    if let Some(value) = cond.client_id {
233
8
        filter.insert("clientId", value);
234
132
    }
235
140
    let mut time_doc = Document::new();
236
140
    if let Some(value) = cond.req_gte {
237
26
        time_doc.insert("$gte", Bson::DateTime(value.into()));
238
114
    }
239
140
    if let Some(value) = cond.req_lte {
240
4
        time_doc.insert("$lte", Bson::DateTime(value.into()));
241
136
    }
242
140
    if time_doc.len() > 0 {
243
26
        filter.insert("reqTime", time_doc);
244
114
    }
245
140
    time_doc = Document::new();
246
140
    if let Some(value) = cond.res_gte {
247
4
        time_doc.insert("$gte", Bson::DateTime(value.into()));
248
136
    }
249
140
    if let Some(value) = cond.res_lte {
250
4
        time_doc.insert("$lte", Bson::DateTime(value.into()));
251
136
    }
252
140
    if time_doc.len() > 0 {
253
4
        filter.insert("resTime", time_doc);
254
136
    }
255
140
    filter
256
140
}
257

            
258
/// Transforms model options to the options.
259
104
fn build_find_options<'a, T>(opts: &ListOptions, mut find: Find<'a, T>) -> Find<'a, T>
260
104
where
261
104
    T: Send + Sync,
262
104
{
263
104
    if let Some(offset) = opts.offset {
264
24
        find = find.skip(offset);
265
80
    }
266
104
    if let Some(limit) = opts.limit {
267
76
        if limit > 0 {
268
70
            find = find.limit(limit as i64);
269
70
        }
270
28
    }
271
104
    if let Some(sort_list) = opts.sort.as_ref() {
272
92
        if sort_list.len() > 0 {
273
92
            let mut sort_opts = Document::new();
274
92
            for cond in sort_list.iter() {
275
92
                let key = match cond.key {
276
76
                    SortKey::ReqTime => "reqTime",
277
8
                    SortKey::ResTime => "resTime",
278
8
                    SortKey::Latency => "latencyMs",
279
                };
280
92
                if cond.asc {
281
54
                    sort_opts.insert(key.to_string(), 1);
282
54
                } else {
283
38
                    sort_opts.insert(key.to_string(), -1);
284
38
                }
285
            }
286
92
            find = find.sort(sort_opts);
287
        }
288
12
    }
289
104
    find
290
104
}