1
use std::{error::Error as StdError, sync::Arc};
2

            
3
use async_trait::async_trait;
4
use futures::TryStreamExt;
5
use mongodb::{
6
    action::Find,
7
    bson::{self, doc, Bson, DateTime, Document, Regex},
8
    error::ErrorKind,
9
    Cursor as MongoDbCursor, Database,
10
};
11
use serde::{Deserialize, Serialize};
12

            
13
use super::super::device::{
14
    Cursor, Device, DeviceModel, ListOptions, ListQueryCond, QueryCond, SortKey, UpdateQueryCond,
15
    Updates,
16
};
17

            
18
/// Model instance.
19
pub struct Model {
20
    /// The associated database connection.
21
    conn: Arc<Database>,
22
}
23

            
24
/// Cursor instance.
25
struct DbCursor {
26
    /// The associated collection cursor.
27
    cursor: MongoDbCursor<Schema>,
28
    /// (Useless) only for Cursor trait implementation.
29
    offset: u64,
30
}
31

            
32
/// MongoDB schema.
33
434538
#[derive(Deserialize, Serialize)]
34
struct Schema {
35
    #[serde(rename = "deviceId")]
36
    device_id: String,
37
    #[serde(rename = "unitId")]
38
    unit_id: String,
39
    #[serde(rename = "unitCode")]
40
    unit_code: Option<String>,
41
    #[serde(rename = "networkId")]
42
    network_id: String,
43
    #[serde(rename = "networkCode")]
44
    network_code: String,
45
    #[serde(rename = "networkAddr")]
46
    network_addr: String,
47
    #[serde(rename = "createdAt")]
48
    created_at: DateTime,
49
    #[serde(rename = "modifiedAt")]
50
    modified_at: DateTime,
51
    profile: String,
52
    name: String,
53
    info: Document,
54
}
55

            
56
const COL_NAME: &'static str = "device";
57

            
58
impl Model {
59
    /// To create the model instance with a database connection.
60
12
    pub async fn new(conn: Arc<Database>) -> Result<Self, Box<dyn StdError>> {
61
12
        let model = Model { conn };
62
24
        model.init().await?;
63
12
        Ok(model)
64
12
    }
65
}
66

            
67
#[async_trait]
68
impl DeviceModel for Model {
69
19
    async fn init(&self) -> Result<(), Box<dyn StdError>> {
70
19
        let indexes = vec![
71
19
            doc! {"name": "deviceId_1", "key": {"deviceId": 1}, "unique": true},
72
19
            doc! {
73
19
                "name": "unitCode_1_networkCode_1_networkAddr_1",
74
19
                "key": {"unitCode": 1, "networkCode": 1, "networkAddr": 1},
75
19
                "unique": true
76
19
            },
77
19
            doc! {"name": "unitId_1", "key": {"unitId": 1}},
78
19
            doc! {"name": "networkId_1", "key": {"networkId": 1}},
79
19
            doc! {"name": "unitCode_1", "key": {"unitCode": 1}},
80
19
            doc! {"name": "networkCode_1", "key": {"networkCode": 1}},
81
19
            doc! {"name": "networkAddr_1", "key": {"networkAddr": 1}},
82
19
            doc! {"name": "createdAt_1", "key": {"createdAt": 1}},
83
19
            doc! {"name": "modifiedAt_1", "key": {"modifiedAt": 1}},
84
19
            doc! {"name": "profile_1", "key": {"profile": 1}},
85
19
            doc! {"name": "name_1", "key": {"name": 1}},
86
19
        ];
87
19
        let command = doc! {
88
19
            "createIndexes": COL_NAME,
89
19
            "indexes": indexes,
90
19
        };
91
38
        self.conn.run_command(command).await?;
92
19
        Ok(())
93
38
    }
94

            
95
49
    async fn count(&self, cond: &ListQueryCond) -> Result<u64, Box<dyn StdError>> {
96
49
        let filter = get_list_query_filter(cond);
97
49
        let count = self
98
49
            .conn
99
49
            .collection::<Schema>(COL_NAME)
100
49
            .count_documents(filter)
101
98
            .await?;
102
49
        Ok(count)
103
98
    }
104

            
105
    async fn list(
106
        &self,
107
        opts: &ListOptions,
108
        cursor: Option<Box<dyn Cursor>>,
109
204
    ) -> Result<(Vec<Device>, Option<Box<dyn Cursor>>), Box<dyn StdError>> {
110
204
        let mut cursor = match cursor {
111
            None => {
112
181
                let filter = get_list_query_filter(opts.cond);
113
181
                Box::new(DbCursor::new(
114
181
                    build_find_options(opts, self.conn.collection::<Schema>(COL_NAME).find(filter))
115
362
                        .await?,
116
                ))
117
            }
118
23
            Some(cursor) => cursor,
119
        };
120

            
121
204
        let mut count: u64 = 0;
122
204
        let mut list = Vec::new();
123
33397
        while let Some(item) = cursor.try_next().await? {
124
33216
            list.push(item);
125
33216
            if let Some(cursor_max) = opts.cursor_max {
126
2228
                count += 1;
127
2228
                if count >= cursor_max {
128
23
                    return Ok((list, Some(cursor)));
129
2205
                }
130
30988
            }
131
        }
132
181
        Ok((list, None))
133
408
    }
134

            
135
252
    async fn get(&self, cond: &QueryCond) -> Result<Option<Device>, Box<dyn StdError>> {
136
252
        let filter = get_query_filter(cond);
137
252
        let mut cursor = self
138
252
            .conn
139
252
            .collection::<Schema>(COL_NAME)
140
252
            .find(filter)
141
504
            .await?;
142
252
        if let Some(item) = cursor.try_next().await? {
143
            return Ok(Some(Device {
144
210
                device_id: item.device_id,
145
210
                unit_id: item.unit_id,
146
210
                unit_code: item.unit_code,
147
210
                network_id: item.network_id,
148
210
                network_code: item.network_code,
149
210
                network_addr: item.network_addr,
150
210
                created_at: item.created_at.into(),
151
210
                modified_at: item.modified_at.into(),
152
210
                profile: item.profile,
153
210
                name: item.name,
154
210
                info: bson::from_document(item.info)?,
155
            }));
156
42
        }
157
42
        Ok(None)
158
504
    }
159

            
160
1261
    async fn add(&self, device: &Device) -> Result<(), Box<dyn StdError>> {
161
1261
        let item = Schema {
162
1261
            device_id: device.device_id.clone(),
163
1261
            unit_id: device.unit_id.clone(),
164
1261
            unit_code: device.unit_code.clone(),
165
1261
            network_id: device.network_id.clone(),
166
1261
            network_code: device.network_code.clone(),
167
1261
            network_addr: device.network_addr.clone(),
168
1261
            created_at: device.created_at.into(),
169
1261
            modified_at: device.modified_at.into(),
170
1261
            profile: device.profile.clone(),
171
1261
            name: device.name.clone(),
172
1261
            info: bson::to_document(&device.info)?,
173
        };
174
1261
        self.conn
175
1261
            .collection::<Schema>(COL_NAME)
176
1261
            .insert_one(item)
177
2524
            .await?;
178
1259
        Ok(())
179
2522
    }
180

            
181
37
    async fn add_bulk(&self, devices: &Vec<Device>) -> Result<(), Box<dyn StdError>> {
182
37
        let mut items = vec![];
183
20803
        for device in devices.iter() {
184
20803
            items.push(Schema {
185
20803
                device_id: device.device_id.clone(),
186
20803
                unit_id: device.unit_id.clone(),
187
20803
                unit_code: device.unit_code.clone(),
188
20803
                network_id: device.network_id.clone(),
189
20803
                network_code: device.network_code.clone(),
190
20803
                network_addr: device.network_addr.clone(),
191
20803
                created_at: device.created_at.into(),
192
20803
                modified_at: device.modified_at.into(),
193
20803
                profile: device.profile.clone(),
194
20803
                name: device.name.clone(),
195
20803
                info: bson::to_document(&device.info)?,
196
            });
197
        }
198
37
        if let Err(e) = self
199
37
            .conn
200
37
            .collection::<Schema>(COL_NAME)
201
37
            .insert_many(items)
202
37
            .ordered(false)
203
79
            .await
204
        {
205
5
            match e.kind.as_ref() {
206
5
                ErrorKind::InsertMany(imerr) => match imerr.write_errors.as_ref() {
207
                    None => return Err(Box::new(e)),
208
5
                    Some(errs) => {
209
4197
                        for err in errs {
210
4192
                            if err.code != 11000 {
211
                                return Err(Box::new(e));
212
4192
                            }
213
                        }
214
5
                        ()
215
                    }
216
                },
217
                _ => return Err(Box::new(e)),
218
            }
219
32
        }
220

            
221
37
        Ok(())
222
74
    }
223

            
224
53
    async fn del(&self, cond: &QueryCond) -> Result<(), Box<dyn StdError>> {
225
53
        let filter = get_query_filter(cond);
226
53
        self.conn
227
53
            .collection::<Schema>(COL_NAME)
228
53
            .delete_many(filter)
229
106
            .await?;
230
53
        Ok(())
231
106
    }
232

            
233
    async fn update(
234
        &self,
235
        cond: &UpdateQueryCond,
236
        updates: &Updates,
237
19
    ) -> Result<(), Box<dyn StdError>> {
238
19
        let filter = get_update_query_filter(cond);
239
19
        if let Some(updates) = get_update_doc(updates) {
240
18
            self.conn
241
18
                .collection::<Schema>(COL_NAME)
242
18
                .update_one(filter, updates)
243
36
                .await?;
244
1
        }
245
19
        return Ok(());
246
38
    }
247
}
248

            
249
impl DbCursor {
250
    /// To create the cursor instance with a collection cursor.
251
181
    pub fn new(cursor: MongoDbCursor<Schema>) -> Self {
252
181
        DbCursor { cursor, offset: 0 }
253
181
    }
254
}
255

            
256
#[async_trait]
257
impl Cursor for DbCursor {
258
33397
    async fn try_next(&mut self) -> Result<Option<Device>, Box<dyn StdError>> {
259
33397
        if let Some(item) = self.cursor.try_next().await? {
260
33216
            self.offset += 1;
261
33216
            return Ok(Some(Device {
262
33216
                device_id: item.device_id,
263
33216
                unit_id: item.unit_id,
264
33216
                unit_code: item.unit_code,
265
33216
                network_id: item.network_id,
266
33216
                network_code: item.network_code,
267
33216
                network_addr: item.network_addr,
268
33216
                created_at: item.created_at.into(),
269
33216
                modified_at: item.modified_at.into(),
270
33216
                profile: item.profile,
271
33216
                name: item.name,
272
33216
                info: bson::from_document(item.info)?,
273
            }));
274
181
        }
275
181
        Ok(None)
276
66794
    }
277

            
278
4
    fn offset(&self) -> u64 {
279
4
        self.offset
280
4
    }
281
}
282

            
283
/// Transforms query conditions to the MongoDB document.
284
305
fn get_query_filter(cond: &QueryCond) -> Document {
285
305
    let mut filter = Document::new();
286
305
    if let Some(value) = cond.unit_id {
287
30
        filter.insert("unitId", value);
288
275
    }
289
305
    if let Some(value) = cond.device_id {
290
251
        filter.insert("deviceId", value);
291
251
    }
292
305
    if let Some(value) = cond.network_id {
293
19
        filter.insert("networkId", value);
294
286
    }
295
305
    if let Some(value) = cond.network_addrs {
296
12
        let mut in_cond = Document::new();
297
12
        in_cond.insert("$in", value);
298
12
        filter.insert("networkAddr", in_cond);
299
293
    }
300
305
    if let Some(value) = cond.device.as_ref() {
301
27
        if let Some(unit_code) = value.unit_code {
302
21
            filter.insert("unitCode", unit_code);
303
21
        } else {
304
6
            filter.insert("unitCode", Bson::Null);
305
6
        }
306
27
        filter.insert("networkCode", value.network_code);
307
27
        filter.insert("networkAddr", value.network_addr);
308
278
    }
309
305
    filter
310
305
}
311

            
312
/// Transforms query conditions to the MongoDB document.
313
230
fn get_list_query_filter(cond: &ListQueryCond) -> Document {
314
230
    let mut filter = Document::new();
315
230
    if let Some(value) = cond.unit_id {
316
88
        filter.insert("unitId", value);
317
142
    }
318
230
    if let Some(value) = cond.device_id {
319
10
        filter.insert("deviceId", value);
320
220
    }
321
230
    if let Some(value) = cond.network_id {
322
79
        filter.insert("networkId", value);
323
151
    }
324
230
    if let Some(value) = cond.network_code {
325
24
        filter.insert("networkCode", value);
326
206
    }
327
230
    if let Some(value) = cond.network_addr {
328
53
        filter.insert("networkAddr", value);
329
177
    } else if let Some(value) = cond.network_addrs {
330
24
        let mut in_cond = Document::new();
331
24
        in_cond.insert("$in", value);
332
24
        filter.insert("networkAddr", in_cond);
333
153
    }
334
230
    if let Some(value) = cond.profile {
335
8
        filter.insert("profile", value);
336
222
    }
337
230
    if let Some(value) = cond.name_contains {
338
28
        filter.insert(
339
28
            "name",
340
28
            Regex {
341
28
                pattern: value.to_string(),
342
28
                options: "i".to_string(),
343
28
            },
344
28
        );
345
202
    }
346
230
    filter
347
230
}
348

            
349
/// Transforms model options to the options.
350
181
fn build_find_options<'a, T>(opts: &ListOptions, mut find: Find<'a, T>) -> Find<'a, T>
351
181
where
352
181
    T: Send + Sync,
