1
use std::{error::Error as StdError, sync::Arc};
2

            
3
use async_trait::async_trait;
4
use futures::TryStreamExt;
5
use mongodb::{
6
    Cursor as MongoDbCursor, Database,
7
    action::Find,
8
    bson::{self, Bson, DateTime, Document, Regex, doc, raw::CString},
9
    error::ErrorKind,
10
};
11
use serde::{Deserialize, Serialize};
12

            
13
use sylvia_iot_corelib::strings;
14

            
15
use super::super::device::{
16
    Cursor, Device, DeviceModel, ListOptions, ListQueryCond, QueryCond, SortKey, UpdateQueryCond,
17
    Updates,
18
};
19

            
20
/// Model instance.
21
pub struct Model {
22
    /// The associated database connection.
23
    conn: Arc<Database>,
24
}
25

            
26
/// Cursor instance.
27
struct DbCursor {
28
    /// The associated collection cursor.
29
    cursor: MongoDbCursor<Schema>,
30
    /// (Useless) only for Cursor trait implementation.
31
    offset: u64,
32
}
33

            
34
/// MongoDB schema.
35
#[derive(Deserialize, Serialize)]
36
struct Schema {
37
    #[serde(rename = "deviceId")]
38
    device_id: String,
39
    #[serde(rename = "unitId")]
40
    unit_id: String,
41
    #[serde(rename = "unitCode")]
42
    unit_code: Option<String>,
43
    #[serde(rename = "networkId")]
44
    network_id: String,
45
    #[serde(rename = "networkCode")]
46
    network_code: String,
47
    #[serde(rename = "networkAddr")]
48
    network_addr: String,
49
    #[serde(rename = "createdAt")]
50
    created_at: DateTime,
51
    #[serde(rename = "modifiedAt")]
52
    modified_at: DateTime,
53
    profile: String,
54
    name: String,
55
    info: Document,
56
}
57

            
58
const COL_NAME: &'static str = "device";
59

            
60
impl Model {
61
    /// To create the model instance with a database connection.
62
24
    pub async fn new(conn: Arc<Database>) -> Result<Self, Box<dyn StdError>> {
63
24
        let model = Model { conn };
64
24
        model.init().await?;
65
24
        Ok(model)
66
24
    }
67
}
68

            
69
#[async_trait]
70
impl DeviceModel for Model {
71
38
    async fn init(&self) -> Result<(), Box<dyn StdError>> {
72
        let indexes = vec![
73
            doc! {"name": "deviceId_1", "key": {"deviceId": 1}, "unique": true},
74
            doc! {
75
                "name": "unitCode_1_networkCode_1_networkAddr_1",
76
                "key": {"unitCode": 1, "networkCode": 1, "networkAddr": 1},
77
                "unique": true
78
            },
79
            doc! {"name": "unitId_1", "key": {"unitId": 1}},
80
            doc! {"name": "networkId_1", "key": {"networkId": 1}},
81
            doc! {"name": "unitCode_1", "key": {"unitCode": 1}},
82
            doc! {"name": "networkCode_1", "key": {"networkCode": 1}},
83
            doc! {"name": "networkAddr_1", "key": {"networkAddr": 1}},
84
            doc! {"name": "createdAt_1", "key": {"createdAt": 1}},
85
            doc! {"name": "modifiedAt_1", "key": {"modifiedAt": 1}},
86
            doc! {"name": "profile_1", "key": {"profile": 1}},
87
            doc! {"name": "name_1", "key": {"name": 1}},
88
        ];
89
        let command = doc! {
90
            "createIndexes": COL_NAME,
91
            "indexes": indexes,
92
        };
93
        self.conn.run_command(command).await?;
94
        Ok(())
95
38
    }
96

            
97
98
    async fn count(&self, cond: &ListQueryCond) -> Result<u64, Box<dyn StdError>> {
98
        let filter = get_list_query_filter(cond);
99
        let count = self
100
            .conn
101
            .collection::<Schema>(COL_NAME)
102
            .count_documents(filter)
103
            .await?;
104
        Ok(count)
105
98
    }
106

            
107
    async fn list(
108
        &self,
109
        opts: &ListOptions,
110
        cursor: Option<Box<dyn Cursor>>,
111
408
    ) -> Result<(Vec<Device>, Option<Box<dyn Cursor>>), Box<dyn StdError>> {
112
        let mut cursor = match cursor {
113
            None => {
114
                let filter = get_list_query_filter(opts.cond);
115
                Box::new(DbCursor::new(
116
                    build_find_options(opts, self.conn.collection::<Schema>(COL_NAME).find(filter))
117
                        .await?,
118
                ))
119
            }
120
            Some(cursor) => cursor,
121
        };
122

            
123
        let mut count: u64 = 0;
124
        let mut list = Vec::new();
125
        while let Some(item) = cursor.try_next().await? {
126
            list.push(item);
127
408
            if let Some(cursor_max) = opts.cursor_max {
128
                count += 1;
129
                if count >= cursor_max {
130
                    return Ok((list, Some(cursor)));
131
                }
132
            }
133
        }
134
        Ok((list, None))
135
408
    }
136

            
137
504
    async fn get(&self, cond: &QueryCond) -> Result<Option<Device>, Box<dyn StdError>> {
138
        let filter = get_query_filter(cond);
139
        let mut cursor = self
140
            .conn
141
            .collection::<Schema>(COL_NAME)
142
            .find(filter)
143
            .await?;
144
        if let Some(item) = cursor.try_next().await? {
145
            return Ok(Some(Device {
146
                device_id: item.device_id,
147
                unit_id: item.unit_id,
148
                unit_code: item.unit_code,
149
                network_id: item.network_id,
150
                network_code: item.network_code,
151
                network_addr: item.network_addr,
152
                created_at: item.created_at.into(),
153
                modified_at: item.modified_at.into(),
154
                profile: item.profile,
155
                name: item.name,
156
                info: bson::deserialize_from_document(item.info)?,
157
            }));
158
        }
159
        Ok(None)
160
504
    }
161

            
162
2522
    async fn add(&self, device: &Device) -> Result<(), Box<dyn StdError>> {
163
        let item = Schema {
164
            device_id: device.device_id.clone(),
165
            unit_id: device.unit_id.clone(),
166
            unit_code: device.unit_code.clone(),
167
            network_id: device.network_id.clone(),
168
            network_code: device.network_code.clone(),
169
            network_addr: device.network_addr.clone(),
170
            created_at: device.created_at.into(),
171
            modified_at: device.modified_at.into(),
172
            profile: device.profile.clone(),
173
            name: device.name.clone(),
174
            info: bson::serialize_to_document(&device.info)?,
175
        };
176
        self.conn
177
            .collection::<Schema>(COL_NAME)
178
            .insert_one(item)
179
            .await?;
180
        Ok(())
181
2522
    }
182

            
183
74
    async fn add_bulk(&self, devices: &Vec<Device>) -> Result<(), Box<dyn StdError>> {
184
        let mut items = vec![];
185
        for device in devices.iter() {
186
            items.push(Schema {
187
                device_id: device.device_id.clone(),
188
                unit_id: device.unit_id.clone(),
189
                unit_code: device.unit_code.clone(),
190
                network_id: device.network_id.clone(),
191
                network_code: device.network_code.clone(),
192
                network_addr: device.network_addr.clone(),
193
                created_at: device.created_at.into(),
194
                modified_at: device.modified_at.into(),
195
                profile: device.profile.clone(),
196
                name: device.name.clone(),
197
                info: bson::serialize_to_document(&device.info)?,
198
            });
199
        }
200
        if let Err(e) = self
201
            .conn
202
            .collection::<Schema>(COL_NAME)
203
            .insert_many(items)
204
            .ordered(false)
205
            .await
206
        {
207
            match e.kind.as_ref() {
208
                ErrorKind::InsertMany(imerr) => match imerr.write_errors.as_ref() {
209
                    None => return Err(Box::new(e)),
210
                    Some(errs) => {
211
                        for err in errs {
212
                            if err.code != 11000 {
213
                                return Err(Box::new(e));
214
                            }
215
                        }
216
                        ()
217
                    }
218
                },
219
                _ => return Err(Box::new(e)),
220
            }
221
        }
222

            
223
        Ok(())
224
74
    }
225

            
226
106
    async fn del(&self, cond: &QueryCond) -> Result<(), Box<dyn StdError>> {
227
        let filter = get_query_filter(cond);
228
        self.conn
229
            .collection::<Schema>(COL_NAME)
230
            .delete_many(filter)
231
            .await?;
232
        Ok(())
233
106
    }
234

            
235
    async fn update(
236
        &self,
237
        cond: &UpdateQueryCond,
238
        updates: &Updates,
239
38
    ) -> Result<(), Box<dyn StdError>> {
240
        let filter = get_update_query_filter(cond);
241
        if let Some(updates) = get_update_doc(updates) {
242
            self.conn
243
                .collection::<Schema>(COL_NAME)
244
                .update_one(filter, updates)
245
                .await?;
246
        }
247
        return Ok(());
248
38
    }
249
}
250

            
251
impl DbCursor {
252
    /// To create the cursor instance with a collection cursor.
253
362
    pub fn new(cursor: MongoDbCursor<Schema>) -> Self {
254
362
        DbCursor { cursor, offset: 0 }
255
362
    }
256
}
257

            
258
#[async_trait]
259
impl Cursor for DbCursor {
260
66794
    async fn try_next(&mut self) -> Result<Option<Device>, Box<dyn StdError>> {
261
        if let Some(item) = self.cursor.try_next().await? {
262
            self.offset += 1;
263
            return Ok(Some(Device {
264
                device_id: item.device_id,
265
                unit_id: item.unit_id,
266
                unit_code: item.unit_code,
267
                network_id: item.network_id,
268
                network_code: item.network_code,
269
                network_addr: item.network_addr,
270
                created_at: item.created_at.into(),
271
                modified_at: item.modified_at.into(),
272
                profile: item.profile,
273
                name: item.name,
274
                info: bson::deserialize_from_document(item.info)?,
275
            }));
276
        }
277
        Ok(None)
278
66794
    }
279

            
280
8
    fn offset(&self) -> u64 {
281
8
        self.offset
282
8
    }
283
}
284

            
285
/// Transforms query conditions to the MongoDB document.
286
610
fn get_query_filter(cond: &QueryCond) -> Document {
287
610
    let mut filter = Document::new();
288
610
    if let Some(value) = cond.unit_id {
289
60
        filter.insert("unitId", value);
290
550
    }
291
610
    if let Some(value) = cond.device_id {
292
502
        filter.insert("deviceId", value);
293
502
    }
294
610
    if let Some(value) = cond.network_id {
295
38
        filter.insert("networkId", value);
296
572
    }
297
610
    if let Some(value) = cond.network_addrs {
298
24
        let mut in_cond = Document::new();
299
24
        in_cond.insert("$in", value);
300
24
        filter.insert("networkAddr", in_cond);
301
586
    }
302
610
    if let Some(value) = cond.device.as_ref() {
303
54
        if let Some(unit_code) = value.unit_code {
304
42
            filter.insert("unitCode", unit_code);
305
42
        } else {
306
12
            filter.insert("unitCode", Bson::Null);
307
12
        }
308
54
        filter.insert("networkCode", value.network_code);
309
54
        filter.insert("networkAddr", value.network_addr);
310
556
    }
311
610
    filter
312
610
}
313

            
314
/// Transforms query conditions to the MongoDB document.
315
460
fn get_list_query_filter(cond: &ListQueryCond) -> Document {
316
460
    let mut filter = Document::new();
317
460
    if let Some(value) = cond.unit_id {
318
176
        filter.insert("unitId", value);
319
284
    }
320
460
    if let Some(value) = cond.device_id {
321
20
        filter.insert("deviceId", value);
322
440
    }
323
460
    if let Some(value) = cond.network_id {
324
158
        filter.insert("networkId", value);
325
302
    }
326
460
    if let Some(value) = cond.network_code {
327
48
        filter.insert("networkCode", value);
328
412
    }
329
460
    if let Some(value) = cond.network_addr {
330
106
        filter.insert("networkAddr", value);
331
354
    } else if let Some(value) = cond.network_addrs {
332
48
        let mut in_cond = Document::new();
333
48
        in_cond.insert("$in", value);
334
48
        filter.insert("networkAddr", in_cond);
335
306
    }
336
460
    if let Some(value) = cond.profile {
337
16
        filter.insert("profile", value);
338
444
    }
339
460
    if let Some(value) = cond.name_contains {
340
56
        let escaped = strings::escape_regex_str(value);
341
56
        if let Ok(pattern) = CString::try_from(escaped.as_str()) {
342
56
            if let Ok(options) = CString::try_from("i") {
343
56
                filter.insert("name", Regex { pattern, options });
344
56
            }
345
        }
346
404
    }
347
460
    filter
348
460
}
349

            
350
/// Transforms model options to the options.
351
362
fn build_find_options<'a, T>(opts: &ListOptions, mut find: Find<'a, T>) -> Find<'a, T>
352
362
where
353
362
    T: Send + Sync,
