1
use std::{error::Error as StdError, sync::Arc};
2

            
3
use async_trait::async_trait;
4
use futures::TryStreamExt;
5
use mongodb::{
6
    action::Find,
7
    bson::{doc, DateTime, Document},
8
    error::ErrorKind,
9
    Cursor as MongoDbCursor, Database,
10
};
11
use serde::{Deserialize, Serialize};
12

            
13
use super::super::device_route::{
14
    Cursor, DeviceRoute, DeviceRouteModel, ListOptions, ListQueryCond, QueryCond, SortKey,
15
    UpdateQueryCond, Updates,
16
};
17

            
18
/// Model instance.
19
pub struct Model {
20
    /// The associated database connection.
21
    conn: Arc<Database>,
22
}
23

            
24
/// Cursor instance.
25
struct DbCursor {
26
    /// The associated collection cursor.
27
    cursor: MongoDbCursor<Schema>,
28
    /// (Useless) only for Cursor trait implementation.
29
    offset: u64,
30
}
31

            
32
/// MongoDB schema.
33
#[derive(Deserialize, Serialize)]
34
struct Schema {
35
    #[serde(rename = "routeId")]
36
    route_id: String,
37
    #[serde(rename = "unitId")]
38
    unit_id: String,
39
    #[serde(rename = "unitCode")]
40
    unit_code: String,
41
    #[serde(rename = "applicationId")]
42
    application_id: String,
43
    #[serde(rename = "applicationCode")]
44
    application_code: String,
45
    #[serde(rename = "deviceId")]
46
    device_id: String,
47
    #[serde(rename = "networkId")]
48
    network_id: String,
49
    #[serde(rename = "networkCode")]
50
    network_code: String,
51
    #[serde(rename = "networkAddr")]
52
    network_addr: String,
53
    profile: String,
54
    #[serde(rename = "createdAt")]
55
    created_at: DateTime,
56
    #[serde(rename = "modifiedAt")]
57
    modified_at: DateTime,
58
}
59

            
60
const COL_NAME: &'static str = "deviceRoute";
61

            
62
impl Model {
63
    /// To create the model instance with a database connection.
64
24
    pub async fn new(conn: Arc<Database>) -> Result<Self, Box<dyn StdError>> {
65
24
        let model = Model { conn };
66
24
        model.init().await?;
67
24
        Ok(model)
68
24
    }
69
}
70

            
71
#[async_trait]
72
impl DeviceRouteModel for Model {
73
38
    async fn init(&self) -> Result<(), Box<dyn StdError>> {
74
38
        let indexes = vec![
75
38
            doc! {"name": "routeId_1", "key": {"routeId": 1}, "unique": true},
76
38
            doc! {
77
38
                "name": "applicationId_1_deviceId_1",
78
38
                "key": {"applicationId": 1, "deviceId": 1},
79
38
                "unique": true
80
38
            },
81
38
            doc! {"name": "unitId_1", "key": {"unitId": 1}},
82
38
            doc! {"name": "applicationId_1", "key": {"applicationId": 1}},
83
38
            doc! {"name": "deviceId_1", "key": {"deviceId": 1}},
84
38
            doc! {"name": "networkId_1", "key": {"networkId": 1}},
85
38
            doc! {"name": "createdAt_1", "key": {"createdAt": 1}},
86
38
        ];
87
38
        let command = doc! {
88
38
            "createIndexes": COL_NAME,
89
38
            "indexes": indexes,
90
38
        };
91
38
        self.conn.run_command(command).await?;
92
38
        Ok(())
93
76
    }
94

            
95
140
    async fn count(&self, cond: &ListQueryCond) -> Result<u64, Box<dyn StdError>> {
96
140
        let filter = get_list_query_filter(cond);
97
140
        let count = self
98
140
            .conn
99
140
            .collection::<Schema>(COL_NAME)
100
140
            .count_documents(filter)
101
140
            .await?;
102
140
        Ok(count)
103
280
    }
104

            
105
    async fn list(
106
        &self,
107
        opts: &ListOptions,
108
        cursor: Option<Box<dyn Cursor>>,
109
392
    ) -> Result<(Vec<DeviceRoute>, Option<Box<dyn Cursor>>), Box<dyn StdError>> {
110
392
        let mut cursor = match cursor {
111
            None => {
112
354
                let filter = get_list_query_filter(opts.cond);
113
354
                Box::new(DbCursor::new(
114
354
                    build_find_options(opts, self.conn.collection::<Schema>(COL_NAME).find(filter))
115
354
                        .await?,
116
                ))
117
            }
118
38
            Some(cursor) => cursor,
119
        };
120

            
121
392
        let mut count: u64 = 0;
122
392
        let mut list = Vec::new();
123
37278
        while let Some(item) = cursor.try_next().await? {
124
36924
            list.push(item);
125
36924
            if let Some(cursor_max) = opts.cursor_max {
126
3604
                count += 1;
127
3604
                if count >= cursor_max {
128
38
                    return Ok((list, Some(cursor)));
129
3566
                }
130
33320
            }
131
        }
132
354
        Ok((list, None))
133
784
    }
134

            
135
230
    async fn get(&self, route_id: &str) -> Result<Option<DeviceRoute>, Box<dyn StdError>> {
136
230
        let filter = doc! {"routeId": route_id};
137
230
        let mut cursor = self
138
230
            .conn
139
230
            .collection::<Schema>(COL_NAME)
140
230
            .find(filter)
141
230
            .await?;
142
230
        if let Some(route) = cursor.try_next().await? {
143
156
            return Ok(Some(DeviceRoute {
144
156
                route_id: route.route_id,
145
156
                unit_id: route.unit_id,
146
156
                unit_code: route.unit_code,
147
156
                application_id: route.application_id,
148
156
                application_code: route.application_code,
149
156
                device_id: route.device_id,
150
156
                network_id: route.network_id,
151
156
                network_code: route.network_code,
152
156
                network_addr: route.network_addr,
153
156
                profile: route.profile,
154
156
                created_at: route.created_at.into(),
155
156
                modified_at: route.modified_at.into(),
156
156
            }));
157
74
        }
158
74
        Ok(None)
159
460
    }
160

            
161
1124
    async fn add(&self, route: &DeviceRoute) -> Result<(), Box<dyn StdError>> {
162
1124
        let item = Schema {
163
1124
            route_id: route.route_id.clone(),
164
1124
            unit_id: route.unit_id.clone(),
165
1124
            unit_code: route.unit_code.clone(),
166
1124
            application_id: route.application_id.clone(),
167
1124
            application_code: route.application_code.clone(),
168
1124
            device_id: route.device_id.clone(),
169
1124
            network_id: route.network_id.clone(),
170
1124
            network_code: route.network_code.clone(),
171
1124
            network_addr: route.network_addr.clone(),
172
1124
            profile: route.profile.clone(),
173
1124
            created_at: route.created_at.into(),
174
1124
            modified_at: route.modified_at.into(),
175
1124
        };
176
1124
        self.conn
177
1124
            .collection::<Schema>(COL_NAME)
178
1124
            .insert_one(item)
179
1124
            .await?;
180
1120
        Ok(())
181
2248
    }
182

            
183
38
    async fn add_bulk(&self, routes: &Vec<DeviceRoute>) -> Result<(), Box<dyn StdError>> {
184
38
        let mut items = vec![];
185
29278
        for route in routes.iter() {
186
29278
            items.push(Schema {
187
29278
                route_id: route.route_id.clone(),
188
29278
                unit_id: route.unit_id.clone(),
189
29278
                unit_code: route.unit_code.clone(),
190
29278
                application_id: route.application_id.clone(),
191
29278
                application_code: route.application_code.clone(),
192
29278
                device_id: route.device_id.clone(),
193
29278
                network_id: route.network_id.clone(),
194
29278
                network_code: route.network_code.clone(),
195
29278
                network_addr: route.network_addr.clone(),
196
29278
                profile: route.profile.clone(),
197
29278
                created_at: route.created_at.into(),
198
29278
                modified_at: route.modified_at.into(),
199
29278
            });
200
29278
        }
201
38
        if let Err(e) = self
202
38
            .conn
203
38
            .collection::<Schema>(COL_NAME)
204
38
            .insert_many(items)
205
38
            .ordered(false)
206
38
            .await
207
        {
208
14
            match e.kind.as_ref() {
209
14
                ErrorKind::InsertMany(imerr) => match imerr.write_errors.as_ref() {
210
                    None => return Err(Box::new(e)),
211
14
                    Some(errs) => {
212
12490
                        for err in errs {
213
12476
                            if err.code != 11000 {
214
                                return Err(Box::new(e));
215
12476
                            }
216
                        }
217
14
                        ()
218
                    }
219
                },
220
                _ => return Err(Box::new(e)),
221
            }
222
24
        }
223

            
224
38
        Ok(())
225
76
    }
226

            
227
124
    async fn del(&self, cond: &QueryCond) -> Result<(), Box<dyn StdError>> {
228
124
        let filter = get_query_filter(cond);
229
124
        self.conn
230
124
            .collection::<Schema>(COL_NAME)
231
124
            .delete_many(filter)
232
124
            .await?;
233
124
        Ok(())
234
248
    }
235

            
236
    async fn update(
237
        &self,
238
        cond: &UpdateQueryCond,
239
        updates: &Updates,
240
24
    ) -> Result<(), Box<dyn StdError>> {
241
24
        let filter = get_update_query_filter(cond);
242
24
        if let Some(updates) = get_update_doc(updates) {
243
22
            self.conn
244
22
                .collection::<Schema>(COL_NAME)
245
22
                .update_many(filter, updates)
246
22
                .await?;
247
2
        }
248
24
        return Ok(());
249
48
    }
250
}
251

            
252
impl DbCursor {
253
    /// To create the cursor instance with a collection cursor.
254
354
    pub fn new(cursor: MongoDbCursor<Schema>) -> Self {
255
354
        DbCursor { cursor, offset: 0 }
256
354
    }
257
}
258

            
259
#[async_trait]
260
impl Cursor for DbCursor {
261
37278
    async fn try_next(&mut self) -> Result<Option<DeviceRoute>, Box<dyn StdError>> {
262
37278
        if let Some(item) = self.cursor.try_next().await? {
263
36924
            self.offset += 1;
264
36924
            return Ok(Some(DeviceRoute {
265
36924
                route_id: item.route_id,
266
36924
                unit_id: item.unit_id,
267
36924
                unit_code: item.unit_code,
268
36924
                application_id: item.application_id,
269
36924
                application_code: item.application_code,
270
36924
                device_id: item.device_id,
271
36924
                network_id: item.network_id,
272
36924
                network_code: item.network_code,
273
36924
                network_addr: item.network_addr,
274
36924
                profile: item.profile,
275
36924
                created_at: item.created_at.into(),
276
36924
                modified_at: item.modified_at.into(),
277
36924
            }));
278
354
        }
279
354
        Ok(None)
280
74556
    }
281

            
282
8
    fn offset(&self) -> u64 {
283
8
        self.offset
284
8
    }
285
}
286

            
287
/// Transforms query conditions to the MongoDB document.
288
124
fn get_query_filter(cond: &QueryCond) -> Document {
289
124
    let mut filter = Document::new();
290
124
    if let Some(value) = cond.route_id {
291
12
        filter.insert("routeId", value);
292
112
    }
293
124
    if let Some(value) = cond.unit_id {
294
34
        filter.insert("unitId", value);
295
90
    }
296
124
    if let Some(value) = cond.application_id {
297
20
        filter.insert("applicationId", value);
298
104
    }
299
124
    if let Some(value) = cond.network_id {
300
34
        filter.insert("networkId", value);
301
90
    }
302
124
    if let Some(value) = cond.device_id {
303
46
        filter.insert("deviceId", value);
304
78
    }
305
124
    if let Some(value) = cond.network_addrs {
306
24
        let mut in_cond = Document::new();
307
24
        in_cond.insert("$in", value);
308
24
        filter.insert("networkAddr", in_cond);
309
100
    }
310
124
    filter
311
124
}
312

            
313
/// Transforms query conditions to the MongoDB document.
314
494
fn get_list_query_filter(cond: &ListQueryCond) -> Document {
315
494
    let mut filter = Document::new();
316
494
    if let Some(value) = cond.route_id {
317
68
        filter.insert("routeId", value);
318
426
    }
319
494
    if let Some(value) = cond.unit_id {
320
138
        filter.insert("unitId", value);
321
356
    }
322
494
    if let Some(value) = cond.unit_code {
323
12
        filter.insert("unitCode", value);
324
482
    }
325
494
    if let Some(value) = cond.application_id {
326
150
        filter.insert("applicationId", value);
327
344
    }
328
494
    if let Some(value) = cond.application_code {
329
12
        filter.insert("applicationCode", value);
330
482
    }
331
494
    if let Some(value) = cond.network_id {
332
114
        filter.insert("networkId", value);
333
380
    }
334
494
    if let Some(value) = cond.network_code {
335
12
        filter.insert("networkCode", value);
336
482
    }
337
494
    if let Some(value) = cond.network_addr {
338
12
        filter.insert("networkAddr", value);
339
482
    }
340
494
    if let Some(value) = cond.network_addrs {
341
8
        let mut in_cond = Document::new();
342
8
        in_cond.insert("$in", value);
343
8
        filter.insert("networkAddr", in_cond);
344
486
    }
345
494
    if let Some(value) = cond.device_id {
346
110
        filter.insert("deviceId", value);
347
384
    }
348
494
    filter
349
494
}
350

            
351
/// Transforms model options to the options.
352
354
fn build_find_options<'a, T>(opts: &ListOptions, mut find: Find<'a, T>) -> Find<'a, T>
353
354
where
354
354
    T: Send + Sync,
