1
use std::{error::Error as StdError, sync::Arc};
2

            
3
use async_trait::async_trait;
4
use futures::TryStreamExt;
5
use mongodb::{
6
    action::Find,
7
    bson::{self, doc, Bson, DateTime, Document},
8
    Cursor as MongoDbCursor, Database,
9
};
10
use serde::{Deserialize, Serialize};
11

            
12
use super::super::application_dldata::{
13
    ApplicationDlData, ApplicationDlDataModel, Cursor, ListOptions, ListQueryCond, QueryCond,
14
    SortKey, UpdateQueryCond, Updates, EXPIRES,
15
};
16

            
17
/// Model instance.
18
pub struct Model {
19
    /// The associated database connection.
20
    conn: Arc<Database>,
21
}
22

            
23
/// Cursor instance.
24
struct DbCursor {
25
    /// The associated collection cursor.
26
    cursor: MongoDbCursor<Schema>,
27
    /// (Useless) only for Cursor trait implementation.
28
    offset: u64,
29
}
30

            
31
/// MongoDB schema.
32
#[derive(Deserialize, Serialize)]
33
struct Schema {
34
    #[serde(rename = "dataId")]
35
    pub data_id: String,
36
    pub proc: DateTime,
37
    #[serde(skip_serializing_if = "Option::is_none")]
38
    pub resp: Option<DateTime>,
39
    pub status: i32,
40
    #[serde(rename = "unitId")]
41
    pub unit_id: String,
42
    #[serde(rename = "deviceId", skip_serializing_if = "Option::is_none")]
43
    pub device_id: Option<String>,
44
    #[serde(rename = "networkCode", skip_serializing_if = "Option::is_none")]
45
    pub network_code: Option<String>,
46
    #[serde(rename = "networkAddr", skip_serializing_if = "Option::is_none")]
47
    pub network_addr: Option<String>,
48
    pub profile: String,
49
    pub data: String,
50
    #[serde(skip_serializing_if = "Option::is_none")]
51
    pub extension: Option<Document>,
52
}
53

            
54
const COL_NAME: &'static str = "applicationDlData";
55

            
56
impl Model {
57
    /// To create the model instance with a database connection.
58
14
    pub async fn new(conn: Arc<Database>) -> Result<Self, Box<dyn StdError>> {
59
14
        let model = Model { conn };
60
14
        model.init().await?;
61
14
        Ok(model)
62
14
    }
63
}
64

            
65
#[async_trait]
66
impl ApplicationDlDataModel for Model {
67
24
    async fn init(&self) -> Result<(), Box<dyn StdError>> {
68
24
        let indexes = vec![
69
24
            doc! {"name": "dataId_1", "key": {"dataId": 1}, "unique": true},
70
24
            doc! {"name": "status_1", "key": {"status": 1}},
71
24
            doc! {"name": "unitId_1", "key": {"unitId": 1}},
72
24
            doc! {"name": "deviceId_1", "key": {"deviceId": 1}},
73
24
            doc! {"name": "networkCode_1", "key": {"networkCode": 1}},
74
24
            doc! {"name": "networkAddr_1", "key": {"networkAddr": 1}},
75
24
            doc! {"name": "profile_1", "key": {"profile": 1}},
76
24
            doc! {"name": "proc_1", "key": {"proc": 1}, "expireAfterSeconds": EXPIRES},
77
24
            doc! {"name": "resp_1", "key": {"resp": 1}},
78
24
        ];
79
24
        let command = doc! {
80
24
            "createIndexes": COL_NAME,
81
24
            "indexes": indexes,
82
24
        };
83
24
        self.conn.run_command(command).await?;
84
24
        Ok(())
85
48
    }
86

            
87
74
    async fn count(&self, cond: &ListQueryCond) -> Result<u64, Box<dyn StdError>> {
88
74
        let filter = get_list_query_filter(cond);
89
74
        let count = self
90
74
            .conn
91
74
            .collection::<Schema>(COL_NAME)
92
74
            .count_documents(filter)
93
74
            .await?;
94
74
        Ok(count)
95
148
    }
96

            
97
    async fn list(
98
        &self,
99
        opts: &ListOptions,
100
        cursor: Option<Box<dyn Cursor>>,
101
176
    ) -> Result<(Vec<ApplicationDlData>, Option<Box<dyn Cursor>>), Box<dyn StdError>> {
102
176
        let mut cursor = match cursor {
103
            None => {
104
150
                let filter = get_list_query_filter(opts.cond);
105
150
                Box::new(DbCursor::new(
106
150
                    build_find_options(opts, self.conn.collection::<Schema>(COL_NAME).find(filter))
107
150
                        .await?,
108
                ))
109
            }
110
26
            Some(cursor) => cursor,
111
        };
112

            
113
176
        let mut count: u64 = 0;
114
176
        let mut list = Vec::new();
115
2422
        while let Some(item) = cursor.try_next().await? {
116
2272
            list.push(item);
117
2272
            if let Some(cursor_max) = opts.cursor_max {
118
2116
                count += 1;
119
2116
                if count >= cursor_max {
120
26
                    return Ok((list, Some(cursor)));
121
2090
                }
122
156
            }
123
        }
124
150
        Ok((list, None))
125
352
    }
126

            
127
944
    async fn add(&self, data: &ApplicationDlData) -> Result<(), Box<dyn StdError>> {
128
944
        let item = Schema {
129
944
            data_id: data.data_id.clone(),
130
944
            proc: data.proc.into(),
131
944
            resp: match data.resp {
132
474
                None => None,
133
470
                Some(resp) => Some(resp.into()),
134
            },
135
944
            status: data.status,
136
944
            unit_id: data.unit_id.clone(),
137
944
            device_id: data.device_id.clone(),
138
944
            network_code: data.network_code.clone(),
139
944
            network_addr: data.network_addr.clone(),
140
944
            profile: data.profile.clone(),
141
944
            data: data.data.clone(),
142
944
            extension: match data.extension.as_ref() {
143
522
                None => None,
144
422
                Some(extension) => Some(bson::to_document(extension)?),
145
            },
146
        };
147
944
        self.conn
148
944
            .collection::<Schema>(COL_NAME)
149
944
            .insert_one(item)
150
944
            .await?;
151
942
        Ok(())
152
1888
    }
153

            
154
12
    async fn del(&self, cond: &QueryCond) -> Result<(), Box<dyn StdError>> {
155
12
        let filter = get_query_filter(cond);
156
12
        self.conn
157
12
            .collection::<Schema>(COL_NAME)
158
12
            .delete_many(filter)
159
12
            .await?;
160
12
        Ok(())
161
24
    }
162

            
163
    async fn update(
164
        &self,
165
        cond: &UpdateQueryCond,
166
        updates: &Updates,
167
22
    ) -> Result<(), Box<dyn StdError>> {
168
22
        let filter = get_update_query_filter(cond, updates.status);
169
22
        if let Some(updates) = get_update_doc(updates) {
170
22
            self.conn
171
22
                .collection::<Schema>(COL_NAME)
172
22
                .update_one(filter, updates)
173
22
                .await?;
174
        }
175
22
        return Ok(());
176
44
    }
177
}
178

            
179
impl DbCursor {
180
    /// To create the cursor instance with a collection cursor.
181
150
    pub fn new(cursor: MongoDbCursor<Schema>) -> Self {
182
150
        DbCursor { cursor, offset: 0 }
183
150
    }
184
}
185

            
186
#[async_trait]
187
impl Cursor for DbCursor {
188
2422
    async fn try_next(&mut self) -> Result<Option<ApplicationDlData>, Box<dyn StdError>> {
189
2422
        if let Some(item) = self.cursor.try_next().await? {
190
2272
            self.offset += 1;
191
2272
            return Ok(Some(ApplicationDlData {
192
2272
                data_id: item.data_id,
193
2272
                proc: item.proc.into(),
194
2272
                resp: match item.resp {
195
1490
                    None => None,
196
782
                    Some(resp) => Some(resp.into()),
197
                },
198
2272
                status: item.status,
199
2272
                unit_id: item.unit_id,
200
2272
                device_id: item.device_id,
201
2272
                network_code: item.network_code,
202
2272
                network_addr: item.network_addr,
203
2272
                profile: item.profile,
204
2272
                data: item.data,
205
2272
                extension: match item.extension {
206
1766
                    None => None,
207
506
                    Some(extension) => Some(bson::from_document(extension)?),
208
                },
209
            }));
210
150
        }
211
150
        Ok(None)
212
4844
    }
213

            
214
10
    fn offset(&self) -> u64 {
215
10
        self.offset
216
10
    }
217
}
218

            
219
/// Transforms query conditions to the MongoDB document.
220
12
fn get_query_filter(cond: &QueryCond) -> Document {
221
12
    let mut filter = Document::new();
222
12
    if let Some(value) = cond.unit_id {
223
6
        filter.insert("unitId", value);
224
6
    }
225
12
    if let Some(value) = cond.device_id {
226
2
        filter.insert("deviceId", value);
227
10
    }
228
12
    if let Some(value) = cond.network_code {
229
2
        filter.insert("networkCode", value);
230
10
    }
231
12
    if let Some(value) = cond.network_addr {
232
2
        filter.insert("networkAddr", value);
233
10
    }
234
12
    let mut time_doc = Document::new();
235
12
    if let Some(value) = cond.proc_gte {
236
2
        time_doc.insert("$gte", Bson::DateTime(value.into()));
237
10
    }
238
12
    if let Some(value) = cond.proc_lte {
239
2
        time_doc.insert("$lte", Bson::DateTime(value.into()));
240
10
    }
241
12
    if time_doc.len() > 0 {
242
2
        filter.insert("proc", time_doc);
243
10
    }
244
12
    filter
245
12
}
246

            
247
/// Transforms query conditions to the MongoDB document.
248
224
fn get_list_query_filter(cond: &ListQueryCond) -> Document {
249
224
    let mut filter = Document::new();
250
224
    if let Some(value) = cond.unit_id {
251
92
        filter.insert("unitId", value);
252
132
    }
253
224
    if let Some(value) = cond.device_id {
254
28
        filter.insert("deviceId", value);
255
196
    }
256
224
    if let Some(value) = cond.network_code {
257
16
        filter.insert("networkCode", value);
258
208
    }
259
224
    if let Some(value) = cond.network_addr {
260
12
        filter.insert("networkAddr", value);
261
212
    }
262
224
    if let Some(value) = cond.profile {
263
12
        filter.insert("profile", value);
264
212
    }
265
224
    let mut time_doc = Document::new();
266
224
    if let Some(value) = cond.proc_gte {
267
50
        time_doc.insert("$gte", Bson::DateTime(value.into()));
268
174
    }
269
224
    if let Some(value) = cond.proc_lte {
270
16
        time_doc.insert("$lte", Bson::DateTime(value.into()));
271
208
    }
272
224
    if time_doc.len() > 0 {
273
50
        filter.insert("proc", time_doc);
274
174
    }
275
224
    time_doc = Document::new();
276
224
    if let Some(value) = cond.resp_gte {
277
28
        time_doc.insert("$gte", Bson::DateTime(value.into()));
278
196
    }
279
224
    if let Some(value) = cond.resp_lte {
280
16
        time_doc.insert("$lte", Bson::DateTime(value.into()));
281
208
    }
282
224
    if time_doc.len() > 0 {
283
28
        filter.insert("resp", time_doc);
284
196
    }
285
224
    filter
286
224
}
287

            
288
/// Transforms model options to the options.
289
150
fn build_find_options<'a, T>(opts: &ListOptions, mut find: Find<'a, T>) -> Find<'a, T>
290
150
where
291
150
    T: Send + Sync,
