1
use std::{error::Error as StdError, sync::Arc};
2

            
3
use async_trait::async_trait;
4
use futures::TryStreamExt;
5
use mongodb::{
6
    Cursor as MongoDbCursor, Database,
7
    action::Find,
8
    bson::{self, DateTime, Document, Regex, doc, raw::CString},
9
};
10
use serde::{Deserialize, Serialize};
11

            
12
use super::super::application::{
13
    Application, ApplicationModel, Cursor, ListOptions, ListQueryCond, QueryCond, SortKey,
14
    UpdateQueryCond, Updates,
15
};
16

            
17
/// Model instance.
18
pub struct Model {
19
    /// The associated database connection.
20
    conn: Arc<Database>,
21
}
22

            
23
/// Cursor instance.
24
struct DbCursor {
25
    /// The associated collection cursor.
26
    cursor: MongoDbCursor<Schema>,
27
    /// (Useless) only for Cursor trait implementation.
28
    offset: u64,
29
}
30

            
31
/// MongoDB schema.
32
#[derive(Deserialize, Serialize)]
33
struct Schema {
34
    #[serde(rename = "applicationId")]
35
    application_id: String,
36
    code: String,
37
    #[serde(rename = "unitId")]
38
    unit_id: String,
39
    #[serde(rename = "unitCode")]
40
    unit_code: String,
41
    #[serde(rename = "createdAt")]
42
    created_at: DateTime,
43
    #[serde(rename = "modifiedAt")]
44
    modified_at: DateTime,
45
    #[serde(rename = "hostUri")]
46
    host_uri: String,
47
    name: String,
48
    info: Document,
49
}
50

            
51
const COL_NAME: &'static str = "application";
52

            
53
impl Model {
54
    /// To create the model instance with a database connection.
55
24
    pub async fn new(conn: Arc<Database>) -> Result<Self, Box<dyn StdError>> {
56
24
        let model = Model { conn };
57
24
        model.init().await?;
58
24
        Ok(model)
59
24
    }
60
}
61

            
62
#[async_trait]
63
impl ApplicationModel for Model {
64
38
    async fn init(&self) -> Result<(), Box<dyn StdError>> {
65
        let indexes = vec![
66
            doc! {"name": "applicationId_1", "key": {"applicationId": 1}, "unique": true},
67
            doc! {"name": "unitId_1_code_1", "key": {"unitId": 1, "code": 1}, "unique": true},
68
            doc! {"name": "code_1", "key": {"code": 1}},
69
            doc! {"name": "unitId_1", "key": {"unitId": 1}},
70
            doc! {"name": "createdAt_1", "key": {"createdAt": 1}},
71
            doc! {"name": "modifiedAt_1", "key": {"modifiedAt": 1}},
72
            doc! {"name": "name_1", "key": {"name": 1}},
73
        ];
74
        let command = doc! {
75
            "createIndexes": COL_NAME,
76
            "indexes": indexes,
77
        };
78
        self.conn.run_command(command).await?;
79
        Ok(())
80
38
    }
81

            
82
60
    async fn count(&self, cond: &ListQueryCond) -> Result<u64, Box<dyn StdError>> {
83
        let filter = get_list_query_filter(cond);
84
        let count = self
85
            .conn
86
            .collection::<Schema>(COL_NAME)
87
            .count_documents(filter)
88
            .await?;
89
        Ok(count)
90
60
    }
91

            
92
    async fn list(
93
        &self,
94
        opts: &ListOptions,
95
        cursor: Option<Box<dyn Cursor>>,
96
192
    ) -> Result<(Vec<Application>, Option<Box<dyn Cursor>>), Box<dyn StdError>> {
97
        let mut cursor = match cursor {
98
            None => {
99
                let filter = get_list_query_filter(opts.cond);
100
                Box::new(DbCursor::new(
101
                    build_find_options(opts, self.conn.collection::<Schema>(COL_NAME).find(filter))
102
                        .await?,
103
                ))
104
            }
105
            Some(cursor) => cursor,
106
        };
107

            
108
        let mut count: u64 = 0;
109
        let mut list = Vec::new();
110
        while let Some(item) = cursor.try_next().await? {
111
            list.push(item);
112
192
            if let Some(cursor_max) = opts.cursor_max {
113
                count += 1;
114
                if count >= cursor_max {
115
                    return Ok((list, Some(cursor)));
116
                }
117
            }
118
        }
119
        Ok((list, None))
120
192
    }
121

            
122
432
    async fn get(&self, cond: &QueryCond) -> Result<Option<Application>, Box<dyn StdError>> {
123
        let filter = get_query_filter(cond);
124
        let mut cursor = self
125
            .conn
126
            .collection::<Schema>(COL_NAME)
127
            .find(filter)
128
            .await?;
129
        if let Some(item) = cursor.try_next().await? {
130
            return Ok(Some(Application {
131
                application_id: item.application_id,
132
                code: item.code,
133
                unit_id: item.unit_id,
134
                unit_code: item.unit_code,
135
                created_at: item.created_at.into(),
136
                modified_at: item.modified_at.into(),
137
                host_uri: item.host_uri,
138
                name: item.name,
139
                info: bson::deserialize_from_document(item.info)?,
140
            }));
141
        }
142
        Ok(None)
143
432
    }
144

            
145
1560
    async fn add(&self, application: &Application) -> Result<(), Box<dyn StdError>> {
146
        let item = Schema {
147
            application_id: application.application_id.clone(),
148
            code: application.code.clone(),
149
            unit_id: application.unit_id.clone(),
150
            unit_code: application.unit_code.clone(),
151
            created_at: application.created_at.into(),
152
            modified_at: application.modified_at.into(),
153
            host_uri: application.host_uri.clone(),
154
            name: application.name.clone(),
155
            info: bson::serialize_to_document(&application.info)?,
156
        };
157
        self.conn
158
            .collection::<Schema>(COL_NAME)
159
            .insert_one(item)
160
            .await?;
161
        Ok(())
162
1560
    }
163

            
164
34
    async fn del(&self, cond: &QueryCond) -> Result<(), Box<dyn StdError>> {
165
        let filter = get_query_filter(cond);
166
        self.conn
167
            .collection::<Schema>(COL_NAME)
168
            .delete_many(filter)
169
            .await?;
170
        Ok(())
171
34
    }
172

            
173
    async fn update(
174
        &self,
175
        cond: &UpdateQueryCond,
176
        updates: &Updates,
177
18
    ) -> Result<(), Box<dyn StdError>> {
178
        let filter = get_update_query_filter(cond);
179
        if let Some(updates) = get_update_doc(updates) {
180
            self.conn
181
                .collection::<Schema>(COL_NAME)
182
                .update_one(filter, updates)
183
                .await?;
184
        }
185
        return Ok(());
186
18
    }
187
}
188

            
189
impl DbCursor {
190
    /// To create the cursor instance with a collection cursor.
191
168
    pub fn new(cursor: MongoDbCursor<Schema>) -> Self {
192
168
        DbCursor { cursor, offset: 0 }
193
168
    }
194
}
195

            
196
#[async_trait]
197
impl Cursor for DbCursor {
198
2394
    async fn try_next(&mut self) -> Result<Option<Application>, Box<dyn StdError>> {
199
        if let Some(item) = self.cursor.try_next().await? {
200
            self.offset += 1;
201
            return Ok(Some(Application {
202
                application_id: item.application_id,
203
                code: item.code,
204
                unit_id: item.unit_id,
205
                unit_code: item.unit_code,
206
                created_at: item.created_at.into(),
207
                modified_at: item.modified_at.into(),
208
                host_uri: item.host_uri,
209
                name: item.name,
210
                info: bson::deserialize_from_document(item.info)?,
211
            }));
212
        }
213
        Ok(None)
214
2394
    }
215

            
216
8
    fn offset(&self) -> u64 {
217
8
        self.offset
218
8
    }
219
}
220

            
221
/// Transforms query conditions to the MongoDB document.
222
466
fn get_query_filter(cond: &QueryCond) -> Document {
223
466
    let mut filter = Document::new();
224
466
    if let Some(value) = cond.unit_id {
225
46
        filter.insert("unitId", value);
226
420
    }
227
466
    if let Some(value) = cond.application_id {
228
432
        filter.insert("applicationId", value);
229
432
    }
230
466
    if let Some(value) = cond.code {
231
18
        filter.insert("code", value);
232
448
    }
233
466
    filter
234
466
}
235

            
236
/// Transforms query conditions to the MongoDB document.
237
228
fn get_list_query_filter(cond: &ListQueryCond) -> Document {
238
228
    let mut filter = Document::new();
239
228
    if let Some(value) = cond.unit_id {
240
100
        filter.insert("unitId", value);
241
128
    }
242
228
    if let Some(value) = cond.application_id {
243
12
        filter.insert("applicationId", value);
244
216
    }
245
228
    if let Some(value) = cond.code {
246
32
        filter.insert("code", value);
247
196
    }
248
228
    if let Some(value) = cond.code_contains {
249
38
        if let Ok(pattern) = CString::try_from(value) {
250
38
            if let Ok(options) = CString::try_from("i") {
251
38
                filter.insert("code", Regex { pattern, options });
252
38
            }
253
        }
254
190
    }
255
228
    if let Some(value) = cond.name_contains {
256
16
        if let Ok(pattern) = CString::try_from(value) {
257
16
            if let Ok(options) = CString::try_from("i") {
258
16
                filter.insert("name", Regex { pattern, options });
259
16
            }
260
        }
261
212
    }
262
228
    filter
263
228
}
264

            
265
/// Transforms model options to the options.
266
168
fn build_find_options<'a, T>(opts: &ListOptions, mut find: Find<'a, T>) -> Find<'a, T>
267
168
where
268
168
    T: Send + Sync,
