1
use std::{error::Error as StdError, sync::Arc};
2

            
3
use async_trait::async_trait;
4
use futures::TryStreamExt;
5
use mongodb::{
6
    action::Find,
7
    bson::{self, doc, Bson, DateTime, Document},
8
    Cursor as MongoDbCursor, Database,
9
};
10
use serde::{Deserialize, Serialize};
11

            
12
use super::super::network_dldata::{
13
    Cursor, ListOptions, ListQueryCond, NetworkDlData, NetworkDlDataModel, QueryCond, SortKey,
14
    UpdateQueryCond, Updates, EXPIRES,
15
};
16

            
17
/// Model instance.
18
pub struct Model {
19
    /// The associated database connection.
20
    conn: Arc<Database>,
21
}
22

            
23
/// Cursor instance.
24
struct DbCursor {
25
    /// The associated collection cursor.
26
    cursor: MongoDbCursor<Schema>,
27
    /// (Useless) only for Cursor trait implementation.
28
    offset: u64,
29
}
30

            
31
/// MongoDB schema.
32
#[derive(Deserialize, Serialize)]
33
struct Schema {
34
    #[serde(rename = "dataId")]
35
    pub data_id: String,
36
    pub proc: DateTime,
37
    #[serde(rename = "pub")]
38
    pub publish: DateTime,
39
    pub resp: Option<DateTime>,
40
    pub status: i32,
41
    #[serde(rename = "unitId")]
42
    pub unit_id: String,
43
    #[serde(rename = "deviceId")]
44
    pub device_id: String,
45
    #[serde(rename = "networkCode")]
46
    pub network_code: String,
47
    #[serde(rename = "networkAddr")]
48
    pub network_addr: String,
49
    pub profile: String,
50
    pub data: String,
51
    #[serde(skip_serializing_if = "Option::is_none")]
52
    pub extension: Option<Document>,
53
}
54

            
55
const COL_NAME: &'static str = "networkDlData";
56

            
57
impl Model {
58
    /// To create the model instance with a database connection.
59
14
    pub async fn new(conn: Arc<Database>) -> Result<Self, Box<dyn StdError>> {
60
14
        let model = Model { conn };
61
14
        model.init().await?;
62
14
        Ok(model)
63
14
    }
64
}
65

            
66
#[async_trait]
67
impl NetworkDlDataModel for Model {
68
24
    async fn init(&self) -> Result<(), Box<dyn StdError>> {
69
24
        let indexes = vec![
70
24
            doc! {"name": "dataId_1", "key": {"dataId": 1}, "unique": true},
71
24
            doc! {"name": "status_1", "key": {"status": 1}},
72
24
            doc! {"name": "unitId_1", "key": {"unitId": 1}},
73
24
            doc! {"name": "deviceId_1", "key": {"deviceId": 1}},
74
24
            doc! {"name": "networkCode_1", "key": {"networkCode": 1}},
75
24
            doc! {"name": "networkAddr_1", "key": {"networkAddr": 1}},
76
24
            doc! {"name": "profile_1", "key": {"profile": 1}},
77
24
            doc! {"name": "proc_1", "key": {"proc": 1}, "expireAfterSeconds": EXPIRES},
78
24
            doc! {"name": "pub_1", "key": {"pub": 1}},
79
24
            doc! {"name": "resp_1", "key": {"resp": 1}},
80
24
        ];
81
24
        let command = doc! {
82
24
            "createIndexes": COL_NAME,
83
24
            "indexes": indexes,
84
24
        };
85
24
        self.conn.run_command(command).await?;
86
24
        Ok(())
87
48
    }
88

            
89
88
    async fn count(&self, cond: &ListQueryCond) -> Result<u64, Box<dyn StdError>> {
90
88
        let filter = get_list_query_filter(cond);
91
88
        let count = self
92
88
            .conn
93
88
            .collection::<Schema>(COL_NAME)
94
88
            .count_documents(filter)
95
88
            .await?;
96
88
        Ok(count)
97
176
    }
98

            
99
    async fn list(
100
        &self,
101
        opts: &ListOptions,
102
        cursor: Option<Box<dyn Cursor>>,
103
198
    ) -> Result<(Vec<NetworkDlData>, Option<Box<dyn Cursor>>), Box<dyn StdError>> {
104
198
        let mut cursor = match cursor {
105
            None => {
106
172
                let filter = get_list_query_filter(opts.cond);
107
172
                Box::new(DbCursor::new(
108
172
                    build_find_options(opts, self.conn.collection::<Schema>(COL_NAME).find(filter))
109
172
                        .await?,
110
                ))
111
            }
112
26
            Some(cursor) => cursor,
113
        };
114

            
115
198
        let mut count: u64 = 0;
116
198
        let mut list = Vec::new();
117
2534
        while let Some(item) = cursor.try_next().await? {
118
2362
            list.push(item);
119
2362
            if let Some(cursor_max) = opts.cursor_max {
120
2178
                count += 1;
121
2178
                if count >= cursor_max {
122
26
                    return Ok((list, Some(cursor)));
123
2152
                }
124
184
            }
125
        }
126
172
        Ok((list, None))
127
396
    }
128

            
129
938
    async fn add(&self, data: &NetworkDlData) -> Result<(), Box<dyn StdError>> {
130
938
        let item = Schema {
131
938
            data_id: data.data_id.clone(),
132
938
            proc: data.proc.into(),
133
938
            publish: data.publish.into(),
134
938
            resp: match data.resp {
135
468
                None => None,
136
470
                Some(resp) => Some(resp.into()),
137
            },
138
938
            status: data.status,
139
938
            unit_id: data.unit_id.clone(),
140
938
            device_id: data.device_id.clone(),
141
938
            network_code: data.network_code.clone(),
142
938
            network_addr: data.network_addr.clone(),
143
938
            profile: data.profile.clone(),
144
938
            data: data.data.clone(),
145
938
            extension: match data.extension.as_ref() {
146
516
                None => None,
147
422
                Some(extension) => Some(bson::to_document(extension)?),
148
            },
149
        };
150
938
        self.conn
151
938
            .collection::<Schema>(COL_NAME)
152
938
            .insert_one(item)
153
938
            .await?;
154
936
        Ok(())
155
1876
    }
156

            
157
10
    async fn del(&self, cond: &QueryCond) -> Result<(), Box<dyn StdError>> {
158
10
        let filter = get_query_filter(cond);
159
10
        self.conn
160
10
            .collection::<Schema>(COL_NAME)
161
10
            .delete_many(filter)
162
10
            .await?;
163
10
        Ok(())
164
20
    }
165

            
166
    async fn update(
167
        &self,
168
        cond: &UpdateQueryCond,
169
        updates: &Updates,
170
22
    ) -> Result<(), Box<dyn StdError>> {
171
22
        let filter = get_update_query_filter(cond, updates.status);
172
22
        if let Some(updates) = get_update_doc(updates) {
173
22
            self.conn
174
22
                .collection::<Schema>(COL_NAME)
175
22
                .update_one(filter, updates)
176
22
                .await?;
177
        }
178
22
        return Ok(());
179
44
    }
180
}
181

            
182
impl DbCursor {
183
    /// To create the cursor instance with a collection cursor.
184
172
    pub fn new(cursor: MongoDbCursor<Schema>) -> Self {
185
172
        DbCursor { cursor, offset: 0 }
186
172
    }
187
}
188

            
189
#[async_trait]
190
impl Cursor for DbCursor {
191
2534
    async fn try_next(&mut self) -> Result<Option<NetworkDlData>, Box<dyn StdError>> {
192
2534
        if let Some(item) = self.cursor.try_next().await? {
193
2362
            self.offset += 1;
194
2362
            return Ok(Some(NetworkDlData {
195
2362
                data_id: item.data_id,
196
2362
                proc: item.proc.into(),
197
2362
                publish: item.publish.into(),
198
2362
                resp: match item.resp {
199
1512
                    None => None,
200
850
                    Some(resp) => Some(resp.into()),
201
                },
202
2362
                status: item.status,
203
2362
                unit_id: item.unit_id,
204
2362
                device_id: item.device_id,
205
2362
                network_code: item.network_code,
206
2362
                network_addr: item.network_addr,
207
2362
                profile: item.profile,
208
2362
                data: item.data,
209
2362
                extension: match item.extension {
210
1848
                    None => None,
211
514
                    Some(extension) => Some(bson::from_document(extension)?),
212
                },
213
            }));
214
172
        }
215
172
        Ok(None)
216
5068
    }
217

            
218
10
    fn offset(&self) -> u64 {
219
10
        self.offset
220
10
    }
221
}
222

            
223
/// Transforms query conditions to the MongoDB document.
224
10
fn get_query_filter(cond: &QueryCond) -> Document {
225
10
    let mut filter = Document::new();
226
10
    if let Some(value) = cond.unit_id {
227
6
        filter.insert("unitId", value);
228
6
    }
229
10
    if let Some(value) = cond.device_id {
230
2
        filter.insert("deviceId", value);
231
8
    }
232
10
    let mut time_doc = Document::new();
233
10
    if let Some(value) = cond.proc_gte {
234
2
        time_doc.insert("$gte", Bson::DateTime(value.into()));
235
8
    }
236
10
    if let Some(value) = cond.proc_lte {
237
2
        time_doc.insert("$lte", Bson::DateTime(value.into()));
238
8
    }
239
10
    if time_doc.len() > 0 {
240
2
        filter.insert("proc", time_doc);
241
8
    }
242
10
    filter
243
10
}
244

            
245
/// Transforms query conditions to the MongoDB document.
246
260
fn get_list_query_filter(cond: &ListQueryCond) -> Document {
247
260
    let mut filter = Document::new();
248
260
    if let Some(value) = cond.unit_id {
249
116
        filter.insert("unitId", value);
250
144
    }
251
260
    if let Some(value) = cond.device_id {
252
28
        filter.insert("deviceId", value);
253
232
    }
254
260
    if let Some(value) = cond.network_code {
255
16
        filter.insert("networkCode", value);
256
244
    }
257
260
    if let Some(value) = cond.network_addr {
258
12
        filter.insert("networkAddr", value);
259
248
    }
260
260
    if let Some(value) = cond.profile {
261
12
        filter.insert("profile", value);
262
248
    }
263
260
    let mut time_doc = Document::new();
264
260
    if let Some(value) = cond.proc_gte {
265
50
        time_doc.insert("$gte", Bson::DateTime(value.into()));
266
210
    }
267
260
    if let Some(value) = cond.proc_lte {
268
16
        time_doc.insert("$lte", Bson::DateTime(value.into()));
269
244
    }
270
260
    if time_doc.len() > 0 {
271
50
        filter.insert("proc", time_doc);
272
210
    }
273
260
    time_doc = Document::new();
274
260
    if let Some(value) = cond.pub_gte {
275
28
        time_doc.insert("$gte", Bson::DateTime(value.into()));
276
232
    }
277
260
    if let Some(value) = cond.pub_lte {
278
16
        time_doc.insert("$lte", Bson::DateTime(value.into()));
279
244
    }
280
260
    if time_doc.len() > 0 {
281
28
        filter.insert("pub", time_doc);
282
232
    }
283
260
    time_doc = Document::new();
284
260
    if let Some(value) = cond.resp_gte {
285
28
        time_doc.insert("$gte", Bson::DateTime(value.into()));
286
232
    }
287
260
    if let Some(value) = cond.resp_lte {
288
16
        time_doc.insert("$lte", Bson::DateTime(value.into()));
289
244
    }
290
260
    if time_doc.len() > 0 {
291
28
        filter.insert("resp", time_doc);
292
232
    }
293
260
    filter
294
260
}
295

            
296
/// Transforms model options to the options.
297
172
fn build_find_options<'a, T>(opts: &ListOptions, mut find: Find<'a, T>) -> Find<'a, T>
298
172
where
299
172
    T: Send + Sync,
