1
use std::{error::Error as StdError, sync::Arc};
2

            
3
use async_trait::async_trait;
4
use futures::TryStreamExt;
5
use mongodb::{
6
    action::Find,
7
    bson::{self, doc, Bson, DateTime, Document, Regex},
8
    Cursor as MongoDbCursor, Database,
9
};
10
use serde::{Deserialize, Serialize};
11

            
12
use super::super::network::{
13
    Cursor, ListOptions, ListQueryCond, Network, NetworkModel, QueryCond, SortKey, UpdateQueryCond,
14
    Updates,
15
};
16

            
17
/// Model instance.
18
pub struct Model {
19
    /// The associated database connection.
20
    conn: Arc<Database>,
21
}
22

            
23
/// Cursor instance.
24
struct DbCursor {
25
    /// The associated collection cursor.
26
    cursor: MongoDbCursor<Schema>,
27
    /// (Useless) only for Cursor trait implementation.
28
    offset: u64,
29
}
30

            
31
/// MongoDB schema.
32
#[derive(Deserialize, Serialize)]
33
struct Schema {
34
    #[serde(rename = "networkId")]
35
    network_id: String,
36
    code: String,
37
    #[serde(rename = "unitId")]
38
    unit_id: Option<String>,
39
    #[serde(rename = "unitCode")]
40
    unit_code: Option<String>,
41
    #[serde(rename = "createdAt")]
42
    created_at: DateTime,
43
    #[serde(rename = "modifiedAt")]
44
    modified_at: DateTime,
45
    #[serde(rename = "hostUri")]
46
    host_uri: String,
47
    name: String,
48
    info: Document,
49
}
50

            
51
const COL_NAME: &'static str = "network";
52

            
53
impl Model {
54
    /// To create the model instance with a database connection.
55
12
    pub async fn new(conn: Arc<Database>) -> Result<Self, Box<dyn StdError>> {
56
12
        let model = Model { conn };
57
12
        model.init().await?;
58
12
        Ok(model)
59
12
    }
60
}
61

            
62
#[async_trait]
63
impl NetworkModel for Model {
64
19
    async fn init(&self) -> Result<(), Box<dyn StdError>> {
65
19
        let indexes = vec![
66
19
            doc! {"name": "networkId_1", "key": {"networkId": 1}, "unique": true},
67
19
            doc! {"name": "unitId_1_code_1", "key": {"unitId": 1, "code": 1}, "unique": true},
68
19
            doc! {"name": "code_1", "key": {"code": 1}},
69
19
            doc! {"name": "unitId_1", "key": {"unitId": 1}},
70
19
            doc! {"name": "createdAt_1", "key": {"createdAt": 1}},
71
19
            doc! {"name": "modifiedAt_1", "key": {"modifiedAt": 1}},
72
19
            doc! {"name": "name_1", "key": {"name": 1}},
73
19
        ];
74
19
        let command = doc! {
75
19
            "createIndexes": COL_NAME,
76
19
            "indexes": indexes,
77
19
        };
78
19
        self.conn.run_command(command).await?;
79
19
        Ok(())
80
38
    }
81

            
82
30
    async fn count(&self, cond: &ListQueryCond) -> Result<u64, Box<dyn StdError>> {
83
30
        let filter = get_list_query_filter(cond);
84
30
        let count = self
85
30
            .conn
86
30
            .collection::<Schema>(COL_NAME)
87
30
            .count_documents(filter)
88
30
            .await?;
89
30
        Ok(count)
90
60
    }
91

            
92
    async fn list(
93
        &self,
94
        opts: &ListOptions,
95
        cursor: Option<Box<dyn Cursor>>,
96
106
    ) -> Result<(Vec<Network>, Option<Box<dyn Cursor>>), Box<dyn StdError>> {
97
106
        let mut cursor = match cursor {
98
            None => {
99
87
                let filter = get_list_query_filter(opts.cond);
100
87
                Box::new(DbCursor::new(
101
87
                    build_find_options(opts, self.conn.collection::<Schema>(COL_NAME).find(filter))
102
87
                        .await?,
103
                ))
104
            }
105
19
            Some(cursor) => cursor,
106
        };
107

            
108
106
        let mut count: u64 = 0;
109
106
        let mut list = Vec::new();
110
1911
        while let Some(item) = cursor.try_next().await? {
111
1824
            list.push(item);
112
1824
            if let Some(cursor_max) = opts.cursor_max {
113
1739
                count += 1;
114
1739
                if count >= cursor_max {
115
19
                    return Ok((list, Some(cursor)));
116
1720
                }
117
85
            }
118
        }
119
87
        Ok((list, None))
120
212
    }
121

            
122
296
    async fn get(&self, cond: &QueryCond) -> Result<Option<Network>, Box<dyn StdError>> {
123
296
        let filter = get_query_filter(cond);
124
296
        let mut cursor = self
125
296
            .conn
126
296
            .collection::<Schema>(COL_NAME)
127
296
            .find(filter)
128
296
            .await?;
129
296
        if let Some(item) = cursor.try_next().await? {
130
            return Ok(Some(Network {
131
248
                network_id: item.network_id,
132
248
                code: item.code,
133
248
                unit_id: item.unit_id,
134
248
                unit_code: item.unit_code,
135
248
                created_at: item.created_at.into(),
136
248
                modified_at: item.modified_at.into(),
137
248
                host_uri: item.host_uri,
138
248
                name: item.name,
139
248
                info: bson::from_document(item.info)?,
140
            }));
141
48
        }
142
48
        Ok(None)
143
592
    }
144

            
145
1300
    async fn add(&self, network: &Network) -> Result<(), Box<dyn StdError>> {
146
1300
        let item = Schema {
147
1300
            network_id: network.network_id.clone(),
148
1300
            code: network.code.clone(),
149
1300
            unit_id: network.unit_id.clone(),
150
1300
            unit_code: network.unit_code.clone(),
151
1300
            created_at: network.created_at.into(),
152
1300
            modified_at: network.modified_at.into(),
153
1300
            host_uri: network.host_uri.clone(),
154
1300
            name: network.name.clone(),
155
1300
            info: bson::to_document(&network.info)?,
156
        };
157
1300
        self.conn
158
1300
            .collection::<Schema>(COL_NAME)
159
1300
            .insert_one(item)
160
1300
            .await?;
161
1296
        Ok(())
162
2600
    }
163

            
164
19
    async fn del(&self, cond: &QueryCond) -> Result<(), Box<dyn StdError>> {
165
19
        let filter = get_query_filter(cond);
166
19
        self.conn
167
19
            .collection::<Schema>(COL_NAME)
168
19
            .delete_many(filter)
169
19
            .await?;
170
19
        Ok(())
171
38
    }
172

            
173
    async fn update(
174
        &self,
175
        cond: &UpdateQueryCond,
176
        updates: &Updates,
177
11
    ) -> Result<(), Box<dyn StdError>> {
178
11
        let filter = get_update_query_filter(cond);
179
11
        if let Some(updates) = get_update_doc(updates) {
180
10
            self.conn
181
10
                .collection::<Schema>(COL_NAME)
182
10
                .update_one(filter, updates)
183
10
                .await?;
184
1
        }
185
11
        return Ok(());
186
22
    }
187
}
188

            
189
impl DbCursor {
190
    /// To create the cursor instance with a collection cursor.
191
87
    pub fn new(cursor: MongoDbCursor<Schema>) -> Self {
192
87
        DbCursor { cursor, offset: 0 }
193
87
    }
194
}
195

            
196
#[async_trait]
197
impl Cursor for DbCursor {
198
1911
    async fn try_next(&mut self) -> Result<Option<Network>, Box<dyn StdError>> {
199
1911
        if let Some(item) = self.cursor.try_next().await? {
200
1824
            self.offset += 1;
201
1824
            return Ok(Some(Network {
202
1824
                network_id: item.network_id,
203
1824
                code: item.code,
204
1824
                unit_id: item.unit_id,
205
1824
                unit_code: item.unit_code,
206
1824
                created_at: item.created_at.into(),
207
1824
                modified_at: item.modified_at.into(),
208
1824
                host_uri: item.host_uri,
209
1824
                name: item.name,
210
1824
                info: bson::from_document(item.info)?,
211
            }));
212
87
        }
213
87
        Ok(None)
214
3822
    }
215

            
216
4
    fn offset(&self) -> u64 {
217
4
        self.offset
218
4
    }
219
}
220

            
221
/// Transforms query conditions to the MongoDB document.
222
315
fn get_query_filter(cond: &QueryCond) -> Document {
223
315
    let mut filter = Document::new();
224
315
    if let Some(value) = cond.unit_id {
225
32
        match value {
226
8
            None => {
227
8
                filter.insert("unitId", Bson::Null);
228
8
            }
229
24
            Some(value) => {
230
24
                filter.insert("unitId", value);
231
24
            }
232
        }
233
283
    }
234
315
    if let Some(value) = cond.network_id {
235
290
        filter.insert("networkId", value);
236
290
    }
237
315
    if let Some(value) = cond.code {
238
16
        filter.insert("code", value);
239
299
    }
240
315
    filter
241
315
}
242

            
243
/// Transforms query conditions to the MongoDB document.
244
117
fn get_list_query_filter(cond: &ListQueryCond) -> Document {
245
117
    let mut filter = Document::new();
246
117
    if let Some(value) = cond.unit_id {
247
54
        match value {
248
10
            None => {
249
10
                filter.insert("unitId", Bson::Null);
250
10
            }
251
44
            Some(value) => {
252
44
                filter.insert("unitId", value);
253
44
            }
254
        }
255
63
    }
256
117
    if let Some(value) = cond.network_id {
257
6
        filter.insert("networkId", value);
258
111
    }
259
117
    if let Some(value) = cond.code {
260
16
        filter.insert("code", value);
261
101
    }
262
117
    if let Some(value) = cond.code_contains {
263
22
        filter.insert(
264
22
            "code",
265
22
            Regex {
266
22
                pattern: value.to_string(),
267
22
                options: "i".to_string(),
268
22
            },
269
22
        );
270
95
    }
271
117
    if let Some(value) = cond.name_contains {
272
8
        filter.insert(
273
8
            "name",
274
8
            Regex {
275
8
                pattern: value.to_string(),
276
8
                options: "i".to_string(),
277
8
            },
278
8
        );
279
109
    }
280
117
    filter
281
117
}
282

            
283
/// Transforms model options to the options.
284
87
fn build_find_options<'a, T>(opts: &ListOptions, mut find: Find<'a, T>) -> Find<'a, T>
285
87
where
286
87
    T: Send + Sync,