269
{
270
168
    if let Some(offset) = opts.offset {
271
20
        find = find.skip(offset);
272
148
    }
273
168
    if let Some(limit) = opts.limit {
274
80
        if limit > 0 {
275
78
            find = find.limit(limit as i64);
276
78
        }
277
88
    }
278
168
    if let Some(sort_list) = opts.sort.as_ref() {
279
110
        if sort_list.len() > 0 {
280
108
            let mut sort_opts = Document::new();
281
112
            for cond in sort_list.iter() {
282
112
                let key = match cond.key {
283
12
                    SortKey::CreatedAt => "createdAt",
284
8
                    SortKey::ModifiedAt => "modifiedAt",
285
80
                    SortKey::Code => "code",
286
12
                    SortKey::Name => "name",
287
                };
288
112
                if cond.asc {
289
94
                    sort_opts.insert(key.to_string(), 1);
290
94
                } else {
291
18
                    sort_opts.insert(key.to_string(), -1);
292
18
                }
293
            }
294
108
            find = find.sort(sort_opts);
295
2
        }
296
58
    }
297
168
    find
298
168
}
299

            
300
/// Transforms query conditions to the MongoDB document.
301
18
fn get_update_query_filter(cond: &UpdateQueryCond) -> Document {
302
18
    doc! {"applicationId": cond.application_id}
303
18
}
304

            
305
/// Transforms the model object to the MongoDB document.
306
18
fn get_update_doc(updates: &Updates) -> Option<Document> {
307
18
    let mut count = 0;
308
18
    let mut document = Document::new();
309
18
    if let Some(value) = updates.modified_at.as_ref() {
310
16
        document.insert(
311
16
            "modifiedAt",
312
16
            DateTime::from_millis(value.timestamp_millis()),
313
16
        );
314
16
        count += 1;
315
16
    }
316
18
    if let Some(value) = updates.host_uri {
317
8
        document.insert("hostUri", value);
318
8
        count += 1;
319
10
    }
320
18
    if let Some(value) = updates.name {
321
12
        document.insert("name", value);
322
12
        count += 1;
323
12
    }
324
18
    if let Some(value) = updates.info {
325
12
        document.insert(
326
            "info",
327
12
            match bson::serialize_to_document(value) {
328
                Err(_) => return None,
329
12
                Ok(doc) => doc,
330
            },
331
        );
332
12
        count += 1;
333
6
    }
334
18
    if count == 0 {
335
2
        return None;
336
16
    }
337
16
    Some(doc! {"$set": document})
338
18
}