1
use std::{error::Error as StdError, sync::Arc};
2

            
3
use async_trait::async_trait;
4
use futures::TryStreamExt;
5
use mongodb::{
6
    Cursor as MongoDbCursor, Database,
7
    action::Find,
8
    bson::{Bson, DateTime, Document, Regex, doc, raw::CString},
9
};
10
use serde::{Deserialize, Serialize};
11

            
12
use sylvia_iot_corelib::strings;
13

            
14
use super::super::client::{
15
    Client, ClientModel, Cursor, ListOptions, ListQueryCond, QueryCond, SortKey, UpdateQueryCond,
16
    Updates,
17
};
18

            
19
/// Model instance.
20
pub struct Model {
21
    /// The associated database connection.
22
    conn: Arc<Database>,
23
}
24

            
25
/// Cursor instance.
26
struct DbCursor {
27
    /// The associated collection cursor.
28
    cursor: MongoDbCursor<Schema>,
29
    /// (Useless) only for Cursor trait implementation.
30
    offset: u64,
31
}
32

            
33
/// MongoDB schema.
34
#[derive(Deserialize, Serialize)]
35
struct Schema {
36
    #[serde(rename = "clientId")]
37
    client_id: String,
38
    #[serde(rename = "createdAt")]
39
    created_at: DateTime,
40
    #[serde(rename = "modifiedAt")]
41
    modified_at: DateTime,
42
    #[serde(rename = "clientSecret")]
43
    client_secret: Option<String>,
44
    #[serde(rename = "redirectUris")]
45
    redirect_uris: Vec<String>,
46
    scopes: Vec<String>,
47
    #[serde(rename = "userId")]
48
    user_id: String,
49
    name: String,
50
    #[serde(rename = "imageUrl")]
51
    image_url: Option<String>,
52
}
53

            
54
const COL_NAME: &'static str = "client";
55

            
56
impl Model {
57
    /// To create the model instance with a database connection.
58
14
    pub async fn new(conn: Arc<Database>) -> Result<Self, Box<dyn StdError>> {
59
14
        let model = Model { conn };
60
14
        model.init().await?;
61
14
        Ok(model)
62
14
    }
63
}
64

            
65
#[async_trait]
66
impl ClientModel for Model {
67
24
    async fn init(&self) -> Result<(), Box<dyn StdError>> {
68
        let indexes = vec![
69
            doc! {"name": "clientId_1", "key": {"clientId": 1}, "unique": true},
70
            doc! {"name": "createdAt_1", "key": {"createdAt": 1}},
71
            doc! {"name": "modifiedAt_1", "key": {"modifiedAt": 1}},
72
            doc! {"name": "userId_1", "key": {"userId": 1}},
73
            doc! {"name": "name_1", "key": {"name": 1}},
74
        ];
75
        let command = doc! {
76
            "createIndexes": COL_NAME,
77
            "indexes": indexes,
78
        };
79
        self.conn.run_command(command).await?;
80
        Ok(())
81
24
    }
82

            
83
20
    async fn count(&self, cond: &ListQueryCond) -> Result<u64, Box<dyn StdError>> {
84
        let filter = get_list_query_filter(cond);
85
        let count = self
86
            .conn
87
            .collection::<Schema>(COL_NAME)
88
            .count_documents(filter)
89
            .await?;
90
        Ok(count)
91
20
    }
92

            
93
    async fn list(
94
        &self,
95
        opts: &ListOptions,
96
        cursor: Option<Box<dyn Cursor>>,
97
114
    ) -> Result<(Vec<Client>, Option<Box<dyn Cursor>>), Box<dyn StdError>> {
98
        let mut cursor = match cursor {
99
            None => {
100
                let filter = get_list_query_filter(opts.cond);
101
                Box::new(DbCursor::new(
102
                    build_find_options(opts, self.conn.collection::<Schema>(COL_NAME).find(filter))
103
                        .await?,
104
                ))
105
            }
106
            Some(cursor) => cursor,
107
        };
108

            
109
        let mut count: u64 = 0;
110
        let mut list = Vec::new();
111
        while let Some(item) = cursor.try_next().await? {
112
            list.push(item);
113
114
            if let Some(cursor_max) = opts.cursor_max {
114
                count += 1;
115
                if count >= cursor_max {
116
                    return Ok((list, Some(cursor)));
117
                }
118
            }
119
        }
120
        Ok((list, None))
121
114
    }
122

            
123
1726
    async fn get(&self, cond: &QueryCond) -> Result<Option<Client>, Box<dyn StdError>> {
124
        let filter = get_query_filter(cond);
125
        let mut cursor = self
126
            .conn
127
            .collection::<Schema>(COL_NAME)
128
            .find(filter)
129
            .await?;
130
        if let Some(item) = cursor.try_next().await? {
131
            return Ok(Some(Client {
132
                client_id: item.client_id,
133
                created_at: item.created_at.into(),
134
                modified_at: item.modified_at.into(),
135
                client_secret: item.client_secret,
136
                redirect_uris: item.redirect_uris,
137
                scopes: item.scopes,
138
                user_id: item.user_id,
139
                name: item.name,
140
                image_url: item.image_url,
141
            }));
142
        }
143
        Ok(None)
144
1726
    }
145

            
146
1068
    async fn add(&self, client: &Client) -> Result<(), Box<dyn StdError>> {
147
        let item = Schema {
148
            client_id: client.client_id.clone(),
149
            created_at: client.created_at.into(),
150
            modified_at: client.modified_at.into(),
151
            client_secret: client.client_secret.clone(),
152
            redirect_uris: client.redirect_uris.clone(),
153
            scopes: client.scopes.clone(),
154
            user_id: client.user_id.clone(),
155
            name: client.name.clone(),
156
            image_url: client.image_url.clone(),
157
        };
158
        self.conn
159
            .collection::<Schema>(COL_NAME)
160
            .insert_one(item)
161
            .await?;
162
        Ok(())
163
1068
    }
164

            
165
20
    async fn del(&self, cond: &QueryCond) -> Result<(), Box<dyn StdError>> {
166
        let filter = get_query_filter(cond);
167
        self.conn
168
            .collection::<Schema>(COL_NAME)
169
            .delete_many(filter)
170
            .await?;
171
        Ok(())
172
20
    }
173

            
174
    async fn update(
175
        &self,
176
        cond: &UpdateQueryCond,
177
        updates: &Updates,
178
28
    ) -> Result<(), Box<dyn StdError>> {
179
        let filter = get_update_query_filter(cond);
180
        if let Some(updates) = get_update_doc(updates) {
181
            self.conn
182
                .collection::<Schema>(COL_NAME)
183
                .update_one(filter, updates)
184
                .await?;
185
        }
186
        return Ok(());
187
28
    }
188
}
189

            
190
impl DbCursor {
191
    /// To create the cursor instance with a collection cursor.
192
92
    pub fn new(cursor: MongoDbCursor<Schema>) -> Self {
193
92
        DbCursor { cursor, offset: 0 }
194
92
    }
195
}
196

            
197
#[async_trait]
198
impl Cursor for DbCursor {
199
1970
    async fn try_next(&mut self) -> Result<Option<Client>, Box<dyn StdError>> {
200
        if let Some(item) = self.cursor.try_next().await? {
201
            self.offset += 1;
202
            return Ok(Some(Client {
203
                client_id: item.client_id,
204
                created_at: item.created_at.into(),
205
                modified_at: item.modified_at.into(),
206
                client_secret: item.client_secret,
207
                redirect_uris: item.redirect_uris,
208
                scopes: item.scopes,
209
                user_id: item.user_id,
210
                name: item.name,
211
                image_url: item.image_url,
212
            }));
213
        }
214
        Ok(None)
215
1970
    }
216

            
217
8
    fn offset(&self) -> u64 {
218
8
        self.offset
219
8
    }
220
}
221

            
222
/// Transforms query conditions to the MongoDB document.
223
1746
fn get_query_filter(cond: &QueryCond) -> Document {
224
1746
    let mut filter = Document::new();
225
1746
    if let Some(value) = cond.user_id {
226
24
        filter.insert("userId", value);
227
1722
    }
228
1746
    if let Some(value) = cond.client_id {
229
1742
        filter.insert("clientId", value);
230
1742
    }
231
1746
    filter
232
1746
}
233

            
234
/// Transforms query conditions to the MongoDB document.
235
112
fn get_list_query_filter(cond: &ListQueryCond) -> Document {
236
112
    let mut filter = Document::new();
237
112
    if let Some(value) = cond.user_id {
238
50
        filter.insert("userId", value);
239
62
    }
240
112
    if let Some(value) = cond.client_id {
241
8
        filter.insert("clientId", value);
242
104
    }
243
112
    if let Some(value) = cond.name_contains {
244
16
        let escaped = strings::escape_regex_str(value);
245
16
        if let Ok(pattern) = CString::try_from(escaped.as_str()) {
246
16
            if let Ok(options) = CString::try_from("i") {
247
16
                filter.insert("name", Regex { pattern, options });
248
16
            }
249
        }
250
96
    }
251
112
    filter
252
112
}
253

            
254
/// Transforms model options to the options.
255
92
fn build_find_options<'a, T>(opts: &ListOptions, mut find: Find<'a, T>) -> Find<'a, T>
256
92
where
257
92
    T: Send + Sync,