355
354
{
356
354
    if let Some(offset) = opts.offset {
357
20
        find = find.skip(offset);
358
334
    }
359
354
    if let Some(limit) = opts.limit {
360
184
        if limit > 0 {
361
182
            find = find.limit(limit as i64);
362
182
        }
363
170
    }
364
354
    if let Some(sort_list) = opts.sort.as_ref() {
365
224
        if sort_list.len() > 0 {
366
222
            let mut sort_opts = Document::new();
367
494
            for cond in sort_list.iter() {
368
494
                let key = match cond.key {
369
140
                    SortKey::CreatedAt => "createdAt",
370
8
                    SortKey::ModifiedAt => "modifiedAt",
371
8
                    SortKey::ApplicationCode => "applicationCode",
372
136
                    SortKey::NetworkCode => "networkCode",
373
202
                    SortKey::NetworkAddr => "networkAddr",
374
                };
375
494
                if cond.asc {
376
348
                    sort_opts.insert(key.to_string(), 1);
377
348
                } else {
378
146
                    sort_opts.insert(key.to_string(), -1);
379
146
                }
380
            }
381
222
            find = find.sort(sort_opts);
382
2
        }
383
130
    }
384
354
    find
385
354
}
386

            
387
/// Transforms query conditions to the MongoDB document.
388
24
fn get_update_query_filter(cond: &UpdateQueryCond) -> Document {
389
24
    doc! {"deviceId": cond.device_id}
390
24
}
391

            
392
/// Transforms the model object to the MongoDB document.
393
24
fn get_update_doc(updates: &Updates) -> Option<Document> {
394
24
    let mut count = 0;
395
24
    let mut document = Document::new();
396
24
    if let Some(value) = updates.modified_at.as_ref() {
397
22
        document.insert(
398
22
            "modifiedAt",
399
22
            DateTime::from_millis(value.timestamp_millis()),
400
22
        );
401
22
        count += 1;
402
22
    }
403
24
    if let Some(value) = updates.profile {
404
20
        document.insert("profile", value);
405
20
        count += 1;
406
20
    }
407
24
    if count == 0 {
408
2
        return None;
409
22
    }
410
22
    Some(doc! {"$set": document})
411
24
}