300
172
{
301
172
    if let Some(offset) = opts.offset {
302
24
        find = find.skip(offset);
303
148
    }
304
172
    if let Some(limit) = opts.limit {
305
130
        if limit > 0 {
306
124
            find = find.limit(limit as i64);
307
124
        }
308
42
    }
309
172
    if let Some(sort_list) = opts.sort.as_ref() {
310
154
        if sort_list.len() > 0 {
311
154
            let mut sort_opts = Document::new();
312
186
            for cond in sort_list.iter() {
313
186
                let key = match cond.key {
314
138
                    SortKey::Proc => "proc",
315
8
                    SortKey::Pub => "pub",
316
8
                    SortKey::Resp => "resp",
317
16
                    SortKey::NetworkCode => "networkCode",
318
16
                    SortKey::NetworkAddr => "networkAddr",
319
                };
320
186
                if cond.asc {
321
86
                    sort_opts.insert(key.to_string(), 1);
322
100
                } else {
323
100
                    sort_opts.insert(key.to_string(), -1);
324
100
                }
325
            }
326
154
            find = find.sort(sort_opts);
327
        }
328
18
    }
329
172
    find
330
172
}
331

            
332
/// Transforms query conditions to the MongoDB document.
333
22
fn get_update_query_filter(cond: &UpdateQueryCond, status: i32) -> Document {
334
22
    let mut document = doc! {"dataId": cond.data_id};
335
22
    if status >= 0 {
336
14
        document.insert("status", doc! {"$ne": 0});
337
14
    } else {
338
8
        document.insert("status", doc! {"$lt": status});
339
8
    }
340
22
    document
341
22
}
342

            
343
/// Transforms the model object to the MongoDB document.
344
22
fn get_update_doc(updates: &Updates) -> Option<Document> {
345
22
    let document = doc! {
346
22
        "resp": DateTime::from_chrono(updates.resp),
347
22
        "status": updates.status,
348
22
    };
349
22
    Some(doc! {"$set": document})
350
22
}