1
use std::{collections::HashMap, error::Error as StdError, sync::Arc};
2

            
3
use async_trait::async_trait;
4
use futures::TryStreamExt;
5
use mongodb::{
6
    Cursor as MongoDbCursor, Database,
7
    action::Find,
8
    bson::{self, Bson, DateTime, Document, Regex, doc, raw::CString},
9
};
10
use serde::{Deserialize, Serialize};
11

            
12
use sylvia_iot_corelib::strings;
13

            
14
use super::super::user::{
15
    Cursor, ListOptions, ListQueryCond, QueryCond, SortKey, Updates, User, UserModel,
16
};
17

            
18
/// Model instance.
19
pub struct Model {
20
    /// The associated database connection.
21
    conn: Arc<Database>,
22
}
23

            
24
/// Cursor instance.
25
struct DbCursor {
26
    /// The associated collection cursor.
27
    cursor: MongoDbCursor<Schema>,
28
    /// (Useless) only for Cursor trait implementation.
29
    offset: u64,
30
}
31

            
32
/// MongoDB schema.
33
#[derive(Deserialize, Serialize)]
34
struct Schema {
35
    #[serde(rename = "userId")]
36
    user_id: String,
37
    account: String,
38
    #[serde(rename = "createdAt")]
39
    created_at: DateTime,
40
    #[serde(rename = "modifiedAt")]
41
    modified_at: DateTime,
42
    #[serde(rename = "verifiedAt")]
43
    verified_at: Option<DateTime>,
44
    #[serde(rename = "expiredAt")]
45
    expired_at: Option<DateTime>,
46
    #[serde(rename = "disabledAt")]
47
    disabled_at: Option<DateTime>,
48
    roles: HashMap<String, bool>,
49
    password: String,
50
    salt: String,
51
    name: String,
52
    info: Document,
53
}
54

            
55
const COL_NAME: &'static str = "user";
56

            
57
impl Model {
58
    /// To create the model instance with a database connection.
59
14
    pub async fn new(conn: Arc<Database>) -> Result<Self, Box<dyn StdError>> {
60
14
        let model = Model { conn };
61
14
        model.init().await?;
62
14
        Ok(model)
63
14
    }
64
}
65

            
66
#[async_trait]
67
impl UserModel for Model {
68
24
    async fn init(&self) -> Result<(), Box<dyn StdError>> {
69
        let indexes = vec![
70
            doc! {"name": "userId_1", "key": {"userId": 1}, "unique": true},
71
            doc! {"name": "account_1", "key": {"account": 1}, "unique": true},
72
            doc! {"name": "createdAt_1", "key": {"createdAt": 1}},
73
            doc! {"name": "modifiedAt_1", "key": {"modifiedAt": 1}},
74
            doc! {"name": "verifiedAt_1", "key": {"verifiedAt": 1}},
75
            doc! {"name": "expiredAt_1", "key": {"expiredAt": 1}},
76
            doc! {"name": "disabledAt_1", "key": {"disabledAt": 1}},
77
            doc! {"name": "name_1", "key": {"name": 1}},
78
        ];
79
        let command = doc! {
80
            "createIndexes": COL_NAME,
81
            "indexes": indexes,
82
        };
83
        self.conn.run_command(command).await?;
84
        Ok(())
85
24
    }
86

            
87
42
    async fn count(&self, cond: &ListQueryCond) -> Result<u64, Box<dyn StdError>> {
88
        let filter = get_list_query_filter(cond);
89
        let count = self
90
            .conn
91
            .collection::<Schema>(COL_NAME)
92
            .count_documents(filter)
93
            .await?;
94
        Ok(count)
95
42
    }
96

            
97
    async fn list(
98
        &self,
99
        opts: &ListOptions,
100
        cursor: Option<Box<dyn Cursor>>,
101
166
    ) -> Result<(Vec<User>, Option<Box<dyn Cursor>>), Box<dyn StdError>> {
102
        let mut cursor = match cursor {
103
            None => {
104
                let filter = get_list_query_filter(opts.cond);
105
                Box::new(DbCursor::new(
106
                    build_find_options(opts, self.conn.collection::<Schema>(COL_NAME).find(filter))
107
                        .await?,
108
                ))
109
            }
110
            Some(cursor) => cursor,
111
        };
112

            
113
        let mut count: u64 = 0;
114
        let mut list = Vec::new();
115
        while let Some(item) = cursor.try_next().await? {
116
            list.push(item);
117
166
            if let Some(cursor_max) = opts.cursor_max {
118
                count += 1;
119
                if count >= cursor_max {
120
                    return Ok((list, Some(cursor)));
121
                }
122
            }
123
        }
124
        Ok((list, None))
125
166
    }
126

            
127
898
    async fn get(&self, cond: &QueryCond) -> Result<Option<User>, Box<dyn StdError>> {
128
        let filter = get_query_filter(cond);
129
        let mut cursor = self
130
            .conn
131
            .collection::<Schema>(COL_NAME)
132
            .find(filter)
133
            .await?;
134
        if let Some(user) = cursor.try_next().await? {
135
            return Ok(Some(User {
136
                user_id: user.user_id,
137
                account: user.account,
138
                created_at: user.created_at.into(),
139
                modified_at: user.modified_at.into(),
140
                verified_at: match user.verified_at {
141
                    None => None,
142
                    Some(value) => Some(value.into()),
143
                },
144
                expired_at: match user.expired_at {
145
                    None => None,
146
                    Some(value) => Some(value.into()),
147
                },
148
                disabled_at: match user.disabled_at {
149
                    None => None,
150
                    Some(value) => Some(value.into()),
151
                },
152
                roles: user.roles,
153
                password: user.password,
154
                salt: user.salt,
155
                name: user.name,
156
                info: bson::deserialize_from_document(user.info)?,
157
            }));
158
        }
159
        Ok(None)
160
898
    }
161

            
162
1098
    async fn add(&self, user: &User) -> Result<(), Box<dyn StdError>> {
163
        let item = Schema {
164
            user_id: user.user_id.clone(),
165
            account: user.account.to_lowercase(),
166
            created_at: user.created_at.into(),
167
            modified_at: user.modified_at.into(),
168
1098
            verified_at: match user.verified_at {
169
                None => None,
170
                Some(value) => Some(value.into()),
171
            },
172
1098
            expired_at: match user.expired_at {
173
                None => None,
174
                Some(value) => Some(value.into()),
175
            },
176
1098
            disabled_at: match user.disabled_at {
177
                None => None,
178
                Some(value) => Some(value.into()),
179
            },
180
            roles: user.roles.clone(),
181
            password: user.password.clone(),
182
            salt: user.salt.clone(),
183
            name: user.name.clone(),
184
            info: bson::serialize_to_document(&user.info)?,
185
        };
186
        self.conn
187
            .collection::<Schema>(COL_NAME)
188
            .insert_one(item)
189
            .await?;
190
        Ok(())
191
1098
    }
192

            
193
10
    async fn del(&self, user_id: &str) -> Result<(), Box<dyn StdError>> {
194
        let filter = doc! {"userId": user_id};
195
        self.conn
196
            .collection::<Schema>(COL_NAME)
197
            .delete_one(filter)
198
            .await?;
199
        Ok(())
200
10
    }
201

            
202
48
    async fn update(&self, user_id: &str, updates: &Updates) -> Result<(), Box<dyn StdError>> {
203
        let filter = doc! {"userId": user_id};
204
        if let Some(updates) = get_update_doc(updates) {
205
            self.conn
206
                .collection::<Schema>(COL_NAME)
207
                .update_one(filter, updates)
208
                .await?;
209
        }
210
        return Ok(());
211
48
    }
212
}
213

            
214
impl DbCursor {
215
    /// To create the cursor instance with a collection cursor.
216
144
    pub fn new(cursor: MongoDbCursor<Schema>) -> Self {
217
144
        DbCursor { cursor, offset: 0 }
218
144
    }
219
}
220

            
221
#[async_trait]
222
impl Cursor for DbCursor {
223
2416
    async fn try_next(&mut self) -> Result<Option<User>, Box<dyn StdError>> {
224
        if let Some(item) = self.cursor.try_next().await? {
225
            self.offset += 1;
226
            return Ok(Some(User {
227
                user_id: item.user_id,
228
                account: item.account,
229
                created_at: item.created_at.into(),
230
                modified_at: item.modified_at.into(),
231
                verified_at: match item.verified_at {
232
                    None => None,
233
                    Some(value) => Some(value.into()),
234
                },
235
                expired_at: match item.expired_at {
236
                    None => None,
237
                    Some(value) => Some(value.into()),
238
                },
239
                disabled_at: match item.disabled_at {
240
                    None => None,
241
                    Some(value) => Some(value.into()),
242
                },
243
                roles: item.roles,
244
                password: item.password,
245
                salt: item.salt,
246
                name: item.name,
247
                info: bson::deserialize_from_document(item.info)?,
248
            }));
249
        }
250
        Ok(None)
251
2416
    }
252

            
253
8
    fn offset(&self) -> u64 {
254
8
        self.offset
255
8
    }
256
}
257

            
258
/// Transforms query conditions to the MongoDB document.
259
898
fn get_query_filter(cond: &QueryCond) -> Document {
260
898
    let mut filter = Document::new();
261
898
    if let Some(value) = cond.user_id {
262
598
        filter.insert("userId", value);
263
598
    }
264
898
    if let Some(value) = cond.account {
265
300
        filter.insert("account", value.to_lowercase().as_str());
266
598
    }
267
898
    filter
268
898
}
269

            
270
/// Transforms query conditions to the MongoDB document.
271
186
fn get_list_query_filter(cond: &ListQueryCond) -> Document {
272
186
    let mut filter = Document::new();
273
186
    if let Some(value) = cond.user_id {
274
4
        filter.insert("userId", value);
275
182
    }
276
186
    if let Some(value) = cond.account {
277
22
        filter.insert("account", value.to_lowercase().as_str());
278
164
    }
279
186
    if let Some(value) = cond.account_contains {
280
60
        let escaped = strings::escape_regex_str(value);
281
60
        if let Ok(pattern) = CString::try_from(escaped.as_str()) {
282
60
            if let Ok(options) = CString::try_from("i") {
283
60
                filter.insert("account", Regex { pattern, options });
284
60
            }
285
        }
286
126
    }
287
186
    if let Some(value) = cond.verified_at {
288
12
        if value {
289
8
            filter.insert("verifiedAt", doc! {"$ne": Bson::Null});
290
8
        } else {
291
4
            filter.insert("verifiedAt", Bson::Null);
292
4
        }
293
174
    }
294
186
    if let Some(value) = cond.disabled_at {
295
8
        if value {
296
4
            filter.insert("disabledAt", doc! {"$ne": Bson::Null});
297
4
        } else {
298
4
            filter.insert("disabledAt", Bson::Null);
299
4
        }
300
178
    }
301
186
    if let Some(value) = cond.name_contains {
302
12
        let escaped = strings::escape_regex_str(value);
303
12
        if let Ok(pattern) = CString::try_from(escaped.as_str()) {
304
12
            if let Ok(options) = CString::try_from("i") {
305
12
                filter.insert("name", Regex { pattern, options });
306
12
            }
307
        }
308
174
    }
309
186
    filter
310
186
}
311

            
312
/// Transforms model options to the options.
313
144
fn build_find_options<'a, T>(opts: &ListOptions, mut find: Find<'a, T>) -> Find<'a, T>
314
144
where
315
144
    T: Send + Sync,