353
181
{
354
181
    if let Some(offset) = opts.offset {
355
10
        find = find.skip(offset);
356
171
    }
357
181
    if let Some(limit) = opts.limit {
358
65
        if limit > 0 {
359
64
            find = find.limit(limit as i64);
360
64
        }
361
116
    }
362
181
    if let Some(sort_list) = opts.sort.as_ref() {
363
109
        if sort_list.len() > 0 {
364
108
            let mut sort_opts = Document::new();
365
169
            for cond in sort_list.iter() {
366
169
                let key = match cond.key {
367
6
                    SortKey::CreatedAt => "createdAt",
368
4
                    SortKey::ModifiedAt => "modifiedAt",
369
59
                    SortKey::NetworkCode => "networkCode",
370
92
                    SortKey::NetworkAddr => "networkAddr",
371
2
                    SortKey::Profile => "profile",
372
6
                    SortKey::Name => "name",
373
                };
374
169
                if cond.asc {
375
152
                    sort_opts.insert(key.to_string(), 1);
376
152
                } else {
377
17
                    sort_opts.insert(key.to_string(), -1);
378
17
                }
379
            }
380
108
            find = find.sort(sort_opts);
381
1
        }
382
72
    }
383
181
    find
384
181
}
385

            
386
/// Transforms query conditions to the MongoDB document.
387
19
fn get_update_query_filter(cond: &UpdateQueryCond) -> Document {
388
19
    doc! {"deviceId": cond.device_id}
389
19
}
390

            
391
/// Transforms the model object to the MongoDB document.
392
19
fn get_update_doc(updates: &Updates) -> Option<Document> {
393
19
    let mut count = 0;
394
19
    let mut document = Document::new();
395
19
    if let Some((network_id, network_code)) = updates.network {
396
6
        document.insert("networkId", network_id);
397
6
        document.insert("networkCode", network_code);
398
6
        count += 1;
399
13
    }
400
19
    if let Some(value) = updates.network_addr {
401
8
        document.insert("networkAddr", value);
402
8
        count += 1;
403
11
    }
404
19
    if let Some(value) = updates.modified_at.as_ref() {
405
18
        document.insert(
406
18
            "modifiedAt",
407
18
            DateTime::from_millis(value.timestamp_millis()),
408
18
        );
409
18
        count += 1;
410
18
    }
411
19
    if let Some(value) = updates.profile {
412
11
        document.insert("profile", value);
413
11
        count += 1;
414
11
    }
415
19
    if let Some(value) = updates.name {
416
11
        document.insert("name", value);
417
11
        count += 1;
418
11
    }
419
19
    if let Some(value) = updates.info {
420
10
        document.insert(
421
10
            "info",
422
10
            match bson::to_document(value) {
423
                Err(_) => return None,
424
10
                Ok(doc) => doc,
425
10
            },
426
10
        );
427
10
        count += 1;
428
9
    }
429
19
    if count == 0 {
430
1
        return None;
431
18
    }
432
18
    Some(doc! {"$set": document})
433
19
}