1
use std::{error::Error as StdError, sync::Arc};
2

            
3
use async_trait::async_trait;
4
use futures::TryStreamExt;
5
use mongodb::{
6
    Cursor as MongoDbCursor, Database,
7
    action::Find,
8
    bson::{self, Bson, DateTime, Document, Regex, doc},
9
    error::ErrorKind,
10
};
11
use serde::{Deserialize, Serialize};
12

            
13
use super::super::device::{
14
    Cursor, Device, DeviceModel, ListOptions, ListQueryCond, QueryCond, SortKey, UpdateQueryCond,
15
    Updates,
16
};
17

            
18
/// Model instance.
19
pub struct Model {
20
    /// The associated database connection.
21
    conn: Arc<Database>,
22
}
23

            
24
/// Cursor instance.
25
struct DbCursor {
26
    /// The associated collection cursor.
27
    cursor: MongoDbCursor<Schema>,
28
    /// (Useless) only for Cursor trait implementation.
29
    offset: u64,
30
}
31

            
32
/// MongoDB schema.
33
#[derive(Deserialize, Serialize)]
34
struct Schema {
35
    #[serde(rename = "deviceId")]
36
    device_id: String,
37
    #[serde(rename = "unitId")]
38
    unit_id: String,
39
    #[serde(rename = "unitCode")]
40
    unit_code: Option<String>,
41
    #[serde(rename = "networkId")]
42
    network_id: String,
43
    #[serde(rename = "networkCode")]
44
    network_code: String,
45
    #[serde(rename = "networkAddr")]
46
    network_addr: String,
47
    #[serde(rename = "createdAt")]
48
    created_at: DateTime,
49
    #[serde(rename = "modifiedAt")]
50
    modified_at: DateTime,
51
    profile: String,
52
    name: String,
53
    info: Document,
54
}
55

            
56
const COL_NAME: &'static str = "device";
57

            
58
impl Model {
59
    /// To create the model instance with a database connection.
60
24
    pub async fn new(conn: Arc<Database>) -> Result<Self, Box<dyn StdError>> {
61
24
        let model = Model { conn };
62
24
        model.init().await?;
63
24
        Ok(model)
64
24
    }
65
}
66

            
67
#[async_trait]
68
impl DeviceModel for Model {
69
38
    async fn init(&self) -> Result<(), Box<dyn StdError>> {
70
38
        let indexes = vec![
71
38
            doc! {"name": "deviceId_1", "key": {"deviceId": 1}, "unique": true},
72
38
            doc! {
73
38
                "name": "unitCode_1_networkCode_1_networkAddr_1",
74
38
                "key": {"unitCode": 1, "networkCode": 1, "networkAddr": 1},
75
38
                "unique": true
76
38
            },
77
38
            doc! {"name": "unitId_1", "key": {"unitId": 1}},
78
38
            doc! {"name": "networkId_1", "key": {"networkId": 1}},
79
38
            doc! {"name": "unitCode_1", "key": {"unitCode": 1}},
80
38
            doc! {"name": "networkCode_1", "key": {"networkCode": 1}},
81
38
            doc! {"name": "networkAddr_1", "key": {"networkAddr": 1}},
82
38
            doc! {"name": "createdAt_1", "key": {"createdAt": 1}},
83
38
            doc! {"name": "modifiedAt_1", "key": {"modifiedAt": 1}},
84
38
            doc! {"name": "profile_1", "key": {"profile": 1}},
85
38
            doc! {"name": "name_1", "key": {"name": 1}},
86
38
        ];
87
38
        let command = doc! {
88
38
            "createIndexes": COL_NAME,
89
38
            "indexes": indexes,
90
38
        };
91
38
        self.conn.run_command(command).await?;
92
38
        Ok(())
93
76
    }
94

            
95
98
    async fn count(&self, cond: &ListQueryCond) -> Result<u64, Box<dyn StdError>> {
96
98
        let filter = get_list_query_filter(cond);
97
98
        let count = self
98
98
            .conn
99
98
            .collection::<Schema>(COL_NAME)
100
98
            .count_documents(filter)
101
98
            .await?;
102
98
        Ok(count)
103
196
    }
104

            
105
    async fn list(
106
        &self,
107
        opts: &ListOptions,
108
        cursor: Option<Box<dyn Cursor>>,
109
408
    ) -> Result<(Vec<Device>, Option<Box<dyn Cursor>>), Box<dyn StdError>> {
110
408
        let mut cursor = match cursor {
111
            None => {
112
362
                let filter = get_list_query_filter(opts.cond);
113
362
                Box::new(DbCursor::new(
114
362
                    build_find_options(opts, self.conn.collection::<Schema>(COL_NAME).find(filter))
115
362
                        .await?,
116
                ))
117
            }
118
46
            Some(cursor) => cursor,
119
        };
120

            
121
408
        let mut count: u64 = 0;
122
408
        let mut list = Vec::new();
123
66794
        while let Some(item) = cursor.try_next().await? {
124
66432
            list.push(item);
125
66432
            if let Some(cursor_max) = opts.cursor_max {
126
4456
                count += 1;
127
4456
                if count >= cursor_max {
128
46
                    return Ok((list, Some(cursor)));
129
4410
                }
130
61976
            }
131
        }
132
362
        Ok((list, None))
133
816
    }
134

            
135
504
    async fn get(&self, cond: &QueryCond) -> Result<Option<Device>, Box<dyn StdError>> {
136
504
        let filter = get_query_filter(cond);
137
504
        let mut cursor = self
138
504
            .conn
139
504
            .collection::<Schema>(COL_NAME)
140
504
            .find(filter)
141
504
            .await?;
142
504
        if let Some(item) = cursor.try_next().await? {
143
            return Ok(Some(Device {
144
420
                device_id: item.device_id,
145
420
                unit_id: item.unit_id,
146
420
                unit_code: item.unit_code,
147
420
                network_id: item.network_id,
148
420
                network_code: item.network_code,
149
420
                network_addr: item.network_addr,
150
420
                created_at: item.created_at.into(),
151
420
                modified_at: item.modified_at.into(),
152
420
                profile: item.profile,
153
420
                name: item.name,
154
420
                info: bson::from_document(item.info)?,
155
            }));
156
84
        }
157
84
        Ok(None)
158
1008
    }
159

            
160
2522
    async fn add(&self, device: &Device) -> Result<(), Box<dyn StdError>> {
161
2522
        let item = Schema {
162
2522
            device_id: device.device_id.clone(),
163
2522
            unit_id: device.unit_id.clone(),
164
2522
            unit_code: device.unit_code.clone(),
165
2522
            network_id: device.network_id.clone(),
166
2522
            network_code: device.network_code.clone(),
167
2522
            network_addr: device.network_addr.clone(),
168
2522
            created_at: device.created_at.into(),
169
2522
            modified_at: device.modified_at.into(),
170
2522
            profile: device.profile.clone(),
171
2522
            name: device.name.clone(),
172
2522
            info: bson::to_document(&device.info)?,
173
        };
174
2522
        self.conn
175
2522
            .collection::<Schema>(COL_NAME)
176
2522
            .insert_one(item)
177
2522
            .await?;
178
2518
        Ok(())
179
5044
    }
180

            
181
74
    async fn add_bulk(&self, devices: &Vec<Device>) -> Result<(), Box<dyn StdError>> {
182
74
        let mut items = vec![];
183
41606
        for device in devices.iter() {
184
41606
            items.push(Schema {
185
41606
                device_id: device.device_id.clone(),
186
41606
                unit_id: device.unit_id.clone(),
187
41606
                unit_code: device.unit_code.clone(),
188
41606
                network_id: device.network_id.clone(),
189
41606
                network_code: device.network_code.clone(),
190
41606
                network_addr: device.network_addr.clone(),
191
41606
                created_at: device.created_at.into(),
192
41606
                modified_at: device.modified_at.into(),
193
41606
                profile: device.profile.clone(),
194
41606
                name: device.name.clone(),
195
41606
                info: bson::to_document(&device.info)?,
196
            });
197
        }
198
74
        if let Err(e) = self
199
74
            .conn
200
74
            .collection::<Schema>(COL_NAME)
201
74
            .insert_many(items)
202
74
            .ordered(false)
203
74
            .await
204
        {
205
10
            match e.kind.as_ref() {
206
10
                ErrorKind::InsertMany(imerr) => match imerr.write_errors.as_ref() {
207
                    None => return Err(Box::new(e)),
208
10
                    Some(errs) => {
209
8394
                        for err in errs {
210
8384
                            if err.code != 11000 {
211
                                return Err(Box::new(e));
212
8384
                            }
213
                        }
214
10
                        ()
215
                    }
216
                },
217
                _ => return Err(Box::new(e)),
218
            }
219
64
        }
220

            
221
74
        Ok(())
222
148
    }
223

            
224
106
    async fn del(&self, cond: &QueryCond) -> Result<(), Box<dyn StdError>> {
225
106
        let filter = get_query_filter(cond);
226
106
        self.conn
227
106
            .collection::<Schema>(COL_NAME)
228
106
            .delete_many(filter)
229
106
            .await?;
230
106
        Ok(())
231
212
    }
232

            
233
    async fn update(
234
        &self,
235
        cond: &UpdateQueryCond,
236
        updates: &Updates,
237
38
    ) -> Result<(), Box<dyn StdError>> {
238
38
        let filter = get_update_query_filter(cond);
239
38
        if let Some(updates) = get_update_doc(updates) {
240
36
            self.conn
241
36
                .collection::<Schema>(COL_NAME)
242
36
                .update_one(filter, updates)
243
36
                .await?;
244
2
        }
245
38
        return Ok(());
246
76
    }
247
}
248

            
249
impl DbCursor {
250
    /// To create the cursor instance with a collection cursor.
251
362
    pub fn new(cursor: MongoDbCursor<Schema>) -> Self {
252
362
        DbCursor { cursor, offset: 0 }
253
362
    }
254
}
255

            
256
#[async_trait]
257
impl Cursor for DbCursor {
258
66794
    async fn try_next(&mut self) -> Result<Option<Device>, Box<dyn StdError>> {
259
66794
        if let Some(item) = self.cursor.try_next().await? {
260
66432
            self.offset += 1;
261
66432
            return Ok(Some(Device {
262
66432
                device_id: item.device_id,
263
66432
                unit_id: item.unit_id,
264
66432
                unit_code: item.unit_code,
265
66432
                network_id: item.network_id,
266
66432
                network_code: item.network_code,
267
66432
                network_addr: item.network_addr,
268
66432
                created_at: item.created_at.into(),
269
66432
                modified_at: item.modified_at.into(),
270
66432
                profile: item.profile,
271
66432
                name: item.name,
272
66432
                info: bson::from_document(item.info)?,
273
            }));
274
362
        }
275
362
        Ok(None)
276
133588
    }
277

            
278
8
    fn offset(&self) -> u64 {
279
8
        self.offset
280
8
    }
281
}
282

            
283
/// Transforms query conditions to the MongoDB document.
284
610
fn get_query_filter(cond: &QueryCond) -> Document {
285
610
    let mut filter = Document::new();
286
610
    if let Some(value) = cond.unit_id {
287
60
        filter.insert("unitId", value);
288
550
    }
289
610
    if let Some(value) = cond.device_id {
290
502
        filter.insert("deviceId", value);
291
502
    }
292
610
    if let Some(value) = cond.network_id {
293
38
        filter.insert("networkId", value);
294
572
    }
295
610
    if let Some(value) = cond.network_addrs {
296
24
        let mut in_cond = Document::new();
297
24
        in_cond.insert("$in", value);
298
24
        filter.insert("networkAddr", in_cond);
299
586
    }
300
610
    if let Some(value) = cond.device.as_ref() {
301
54
        if let Some(unit_code) = value.unit_code {
302
42
            filter.insert("unitCode", unit_code);
303
42
        } else {
304
12
            filter.insert("unitCode", Bson::Null);
305
12
        }
306
54
        filter.insert("networkCode", value.network_code);
307
54
        filter.insert("networkAddr", value.network_addr);
308
556
    }
309
610
    filter
310
610
}
311

            
312
/// Transforms query conditions to the MongoDB document.
313
460
fn get_list_query_filter(cond: &ListQueryCond) -> Document {
314
460
    let mut filter = Document::new();
315
460
    if let Some(value) = cond.unit_id {
316
176
        filter.insert("unitId", value);
317
284
    }
318
460
    if let Some(value) = cond.device_id {
319
20
        filter.insert("deviceId", value);
320
440
    }
321
460
    if let Some(value) = cond.network_id {
322
158
        filter.insert("networkId", value);
323
302
    }
324
460
    if let Some(value) = cond.network_code {
325
48
        filter.insert("networkCode", value);
326
412
    }
327
460
    if let Some(value) = cond.network_addr {
328
106
        filter.insert("networkAddr", value);
329
354
    } else if let Some(value) = cond.network_addrs {
330
48
        let mut in_cond = Document::new();
331
48
        in_cond.insert("$in", value);
332
48
        filter.insert("networkAddr", in_cond);
333
306
    }
334
460
    if let Some(value) = cond.profile {
335
16
        filter.insert("profile", value);
336
444
    }
337
460
    if let Some(value) = cond.name_contains {
338
56
        filter.insert(
339
56
            "name",
340
56
            Regex {
341
56
                pattern: value.to_string(),
342
56
                options: "i".to_string(),
343
56
            },
344
56
        );
345
404
    }
346
460
    filter
347
460
}
348

            
349
/// Transforms model options to the options.
350
362
fn build_find_options<'a, T>(opts: &ListOptions, mut find: Find<'a, T>) -> Find<'a, T>
351
362
where
352
362
    T: Send + Sync,