354
{
355
362
    if let Some(offset) = opts.offset {
356
20
        find = find.skip(offset);
357
342
    }
358
362
    if let Some(limit) = opts.limit {
359
130
        if limit > 0 {
360
128
            find = find.limit(limit as i64);
361
128
        }
362
232
    }
363
362
    if let Some(sort_list) = opts.sort.as_ref() {
364
218
        if sort_list.len() > 0 {
365
216
            let mut sort_opts = Document::new();
366
338
            for cond in sort_list.iter() {
367
338
                let key = match cond.key {
368
12
                    SortKey::CreatedAt => "createdAt",
369
8
                    SortKey::ModifiedAt => "modifiedAt",
370
118
                    SortKey::NetworkCode => "networkCode",
371
184
                    SortKey::NetworkAddr => "networkAddr",
372
4
                    SortKey::Profile => "profile",
373
12
                    SortKey::Name => "name",
374
                };
375
338
                if cond.asc {
376
304
                    sort_opts.insert(key.to_string(), 1);
377
304
                } else {
378
34
                    sort_opts.insert(key.to_string(), -1);
379
34
                }
380
            }
381
216
            find = find.sort(sort_opts);
382
2
        }
383
144
    }
384
362
    find
385
362
}
386

            
387
/// Transforms query conditions to the MongoDB document.
388
38
fn get_update_query_filter(cond: &UpdateQueryCond) -> Document {
389
38
    doc! {"deviceId": cond.device_id}
390
38
}
391

            
392
/// Transforms the model object to the MongoDB document.
393
38
fn get_update_doc(updates: &Updates) -> Option<Document> {
394
38
    let mut count = 0;
395
38
    let mut document = Document::new();
396
38
    if let Some((network_id, network_code)) = updates.network {
397
12
        document.insert("networkId", network_id);
398
12
        document.insert("networkCode", network_code);
399
12
        count += 1;
400
26
    }
401
38
    if let Some(value) = updates.network_addr {
402
16
        document.insert("networkAddr", value);
403
16
        count += 1;
404
22
    }
405
38
    if let Some(value) = updates.modified_at.as_ref() {
406
36
        document.insert(
407
36
            "modifiedAt",
408
36
            DateTime::from_millis(value.timestamp_millis()),
409
36
        );
410
36
        count += 1;
411
36
    }
412
38
    if let Some(value) = updates.profile {
413
22
        document.insert("profile", value);
414
22
        count += 1;
415
22
    }
416
38
    if let Some(value) = updates.name {
417
22
        document.insert("name", value);
418
22
        count += 1;
419
22
    }
420
38
    if let Some(value) = updates.info {
421
20
        document.insert(
422
            "info",
423
20
            match bson::serialize_to_document(value) {
424
                Err(_) => return None,
425
20
                Ok(doc) => doc,
426
            },
427
        );
428
20
        count += 1;
429
18
    }
430
38
    if count == 0 {
431
2
        return None;
432
36
    }
433
36
    Some(doc! {"$set": document})
434
38
}