1
use std::{error::Error as StdError, sync::Arc};
2

            
3
use async_trait::async_trait;
4
use futures::TryStreamExt;
5
use mongodb::{
6
    bson::{self, doc, Bson, DateTime, Document},
7
    options::FindOptions,
8
    Cursor as MongoDbCursor, Database,
9
};
10
use serde::{Deserialize, Serialize};
11

            
12
use super::super::application_dldata::{
13
    ApplicationDlData, ApplicationDlDataModel, Cursor, ListOptions, ListQueryCond, QueryCond,
14
    SortKey, UpdateQueryCond, Updates, EXPIRES,
15
};
16

            
17
/// Model instance.
18
pub struct Model {
19
    /// The associated database connection.
20
    conn: Arc<Database>,
21
}
22

            
23
/// Cursor instance.
24
struct DbCursor {
25
    /// The associated collection cursor.
26
    cursor: MongoDbCursor<Schema>,
27
    /// (Useless) only for Cursor trait implementation.
28
    offset: u64,
29
}
30

            
31
/// MongoDB schema.
32
19298
#[derive(Deserialize, Serialize)]
33
struct Schema {
34
    #[serde(rename = "dataId")]
35
    pub data_id: String,
36
    pub proc: DateTime,
37
    #[serde(skip_serializing_if = "Option::is_none")]
38
    pub resp: Option<DateTime>,
39
    pub status: i32,
40
    #[serde(rename = "unitId")]
41
    pub unit_id: String,
42
    #[serde(rename = "deviceId", skip_serializing_if = "Option::is_none")]
43
    pub device_id: Option<String>,
44
    #[serde(rename = "networkCode", skip_serializing_if = "Option::is_none")]
45
    pub network_code: Option<String>,
46
    #[serde(rename = "networkAddr", skip_serializing_if = "Option::is_none")]
47
    pub network_addr: Option<String>,
48
    pub profile: String,
49
    pub data: String,
50
    #[serde(skip_serializing_if = "Option::is_none")]
51
    pub extension: Option<Document>,
52
}
53

            
54
const COL_NAME: &'static str = "applicationDlData";
55

            
56
impl Model {
57
    /// To create the model instance with a database connection.
58
7
    pub async fn new(conn: Arc<Database>) -> Result<Self, Box<dyn StdError>> {
59
7
        let model = Model { conn };
60
14
        model.init().await?;
61
7
        Ok(model)
62
7
    }
63
}
64

            
65
#[async_trait]
66
impl ApplicationDlDataModel for Model {
67
12
    async fn init(&self) -> Result<(), Box<dyn StdError>> {
68
12
        let indexes = vec![
69
12
            doc! {"name": "dataId_1", "key": {"dataId": 1}, "unique": true},
70
12
            doc! {"name": "status_1", "key": {"status": 1}},
71
12
            doc! {"name": "unitId_1", "key": {"unitId": 1}},
72
12
            doc! {"name": "deviceId_1", "key": {"deviceId": 1}},
73
12
            doc! {"name": "networkCode_1", "key": {"networkCode": 1}},
74
12
            doc! {"name": "networkAddr_1", "key": {"networkAddr": 1}},
75
12
            doc! {"name": "profile_1", "key": {"profile": 1}},
76
12
            doc! {"name": "proc_1", "key": {"proc": 1}, "expireAfterSeconds": EXPIRES},
77
12
            doc! {"name": "resp_1", "key": {"resp": 1}},
78
12
        ];
79
12
        let command = doc! {
80
12
            "createIndexes": COL_NAME,
81
12
            "indexes": indexes,
82
12
        };
83
24
        self.conn.run_command(command, None).await?;
84
12
        Ok(())
85
24
    }
86

            
87
37
    async fn count(&self, cond: &ListQueryCond) -> Result<u64, Box<dyn StdError>> {
88
37
        let filter = get_list_query_filter(cond);
89
37
        let count = self
90
37
            .conn
91
37
            .collection::<Schema>(COL_NAME)
92
37
            .count_documents(filter, None)
93
75
            .await?;
94
37
        Ok(count)
95
74
    }
96

            
97
    async fn list(
98
        &self,
99
        opts: &ListOptions,
100
        cursor: Option<Box<dyn Cursor>>,
101
88
    ) -> Result<(Vec<ApplicationDlData>, Option<Box<dyn Cursor>>), Box<dyn StdError>> {
102
88
        let mut cursor = match cursor {
103
            None => {
104
75
                let filter = get_list_query_filter(opts.cond);
105
75
                let options = get_find_options(opts);
106
75
                Box::new(DbCursor::new(
107
75
                    self.conn
108
75
                        .collection::<Schema>(COL_NAME)
109
75
                        .find(filter, options)
110
150
                        .await?,
111
                ))
112
            }
113
13
            Some(cursor) => cursor,
114
        };
115

            
116
88
        let mut count: u64 = 0;
117
88
        let mut list = Vec::new();
118
1211
        while let Some(item) = cursor.try_next().await? {
119
1136
            list.push(item);
120
1136
            if let Some(cursor_max) = opts.cursor_max {
121
1058
                count += 1;
122
1058
                if count >= cursor_max {
123
13
                    return Ok((list, Some(cursor)));
124
1045
                }
125
78
            }
126
        }
127
75
        Ok((list, None))
128
176
    }
129

            
130
472
    async fn add(&self, data: &ApplicationDlData) -> Result<(), Box<dyn StdError>> {
131
472
        let item = Schema {
132
472
            data_id: data.data_id.clone(),
133
472
            proc: data.proc.into(),
134
472
            resp: match data.resp {
135
237
                None => None,
136
235
                Some(resp) => Some(resp.into()),
137
            },
138
472
            status: data.status,
139
472
            unit_id: data.unit_id.clone(),
140
472
            device_id: data.device_id.clone(),
141
472
            network_code: data.network_code.clone(),
142
472
            network_addr: data.network_addr.clone(),
143
472
            profile: data.profile.clone(),
144
472
            data: data.data.clone(),
145
472
            extension: match data.extension.as_ref() {
146
261
                None => None,
147
211
                Some(extension) => Some(bson::to_document(extension)?),
148
            },
149
        };
150
472
        self.conn
151
472
            .collection::<Schema>(COL_NAME)
152
472
            .insert_one(item, None)
153
944
            .await?;
154
471
        Ok(())
155
944
    }
156

            
157
6
    async fn del(&self, cond: &QueryCond) -> Result<(), Box<dyn StdError>> {
158
6
        let filter = get_query_filter(cond);
159
6
        self.conn
160
6
            .collection::<Schema>(COL_NAME)
161
6
            .delete_many(filter, None)
162
12
            .await?;
163
6
        Ok(())
164
12
    }
165

            
166
    async fn update(
167
        &self,
168
        cond: &UpdateQueryCond,
169
        updates: &Updates,
170
11
    ) -> Result<(), Box<dyn StdError>> {
171
11
        let filter = get_update_query_filter(cond, updates.status);
172
11
        if let Some(updates) = get_update_doc(updates) {
173
11
            self.conn
174
11
                .collection::<Schema>(COL_NAME)
175
11
                .update_one(filter, updates, None)
176
22
                .await?;
177
        }
178
11
        return Ok(());
179
22
    }
180
}
181

            
182
impl DbCursor {
183
    /// To create the cursor instance with a collection cursor.
184
75
    pub fn new(cursor: MongoDbCursor<Schema>) -> Self {
185
75
        DbCursor { cursor, offset: 0 }
186
75
    }
187
}
188

            
189
#[async_trait]
190
impl Cursor for DbCursor {
191
1211
    async fn try_next(&mut self) -> Result<Option<ApplicationDlData>, Box<dyn StdError>> {
192
1211
        if let Some(item) = self.cursor.try_next().await? {
193
1136
            self.offset += 1;
194
1136
            return Ok(Some(ApplicationDlData {
195
1136
                data_id: item.data_id,
196
1136
                proc: item.proc.into(),
197
1136
                resp: match item.resp {
198
745
                    None => None,
199
391
                    Some(resp) => Some(resp.into()),
200
                },
201
1136
                status: item.status,
202
1136
                unit_id: item.unit_id,
203
1136
                device_id: item.device_id,
204
1136
                network_code: item.network_code,
205
1136
                network_addr: item.network_addr,
206
1136
                profile: item.profile,
207
1136
                data: item.data,
208
1136
                extension: match item.extension {
209
883
                    None => None,
210
253
                    Some(extension) => Some(bson::from_document(extension)?),
211
                },
212
            }));
213
75
        }
214
75
        Ok(None)
215
2422
    }
216

            
217
5
    fn offset(&self) -> u64 {
218
5
        self.offset
219
5
    }
220
}
221

            
222
/// Transforms query conditions to the MongoDB document.
223
6
fn get_query_filter(cond: &QueryCond) -> Document {
224
6
    let mut filter = Document::new();
225
6
    if let Some(value) = cond.unit_id {
226
3
        filter.insert("unitId", value);
227
3
    }
228
6
    if let Some(value) = cond.device_id {
229
1
        filter.insert("deviceId", value);
230
5
    }
231
6
    if let Some(value) = cond.network_code {
232
1
        filter.insert("networkCode", value);
233
5
    }
234
6
    if let Some(value) = cond.network_addr {
235
1
        filter.insert("networkAddr", value);
236
5
    }
237
6
    let mut time_doc = Document::new();
238
6
    if let Some(value) = cond.proc_gte {
239
1
        time_doc.insert("$gte", Bson::DateTime(value.into()));
240
5
    }
241
6
    if let Some(value) = cond.proc_lte {
242
1
        time_doc.insert("$lte", Bson::DateTime(value.into()));
243
5
    }
244
6
    if time_doc.len() > 0 {
245
1
        filter.insert("proc", time_doc);
246
5
    }
247
6
    filter
248
6
}
249

            
250
/// Transforms query conditions to the MongoDB document.
251
112
fn get_list_query_filter(cond: &ListQueryCond) -> Document {
252
112
    let mut filter = Document::new();
253
112
    if let Some(value) = cond.unit_id {
254
46
        filter.insert("unitId", value);
255
66
    }
256
112
    if let Some(value) = cond.device_id {
257
14
        filter.insert("deviceId", value);
258
98
    }
259
112
    if let Some(value) = cond.network_code {
260
8
        filter.insert("networkCode", value);
261
104
    }
262
112
    if let Some(value) = cond.network_addr {
263
6
        filter.insert("networkAddr", value);
264
106
    }
265
112
    if let Some(value) = cond.profile {
266
6
        filter.insert("profile", value);
267
106
    }
268
112
    let mut time_doc = Document::new();
269
112
    if let Some(value) = cond.proc_gte {
270
25
        time_doc.insert("$gte", Bson::DateTime(value.into()));
271
87
    }
272
112
    if let Some(value) = cond.proc_lte {
273
8
        time_doc.insert("$lte", Bson::DateTime(value.into()));
274
104
    }
275
112
    if time_doc.len() > 0 {
276
25
        filter.insert("proc", time_doc);
277
87
    }
278
112
    time_doc = Document::new();
279
112
    if let Some(value) = cond.resp_gte {
280
14
        time_doc.insert("$gte", Bson::DateTime(value.into()));
281
98
    }
282
112
    if let Some(value) = cond.resp_lte {
283
8
        time_doc.insert("$lte", Bson::DateTime(value.into()));
284
104
    }
285
112
    if time_doc.len() > 0 {
286
14
        filter.insert("resp", time_doc);
287
98
    }
288
112
    filter
289
112
}
290

            
291
/// Transforms model options to the options.
292
75
fn get_find_options(opts: &ListOptions) -> FindOptions {
293
75
    let mut options = FindOptions::builder().build();
294
75
    if let Some(offset) = opts.offset {
295
12
        options.skip = Some(offset);
296
63
    }
297
75
    if let Some(limit) = opts.limit {
298
57
        if limit > 0 {
299
54
            options.limit = Some(limit as i64);
300
54
        }
301
18
    }
302
75
    if let Some(sort_list) = opts.sort.as_ref() {
303
67
        if sort_list.len() > 0 {
304
67
            let mut sort_opts = Document::new();
305
83
            for cond in sort_list.iter() {
306
83
                let key = match cond.key {
307
63
                    SortKey::Proc => "proc",
308
4
                    SortKey::Resp => "resp",
309
8
                    SortKey::NetworkCode => "networkCode",
310
8
                    SortKey::NetworkAddr => "networkAddr",
311
                };
312
83
                if cond.asc {
313
41
                    sort_opts.insert(key.to_string(), 1);
314
42
                } else {
315
42
                    sort_opts.insert(key.to_string(), -1);
316
42
                }
317
            }
318
67
            options.sort = Some(sort_opts);
319
        }
320
8
    }
321
75
    options
322
75
}
323

            
324
/// Transforms query conditions to the MongoDB document.
325
11
fn get_update_query_filter(cond: &UpdateQueryCond, status: i32) -> Document {
326
11
    let mut document = doc! {"dataId": cond.data_id};
327
11
    if status >= 0 {
328
7
        document.insert("status", doc! {"$ne": 0});
329
7
    } else {
330
4
        document.insert("status", doc! {"$lt": status});
331
4
    }
332
11
    document
333
11
}
334

            
335
/// Transforms the model object to the MongoDB document.
336
11
fn get_update_doc(updates: &Updates) -> Option<Document> {
337
11
    let document = doc! {
338
11
        "resp": DateTime::from_chrono(updates.resp),
339
11
        "status": updates.status,
340
11
    };
341
11
    Some(doc! {"$set": document})
342
11
}