1
use std::{collections::HashMap, error::Error as StdError, sync::Arc};
2

            
3
use async_trait::async_trait;
4
use futures::TryStreamExt;
5
use mongodb::{
6
    bson::{self, doc, Bson, DateTime, Document, Regex},
7
    options::FindOptions,
8
    Cursor as MongoDbCursor, Database,
9
};
10
use serde::{Deserialize, Serialize};
11

            
12
use super::super::user::{
13
    Cursor, ListOptions, ListQueryCond, QueryCond, SortKey, Updates, User, UserModel,
14
};
15

            
16
/// Model instance.
17
pub struct Model {
18
    /// The associated database connection.
19
    conn: Arc<Database>,
20
}
21

            
22
/// Cursor instance.
23
struct DbCursor {
24
    /// The associated collection cursor.
25
    cursor: MongoDbCursor<Schema>,
26
    /// (Useless) only for Cursor trait implementation.
27
    offset: u64,
28
}
29

            
30
/// MongoDB schema.
31
22050
#[derive(Deserialize, Serialize)]
32
struct Schema {
33
    #[serde(rename = "userId")]
34
    user_id: String,
35
    account: String,
36
    #[serde(rename = "createdAt")]
37
    created_at: DateTime,
38
    #[serde(rename = "modifiedAt")]
39
    modified_at: DateTime,
40
    #[serde(rename = "verifiedAt")]
41
    verified_at: Option<DateTime>,
42
    #[serde(rename = "expiredAt")]
43
    expired_at: Option<DateTime>,
44
    #[serde(rename = "disabledAt")]
45
    disabled_at: Option<DateTime>,
46
    roles: HashMap<String, bool>,
47
    password: String,
48
    salt: String,
49
    name: String,
50
    info: Document,
51
}
52

            
53
const COL_NAME: &'static str = "user";
54

            
55
impl Model {
56
    /// To create the model instance with a database connection.
57
7
    pub async fn new(conn: Arc<Database>) -> Result<Self, Box<dyn StdError>> {
58
7
        let model = Model { conn };
59
14
        model.init().await?;
60
7
        Ok(model)
61
7
    }
62
}
63

            
64
#[async_trait]
65
impl UserModel for Model {
66
12
    async fn init(&self) -> Result<(), Box<dyn StdError>> {
67
12
        let indexes = vec![
68
12
            doc! {"name": "userId_1", "key": {"userId": 1}, "unique": true},
69
12
            doc! {"name": "account_1", "key": {"account": 1}, "unique": true},
70
12
            doc! {"name": "createdAt_1", "key": {"createdAt": 1}},
71
12
            doc! {"name": "modifiedAt_1", "key": {"modifiedAt": 1}},
72
12
            doc! {"name": "verifiedAt_1", "key": {"verifiedAt": 1}},
73
12
            doc! {"name": "expiredAt_1", "key": {"expiredAt": 1}},
74
12
            doc! {"name": "disabledAt_1", "key": {"disabledAt": 1}},
75
12
            doc! {"name": "name_1", "key": {"name": 1}},
76
12
        ];
77
12
        let command = doc! {
78
12
            "createIndexes": COL_NAME,
79
12
            "indexes": indexes,
80
12
        };
81
24
        self.conn.run_command(command, None).await?;
82
12
        Ok(())
83
12
    }
84

            
85
21
    async fn count(&self, cond: &ListQueryCond) -> Result<u64, Box<dyn StdError>> {
86
21
        let filter = get_list_query_filter(cond);
87
21
        let count = self
88
21
            .conn
89
21
            .collection::<Schema>(COL_NAME)
90
21
            .count_documents(filter, None)
91
42
            .await?;
92
21
        Ok(count)
93
21
    }
94

            
95
    async fn list(
96
        &self,
97
        opts: &ListOptions,
98
        cursor: Option<Box<dyn Cursor>>,
99
83
    ) -> Result<(Vec<User>, Option<Box<dyn Cursor>>), Box<dyn StdError>> {
100
83
        let mut cursor = match cursor {
101
83
            None => {
102
83
                let filter = get_list_query_filter(opts.cond);
103
72
                let options = get_find_options(opts);
104
72
                Box::new(DbCursor::new(
105
72
                    self.conn
106
72
                        .collection::<Schema>(COL_NAME)
107
72
                        .find(filter, options)
108
144
                        .await?,
109
83
                ))
110
83
            }
111
83
            Some(cursor) => cursor,
112
83
        };
113
83

            
114
83
        let mut count: u64 = 0;
115
83
        let mut list = Vec::new();
116
1208
        while let Some(item) = cursor.try_next().await? {
117
1136
            list.push(item);
118
1136
            if let Some(cursor_max) = opts.cursor_max {
119
1032
                count += 1;
120
1032
                if count >= cursor_max {
121
83
                    return Ok((list, Some(cursor)));
122
1021
                }
123
104
            }
124
83
        }
125
83
        Ok((list, None))
126
83
    }
127

            
128
450
    async fn get(&self, cond: &QueryCond) -> Result<Option<User>, Box<dyn StdError>> {
129
450
        let filter = get_query_filter(cond);
130
450
        let mut cursor = self
131
450
            .conn
132
450
            .collection::<Schema>(COL_NAME)
133
450
            .find(filter, None)
134
900
            .await?;
135
450
        if let Some(user) = cursor.try_next().await? {
136
450
            return Ok(Some(User {
137
450
                user_id: user.user_id,
138
439
                account: user.account,
139
439
                created_at: user.created_at.into(),
140
439
                modified_at: user.modified_at.into(),
141
439
                verified_at: match user.verified_at {
142
450
                    None => None,
143
450
                    Some(value) => Some(value.into()),
144
450
                },
145
450
                expired_at: match user.expired_at {
146
450
                    None => None,
147
450
                    Some(value) => Some(value.into()),
148
450
                },
149
450
                disabled_at: match user.disabled_at {
150
450
                    None => None,
151
450
                    Some(value) => Some(value.into()),
152
450
                },
153
450
                roles: user.roles,
154
439
                password: user.password,
155
439
                salt: user.salt,
156
439
                name: user.name,
157
439
                info: bson::from_document(user.info)?,
158
450
            }));
159
450
        }
160
11
        Ok(None)
161
450
    }
162

            
163
549
    async fn add(&self, user: &User) -> Result<(), Box<dyn StdError>> {
164
549
        let item = Schema {
165
549
            user_id: user.user_id.clone(),
166
549
            account: user.account.to_lowercase(),
167
549
            created_at: user.created_at.into(),
168
549
            modified_at: user.modified_at.into(),
169
549
            verified_at: match user.verified_at {
170
549
                None => None,
171
549
                Some(value) => Some(value.into()),
172
549
            },
173
549
            expired_at: match user.expired_at {
174
549
                None => None,
175
549
                Some(value) => Some(value.into()),
176
549
            },
177
549
            disabled_at: match user.disabled_at {
178
549
                None => None,
179
549
                Some(value) => Some(value.into()),
180
549
            },
181
549
            roles: user.roles.clone(),
182
549
            password: user.password.clone(),
183
549
            salt: user.salt.clone(),
184
549
            name: user.name.clone(),
185
549
            info: bson::to_document(&user.info)?,
186
549
        };
187
549
        self.conn
188
549
            .collection::<Schema>(COL_NAME)
189
549
            .insert_one(item, None)
190
1100
            .await?;
191
549
        Ok(())
192
549
    }
193

            
194
5
    async fn del(&self, user_id: &str) -> Result<(), Box<dyn StdError>> {
195
5
        let filter = doc! {"userId": user_id};
196
5
        self.conn
197
5
            .collection::<Schema>(COL_NAME)
198
5
            .delete_one(filter, None)
199
10
            .await?;
200
5
        Ok(())
201
5
    }
202

            
203
24
    async fn update(&self, user_id: &str, updates: &Updates) -> Result<(), Box<dyn StdError>> {
204
24
        let filter = doc! {"userId": user_id};
205
24
        if let Some(updates) = get_update_doc(updates) {
206
24
            self.conn
207
23
                .collection::<Schema>(COL_NAME)
208
23
                .update_one(filter, updates, None)
209
46
                .await?;
210
24
        }
211
24
        return Ok(());
212
24
    }
213
}
214

            
215
impl DbCursor {
216
    /// To create the cursor instance with a collection cursor.
217
72
    pub fn new(cursor: MongoDbCursor<Schema>) -> Self {
218
72
        DbCursor { cursor, offset: 0 }
219
72
    }
220
}
221

            
222
#[async_trait]
223
impl Cursor for DbCursor {
224
1208
    async fn try_next(&mut self) -> Result<Option<User>, Box<dyn StdError>> {
225
1208
        if let Some(item) = self.cursor.try_next().await? {
226
1208
            self.offset += 1;
227
1136
            return Ok(Some(User {
228
1136
                user_id: item.user_id,
229
1136
                account: item.account,
230
1136
                created_at: item.created_at.into(),
231
1136
                modified_at: item.modified_at.into(),
232
1136
                verified_at: match item.verified_at {
233
1208
                    None => None,
234
1208
                    Some(value) => Some(value.into()),
235
1208
                },
236
1208
                expired_at: match item.expired_at {
237
1208
                    None => None,
238
1208
                    Some(value) => Some(value.into()),
239
1208
                },
240
1208
                disabled_at: match item.disabled_at {
241
1208
                    None => None,
242
1208
                    Some(value) => Some(value.into()),
243
1208
                },
244
1208
                roles: item.roles,
245
1136
                password: item.password,
246
1136
                salt: item.salt,
247
1136
                name: item.name,
248
1136
                info: bson::from_document(item.info)?,
249
1208
            }));
250
1208
        }
251
72
        Ok(None)
252
1208
    }
253

            
254
4
    fn offset(&self) -> u64 {
255
4
        self.offset
256
4
    }
257
}
258

            
259
/// Transforms query conditions to the MongoDB document.
260
450
fn get_query_filter(cond: &QueryCond) -> Document {
261
450
    let mut filter = Document::new();
262
450
    if let Some(value) = cond.user_id {
263
299
        filter.insert("userId", value);
264
299
    }
265
450
    if let Some(value) = cond.account {
266
151
        filter.insert("account", value.to_lowercase().as_str());
267
299
    }
268
450
    filter
269
450
}
270

            
271
/// Transforms query conditions to the MongoDB document.
272
93
fn get_list_query_filter(cond: &ListQueryCond) -> Document {
273
93
    let mut filter = Document::new();
274
93
    if let Some(value) = cond.user_id {
275
2
        filter.insert("userId", value);
276
91
    }
277
93
    if let Some(value) = cond.account {
278
11
        filter.insert("account", value.to_lowercase().as_str());
279
82
    }
280
93
    if let Some(value) = cond.account_contains {
281
30
        filter.insert(
282
30
            "account",
283
30
            Regex {
284
30
                pattern: value.to_lowercase(),
285
30
                options: "i".to_string(),
286
30
            },
287
30
        );
288
63
    }
289
93
    if let Some(value) = cond.verified_at {
290
6
        if value {
291
4
            filter.insert("verifiedAt", doc! {"$ne": Bson::Null});
292
4
        } else {
293
2
            filter.insert("verifiedAt", Bson::Null);
294
2
        }
295
87
    }
296
93
    if let Some(value) = cond.disabled_at {
297
4
        if value {
298
2
            filter.insert("disabledAt", doc! {"$ne": Bson::Null});
299
2
        } else {
300
2
            filter.insert("disabledAt", Bson::Null);
301
2
        }
302
89
    }
303
93
    if let Some(value) = cond.name_contains {
304
6
        filter.insert(
305
6
            "name",
306
6
            Regex {
307
6
                pattern: value.to_string(),
308
6
                options: "i".to_string(),
309
6
            },
310
6
        );
311
87
    }
312
93
    filter
313
93
}
314

            
315
/// Transforms model options to the options.
316
72
fn get_find_options(opts: &ListOptions) -> FindOptions {
317
72
    let mut options = FindOptions::builder().build();
318
72
    if let Some(offset) = opts.offset {
319
10
        options.skip = Some(offset);
320
62
    }
321
72
    if let Some(limit) = opts.limit {
322
38
        if limit > 0 {
323
37
            options.limit = Some(limit as i64);
324
37
        }
325
34
    }
326
72
    if let Some(sort_list) = opts.sort.as_ref() {
327
60
        if sort_list.len() > 0 {
328
59
            let mut sort_opts = Document::new();
329
65
            for cond in sort_list.iter() {
330
65
                let key = match cond.key {
331
41
                    SortKey::Account => "account",
332
6
                    SortKey::CreatedAt => "createdAt",
333
4
                    SortKey::ModifiedAt => "modifiedAt",
334
4
                    SortKey::VerifiedAt => "verifiedAt",
335
2
                    SortKey::ExpiredAt => "expiredAt",
336
2
                    SortKey::DisabledAt => "disabledAt",
337
6
                    SortKey::Name => "name",
338
                };
339
65
                if cond.asc {
340
52
                    sort_opts.insert(key.to_string(), 1);
341
52
                } else {
342
13
                    sort_opts.insert(key.to_string(), -1);
343
13
                }
344
            }
345
59
            options.sort = Some(sort_opts);
346
1
        }
347
12
    }
348
72
    options
349
72
}
350

            
351
/// Transforms the model object to the MongoDB document.
352
24
fn get_update_doc(updates: &Updates) -> Option<Document> {
353
24
    let mut count = 0;
354
24
    let mut document = Document::new();
355
24
    if let Some(value) = updates.modified_at.as_ref() {
356
23
        document.insert(
357
23
            "modifiedAt",
358
23
            DateTime::from_millis(value.timestamp_millis()),
359
23
        );
360
23
        count += 1;
361
23
    }
362
24
    if let Some(value) = updates.verified_at.as_ref() {
363
4
        document.insert(
364
4
            "verifiedAt",
365
4
            DateTime::from_millis(value.timestamp_millis()),
366
4
        );
367
4
        count += 1;
368
20
    }
369
24
    if let Some(value) = updates.expired_at.as_ref() {
370
4
        match value {
371
3
            None => {
372
3
                document.insert("expiredAt", Bson::Null);
373
3
            }
374
1
            Some(value) => {
375
1
                document.insert("expiredAt", DateTime::from_millis(value.timestamp_millis()));
376
1
            }
377
        }
378
4
        count += 1;
379
20
    }
380
24
    if let Some(value) = updates.disabled_at.as_ref() {
381
8
        match value {
382
4
            None => {
383
4
                document.insert("disabledAt", Bson::Null);
384
4
            }
385
4
            Some(value) => {
386
4
                document.insert(
387
4
                    "disabledAt",
388
4
                    DateTime::from_millis(value.timestamp_millis()),
389
4
                );
390
4
            }
391
        }
392
8
        count += 1;
393
16
    }
394
24
    if let Some(value) = updates.roles {
395
6
        let mut doc = Document::new();
396
17
        for (k, v) in value {
397
11
            doc.insert(k, v);
398
11
        }
399
6
        document.insert("roles", doc);
400
6
        count += 1;
401
18
    }
402
24
    if let Some(value) = updates.password.as_ref() {
403
7
        document.insert("password", value);
404
7
        count += 1;
405
17
    }
406
24
    if let Some(value) = updates.salt.as_ref() {
407
7
        document.insert("salt", value);
408
7
        count += 1;
409
17
    }
410
24
    if let Some(value) = updates.name {
411
16
        document.insert("name", value);
412
16
        count += 1;
413
16
    }
414
24
    if let Some(value) = updates.info {
415
16
        document.insert(
416
16
            "info",
417
16
            match bson::to_document(value) {
418
                Err(_) => return None,
419
16
                Ok(doc) => doc,
420
16
            },
421
16
        );
422
16
        count += 1;
423
8
    }
424
24
    if count == 0 {
425
1
        return None;
426
23
    }
427
23
    Some(doc! {"$set": document})
428
24
}