1
use std::{error::Error as StdError, sync::Arc};
2

            
3
use async_trait::async_trait;
4
use futures::TryStreamExt;
5
use mongodb::{
6
    Cursor as MongoDbCursor, Database,
7
    action::Find,
8
    bson::{self, Bson, DateTime, Document, Regex, doc, raw::CString},
9
};
10
use serde::{Deserialize, Serialize};
11

            
12
use sylvia_iot_corelib::strings;
13

            
14
use super::super::network::{
15
    Cursor, ListOptions, ListQueryCond, Network, NetworkModel, QueryCond, SortKey, UpdateQueryCond,
16
    Updates,
17
};
18

            
19
/// Model instance.
20
pub struct Model {
21
    /// The associated database connection.
22
    conn: Arc<Database>,
23
}
24

            
25
/// Cursor instance.
26
struct DbCursor {
27
    /// The associated collection cursor.
28
    cursor: MongoDbCursor<Schema>,
29
    /// (Useless) only for Cursor trait implementation.
30
    offset: u64,
31
}
32

            
33
/// MongoDB schema.
34
#[derive(Deserialize, Serialize)]
35
struct Schema {
36
    #[serde(rename = "networkId")]
37
    network_id: String,
38
    code: String,
39
    #[serde(rename = "unitId")]
40
    unit_id: Option<String>,
41
    #[serde(rename = "unitCode")]
42
    unit_code: Option<String>,
43
    #[serde(rename = "createdAt")]
44
    created_at: DateTime,
45
    #[serde(rename = "modifiedAt")]
46
    modified_at: DateTime,
47
    #[serde(rename = "hostUri")]
48
    host_uri: String,
49
    name: String,
50
    info: Document,
51
}
52

            
53
const COL_NAME: &'static str = "network";
54

            
55
impl Model {
56
    /// To create the model instance with a database connection.
57
24
    pub async fn new(conn: Arc<Database>) -> Result<Self, Box<dyn StdError>> {
58
24
        let model = Model { conn };
59
24
        model.init().await?;
60
24
        Ok(model)
61
24
    }
62
}
63

            
64
#[async_trait]
65
impl NetworkModel for Model {
66
38
    async fn init(&self) -> Result<(), Box<dyn StdError>> {
67
        let indexes = vec![
68
            doc! {"name": "networkId_1", "key": {"networkId": 1}, "unique": true},
69
            doc! {"name": "unitId_1_code_1", "key": {"unitId": 1, "code": 1}, "unique": true},
70
            doc! {"name": "code_1", "key": {"code": 1}},
71
            doc! {"name": "unitId_1", "key": {"unitId": 1}},
72
            doc! {"name": "createdAt_1", "key": {"createdAt": 1}},
73
            doc! {"name": "modifiedAt_1", "key": {"modifiedAt": 1}},
74
            doc! {"name": "name_1", "key": {"name": 1}},
75
        ];
76
        let command = doc! {
77
            "createIndexes": COL_NAME,
78
            "indexes": indexes,
79
        };
80
        self.conn.run_command(command).await?;
81
        Ok(())
82
38
    }
83

            
84
60
    async fn count(&self, cond: &ListQueryCond) -> Result<u64, Box<dyn StdError>> {
85
        let filter = get_list_query_filter(cond);
86
        let count = self
87
            .conn
88
            .collection::<Schema>(COL_NAME)
89
            .count_documents(filter)
90
            .await?;
91
        Ok(count)
92
60
    }
93

            
94
    async fn list(
95
        &self,
96
        opts: &ListOptions,
97
        cursor: Option<Box<dyn Cursor>>,
98
212
    ) -> Result<(Vec<Network>, Option<Box<dyn Cursor>>), Box<dyn StdError>> {
99
        let mut cursor = match cursor {
100
            None => {
101
                let filter = get_list_query_filter(opts.cond);
102
                Box::new(DbCursor::new(
103
                    build_find_options(opts, self.conn.collection::<Schema>(COL_NAME).find(filter))
104
                        .await?,
105
                ))
106
            }
107
            Some(cursor) => cursor,
108
        };
109

            
110
        let mut count: u64 = 0;
111
        let mut list = Vec::new();
112
        while let Some(item) = cursor.try_next().await? {
113
            list.push(item);
114
212
            if let Some(cursor_max) = opts.cursor_max {
115
                count += 1;
116
                if count >= cursor_max {
117
                    return Ok((list, Some(cursor)));
118
                }
119
            }
120
        }
121
        Ok((list, None))
122
212
    }
123

            
124
592
    async fn get(&self, cond: &QueryCond) -> Result<Option<Network>, Box<dyn StdError>> {
125
        let filter = get_query_filter(cond);
126
        let mut cursor = self
127
            .conn
128
            .collection::<Schema>(COL_NAME)
129
            .find(filter)
130
            .await?;
131
        if let Some(item) = cursor.try_next().await? {
132
            return Ok(Some(Network {
133
                network_id: item.network_id,
134
                code: item.code,
135
                unit_id: item.unit_id,
136
                unit_code: item.unit_code,
137
                created_at: item.created_at.into(),
138
                modified_at: item.modified_at.into(),
139
                host_uri: item.host_uri,
140
                name: item.name,
141
                info: bson::deserialize_from_document(item.info)?,
142
            }));
143
        }
144
        Ok(None)
145
592
    }
146

            
147
2600
    async fn add(&self, network: &Network) -> Result<(), Box<dyn StdError>> {
148
        let item = Schema {
149
            network_id: network.network_id.clone(),
150
            code: network.code.clone(),
151
            unit_id: network.unit_id.clone(),
152
            unit_code: network.unit_code.clone(),
153
            created_at: network.created_at.into(),
154
            modified_at: network.modified_at.into(),
155
            host_uri: network.host_uri.clone(),
156
            name: network.name.clone(),
157
            info: bson::serialize_to_document(&network.info)?,
158
        };
159
        self.conn
160
            .collection::<Schema>(COL_NAME)
161
            .insert_one(item)
162
            .await?;
163
        Ok(())
164
2600
    }
165

            
166
38
    async fn del(&self, cond: &QueryCond) -> Result<(), Box<dyn StdError>> {
167
        let filter = get_query_filter(cond);
168
        self.conn
169
            .collection::<Schema>(COL_NAME)
170
            .delete_many(filter)
171
            .await?;
172
        Ok(())
173
38
    }
174

            
175
    async fn update(
176
        &self,
177
        cond: &UpdateQueryCond,
178
        updates: &Updates,
179
22
    ) -> Result<(), Box<dyn StdError>> {
180
        let filter = get_update_query_filter(cond);
181
        if let Some(updates) = get_update_doc(updates) {
182
            self.conn
183
                .collection::<Schema>(COL_NAME)
184
                .update_one(filter, updates)
185
                .await?;
186
        }
187
        return Ok(());
188
22
    }
189
}
190

            
191
impl DbCursor {
192
    /// To create the cursor instance with a collection cursor.
193
174
    pub fn new(cursor: MongoDbCursor<Schema>) -> Self {
194
174
        DbCursor { cursor, offset: 0 }
195
174
    }
196
}
197

            
198
#[async_trait]
199
impl Cursor for DbCursor {
200
3822
    async fn try_next(&mut self) -> Result<Option<Network>, Box<dyn StdError>> {
201
        if let Some(item) = self.cursor.try_next().await? {
202
            self.offset += 1;
203
            return Ok(Some(Network {
204
                network_id: item.network_id,
205
                code: item.code,
206
                unit_id: item.unit_id,
207
                unit_code: item.unit_code,
208
                created_at: item.created_at.into(),
209
                modified_at: item.modified_at.into(),
210
                host_uri: item.host_uri,
211
                name: item.name,
212
                info: bson::deserialize_from_document(item.info)?,
213
            }));
214
        }
215
        Ok(None)
216
3822
    }
217

            
218
8
    fn offset(&self) -> u64 {
219
8
        self.offset
220
8
    }
221
}
222

            
223
/// Transforms query conditions to the MongoDB document.
224
630
fn get_query_filter(cond: &QueryCond) -> Document {
225
630
    let mut filter = Document::new();
226
630
    if let Some(value) = cond.unit_id {
227
64
        match value {
228
16
            None => {
229
16
                filter.insert("unitId", Bson::Null);
230
16
            }
231
48
            Some(value) => {
232
48
                filter.insert("unitId", value);
233
48
            }
234
        }
235
566
    }
236
630
    if let Some(value) = cond.network_id {
237
580
        filter.insert("networkId", value);
238
580
    }
239
630
    if let Some(value) = cond.code {
240
32
        filter.insert("code", value);
241
598
    }
242
630
    filter
243
630
}
244

            
245
/// Transforms query conditions to the MongoDB document.
246
234
fn get_list_query_filter(cond: &ListQueryCond) -> Document {
247
234
    let mut filter = Document::new();
248
234
    if let Some(value) = cond.unit_id {
249
108
        match value {
250
20
            None => {
251
20
                filter.insert("unitId", Bson::Null);
252
20
            }
253
88
            Some(value) => {
254
88
                filter.insert("unitId", value);
255
88
            }
256
        }
257
126
    }
258
234
    if let Some(value) = cond.network_id {
259
12
        filter.insert("networkId", value);
260
222
    }
261
234
    if let Some(value) = cond.code {
262
32
        filter.insert("code", value);
263
202
    }
264
234
    if let Some(value) = cond.code_contains {
265
44
        let escaped = strings::escape_regex_str(value);
266
44
        if let Ok(pattern) = CString::try_from(escaped.as_str()) {
267
44
            if let Ok(options) = CString::try_from("i") {
268
44
                filter.insert("code", Regex { pattern, options });
269
44
            }
270
        }
271
190
    }
272
234
    if let Some(value) = cond.name_contains {
273
16
        let escaped = strings::escape_regex_str(value);
274
16
        if let Ok(pattern) = CString::try_from(escaped.as_str()) {
275
16
            if let Ok(options) = CString::try_from("i") {
276
16
                filter.insert("name", Regex { pattern, options });
277
16
            }
278
        }
279
218
    }
280
234
    filter
281
234
}
282

            
283
/// Transforms model options to the options.
284
174
fn build_find_options<'a, T>(opts: &ListOptions, mut find: Find<'a, T>) -> Find<'a, T>
285
174
where
286
174
    T: Send + Sync,
