1
use std::{error::Error as StdError, sync::Arc};
2

            
3
use async_trait::async_trait;
4
use futures::TryStreamExt;
5
use mongodb::{
6
    action::Find,
7
    bson::{self, doc, DateTime, Document, Regex},
8
    Cursor as MongoDbCursor, Database,
9
};
10
use serde::{Deserialize, Serialize};
11

            
12
use super::super::unit::{
13
    Cursor, ListOptions, ListQueryCond, QueryCond, SortKey, Unit, UnitModel, UpdateQueryCond,
14
    Updates,
15
};
16

            
17
/// Model instance.
18
pub struct Model {
19
    /// The associated database connection.
20
    conn: Arc<Database>,
21
}
22

            
23
/// Cursor instance.
24
struct DbCursor {
25
    /// The associated collection cursor.
26
    cursor: MongoDbCursor<Schema>,
27
    /// (Useless) only for Cursor trait implementation.
28
    offset: u64,
29
}
30

            
31
/// MongoDB schema.
32
#[derive(Deserialize, Serialize)]
33
struct Schema {
34
    #[serde(rename = "unitId")]
35
    unit_id: String,
36
    code: String,
37
    #[serde(rename = "createdAt")]
38
    created_at: DateTime,
39
    #[serde(rename = "modifiedAt")]
40
    modified_at: DateTime,
41
    #[serde(rename = "ownerId")]
42
    owner_id: String,
43
    #[serde(rename = "memberIds")]
44
    member_ids: Vec<String>,
45
    name: String,
46
    info: Document,
47
}
48

            
49
const COL_NAME: &'static str = "unit";
50

            
51
impl Model {
52
    /// To create the model instance with a database connection.
53
24
    pub async fn new(conn: Arc<Database>) -> Result<Self, Box<dyn StdError>> {
54
24
        let model = Model { conn };
55
24
        model.init().await?;
56
24
        Ok(model)
57
24
    }
58
}
59

            
60
#[async_trait]
61
impl UnitModel for Model {
62
38
    async fn init(&self) -> Result<(), Box<dyn StdError>> {
63
38
        let indexes = vec![
64
38
            doc! {"name": "unitId_1", "key": {"unitId": 1}, "unique": true},
65
38
            doc! {"name": "code_1", "key": {"code": 1}, "unique": true},
66
38
            doc! {"name": "createdAt_1", "key": {"createdAt": 1}},
67
38
            doc! {"name": "modifiedAt_1", "key": {"modifiedAt": 1}},
68
38
            doc! {"name": "ownerId_1", "key": {"ownerId": 1}},
69
38
            doc! {"name": "memberIds_1", "key": {"memberIds": 1}},
70
38
            doc! {"name": "name_1", "key": {"name": 1}},
71
38
        ];
72
38
        let command = doc! {
73
38
            "createIndexes": COL_NAME,
74
38
            "indexes": indexes,
75
38
        };
76
38
        self.conn.run_command(command).await?;
77
38
        Ok(())
78
76
    }
79

            
80
56
    async fn count(&self, cond: &ListQueryCond) -> Result<u64, Box<dyn StdError>> {
81
56
        let filter = get_list_query_filter(cond);
82
56
        let count = self
83
56
            .conn
84
56
            .collection::<Schema>(COL_NAME)
85
56
            .count_documents(filter)
86
56
            .await?;
87
56
        Ok(count)
88
112
    }
89

            
90
    async fn list(
91
        &self,
92
        opts: &ListOptions,
93
        cursor: Option<Box<dyn Cursor>>,
94
166
    ) -> Result<(Vec<Unit>, Option<Box<dyn Cursor>>), Box<dyn StdError>> {
95
166
        let mut cursor = match cursor {
96
            None => {
97
144
                let filter = get_list_query_filter(opts.cond);
98
144
                Box::new(DbCursor::new(
99
144
                    build_find_options(opts, self.conn.collection::<Schema>(COL_NAME).find(filter))
100
144
                        .await?,
101
                ))
102
            }
103
22
            Some(cursor) => cursor,
104
        };
105

            
106
166
        let mut count: u64 = 0;
107
166
        let mut list = Vec::new();
108
2122
        while let Some(item) = cursor.try_next().await? {
109
1978
            list.push(item);
110
1978
            if let Some(cursor_max) = opts.cursor_max {
111
1810
                count += 1;
112
1810
                if count >= cursor_max {
113
22
                    return Ok((list, Some(cursor)));
114
1788
                }
115
168
            }
116
        }
117
144
        Ok((list, None))
118
332
    }
119

            
120
1336
    async fn get(&self, cond: &QueryCond) -> Result<Option<Unit>, Box<dyn StdError>> {
121
1336
        let filter = get_query_filter(cond);
122
1336
        let mut cursor = self
123
1336
            .conn
124
1336
            .collection::<Schema>(COL_NAME)
125
1336
            .find(filter)
126
1336
            .await?;
127
1336
        if let Some(item) = cursor.try_next().await? {
128
            return Ok(Some(Unit {
129
1092
                unit_id: item.unit_id,
130
1092
                code: item.code,
131
1092
                created_at: item.created_at.into(),
132
1092
                modified_at: item.modified_at.into(),
133
1092
                owner_id: item.owner_id,
134
1092
                member_ids: item.member_ids,
135
1092
                name: item.name,
136
1092
                info: bson::from_document(item.info)?,
137
            }));
138
244
        }
139
244
        Ok(None)
140
2672
    }
141

            
142
1394
    async fn add(&self, unit: &Unit) -> Result<(), Box<dyn StdError>> {
143
1394
        let item = Schema {
144
1394
            unit_id: unit.unit_id.clone(),
145
1394
            code: unit.code.clone(),
146
1394
            created_at: unit.created_at.into(),
147
1394
            modified_at: unit.modified_at.into(),
148
1394
            owner_id: unit.owner_id.clone(),
149
1394
            member_ids: unit.member_ids.clone(),
150
1394
            name: unit.name.clone(),
151
1394
            info: bson::to_document(&unit.info)?,
152
        };
153
1394
        self.conn
154
1394
            .collection::<Schema>(COL_NAME)
155
1394
            .insert_one(item)
156
1394
            .await?;
157
1390
        Ok(())
158
2788
    }
159

            
160
24
    async fn del(&self, cond: &QueryCond) -> Result<(), Box<dyn StdError>> {
161
24
        let filter = get_query_filter(cond);
162
24
        self.conn
163
24
            .collection::<Schema>(COL_NAME)
164
24
            .delete_many(filter)
165
24
            .await?;
166
24
        Ok(())
167
48
    }
168

            
169
    async fn update(
170
        &self,
171
        cond: &UpdateQueryCond,
172
        updates: &Updates,
173
20
    ) -> Result<(), Box<dyn StdError>> {
174
20
        let filter = get_update_query_filter(cond);
175
20
        if let Some(updates) = get_update_doc(updates) {
176
18
            self.conn
177
18
                .collection::<Schema>(COL_NAME)
178
18
                .update_one(filter, updates)
179
18
                .await?;
180
2
        }
181
20
        return Ok(());
182
40
    }
183
}
184

            
185
impl DbCursor {
186
    /// To create the cursor instance with a collection cursor.
187
144
    pub fn new(cursor: MongoDbCursor<Schema>) -> Self {
188
144
        DbCursor { cursor, offset: 0 }
189
144
    }
190
}
191

            
192
#[async_trait]
193
impl Cursor for DbCursor {
194
2122
    async fn try_next(&mut self) -> Result<Option<Unit>, Box<dyn StdError>> {
195
2122
        if let Some(item) = self.cursor.try_next().await? {
196
1978
            self.offset += 1;
197
1978
            return Ok(Some(Unit {
198
1978
                unit_id: item.unit_id,
199
1978
                code: item.code,
200
1978
                created_at: item.created_at.into(),
201
1978
                modified_at: item.modified_at.into(),
202
1978
                owner_id: item.owner_id,
203
1978
                member_ids: item.member_ids,
204
1978
                name: item.name,
205
1978
                info: bson::from_document(item.info)?,
206
            }));
207
144
        }
208
144
        Ok(None)
209
4244
    }
210

            
211
8
    fn offset(&self) -> u64 {
212
8
        self.offset
213
8
    }
214
}
215

            
216
/// Transforms query conditions to the MongoDB document.
217
1360
fn get_query_filter(cond: &QueryCond) -> Document {
218
1360
    let mut filter = Document::new();
219
1360
    if let Some(value) = cond.unit_id {
220
1332
        filter.insert("unitId", value);
221
1332
    }
222
1360
    if let Some(value) = cond.code {
223
26
        filter.insert("code", value);
224
1334
    }
225
1360
    if let Some(value) = cond.owner_id {
226
284
        filter.insert("ownerId", value);
227
1076
    }
228
1360
    if let Some(value) = cond.member_id {
229
436
        filter.insert("memberIds", value);
230
924
    }
231
1360
    filter
232
1360
}
233

            
234
/// Transforms query conditions to the MongoDB document.
235
200
fn get_list_query_filter(cond: &ListQueryCond) -> Document {
236
200
    let mut filter = Document::new();
237
200
    if let Some(value) = cond.owner_id {
238
48
        filter.insert("ownerId", value);
239
152
    }
240
200
    if let Some(value) = cond.member_id {
241
74
        filter.insert("memberIds", value);
242
126
    }
243
200
    if let Some(value) = cond.unit_id {
244
12
        filter.insert("unitId", value);
245
188
    }
246
200
    if let Some(value) = cond.code_contains {
247
40
        filter.insert(
248
40
            "code",
249
40
            Regex {
250
40
                pattern: value.to_string(),
251
40
                options: "i".to_string(),
252
40
            },
253
40
        );
254
160
    }
255
200
    if let Some(value) = cond.name_contains {
256
16
        filter.insert(
257
16
            "name",
258
16
            Regex {
259
16
                pattern: value.to_string(),
260
16
                options: "i".to_string(),
261
16
            },
262
16
        );
263
184
    }
264
200
    filter
265
200
}
266

            
267
/// Transforms model options to the options.
268
144
fn build_find_options<'a, T>(opts: &ListOptions, mut find: Find<'a, T>) -> Find<'a, T>
269
144
where
270
144
    T: Send + Sync,