258
{
259
92
    if let Some(offset) = opts.offset {
260
20
        find = find.skip(offset);
261
72
    }
262
92
    if let Some(limit) = opts.limit {
263
50
        if limit > 0 {
264
48
            find = find.limit(limit as i64);
265
48
        }
266
42
    }
267
92
    if let Some(sort_list) = opts.sort.as_ref() {
268
76
        if sort_list.len() > 0 {
269
74
            let mut sort_opts = Document::new();
270
88
            for cond in sort_list.iter() {
271
88
                let key = match cond.key {
272
22
                    SortKey::CreatedAt => "createdAt",
273
8
                    SortKey::ModifiedAt => "modifiedAt",
274
58
                    SortKey::Name => "name",
275
                };
276
88
                if cond.asc {
277
74
                    sort_opts.insert(key.to_string(), 1);
278
74
                } else {
279
14
                    sort_opts.insert(key.to_string(), -1);
280
14
                }
281
            }
282
74
            find = find.sort(sort_opts);
283
2
        }
284
16
    }
285
92
    find
286
92
}
287

            
288
/// Transforms query conditions to the MongoDB document.
289
28
fn get_update_query_filter(cond: &UpdateQueryCond) -> Document {
290
28
    doc! {
291
28
        "userId": cond.user_id,
292
28
        "clientId": cond.client_id,
293
    }
294
28
}
295

            
296
/// Transforms the model object to the MongoDB document.
297
28
fn get_update_doc(updates: &Updates) -> Option<Document> {
298
28
    let mut count = 0;
299
28
    let mut document = Document::new();
300
28
    if let Some(value) = updates.modified_at.as_ref() {
301
26
        document.insert(
302
26
            "modifiedAt",
303
26
            DateTime::from_millis(value.timestamp_millis()),
304
26
        );
305
26
        count += 1;
306
26
    }
307
28
    if let Some(value) = updates.client_secret.as_ref() {
308
6
        match value {
309
2
            None => {
310
2
                document.insert("clientSecret", Bson::Null);
311
2
            }
312
4
            Some(value) => {
313
4
                document.insert("clientSecret", value);
314
4
            }
315
        }
316
6
        count += 1;
317
22
    }
318
28
    if let Some(value) = updates.redirect_uris {
319
16
        document.insert("redirectUris", value);
320
16
        count += 1;
321
16
    }
322
28
    if let Some(value) = updates.scopes {
323
16
        document.insert("scopes", value);
324
16
        count += 1;
325
16
    }
326
28
    if let Some(value) = updates.name {
327
10
        document.insert("name", value);
328
10
        count += 1;
329
18
    }
330
28
    if let Some(value) = updates.image_url.as_ref() {
331
16
        match value {
332
8
            None => {
333
8
                document.insert("imageUrl", Bson::Null);
334
8
            }
335
8
            Some(value) => {
336
8
                document.insert("imageUrl", value);
337
8
            }
338
        }
339
16
        count += 1;
340
12
    }
341
28
    if count == 0 {
342
2
        return None;
343
26
    }
344
26
    Some(doc! {"$set": document})
345
28
}