287
{
288
174
    if let Some(offset) = opts.offset {
289
20
        find = find.skip(offset);
290
154
    }
291
174
    if let Some(limit) = opts.limit {
292
82
        if limit > 0 {
293
80
            find = find.limit(limit as i64);
294
80
        }
295
92
    }
296
174
    if let Some(sort_list) = opts.sort.as_ref() {
297
116
        if sort_list.len() > 0 {
298
114
            let mut sort_opts = Document::new();
299
118
            for cond in sort_list.iter() {
300
118
                let key = match cond.key {
301
12
                    SortKey::CreatedAt => "createdAt",
302
8
                    SortKey::ModifiedAt => "modifiedAt",
303
86
                    SortKey::Code => "code",
304
12
                    SortKey::Name => "name",
305
                };
306
118
                if cond.asc {
307
100
                    sort_opts.insert(key.to_string(), 1);
308
100
                } else {
309
18
                    sort_opts.insert(key.to_string(), -1);
310
18
                }
311
            }
312
114
            find = find.sort(sort_opts);
313
2
        }
314
58
    }
315
174
    find
316
174
}
317

            
318
/// Transforms query conditions to the MongoDB document.
319
22
fn get_update_query_filter(cond: &UpdateQueryCond) -> Document {
320
22
    doc! {"networkId": cond.network_id}
321
22
}
322

            
323
/// Transforms the model object to the MongoDB document.
324
22
fn get_update_doc(updates: &Updates) -> Option<Document> {
325
22
    let mut count = 0;
326
22
    let mut document = Document::new();
327
22
    if let Some(value) = updates.modified_at.as_ref() {
328
20
        document.insert(
329
20
            "modifiedAt",
330
20
            DateTime::from_millis(value.timestamp_millis()),
331
20
        );
332
20
        count += 1;
333
20
    }
334
22
    if let Some(value) = updates.host_uri {
335
10
        document.insert("hostUri", value);
336
10
        count += 1;
337
12
    }
338
22
    if let Some(value) = updates.name {
339
16
        document.insert("name", value);
340
16
        count += 1;
341
16
    }
342
22
    if let Some(value) = updates.info {
343
16
        document.insert(
344
            "info",
345
16
            match bson::serialize_to_document(value) {
346
                Err(_) => return None,
347
16
                Ok(doc) => doc,
348
            },
349
        );
350
16
        count += 1;
351
6
    }
352
22
    if count == 0 {
353
2
        return None;
354
20
    }
355
20
    Some(doc! {"$set": document})
356
22
}