271
144
{
272
144
    if let Some(offset) = opts.offset {
273
20
        find = find.skip(offset);
274
124
    }
275
144
    if let Some(limit) = opts.limit {
276
76
        if limit > 0 {
277
74
            find = find.limit(limit as i64);
278
74
        }
279
68
    }
280
144
    if let Some(sort_list) = opts.sort.as_ref() {
281
106
        if sort_list.len() > 0 {
282
104
            let mut sort_opts = Document::new();
283
108
            for cond in sort_list.iter() {
284
108
                let key = match cond.key {
285
12
                    SortKey::CreatedAt => "createdAt",
286
8
                    SortKey::ModifiedAt => "modifiedAt",
287
76
                    SortKey::Code => "code",
288
12
                    SortKey::Name => "name",
289
                };
290
108
                if cond.asc {
291
90
                    sort_opts.insert(key.to_string(), 1);
292
90
                } else {
293
18
                    sort_opts.insert(key.to_string(), -1);
294
18
                }
295
            }
296
104
            find = find.sort(sort_opts);
297
2
        }
298
38
    }
299
144
    find
300
144
}
301

            
302
/// Transforms query conditions to the MongoDB document.
303
20
fn get_update_query_filter(cond: &UpdateQueryCond) -> Document {
304
20
    doc! {"unitId": cond.unit_id}
305
20
}
306

            
307
/// Transforms the model object to the MongoDB document.
308
20
fn get_update_doc(updates: &Updates) -> Option<Document> {
309
20
    let mut count = 0;
310
20
    let mut document = Document::new();
311
20
    if let Some(value) = updates.modified_at.as_ref() {
312
18
        document.insert(
313
18
            "modifiedAt",
314
18
            DateTime::from_millis(value.timestamp_millis()),
315
18
        );
316
18
        count += 1;
317
18
    }
318
20
    if let Some(value) = updates.owner_id {
319
10
        document.insert("ownerId", value);
320
10
        count += 1;
321
10
    }
322
20
    if let Some(value) = updates.member_ids {
323
10
        document.insert("memberIds", value);
324
10
        count += 1;
325
10
    }
326
20
    if let Some(value) = updates.name {
327
12
        document.insert("name", value);
328
12
        count += 1;
329
12
    }
330
20
    if let Some(value) = updates.info {
331
12
        document.insert(
332
12
            "info",
333
12
            match bson::to_document(value) {
334
                Err(_) => return None,
335
12
                Ok(doc) => doc,
336
12
            },
337
12
        );
338
12
        count += 1;
339
8
    }
340
20
    if count == 0 {
341
2
        return None;
342
18
    }
343
18
    Some(doc! {"$set": document})
344
20
}