1
use std::{error::Error as StdError, sync::Arc};
2

            
3
use async_trait::async_trait;
4
use futures::TryStreamExt;
5
use mongodb::{
6
    action::Find,
7
    bson::{doc, DateTime, Document},
8
    error::ErrorKind,
9
    Cursor as MongoDbCursor, Database,
10
};
11
use serde::{Deserialize, Serialize};
12

            
13
use super::super::device_route::{
14
    Cursor, DeviceRoute, DeviceRouteModel, ListOptions, ListQueryCond, QueryCond, SortKey,
15
    UpdateQueryCond, Updates,
16
};
17

            
18
/// Model instance.
19
pub struct Model {
20
    /// The associated database connection.
21
    conn: Arc<Database>,
22
}
23

            
24
/// Cursor instance.
25
struct DbCursor {
26
    /// The associated collection cursor.
27
    cursor: MongoDbCursor<Schema>,
28
    /// (Useless) only for Cursor trait implementation.
29
    offset: u64,
30
}
31

            
32
/// MongoDB schema.
33
259560
#[derive(Deserialize, Serialize)]
34
struct Schema {
35
    #[serde(rename = "routeId")]
36
    route_id: String,
37
    #[serde(rename = "unitId")]
38
    unit_id: String,
39
    #[serde(rename = "unitCode")]
40
    unit_code: String,
41
    #[serde(rename = "applicationId")]
42
    application_id: String,
43
    #[serde(rename = "applicationCode")]
44
    application_code: String,
45
    #[serde(rename = "deviceId")]
46
    device_id: String,
47
    #[serde(rename = "networkId")]
48
    network_id: String,
49
    #[serde(rename = "networkCode")]
50
    network_code: String,
51
    #[serde(rename = "networkAddr")]
52
    network_addr: String,
53
    profile: String,
54
    #[serde(rename = "createdAt")]
55
    created_at: DateTime,
56
    #[serde(rename = "modifiedAt")]
57
    modified_at: DateTime,
58
}
59

            
60
const COL_NAME: &'static str = "deviceRoute";
61

            
62
impl Model {
63
    /// To create the model instance with a database connection.
64
12
    pub async fn new(conn: Arc<Database>) -> Result<Self, Box<dyn StdError>> {
65
12
        let model = Model { conn };
66
24
        model.init().await?;
67
12
        Ok(model)
68
12
    }
69
}
70

            
71
#[async_trait]
72
impl DeviceRouteModel for Model {
73
19
    async fn init(&self) -> Result<(), Box<dyn StdError>> {
74
19
        let indexes = vec![
75
19
            doc! {"name": "routeId_1", "key": {"routeId": 1}, "unique": true},
76
19
            doc! {
77
19
                "name": "applicationId_1_deviceId_1",
78
19
                "key": {"applicationId": 1, "deviceId": 1},
79
19
                "unique": true
80
19
            },
81
19
            doc! {"name": "unitId_1", "key": {"unitId": 1}},
82
19
            doc! {"name": "applicationId_1", "key": {"applicationId": 1}},
83
19
            doc! {"name": "deviceId_1", "key": {"deviceId": 1}},
84
19
            doc! {"name": "networkId_1", "key": {"networkId": 1}},
85
19
            doc! {"name": "createdAt_1", "key": {"createdAt": 1}},
86
19
        ];
87
19
        let command = doc! {
88
19
            "createIndexes": COL_NAME,
89
19
            "indexes": indexes,
90
19
        };
91
38
        self.conn.run_command(command).await?;
92
19
        Ok(())
93
38
    }
94

            
95
70
    async fn count(&self, cond: &ListQueryCond) -> Result<u64, Box<dyn StdError>> {
96
70
        let filter = get_list_query_filter(cond);
97
70
        let count = self
98
70
            .conn
99
70
            .collection::<Schema>(COL_NAME)
100
70
            .count_documents(filter)
101
140
            .await?;
102
70
        Ok(count)
103
140
    }
104

            
105
    async fn list(
106
        &self,
107
        opts: &ListOptions,
108
        cursor: Option<Box<dyn Cursor>>,
109
196
    ) -> Result<(Vec<DeviceRoute>, Option<Box<dyn Cursor>>), Box<dyn StdError>> {
110
196
        let mut cursor = match cursor {
111
            None => {
112
177
                let filter = get_list_query_filter(opts.cond);
113
177
                Box::new(DbCursor::new(
114
177
                    build_find_options(opts, self.conn.collection::<Schema>(COL_NAME).find(filter))
115
354
                        .await?,
116
                ))
117
            }
118
19
            Some(cursor) => cursor,
119
        };
120

            
121
196
        let mut count: u64 = 0;
122
196
        let mut list = Vec::new();
123
18639
        while let Some(item) = cursor.try_next().await? {
124
18462
            list.push(item);
125
18462
            if let Some(cursor_max) = opts.cursor_max {
126
1802
                count += 1;
127
1802
                if count >= cursor_max {
128
19
                    return Ok((list, Some(cursor)));
129
1783
                }
130
16660
            }
131
        }
132
177
        Ok((list, None))
133
392
    }
134

            
135
115
    async fn get(&self, route_id: &str) -> Result<Option<DeviceRoute>, Box<dyn StdError>> {
136
115
        let filter = doc! {"routeId": route_id};
137
115
        let mut cursor = self
138
115
            .conn
139
115
            .collection::<Schema>(COL_NAME)
140
115
            .find(filter)
141
230
            .await?;
142
115
        if let Some(route) = cursor.try_next().await? {
143
78
            return Ok(Some(DeviceRoute {
144
78
                route_id: route.route_id,
145
78
                unit_id: route.unit_id,
146
78
                unit_code: route.unit_code,
147
78
                application_id: route.application_id,
148
78
                application_code: route.application_code,
149
78
                device_id: route.device_id,
150
78
                network_id: route.network_id,
151
78
                network_code: route.network_code,
152
78
                network_addr: route.network_addr,
153
78
                profile: route.profile,
154
78
                created_at: route.created_at.into(),
155
78
                modified_at: route.modified_at.into(),
156
78
            }));
157
37
        }
158
37
        Ok(None)
159
230
    }
160

            
161
562
    async fn add(&self, route: &DeviceRoute) -> Result<(), Box<dyn StdError>> {
162
562
        let item = Schema {
163
562
            route_id: route.route_id.clone(),
164
562
            unit_id: route.unit_id.clone(),
165
562
            unit_code: route.unit_code.clone(),
166
562
            application_id: route.application_id.clone(),
167
562
            application_code: route.application_code.clone(),
168
562
            device_id: route.device_id.clone(),
169
562
            network_id: route.network_id.clone(),
170
562
            network_code: route.network_code.clone(),
171
562
            network_addr: route.network_addr.clone(),
172
562
            profile: route.profile.clone(),
173
562
            created_at: route.created_at.into(),
174
562
            modified_at: route.modified_at.into(),
175
562
        };
176
562
        self.conn
177
562
            .collection::<Schema>(COL_NAME)
178
562
            .insert_one(item)
179
1126
            .await?;
180
560
        Ok(())
181
1124
    }
182

            
183
19
    async fn add_bulk(&self, routes: &Vec<DeviceRoute>) -> Result<(), Box<dyn StdError>> {
184
19
        let mut items = vec![];
185
14639
        for route in routes.iter() {
186
14639
            items.push(Schema {
187
14639
                route_id: route.route_id.clone(),
188
14639
                unit_id: route.unit_id.clone(),
189
14639
                unit_code: route.unit_code.clone(),
190
14639
                application_id: route.application_id.clone(),
191
14639
                application_code: route.application_code.clone(),
192
14639
                device_id: route.device_id.clone(),
193
14639
                network_id: route.network_id.clone(),
194
14639
                network_code: route.network_code.clone(),
195
14639
                network_addr: route.network_addr.clone(),
196
14639
                profile: route.profile.clone(),
197
14639
                created_at: route.created_at.into(),
198
14639
                modified_at: route.modified_at.into(),
199
14639
            });
200
14639
        }
201
19
        if let Err(e) = self
202
19
            .conn
203
19
            .collection::<Schema>(COL_NAME)
204
19
            .insert_many(items)
205
19
            .ordered(false)
206
45
            .await
207
        {
208
7
            match e.kind.as_ref() {
209
7
                ErrorKind::InsertMany(imerr) => match imerr.write_errors.as_ref() {
210
                    None => return Err(Box::new(e)),
211
7
                    Some(errs) => {
212
6245
                        for err in errs {
213
6238
                            if err.code != 11000 {
214
                                return Err(Box::new(e));
215
6238
                            }
216
                        }
217
7
                        ()
218
                    }
219
                },
220
                _ => return Err(Box::new(e)),
221
            }
222
12
        }
223

            
224
19
        Ok(())
225
38
    }
226

            
227
62
    async fn del(&self, cond: &QueryCond) -> Result<(), Box<dyn StdError>> {
228
62
        let filter = get_query_filter(cond);
229
62
        self.conn
230
62
            .collection::<Schema>(COL_NAME)
231
62
            .delete_many(filter)
232
124
            .await?;
233
62
        Ok(())
234
124
    }
235

            
236
    async fn update(
237
        &self,
238
        cond: &UpdateQueryCond,
239
        updates: &Updates,
240
12
    ) -> Result<(), Box<dyn StdError>> {
241
12
        let filter = get_update_query_filter(cond);
242
12
        if let Some(updates) = get_update_doc(updates) {
243
11
            self.conn
244
11
                .collection::<Schema>(COL_NAME)
245
11
                .update_many(filter, updates)
246
22
                .await?;
247
1
        }
248
12
        return Ok(());
249
24
    }
250
}
251

            
252
impl DbCursor {
253
    /// To create the cursor instance with a collection cursor.
254
177
    pub fn new(cursor: MongoDbCursor<Schema>) -> Self {
255
177
        DbCursor { cursor, offset: 0 }
256
177
    }
257
}
258

            
259
#[async_trait]
260
impl Cursor for DbCursor {
261
18639
    async fn try_next(&mut self) -> Result<Option<DeviceRoute>, Box<dyn StdError>> {
262
18639
        if let Some(item) = self.cursor.try_next().await? {
263
18462
            self.offset += 1;
264
18462
            return Ok(Some(DeviceRoute {
265
18462
                route_id: item.route_id,
266
18462
                unit_id: item.unit_id,
267
18462
                unit_code: item.unit_code,
268
18462
                application_id: item.application_id,
269
18462
                application_code: item.application_code,
270
18462
                device_id: item.device_id,
271
18462
                network_id: item.network_id,
272
18462
                network_code: item.network_code,
273
18462
                network_addr: item.network_addr,
274
18462
                profile: item.profile,
275
18462
                created_at: item.created_at.into(),
276
18462
                modified_at: item.modified_at.into(),
277
18462
            }));
278
177
        }
279
177
        Ok(None)
280
37278
    }
281

            
282
4
    fn offset(&self) -> u64 {
283
4
        self.offset
284
4
    }
285
}
286

            
287
/// Transforms query conditions to the MongoDB document.
288
62
fn get_query_filter(cond: &QueryCond) -> Document {
289
62
    let mut filter = Document::new();
290
62
    if let Some(value) = cond.route_id {
291
6
        filter.insert("routeId", value);
292
56
    }
293
62
    if let Some(value) = cond.unit_id {
294
17
        filter.insert("unitId", value);
295
45
    }
296
62
    if let Some(value) = cond.application_id {
297
10
        filter.insert("applicationId", value);
298
52
    }
299
62
    if let Some(value) = cond.network_id {
300
17
        filter.insert("networkId", value);
301
45
    }
302
62
    if let Some(value) = cond.device_id {
303
23
        filter.insert("deviceId", value);
304
39
    }
305
62
    if let Some(value) = cond.network_addrs {
306
12
        let mut in_cond = Document::new();
307
12
        in_cond.insert("$in", value);
308
12
        filter.insert("networkAddr", in_cond);
309
50
    }
310
62
    filter
311
62
}
312

            
313
/// Transforms query conditions to the MongoDB document.
314
247
fn get_list_query_filter(cond: &ListQueryCond) -> Document {
315
247
    let mut filter = Document::new();
316
247
    if let Some(value) = cond.route_id {
317
34
        filter.insert("routeId", value);
318
213
    }
319
247
    if let Some(value) = cond.unit_id {
320
69
        filter.insert("unitId", value);
321
178
    }
322
247
    if let Some(value) = cond.unit_code {
323
6
        filter.insert("unitCode", value);
324
241
    }
325
247
    if let Some(value) = cond.application_id {
326
75
        filter.insert("applicationId", value);
327
172
    }
328
247
    if let Some(value) = cond.application_code {
329
6
        filter.insert("applicationCode", value);
330
241
    }
331
247
    if let Some(value) = cond.network_id {
332
57
        filter.insert("networkId", value);
333
190
    }
334
247
    if let Some(value) = cond.network_code {
335
6
        filter.insert("networkCode", value);
336
241
    }
337
247
    if let Some(value) = cond.network_addr {
338
6
        filter.insert("networkAddr", value);
339
241
    }
340
247
    if let Some(value) = cond.network_addrs {
341
4
        let mut in_cond = Document::new();
342
4
        in_cond.insert("$in", value);
343
4
        filter.insert("networkAddr", in_cond);
344
243
    }
345
247
    if let Some(value) = cond.device_id {
346
55
        filter.insert("deviceId", value);
347
192
    }
348
247
    filter
349
247
}
350

            
351
/// Transforms model options to the options.
352
177
fn build_find_options<'a, T>(opts: &ListOptions, mut find: Find<'a, T>) -> Find<'a, T>
353
177
where
354
177
    T: Send + Sync,