292
150
{
293
150
    if let Some(offset) = opts.offset {
294
24
        find = find.skip(offset);
295
126
    }
296
150
    if let Some(limit) = opts.limit {
297
114
        if limit > 0 {
298
108
            find = find.limit(limit as i64);
299
108
        }
300
36
    }
301
150
    if let Some(sort_list) = opts.sort.as_ref() {
302
134
        if sort_list.len() > 0 {
303
134
            let mut sort_opts = Document::new();
304
166
            for cond in sort_list.iter() {
305
166
                let key = match cond.key {
306
126
                    SortKey::Proc => "proc",
307
8
                    SortKey::Resp => "resp",
308
16
                    SortKey::NetworkCode => "networkCode",
309
16
                    SortKey::NetworkAddr => "networkAddr",
310
                };
311
166
                if cond.asc {
312
82
                    sort_opts.insert(key.to_string(), 1);
313
84
                } else {
314
84
                    sort_opts.insert(key.to_string(), -1);
315
84
                }
316
            }
317
134
            find = find.sort(sort_opts);
318
        }
319
16
    }
320
150
    find
321
150
}
322

            
323
/// Transforms query conditions to the MongoDB document.
324
22
fn get_update_query_filter(cond: &UpdateQueryCond, status: i32) -> Document {
325
22
    let mut document = doc! {"dataId": cond.data_id};
326
22
    if status >= 0 {
327
14
        document.insert("status", doc! {"$ne": 0});
328
14
    } else {
329
8
        document.insert("status", doc! {"$lt": status});
330
8
    }
331
22
    document
332
22
}
333

            
334
/// Transforms the model object to the MongoDB document.
335
22
fn get_update_doc(updates: &Updates) -> Option<Document> {
336
22
    let document = doc! {
337
22
        "resp": DateTime::from_chrono(updates.resp),
338
22
        "status": updates.status,
339
22
    };
340
22
    Some(doc! {"$set": document})
341
22
}