1
use std::{error::Error as StdError, sync::Arc};
2

            
3
use async_trait::async_trait;
4
use futures::TryStreamExt;
5
use mongodb::{
6
    Cursor as MongoDbCursor, Database,
7
    action::Find,
8
    bson::{self, DateTime, Document, Regex, doc, raw::CString},
9
};
10
use serde::{Deserialize, Serialize};
11

            
12
use sylvia_iot_corelib::strings;
13

            
14
use super::super::unit::{
15
    Cursor, ListOptions, ListQueryCond, QueryCond, SortKey, Unit, UnitModel, UpdateQueryCond,
16
    Updates,
17
};
18

            
19
/// Model instance.
20
pub struct Model {
21
    /// The associated database connection.
22
    conn: Arc<Database>,
23
}
24

            
25
/// Cursor instance.
26
struct DbCursor {
27
    /// The associated collection cursor.
28
    cursor: MongoDbCursor<Schema>,
29
    /// (Useless) only for Cursor trait implementation.
30
    offset: u64,
31
}
32

            
33
/// MongoDB schema.
34
#[derive(Deserialize, Serialize)]
35
struct Schema {
36
    #[serde(rename = "unitId")]
37
    unit_id: String,
38
    code: String,
39
    #[serde(rename = "createdAt")]
40
    created_at: DateTime,
41
    #[serde(rename = "modifiedAt")]
42
    modified_at: DateTime,
43
    #[serde(rename = "ownerId")]
44
    owner_id: String,
45
    #[serde(rename = "memberIds")]
46
    member_ids: Vec<String>,
47
    name: String,
48
    info: Document,
49
}
50

            
51
const COL_NAME: &'static str = "unit";
52

            
53
impl Model {
54
    /// To create the model instance with a database connection.
55
24
    pub async fn new(conn: Arc<Database>) -> Result<Self, Box<dyn StdError>> {
56
24
        let model = Model { conn };
57
24
        model.init().await?;
58
24
        Ok(model)
59
24
    }
60
}
61

            
62
#[async_trait]
63
impl UnitModel for Model {
64
38
    async fn init(&self) -> Result<(), Box<dyn StdError>> {
65
        let indexes = vec![
66
            doc! {"name": "unitId_1", "key": {"unitId": 1}, "unique": true},
67
            doc! {"name": "code_1", "key": {"code": 1}, "unique": true},
68
            doc! {"name": "createdAt_1", "key": {"createdAt": 1}},
69
            doc! {"name": "modifiedAt_1", "key": {"modifiedAt": 1}},
70
            doc! {"name": "ownerId_1", "key": {"ownerId": 1}},
71
            doc! {"name": "memberIds_1", "key": {"memberIds": 1}},
72
            doc! {"name": "name_1", "key": {"name": 1}},
73
        ];
74
        let command = doc! {
75
            "createIndexes": COL_NAME,
76
            "indexes": indexes,
77
        };
78
        self.conn.run_command(command).await?;
79
        Ok(())
80
38
    }
81

            
82
56
    async fn count(&self, cond: &ListQueryCond) -> Result<u64, Box<dyn StdError>> {
83
        let filter = get_list_query_filter(cond);
84
        let count = self
85
            .conn
86
            .collection::<Schema>(COL_NAME)
87
            .count_documents(filter)
88
            .await?;
89
        Ok(count)
90
56
    }
91

            
92
    async fn list(
93
        &self,
94
        opts: &ListOptions,
95
        cursor: Option<Box<dyn Cursor>>,
96
166
    ) -> Result<(Vec<Unit>, Option<Box<dyn Cursor>>), Box<dyn StdError>> {
97
        let mut cursor = match cursor {
98
            None => {
99
                let filter = get_list_query_filter(opts.cond);
100
                Box::new(DbCursor::new(
101
                    build_find_options(opts, self.conn.collection::<Schema>(COL_NAME).find(filter))
102
                        .await?,
103
                ))
104
            }
105
            Some(cursor) => cursor,
106
        };
107

            
108
        let mut count: u64 = 0;
109
        let mut list = Vec::new();
110
        while let Some(item) = cursor.try_next().await? {
111
            list.push(item);
112
166
            if let Some(cursor_max) = opts.cursor_max {
113
                count += 1;
114
                if count >= cursor_max {
115
                    return Ok((list, Some(cursor)));
116
                }
117
            }
118
        }
119
        Ok((list, None))
120
166
    }
121

            
122
1336
    async fn get(&self, cond: &QueryCond) -> Result<Option<Unit>, Box<dyn StdError>> {
123
        let filter = get_query_filter(cond);
124
        let mut cursor = self
125
            .conn
126
            .collection::<Schema>(COL_NAME)
127
            .find(filter)
128
            .await?;
129
        if let Some(item) = cursor.try_next().await? {
130
            return Ok(Some(Unit {
131
                unit_id: item.unit_id,
132
                code: item.code,
133
                created_at: item.created_at.into(),
134
                modified_at: item.modified_at.into(),
135
                owner_id: item.owner_id,
136
                member_ids: item.member_ids,
137
                name: item.name,
138
                info: bson::deserialize_from_document(item.info)?,
139
            }));
140
        }
141
        Ok(None)
142
1336
    }
143

            
144
1394
    async fn add(&self, unit: &Unit) -> Result<(), Box<dyn StdError>> {
145
        let item = Schema {
146
            unit_id: unit.unit_id.clone(),
147
            code: unit.code.clone(),
148
            created_at: unit.created_at.into(),
149
            modified_at: unit.modified_at.into(),
150
            owner_id: unit.owner_id.clone(),
151
            member_ids: unit.member_ids.clone(),
152
            name: unit.name.clone(),
153
            info: bson::serialize_to_document(&unit.info)?,
154
        };
155
        self.conn
156
            .collection::<Schema>(COL_NAME)
157
            .insert_one(item)
158
            .await?;
159
        Ok(())
160
1394
    }
161

            
162
24
    async fn del(&self, cond: &QueryCond) -> Result<(), Box<dyn StdError>> {
163
        let filter = get_query_filter(cond);
164
        self.conn
165
            .collection::<Schema>(COL_NAME)
166
            .delete_many(filter)
167
            .await?;
168
        Ok(())
169
24
    }
170

            
171
    async fn update(
172
        &self,
173
        cond: &UpdateQueryCond,
174
        updates: &Updates,
175
20
    ) -> Result<(), Box<dyn StdError>> {
176
        let filter = get_update_query_filter(cond);
177
        if let Some(updates) = get_update_doc(updates) {
178
            self.conn
179
                .collection::<Schema>(COL_NAME)
180
                .update_one(filter, updates)
181
                .await?;
182
        }
183
        return Ok(());
184
20
    }
185
}
186

            
187
impl DbCursor {
188
    /// To create the cursor instance with a collection cursor.
189
144
    pub fn new(cursor: MongoDbCursor<Schema>) -> Self {
190
144
        DbCursor { cursor, offset: 0 }
191
144
    }
192
}
193

            
194
#[async_trait]
195
impl Cursor for DbCursor {
196
2122
    async fn try_next(&mut self) -> Result<Option<Unit>, Box<dyn StdError>> {
197
        if let Some(item) = self.cursor.try_next().await? {
198
            self.offset += 1;
199
            return Ok(Some(Unit {
200
                unit_id: item.unit_id,
201
                code: item.code,
202
                created_at: item.created_at.into(),
203
                modified_at: item.modified_at.into(),
204
                owner_id: item.owner_id,
205
                member_ids: item.member_ids,
206
                name: item.name,
207
                info: bson::deserialize_from_document(item.info)?,
208
            }));
209
        }
210
        Ok(None)
211
2122
    }
212

            
213
8
    fn offset(&self) -> u64 {
214
8
        self.offset
215
8
    }
216
}
217

            
218
/// Transforms query conditions to the MongoDB document.
219
1360
fn get_query_filter(cond: &QueryCond) -> Document {
220
1360
    let mut filter = Document::new();
221
1360
    if let Some(value) = cond.unit_id {
222
1332
        filter.insert("unitId", value);
223
1332
    }
224
1360
    if let Some(value) = cond.code {
225
26
        filter.insert("code", value);
226
1334
    }
227
1360
    if let Some(value) = cond.owner_id {
228
284
        filter.insert("ownerId", value);
229
1076
    }
230
1360
    if let Some(value) = cond.member_id {
231
436
        filter.insert("memberIds", value);
232
924
    }
233
1360
    filter
234
1360
}
235

            
236
/// Transforms query conditions to the MongoDB document.
237
200
fn get_list_query_filter(cond: &ListQueryCond) -> Document {
238
200
    let mut filter = Document::new();
239
200
    if let Some(value) = cond.owner_id {
240
48
        filter.insert("ownerId", value);
241
152
    }
242
200
    if let Some(value) = cond.member_id {
243
74
        filter.insert("memberIds", value);
244
126
    }
245
200
    if let Some(value) = cond.unit_id {
246
12
        filter.insert("unitId", value);
247
188
    }
248
200
    if let Some(value) = cond.code_contains {
249
40
        let escaped = strings::escape_regex_str(value);
250
40
        if let Ok(pattern) = CString::try_from(escaped.as_str()) {
251
40
            if let Ok(options) = CString::try_from("i") {
252
40
                filter.insert("code", Regex { pattern, options });
253
40
            }
254
        }
255
160
    }
256
200
    if let Some(value) = cond.name_contains {
257
16
        let escaped = strings::escape_regex_str(value);
258
16
        if let Ok(pattern) = CString::try_from(escaped.as_str()) {
259
16
            if let Ok(options) = CString::try_from("i") {
260
16
                filter.insert("name", Regex { pattern, options });
261
16
            }
262
        }
263
184
    }
264
200
    filter
265
200
}
266

            
267
/// Transforms model options to the options.
268
144
fn build_find_options<'a, T>(opts: &ListOptions, mut find: Find<'a, T>) -> Find<'a, T>
269
144
where
270
144
    T: Send + Sync,