353
362
{
354
362
    if let Some(offset) = opts.offset {
355
20
        find = find.skip(offset);
356
342
    }
357
362
    if let Some(limit) = opts.limit {
358
130
        if limit > 0 {
359
128
            find = find.limit(limit as i64);
360
128
        }
361
232
    }
362
362
    if let Some(sort_list) = opts.sort.as_ref() {
363
218
        if sort_list.len() > 0 {
364
216
            let mut sort_opts = Document::new();
365
338
            for cond in sort_list.iter() {
366
338
                let key = match cond.key {
367
12
                    SortKey::CreatedAt => "createdAt",
368
8
                    SortKey::ModifiedAt => "modifiedAt",
369
118
                    SortKey::NetworkCode => "networkCode",
370
184
                    SortKey::NetworkAddr => "networkAddr",
371
4
                    SortKey::Profile => "profile",
372
12
                    SortKey::Name => "name",
373
                };
374
338
                if cond.asc {
375
304
                    sort_opts.insert(key.to_string(), 1);
376
304
                } else {
377
34
                    sort_opts.insert(key.to_string(), -1);
378
34
                }
379
            }
380
216
            find = find.sort(sort_opts);
381
2
        }
382
144
    }
383
362
    find
384
362
}
385

            
386
/// Transforms query conditions to the MongoDB document.
387
38
fn get_update_query_filter(cond: &UpdateQueryCond) -> Document {
388
38
    doc! {"deviceId": cond.device_id}
389
38
}
390

            
391
/// Transforms the model object to the MongoDB document.
392
38
fn get_update_doc(updates: &Updates) -> Option<Document> {
393
38
    let mut count = 0;
394
38
    let mut document = Document::new();
395
38
    if let Some((network_id, network_code)) = updates.network {
396
12
        document.insert("networkId", network_id);
397
12
        document.insert("networkCode", network_code);
398
12
        count += 1;
399
26
    }
400
38
    if let Some(value) = updates.network_addr {
401
16
        document.insert("networkAddr", value);
402
16
        count += 1;
403
22
    }
404
38
    if let Some(value) = updates.modified_at.as_ref() {
405
36
        document.insert(
406
36
            "modifiedAt",
407
36
            DateTime::from_millis(value.timestamp_millis()),
408
36
        );
409
36
        count += 1;
410
36
    }
411
38
    if let Some(value) = updates.profile {
412
22
        document.insert("profile", value);
413
22
        count += 1;
414
22
    }
415
38
    if let Some(value) = updates.name {
416
22
        document.insert("name", value);
417
22
        count += 1;
418
22
    }
419
38
    if let Some(value) = updates.info {
420
20
        document.insert(
421
20
            "info",
422
20
            match bson::to_document(value) {
423
                Err(_) => return None,
424
20
                Ok(doc) => doc,
425
20
            },
426
20
        );
427
20
        count += 1;
428
18
    }
429
38
    if count == 0 {
430
2
        return None;
431
36
    }
432
36
    Some(doc! {"$set": document})
433
38
}