1
use std::{error::Error as StdError, sync::Arc};
2

            
3
use async_trait::async_trait;
4
use futures::TryStreamExt;
5
use mongodb::{
6
    Cursor as MongoDbCursor, Database,
7
    action::Find,
8
    bson::{self, Bson, DateTime, Document, Regex, doc},
9
    error::ErrorKind,
10
};
11
use serde::{Deserialize, Serialize};
12

            
13
use super::super::device::{
14
    Cursor, Device, DeviceModel, ListOptions, ListQueryCond, QueryCond, SortKey, UpdateQueryCond,
15
    Updates,
16
};
17

            
18
/// Model instance.
19
pub struct Model {
20
    /// The associated database connection.
21
    conn: Arc<Database>,
22
}
23

            
24
/// Cursor instance.
25
struct DbCursor {
26
    /// The associated collection cursor.
27
    cursor: MongoDbCursor<Schema>,
28
    /// (Useless) only for Cursor trait implementation.
29
    offset: u64,
30
}
31

            
32
/// MongoDB schema.
33
#[derive(Deserialize, Serialize)]
34
struct Schema {
35
    #[serde(rename = "deviceId")]
36
    device_id: String,
37
    #[serde(rename = "unitId")]
38
    unit_id: String,
39
    #[serde(rename = "unitCode")]
40
    unit_code: Option<String>,
41
    #[serde(rename = "networkId")]
42
    network_id: String,
43
    #[serde(rename = "networkCode")]
44
    network_code: String,
45
    #[serde(rename = "networkAddr")]
46
    network_addr: String,
47
    #[serde(rename = "createdAt")]
48
    created_at: DateTime,
49
    #[serde(rename = "modifiedAt")]
50
    modified_at: DateTime,
51
    profile: String,
52
    name: String,
53
    info: Document,
54
}
55

            
56
const COL_NAME: &'static str = "device";
57

            
58
impl Model {
59
    /// To create the model instance with a database connection.
60
24
    pub async fn new(conn: Arc<Database>) -> Result<Self, Box<dyn StdError>> {
61
24
        let model = Model { conn };
62
24
        model.init().await?;
63
24
        Ok(model)
64
24
    }
65
}
66

            
67
#[async_trait]
68
impl DeviceModel for Model {
69
38
    async fn init(&self) -> Result<(), Box<dyn StdError>> {
70
        let indexes = vec![
71
            doc! {"name": "deviceId_1", "key": {"deviceId": 1}, "unique": true},
72
            doc! {
73
                "name": "unitCode_1_networkCode_1_networkAddr_1",
74
                "key": {"unitCode": 1, "networkCode": 1, "networkAddr": 1},
75
                "unique": true
76
            },
77
            doc! {"name": "unitId_1", "key": {"unitId": 1}},
78
            doc! {"name": "networkId_1", "key": {"networkId": 1}},
79
            doc! {"name": "unitCode_1", "key": {"unitCode": 1}},
80
            doc! {"name": "networkCode_1", "key": {"networkCode": 1}},
81
            doc! {"name": "networkAddr_1", "key": {"networkAddr": 1}},
82
            doc! {"name": "createdAt_1", "key": {"createdAt": 1}},
83
            doc! {"name": "modifiedAt_1", "key": {"modifiedAt": 1}},
84
            doc! {"name": "profile_1", "key": {"profile": 1}},
85
            doc! {"name": "name_1", "key": {"name": 1}},
86
        ];
87
        let command = doc! {
88
            "createIndexes": COL_NAME,
89
            "indexes": indexes,
90
        };
91
        self.conn.run_command(command).await?;
92
        Ok(())
93
38
    }
94

            
95
98
    async fn count(&self, cond: &ListQueryCond) -> Result<u64, Box<dyn StdError>> {
96
        let filter = get_list_query_filter(cond);
97
        let count = self
98
            .conn
99
            .collection::<Schema>(COL_NAME)
100
            .count_documents(filter)
101
            .await?;
102
        Ok(count)
103
98
    }
104

            
105
    async fn list(
106
        &self,
107
        opts: &ListOptions,
108
        cursor: Option<Box<dyn Cursor>>,
109
408
    ) -> Result<(Vec<Device>, Option<Box<dyn Cursor>>), Box<dyn StdError>> {
110
        let mut cursor = match cursor {
111
            None => {
112
                let filter = get_list_query_filter(opts.cond);
113
                Box::new(DbCursor::new(
114
                    build_find_options(opts, self.conn.collection::<Schema>(COL_NAME).find(filter))
115
                        .await?,
116
                ))
117
            }
118
            Some(cursor) => cursor,
119
        };
120

            
121
        let mut count: u64 = 0;
122
        let mut list = Vec::new();
123
        while let Some(item) = cursor.try_next().await? {
124
            list.push(item);
125
408
            if let Some(cursor_max) = opts.cursor_max {
126
                count += 1;
127
                if count >= cursor_max {
128
                    return Ok((list, Some(cursor)));
129
                }
130
            }
131
        }
132
        Ok((list, None))
133
408
    }
134

            
135
504
    async fn get(&self, cond: &QueryCond) -> Result<Option<Device>, Box<dyn StdError>> {
136
        let filter = get_query_filter(cond);
137
        let mut cursor = self
138
            .conn
139
            .collection::<Schema>(COL_NAME)
140
            .find(filter)
141
            .await?;
142
        if let Some(item) = cursor.try_next().await? {
143
            return Ok(Some(Device {
144
                device_id: item.device_id,
145
                unit_id: item.unit_id,
146
                unit_code: item.unit_code,
147
                network_id: item.network_id,
148
                network_code: item.network_code,
149
                network_addr: item.network_addr,
150
                created_at: item.created_at.into(),
151
                modified_at: item.modified_at.into(),
152
                profile: item.profile,
153
                name: item.name,
154
                info: bson::from_document(item.info)?,
155
            }));
156
        }
157
        Ok(None)
158
504
    }
159

            
160
2522
    async fn add(&self, device: &Device) -> Result<(), Box<dyn StdError>> {
161
        let item = Schema {
162
            device_id: device.device_id.clone(),
163
            unit_id: device.unit_id.clone(),
164
            unit_code: device.unit_code.clone(),
165
            network_id: device.network_id.clone(),
166
            network_code: device.network_code.clone(),
167
            network_addr: device.network_addr.clone(),
168
            created_at: device.created_at.into(),
169
            modified_at: device.modified_at.into(),
170
            profile: device.profile.clone(),
171
            name: device.name.clone(),
172
            info: bson::to_document(&device.info)?,
173
        };
174
        self.conn
175
            .collection::<Schema>(COL_NAME)
176
            .insert_one(item)
177
            .await?;
178
        Ok(())
179
2522
    }
180

            
181
74
    async fn add_bulk(&self, devices: &Vec<Device>) -> Result<(), Box<dyn StdError>> {
182
        let mut items = vec![];
183
        for device in devices.iter() {
184
            items.push(Schema {
185
                device_id: device.device_id.clone(),
186
                unit_id: device.unit_id.clone(),
187
                unit_code: device.unit_code.clone(),
188
                network_id: device.network_id.clone(),
189
                network_code: device.network_code.clone(),
190
                network_addr: device.network_addr.clone(),
191
                created_at: device.created_at.into(),
192
                modified_at: device.modified_at.into(),
193
                profile: device.profile.clone(),
194
                name: device.name.clone(),
195
                info: bson::to_document(&device.info)?,
196
            });
197
        }
198
        if let Err(e) = self
199
            .conn
200
            .collection::<Schema>(COL_NAME)
201
            .insert_many(items)
202
            .ordered(false)
203
            .await
204
        {
205
            match e.kind.as_ref() {
206
                ErrorKind::InsertMany(imerr) => match imerr.write_errors.as_ref() {
207
                    None => return Err(Box::new(e)),
208
                    Some(errs) => {
209
                        for err in errs {
210
                            if err.code != 11000 {
211
                                return Err(Box::new(e));
212
                            }
213
                        }
214
                        ()
215
                    }
216
                },
217
                _ => return Err(Box::new(e)),
218
            }
219
        }
220

            
221
        Ok(())
222
74
    }
223

            
224
106
    async fn del(&self, cond: &QueryCond) -> Result<(), Box<dyn StdError>> {
225
        let filter = get_query_filter(cond);
226
        self.conn
227
            .collection::<Schema>(COL_NAME)
228
            .delete_many(filter)
229
            .await?;
230
        Ok(())
231
106
    }
232

            
233
    async fn update(
234
        &self,
235
        cond: &UpdateQueryCond,
236
        updates: &Updates,
237
38
    ) -> Result<(), Box<dyn StdError>> {
238
        let filter = get_update_query_filter(cond);
239
        if let Some(updates) = get_update_doc(updates) {
240
            self.conn
241
                .collection::<Schema>(COL_NAME)
242
                .update_one(filter, updates)
243
                .await?;
244
        }
245
        return Ok(());
246
38
    }
247
}
248

            
249
impl DbCursor {
250
    /// To create the cursor instance with a collection cursor.
251
362
    pub fn new(cursor: MongoDbCursor<Schema>) -> Self {
252
362
        DbCursor { cursor, offset: 0 }
253
362
    }
254
}
255

            
256
#[async_trait]
257
impl Cursor for DbCursor {
258
66794
    async fn try_next(&mut self) -> Result<Option<Device>, Box<dyn StdError>> {
259
        if let Some(item) = self.cursor.try_next().await? {
260
            self.offset += 1;
261
            return Ok(Some(Device {
262
                device_id: item.device_id,
263
                unit_id: item.unit_id,
264
                unit_code: item.unit_code,
265
                network_id: item.network_id,
266
                network_code: item.network_code,
267
                network_addr: item.network_addr,
268
                created_at: item.created_at.into(),
269
                modified_at: item.modified_at.into(),
270
                profile: item.profile,
271
                name: item.name,
272
                info: bson::from_document(item.info)?,
273
            }));
274
        }
275
        Ok(None)
276
66794
    }
277

            
278
8
    fn offset(&self) -> u64 {
279
8
        self.offset
280
8
    }
281
}
282

            
283
/// Transforms query conditions to the MongoDB document.
284
610
fn get_query_filter(cond: &QueryCond) -> Document {
285
610
    let mut filter = Document::new();
286
610
    if let Some(value) = cond.unit_id {
287
60
        filter.insert("unitId", value);
288
550
    }
289
610
    if let Some(value) = cond.device_id {
290
502
        filter.insert("deviceId", value);
291
502
    }
292
610
    if let Some(value) = cond.network_id {
293
38
        filter.insert("networkId", value);
294
572
    }
295
610
    if let Some(value) = cond.network_addrs {
296
24
        let mut in_cond = Document::new();
297
24
        in_cond.insert("$in", value);
298
24
        filter.insert("networkAddr", in_cond);
299
586
    }
300
610
    if let Some(value) = cond.device.as_ref() {
301
54
        if let Some(unit_code) = value.unit_code {
302
42
            filter.insert("unitCode", unit_code);
303
42
        } else {
304
12
            filter.insert("unitCode", Bson::Null);
305
12
        }
306
54
        filter.insert("networkCode", value.network_code);
307
54
        filter.insert("networkAddr", value.network_addr);
308
556
    }
309
610
    filter
310
610
}
311

            
312
/// Transforms query conditions to the MongoDB document.
313
460
fn get_list_query_filter(cond: &ListQueryCond) -> Document {
314
460
    let mut filter = Document::new();
315
460
    if let Some(value) = cond.unit_id {
316
176
        filter.insert("unitId", value);
317
284
    }
318
460
    if let Some(value) = cond.device_id {
319
20
        filter.insert("deviceId", value);
320
440
    }
321
460
    if let Some(value) = cond.network_id {
322
158
        filter.insert("networkId", value);
323
302
    }
324
460
    if let Some(value) = cond.network_code {
325
48
        filter.insert("networkCode", value);
326
412
    }
327
460
    if let Some(value) = cond.network_addr {
328
106
        filter.insert("networkAddr", value);
329
354
    } else if let Some(value) = cond.network_addrs {
330
48
        let mut in_cond = Document::new();
331
48
        in_cond.insert("$in", value);
332
48
        filter.insert("networkAddr", in_cond);
333
306
    }
334
460
    if let Some(value) = cond.profile {
335
16
        filter.insert("profile", value);
336
444
    }
337
460
    if let Some(value) = cond.name_contains {
338
56
        filter.insert(
339
56
            "name",
340
56
            Regex {
341
56
                pattern: value.to_string(),
342
56
                options: "i".to_string(),
343
56
            },
344
56
        );
345
404
    }
346
460
    filter
347
460
}
348

            
349
/// Transforms model options to the options.
350
362
fn build_find_options<'a, T>(opts: &ListOptions, mut find: Find<'a, T>) -> Find<'a, T>
351
362
where
352
362
    T: Send + Sync,