355
177
{
356
177
    if let Some(offset) = opts.offset {
357
10
        find = find.skip(offset);
358
167
    }
359
177
    if let Some(limit) = opts.limit {
360
92
        if limit > 0 {
361
91
            find = find.limit(limit as i64);
362
91
        }
363
85
    }
364
177
    if let Some(sort_list) = opts.sort.as_ref() {
365
112
        if sort_list.len() > 0 {
366
111
            let mut sort_opts = Document::new();
367
247
            for cond in sort_list.iter() {
368
247
                let key = match cond.key {
369
70
                    SortKey::CreatedAt => "createdAt",
370
4
                    SortKey::ModifiedAt => "modifiedAt",
371
4
                    SortKey::ApplicationCode => "applicationCode",
372
68
                    SortKey::NetworkCode => "networkCode",
373
101
                    SortKey::NetworkAddr => "networkAddr",
374
                };
375
247
                if cond.asc {
376
174
                    sort_opts.insert(key.to_string(), 1);
377
174
                } else {
378
73
                    sort_opts.insert(key.to_string(), -1);
379
73
                }
380
            }
381
111
            find = find.sort(sort_opts);
382
1
        }
383
65
    }
384
177
    find
385
177
}
386

            
387
/// Transforms query conditions to the MongoDB document.
388
12
fn get_update_query_filter(cond: &UpdateQueryCond) -> Document {
389
12
    doc! {"deviceId": cond.device_id}
390
12
}
391

            
392
/// Transforms the model object to the MongoDB document.
393
12
fn get_update_doc(updates: &Updates) -> Option<Document> {
394
12
    let mut count = 0;
395
12
    let mut document = Document::new();
396
12
    if let Some(value) = updates.modified_at.as_ref() {
397
11
        document.insert(
398
11
            "modifiedAt",
399
11
            DateTime::from_millis(value.timestamp_millis()),
400
11
        );
401
11
        count += 1;
402
11
    }
403
12
    if let Some(value) = updates.profile {
404
10
        document.insert("profile", value);
405
10
        count += 1;
406
10
    }
407
12
    if count == 0 {
408
1
        return None;
409
11
    }
410
11
    Some(doc! {"$set": document})
411
12
}