271
{
272
144
    if let Some(offset) = opts.offset {
273
20
        find = find.skip(offset);
274
124
    }
275
144
    if let Some(limit) = opts.limit {
276
76
        if limit > 0 {
277
74
            find = find.limit(limit as i64);
278
74
        }
279
68
    }
280
144
    if let Some(sort_list) = opts.sort.as_ref() {
281
106
        if sort_list.len() > 0 {
282
104
            let mut sort_opts = Document::new();
283
108
            for cond in sort_list.iter() {
284
108
                let key = match cond.key {
285
12
                    SortKey::CreatedAt => "createdAt",
286
8
                    SortKey::ModifiedAt => "modifiedAt",
287
76
                    SortKey::Code => "code",
288
12
                    SortKey::Name => "name",
289
                };
290
108
                if cond.asc {
291
90
                    sort_opts.insert(key.to_string(), 1);
292
90
                } else {
293
18
                    sort_opts.insert(key.to_string(), -1);
294
18
                }
295
            }
296
104
            find = find.sort(sort_opts);
297
2
        }
298
38
    }
299
144
    find
300
144
}
301

            
302
/// Transforms query conditions to the MongoDB document.
303
20
fn get_update_query_filter(cond: &UpdateQueryCond) -> Document {
304
20
    doc! {"unitId": cond.unit_id}
305
20
}
306

            
307
/// Transforms the model object to the MongoDB document.
308
20
fn get_update_doc(updates: &Updates) -> Option<Document> {
309
20
    let mut count = 0;
310
20
    let mut document = Document::new();
311
20
    if let Some(value) = updates.modified_at.as_ref() {
312
18
        document.insert(
313
18
            "modifiedAt",
314
18
            DateTime::from_millis(value.timestamp_millis()),
315
18
        );
316
18
        count += 1;
317
18
    }
318
20
    if let Some(value) = updates.owner_id {
319
10
        document.insert("ownerId", value);
320
10
        count += 1;
321
10
    }
322
20
    if let Some(value) = updates.member_ids {
323
10
        document.insert("memberIds", value);
324
10
        count += 1;
325
10
    }
326
20
    if let Some(value) = updates.name {
327
12
        document.insert("name", value);
328
12
        count += 1;
329
12
    }
330
20
    if let Some(value) = updates.info {
331
12
        document.insert(
332
            "info",
333
12
            match bson::serialize_to_document(value) {
334
                Err(_) => return None,
335
12
                Ok(doc) => doc,
336
            },
337
        );
338
12
        count += 1;
339
8
    }
340
20
    if count == 0 {
341
2
        return None;
342
18
    }
343
18
    Some(doc! {"$set": document})
344
20
}