1
use std::{error::Error as StdError, sync::Arc};
2

            
3
use async_trait::async_trait;
4
use futures::TryStreamExt;
5
use mongodb::{
6
    bson::{self, doc, Bson, DateTime, Document},
7
    options::FindOptions,
8
    Cursor as MongoDbCursor, Database,
9
};
10
use serde::{Deserialize, Serialize};
11

            
12
use super::super::network_dldata::{
13
    Cursor, ListOptions, ListQueryCond, NetworkDlData, NetworkDlDataModel, QueryCond, SortKey,
14
    UpdateQueryCond, Updates, EXPIRES,
15
};
16

            
17
/// Model instance.
18
pub struct Model {
19
    /// The associated database connection.
20
    conn: Arc<Database>,
21
}
22

            
23
/// Cursor instance.
24
struct DbCursor {
25
    /// The associated collection cursor.
26
    cursor: MongoDbCursor<Schema>,
27
    /// (Useless) only for Cursor trait implementation.
28
    offset: u64,
29
}
30

            
31
/// MongoDB schema.
32
28858
#[derive(Deserialize, Serialize)]
33
struct Schema {
34
    #[serde(rename = "dataId")]
35
    pub data_id: String,
36
    pub proc: DateTime,
37
    #[serde(rename = "pub")]
38
    pub publish: DateTime,
39
    pub resp: Option<DateTime>,
40
    pub status: i32,
41
    #[serde(rename = "unitId")]
42
    pub unit_id: String,
43
    #[serde(rename = "deviceId")]
44
    pub device_id: String,
45
    #[serde(rename = "networkCode")]
46
    pub network_code: String,
47
    #[serde(rename = "networkAddr")]
48
    pub network_addr: String,
49
    pub profile: String,
50
    pub data: String,
51
    #[serde(skip_serializing_if = "Option::is_none")]
52
    pub extension: Option<Document>,
53
}
54

            
55
const COL_NAME: &'static str = "networkDlData";
56

            
57
impl Model {
58
    /// To create the model instance with a database connection.
59
7
    pub async fn new(conn: Arc<Database>) -> Result<Self, Box<dyn StdError>> {
60
7
        let model = Model { conn };
61
14
        model.init().await?;
62
7
        Ok(model)
63
7
    }
64
}
65

            
66
#[async_trait]
67
impl NetworkDlDataModel for Model {
68
12
    async fn init(&self) -> Result<(), Box<dyn StdError>> {
69
12
        let indexes = vec![
70
12
            doc! {"name": "dataId_1", "key": {"dataId": 1}, "unique": true},
71
12
            doc! {"name": "status_1", "key": {"status": 1}},
72
12
            doc! {"name": "unitId_1", "key": {"unitId": 1}},
73
12
            doc! {"name": "deviceId_1", "key": {"deviceId": 1}},
74
12
            doc! {"name": "networkCode_1", "key": {"networkCode": 1}},
75
12
            doc! {"name": "networkAddr_1", "key": {"networkAddr": 1}},
76
12
            doc! {"name": "profile_1", "key": {"profile": 1}},
77
12
            doc! {"name": "proc_1", "key": {"proc": 1}, "expireAfterSeconds": EXPIRES},
78
12
            doc! {"name": "pub_1", "key": {"pub": 1}},
79
12
            doc! {"name": "resp_1", "key": {"resp": 1}},
80
12
        ];
81
12
        let command = doc! {
82
12
            "createIndexes": COL_NAME,
83
12
            "indexes": indexes,
84
12
        };
85
24
        self.conn.run_command(command, None).await?;
86
12
        Ok(())
87
24
    }
88

            
89
44
    async fn count(&self, cond: &ListQueryCond) -> Result<u64, Box<dyn StdError>> {
90
44
        let filter = get_list_query_filter(cond);
91
44
        let count = self
92
44
            .conn
93
44
            .collection::<Schema>(COL_NAME)
94
44
            .count_documents(filter, None)
95
88
            .await?;
96
44
        Ok(count)
97
88
    }
98

            
99
    async fn list(
100
        &self,
101
        opts: &ListOptions,
102
        cursor: Option<Box<dyn Cursor>>,
103
99
    ) -> Result<(Vec<NetworkDlData>, Option<Box<dyn Cursor>>), Box<dyn StdError>> {
104
99
        let mut cursor = match cursor {
105
            None => {
106
86
                let filter = get_list_query_filter(opts.cond);
107
86
                let options = get_find_options(opts);
108
86
                Box::new(DbCursor::new(
109
86
                    self.conn
110
86
                        .collection::<Schema>(COL_NAME)
111
86
                        .find(filter, options)
112
172
                        .await?,
113
                ))
114
            }
115
13
            Some(cursor) => cursor,
116
        };
117

            
118
99
        let mut count: u64 = 0;
119
99
        let mut list = Vec::new();
120
1267
        while let Some(item) = cursor.try_next().await? {
121
1181
            list.push(item);
122
1181
            if let Some(cursor_max) = opts.cursor_max {
123
1089
                count += 1;
124
1089
                if count >= cursor_max {
125
13
                    return Ok((list, Some(cursor)));
126
1076
                }
127
92
            }
128
        }
129
86
        Ok((list, None))
130
198
    }
131

            
132
469
    async fn add(&self, data: &NetworkDlData) -> Result<(), Box<dyn StdError>> {
133
469
        let item = Schema {
134
469
            data_id: data.data_id.clone(),
135
469
            proc: data.proc.into(),
136
469
            publish: data.publish.into(),
137
469
            resp: match data.resp {
138
234
                None => None,
139
235
                Some(resp) => Some(resp.into()),
140
            },
141
469
            status: data.status,
142
469
            unit_id: data.unit_id.clone(),
143
469
            device_id: data.device_id.clone(),
144
469
            network_code: data.network_code.clone(),
145
469
            network_addr: data.network_addr.clone(),
146
469
            profile: data.profile.clone(),
147
469
            data: data.data.clone(),
148
469
            extension: match data.extension.as_ref() {
149
258
                None => None,
150
211
                Some(extension) => Some(bson::to_document(extension)?),
151
            },
152
        };
153
469
        self.conn
154
469
            .collection::<Schema>(COL_NAME)
155
469
            .insert_one(item, None)
156
939
            .await?;
157
468
        Ok(())
158
938
    }
159

            
160
5
    async fn del(&self, cond: &QueryCond) -> Result<(), Box<dyn StdError>> {
161
5
        let filter = get_query_filter(cond);
162
5
        self.conn
163
5
            .collection::<Schema>(COL_NAME)
164
5
            .delete_many(filter, None)
165
10
            .await?;
166
5
        Ok(())
167
10
    }
168

            
169
    async fn update(
170
        &self,
171
        cond: &UpdateQueryCond,
172
        updates: &Updates,
173
11
    ) -> Result<(), Box<dyn StdError>> {
174
11
        let filter = get_update_query_filter(cond, updates.status);
175
11
        if let Some(updates) = get_update_doc(updates) {
176
11
            self.conn
177
11
                .collection::<Schema>(COL_NAME)
178
11
                .update_one(filter, updates, None)
179
22
                .await?;
180
        }
181
11
        return Ok(());
182
22
    }
183
}
184

            
185
impl DbCursor {
186
    /// To create the cursor instance with a collection cursor.
187
86
    pub fn new(cursor: MongoDbCursor<Schema>) -> Self {
188
86
        DbCursor { cursor, offset: 0 }
189
86
    }
190
}
191

            
192
#[async_trait]
193
impl Cursor for DbCursor {
194
1267
    async fn try_next(&mut self) -> Result<Option<NetworkDlData>, Box<dyn StdError>> {
195
1267
        if let Some(item) = self.cursor.try_next().await? {
196
1181
            self.offset += 1;
197
1181
            return Ok(Some(NetworkDlData {
198
1181
                data_id: item.data_id,
199
1181
                proc: item.proc.into(),
200
1181
                publish: item.publish.into(),
201
1181
                resp: match item.resp {
202
756
                    None => None,
203
425
                    Some(resp) => Some(resp.into()),
204
                },
205
1181
                status: item.status,
206
1181
                unit_id: item.unit_id,
207
1181
                device_id: item.device_id,
208
1181
                network_code: item.network_code,
209
1181
                network_addr: item.network_addr,
210
1181
                profile: item.profile,
211
1181
                data: item.data,
212
1181
                extension: match item.extension {
213
924
                    None => None,
214
257
                    Some(extension) => Some(bson::from_document(extension)?),
215
                },
216
            }));
217
86
        }
218
86
        Ok(None)
219
2534
    }
220

            
221
5
    fn offset(&self) -> u64 {
222
5
        self.offset
223
5
    }
224
}
225

            
226
/// Transforms query conditions to the MongoDB document.
227
5
fn get_query_filter(cond: &QueryCond) -> Document {
228
5
    let mut filter = Document::new();
229
5
    if let Some(value) = cond.unit_id {
230
3
        filter.insert("unitId", value);
231
3
    }
232
5
    if let Some(value) = cond.device_id {
233
1
        filter.insert("deviceId", value);
234
4
    }
235
5
    let mut time_doc = Document::new();
236
5
    if let Some(value) = cond.proc_gte {
237
1
        time_doc.insert("$gte", Bson::DateTime(value.into()));
238
4
    }
239
5
    if let Some(value) = cond.proc_lte {
240
1
        time_doc.insert("$lte", Bson::DateTime(value.into()));
241
4
    }
242
5
    if time_doc.len() > 0 {
243
1
        filter.insert("proc", time_doc);
244
4
    }
245
5
    filter
246
5
}
247

            
248
/// Transforms query conditions to the MongoDB document.
249
130
fn get_list_query_filter(cond: &ListQueryCond) -> Document {
250
130
    let mut filter = Document::new();
251
130
    if let Some(value) = cond.unit_id {
252
58
        filter.insert("unitId", value);
253
72
    }
254
130
    if let Some(value) = cond.device_id {
255
14
        filter.insert("deviceId", value);
256
116
    }
257
130
    if let Some(value) = cond.network_code {
258
8
        filter.insert("networkCode", value);
259
122
    }
260
130
    if let Some(value) = cond.network_addr {
261
6
        filter.insert("networkAddr", value);
262
124
    }
263
130
    if let Some(value) = cond.profile {
264
6
        filter.insert("profile", value);
265
124
    }
266
130
    let mut time_doc = Document::new();
267
130
    if let Some(value) = cond.proc_gte {
268
25
        time_doc.insert("$gte", Bson::DateTime(value.into()));
269
105
    }
270
130
    if let Some(value) = cond.proc_lte {
271
8
        time_doc.insert("$lte", Bson::DateTime(value.into()));
272
122
    }
273
130
    if time_doc.len() > 0 {
274
25
        filter.insert("proc", time_doc);
275
105
    }
276
130
    time_doc = Document::new();
277
130
    if let Some(value) = cond.pub_gte {
278
14
        time_doc.insert("$gte", Bson::DateTime(value.into()));
279
116
    }
280
130
    if let Some(value) = cond.pub_lte {
281
8
        time_doc.insert("$lte", Bson::DateTime(value.into()));
282
122
    }
283
130
    if time_doc.len() > 0 {
284
14
        filter.insert("pub", time_doc);
285
116
    }
286
130
    time_doc = Document::new();
287
130
    if let Some(value) = cond.resp_gte {
288
14
        time_doc.insert("$gte", Bson::DateTime(value.into()));
289
116
    }
290
130
    if let Some(value) = cond.resp_lte {
291
8
        time_doc.insert("$lte", Bson::DateTime(value.into()));
292
122
    }
293
130
    if time_doc.len() > 0 {
294
14
        filter.insert("resp", time_doc);
295
116
    }
296
130
    filter
297
130
}
298

            
299
/// Transforms model options to the options.
300
86
fn get_find_options(opts: &ListOptions) -> FindOptions {
301
86
    let mut options = FindOptions::builder().build();
302
86
    if let Some(offset) = opts.offset {
303
12
        options.skip = Some(offset);
304
74
    }
305
86
    if let Some(limit) = opts.limit {
306
65
        if limit > 0 {
307
62
            options.limit = Some(limit as i64);
308
62
        }
309
21
    }
310
86
    if let Some(sort_list) = opts.sort.as_ref() {
311
77
        if sort_list.len() > 0 {
312
77
            let mut sort_opts = Document::new();
313
93
            for cond in sort_list.iter() {
314
93
                let key = match cond.key {
315
69
                    SortKey::Proc => "proc",
316
4
                    SortKey::Pub => "pub",
317
4
                    SortKey::Resp => "resp",
318
8
                    SortKey::NetworkCode => "networkCode",
319
8
                    SortKey::NetworkAddr => "networkAddr",
320
                };
321
93
                if cond.asc {
322
43
                    sort_opts.insert(key.to_string(), 1);
323
50
                } else {
324
50
                    sort_opts.insert(key.to_string(), -1);
325
50
                }
326
            }
327
77
            options.sort = Some(sort_opts);
328
        }
329
9
    }
330
86
    options
331
86
}
332

            
333
/// Transforms query conditions to the MongoDB document.
334
11
fn get_update_query_filter(cond: &UpdateQueryCond, status: i32) -> Document {
335
11
    let mut document = doc! {"dataId": cond.data_id};
336
11
    if status >= 0 {
337
7
        document.insert("status", doc! {"$ne": 0});
338
7
    } else {
339
4
        document.insert("status", doc! {"$lt": status});
340
4
    }
341
11
    document
342
11
}
343

            
344
/// Transforms the model object to the MongoDB document.
345
11
fn get_update_doc(updates: &Updates) -> Option<Document> {
346
11
    let document = doc! {
347
11
        "resp": DateTime::from_chrono(updates.resp),
348
11
        "status": updates.status,
349
11
    };
350
11
    Some(doc! {"$set": document})
351
11
}