287
87
{
288
87
    if let Some(offset) = opts.offset {
289
10
        find = find.skip(offset);
290
77
    }
291
87
    if let Some(limit) = opts.limit {
292
41
        if limit > 0 {
293
40
            find = find.limit(limit as i64);
294
40
        }
295
46
    }
296
87
    if let Some(sort_list) = opts.sort.as_ref() {
297
58
        if sort_list.len() > 0 {
298
57
            let mut sort_opts = Document::new();
299
59
            for cond in sort_list.iter() {
300
59
                let key = match cond.key {
301
6
                    SortKey::CreatedAt => "createdAt",
302
4
                    SortKey::ModifiedAt => "modifiedAt",
303
43
                    SortKey::Code => "code",
304
6
                    SortKey::Name => "name",
305
                };
306
59
                if cond.asc {
307
50
                    sort_opts.insert(key.to_string(), 1);
308
50
                } else {
309
9
                    sort_opts.insert(key.to_string(), -1);
310
9
                }
311
            }
312
57
            find = find.sort(sort_opts);
313
1
        }
314
29
    }
315
87
    find
316
87
}
317

            
318
/// Transforms query conditions to the MongoDB document.
319
11
fn get_update_query_filter(cond: &UpdateQueryCond) -> Document {
320
11
    doc! {"networkId": cond.network_id}
321
11
}
322

            
323
/// Transforms the model object to the MongoDB document.
324
11
fn get_update_doc(updates: &Updates) -> Option<Document> {
325
11
    let mut count = 0;
326
11
    let mut document = Document::new();
327
11
    if let Some(value) = updates.modified_at.as_ref() {
328
10
        document.insert(
329
10
            "modifiedAt",
330
10
            DateTime::from_millis(value.timestamp_millis()),
331
10
        );
332
10
        count += 1;
333
10
    }
334
11
    if let Some(value) = updates.host_uri {
335
5
        document.insert("hostUri", value);
336
5
        count += 1;
337
6
    }
338
11
    if let Some(value) = updates.name {
339
8
        document.insert("name", value);
340
8
        count += 1;
341
8
    }
342
11
    if let Some(value) = updates.info {
343
8
        document.insert(
344
8
            "info",
345
8
            match bson::to_document(value) {
346
                Err(_) => return None,
347
8
                Ok(doc) => doc,
348
8
            },
349
8
        );
350
8
        count += 1;
351
3
    }
352
11
    if count == 0 {
353
1
        return None;
354
10
    }
355
10
    Some(doc! {"$set": document})
356
11
}