316
{
317
144
    if let Some(offset) = opts.offset {
318
20
        find = find.skip(offset);
319
124
    }
320
144
    if let Some(limit) = opts.limit {
321
76
        if limit > 0 {
322
74
            find = find.limit(limit as i64);
323
74
        }
324
68
    }
325
144
    if let Some(sort_list) = opts.sort.as_ref() {
326
120
        if sort_list.len() > 0 {
327
118
            let mut sort_opts = Document::new();
328
130
            for cond in sort_list.iter() {
329
130
                let key = match cond.key {
330
82
                    SortKey::Account => "account",
331
12
                    SortKey::CreatedAt => "createdAt",
332
8
                    SortKey::ModifiedAt => "modifiedAt",
333
8
                    SortKey::VerifiedAt => "verifiedAt",
334
4
                    SortKey::ExpiredAt => "expiredAt",
335
4
                    SortKey::DisabledAt => "disabledAt",
336
12
                    SortKey::Name => "name",
337
                };
338
130
                if cond.asc {
339
104
                    sort_opts.insert(key.to_string(), 1);
340
104
                } else {
341
26
                    sort_opts.insert(key.to_string(), -1);
342
26
                }
343
            }
344
118
            find = find.sort(sort_opts);
345
2
        }
346
24
    }
347
144
    find
348
144
}
349

            
350
/// Transforms the model object to the MongoDB document.
351
48
fn get_update_doc(updates: &Updates) -> Option<Document> {
352
48
    let mut count = 0;
353
48
    let mut document = Document::new();
354
48
    if let Some(value) = updates.modified_at.as_ref() {
355
46
        document.insert(
356
46
            "modifiedAt",
357
46
            DateTime::from_millis(value.timestamp_millis()),
358
46
        );
359
46
        count += 1;
360
46
    }
361
48
    if let Some(value) = updates.verified_at.as_ref() {
362
8
        document.insert(
363
8
            "verifiedAt",
364
8
            DateTime::from_millis(value.timestamp_millis()),
365
8
        );
366
8
        count += 1;
367
40
    }
368
48
    if let Some(value) = updates.expired_at.as_ref() {
369
8
        match value {
370
6
            None => {
371
6
                document.insert("expiredAt", Bson::Null);
372
6
            }
373
2
            Some(value) => {
374
2
                document.insert("expiredAt", DateTime::from_millis(value.timestamp_millis()));
375
2
            }
376
        }
377
8
        count += 1;
378
40
    }
379
48
    if let Some(value) = updates.disabled_at.as_ref() {
380
16
        match value {
381
8
            None => {
382
8
                document.insert("disabledAt", Bson::Null);
383
8
            }
384
8
            Some(value) => {
385
8
                document.insert(
386
8
                    "disabledAt",
387
8
                    DateTime::from_millis(value.timestamp_millis()),
388
8
                );
389
8
            }
390
        }
391
16
        count += 1;
392
32
    }
393
48
    if let Some(value) = updates.roles {
394
12
        let mut doc = Document::new();
395
22
        for (k, v) in value {
396
22
            doc.insert(k, v);
397
22
        }
398
12
        document.insert("roles", doc);
399
12
        count += 1;
400
36
    }
401
48
    if let Some(value) = updates.password.as_ref() {
402
14
        document.insert("password", value);
403
14
        count += 1;
404
34
    }
405
48
    if let Some(value) = updates.salt.as_ref() {
406
14
        document.insert("salt", value);
407
14
        count += 1;
408
34
    }
409
48
    if let Some(value) = updates.name {
410
32
        document.insert("name", value);
411
32
        count += 1;
412
32
    }
413
48
    if let Some(value) = updates.info {
414
32
        document.insert(
415
            "info",
416
32
            match bson::serialize_to_document(value) {
417
                Err(_) => return None,
418
32
                Ok(doc) => doc,
419
            },
420
        );
421
32
        count += 1;
422
16
    }
423
48
    if count == 0 {
424
2
        return None;
425
46
    }
426
46
    Some(doc! {"$set": document})
427
48
}