353
{
354
362
    if let Some(offset) = opts.offset {
355
20
        find = find.skip(offset);
356
342
    }
357
362
    if let Some(limit) = opts.limit {
358
130
        if limit > 0 {
359
128
            find = find.limit(limit as i64);
360
128
        }
361
232
    }
362
362
    if let Some(sort_list) = opts.sort.as_ref() {
363
218
        if sort_list.len() > 0 {
364
216
            let mut sort_opts = Document::new();
365
338
            for cond in sort_list.iter() {
366
338
                let key = match cond.key {
367
12
                    SortKey::CreatedAt => "createdAt",
368
8
                    SortKey::ModifiedAt => "modifiedAt",
369
118
                    SortKey::NetworkCode => "networkCode",
370
184
                    SortKey::NetworkAddr => "networkAddr",
371
4
                    SortKey::Profile => "profile",
372
12
                    SortKey::Name => "name",
373
                };
374
338
                if cond.asc {
375
304
                    sort_opts.insert(key.to_string(), 1);
376
304
                } else {
377
34
                    sort_opts.insert(key.to_string(), -1);
378
34
                }
379
            }
380
216
            find = find.sort(sort_opts);
381
2
        }
382
144
    }
383
362
    find
384
362
}
385

            
386
/// Transforms query conditions to the MongoDB document.
387
38
fn get_update_query_filter(cond: &UpdateQueryCond) -> Document {
388
38
    doc! {"deviceId": cond.device_id}
389
38
}
390

            
391
/// Transforms the model object to the MongoDB document.
392
38
fn get_update_doc(updates: &Updates) -> Option<Document> {
393
38
    let mut count = 0;
394
38
    let mut document = Document::new();
395
38
    if let Some((network_id, network_code)) = updates.network {
396
12
        document.insert("networkId", network_id);
397
12
        document.insert("networkCode", network_code);
398
12
        count += 1;
399
26
    }
400
38
    if let Some(value) = updates.network_addr {
401
16
        document.insert("networkAddr", value);
402
16
        count += 1;
403
22
    }
404
38
    if let Some(value) = updates.modified_at.as_ref() {
405
36
        document.insert(
406
36
            "modifiedAt",
407
36
            DateTime::from_millis(value.timestamp_millis()),
408
36
        );
409
36
        count += 1;
410
36
    }
411
38
    if let Some(value) = updates.profile {
412
22
        document.insert("profile", value);
413
22
        count += 1;
414
22
    }
415
38
    if let Some(value) = updates.name {
416
22
        document.insert("name", value);
417
22
        count += 1;
418
22
    }
419
38
    if let Some(value) = updates.info {
420
20
        document.insert(
421
            "info",
422
20
            match bson::to_document(value) {
423
                Err(_) => return None,
424
20
                Ok(doc) => doc,
425
            },
426
        );
427
20
        count += 1;
428
18
    }
429
38
    if count == 0 {
430
2
        return None;
431
36
    }
432
36
    Some(doc! {"$set": document})
433
38
}