1
use std::{error::Error as StdError, sync::Arc};
2

            
3
use async_trait::async_trait;
4
use futures::TryStreamExt;
5
use mongodb::{
6
    action::Find,
7
    bson::{self, doc, Bson, DateTime, Document},
8
    Cursor as MongoDbCursor, Database,
9
};
10
use serde::{Deserialize, Serialize};
11

            
12
use super::super::application_dldata::{
13
    ApplicationDlData, ApplicationDlDataModel, Cursor, ListOptions, ListQueryCond, QueryCond,
14
    SortKey, UpdateQueryCond, Updates, EXPIRES,
15
};
16

            
17
/// Model instance.
18
pub struct Model {
19
    /// The associated database connection.
20
    conn: Arc<Database>,
21
}
22

            
23
/// Cursor instance.
24
struct DbCursor {
25
    /// The associated collection cursor.
26
    cursor: MongoDbCursor<Schema>,
27
    /// (Useless) only for Cursor trait implementation.
28
    offset: u64,
29
}
30

            
31
/// MongoDB schema.
32
#[derive(Deserialize, Serialize)]
33
struct Schema {
34
    #[serde(rename = "dataId")]
35
    pub data_id: String,
36
    pub proc: DateTime,
37
    #[serde(skip_serializing_if = "Option::is_none")]
38
    pub resp: Option<DateTime>,
39
    pub status: i32,
40
    #[serde(rename = "unitId")]
41
    pub unit_id: String,
42
    #[serde(rename = "deviceId", skip_serializing_if = "Option::is_none")]
43
    pub device_id: Option<String>,
44
    #[serde(rename = "networkCode", skip_serializing_if = "Option::is_none")]
45
    pub network_code: Option<String>,
46
    #[serde(rename = "networkAddr", skip_serializing_if = "Option::is_none")]
47
    pub network_addr: Option<String>,
48
    pub profile: String,
49
    pub data: String,
50
    #[serde(skip_serializing_if = "Option::is_none")]
51
    pub extension: Option<Document>,
52
}
53

            
54
const COL_NAME: &'static str = "applicationDlData";
55

            
56
impl Model {
57
    /// To create the model instance with a database connection.
58
7
    pub async fn new(conn: Arc<Database>) -> Result<Self, Box<dyn StdError>> {
59
7
        let model = Model { conn };
60
7
        model.init().await?;
61
7
        Ok(model)
62
7
    }
63
}
64

            
65
#[async_trait]
66
impl ApplicationDlDataModel for Model {
67
12
    async fn init(&self) -> Result<(), Box<dyn StdError>> {
68
12
        let indexes = vec![
69
12
            doc! {"name": "dataId_1", "key": {"dataId": 1}, "unique": true},
70
12
            doc! {"name": "status_1", "key": {"status": 1}},
71
12
            doc! {"name": "unitId_1", "key": {"unitId": 1}},
72
12
            doc! {"name": "deviceId_1", "key": {"deviceId": 1}},
73
12
            doc! {"name": "networkCode_1", "key": {"networkCode": 1}},
74
12
            doc! {"name": "networkAddr_1", "key": {"networkAddr": 1}},
75
12
            doc! {"name": "profile_1", "key": {"profile": 1}},
76
12
            doc! {"name": "proc_1", "key": {"proc": 1}, "expireAfterSeconds": EXPIRES},
77
12
            doc! {"name": "resp_1", "key": {"resp": 1}},
78
12
        ];
79
12
        let command = doc! {
80
12
            "createIndexes": COL_NAME,
81
12
            "indexes": indexes,
82
12
        };
83
12
        self.conn.run_command(command).await?;
84
12
        Ok(())
85
24
    }
86

            
87
37
    async fn count(&self, cond: &ListQueryCond) -> Result<u64, Box<dyn StdError>> {
88
37
        let filter = get_list_query_filter(cond);
89
37
        let count = self
90
37
            .conn
91
37
            .collection::<Schema>(COL_NAME)
92
37
            .count_documents(filter)
93
37
            .await?;
94
37
        Ok(count)
95
74
    }
96

            
97
    async fn list(
98
        &self,
99
        opts: &ListOptions,
100
        cursor: Option<Box<dyn Cursor>>,
101
88
    ) -> Result<(Vec<ApplicationDlData>, Option<Box<dyn Cursor>>), Box<dyn StdError>> {
102
88
        let mut cursor = match cursor {
103
            None => {
104
75
                let filter = get_list_query_filter(opts.cond);
105
75
                Box::new(DbCursor::new(
106
75
                    build_find_options(opts, self.conn.collection::<Schema>(COL_NAME).find(filter))
107
75
                        .await?,
108
                ))
109
            }
110
13
            Some(cursor) => cursor,
111
        };
112

            
113
88
        let mut count: u64 = 0;
114
88
        let mut list = Vec::new();
115
1211
        while let Some(item) = cursor.try_next().await? {
116
1136
            list.push(item);
117
1136
            if let Some(cursor_max) = opts.cursor_max {
118
1058
                count += 1;
119
1058
                if count >= cursor_max {
120
13
                    return Ok((list, Some(cursor)));
121
1045
                }
122
78
            }
123
        }
124
75
        Ok((list, None))
125
176
    }
126

            
127
472
    async fn add(&self, data: &ApplicationDlData) -> Result<(), Box<dyn StdError>> {
128
472
        let item = Schema {
129
472
            data_id: data.data_id.clone(),
130
472
            proc: data.proc.into(),
131
472
            resp: match data.resp {
132
237
                None => None,
133
235
                Some(resp) => Some(resp.into()),
134
            },
135
472
            status: data.status,
136
472
            unit_id: data.unit_id.clone(),
137
472
            device_id: data.device_id.clone(),
138
472
            network_code: data.network_code.clone(),
139
472
            network_addr: data.network_addr.clone(),
140
472
            profile: data.profile.clone(),
141
472
            data: data.data.clone(),
142
472
            extension: match data.extension.as_ref() {
143
261
                None => None,
144
211
                Some(extension) => Some(bson::to_document(extension)?),
145
            },
146
        };
147
472
        self.conn
148
472
            .collection::<Schema>(COL_NAME)
149
472
            .insert_one(item)
150
472
            .await?;
151
471
        Ok(())
152
944
    }
153

            
154
6
    async fn del(&self, cond: &QueryCond) -> Result<(), Box<dyn StdError>> {
155
6
        let filter = get_query_filter(cond);
156
6
        self.conn
157
6
            .collection::<Schema>(COL_NAME)
158
6
            .delete_many(filter)
159
6
            .await?;
160
6
        Ok(())
161
12
    }
162

            
163
    async fn update(
164
        &self,
165
        cond: &UpdateQueryCond,
166
        updates: &Updates,
167
11
    ) -> Result<(), Box<dyn StdError>> {
168
11
        let filter = get_update_query_filter(cond, updates.status);
169
11
        if let Some(updates) = get_update_doc(updates) {
170
11
            self.conn
171
11
                .collection::<Schema>(COL_NAME)
172
11
                .update_one(filter, updates)
173
11
                .await?;
174
        }
175
11
        return Ok(());
176
22
    }
177
}
178

            
179
impl DbCursor {
180
    /// To create the cursor instance with a collection cursor.
181
75
    pub fn new(cursor: MongoDbCursor<Schema>) -> Self {
182
75
        DbCursor { cursor, offset: 0 }
183
75
    }
184
}
185

            
186
#[async_trait]
187
impl Cursor for DbCursor {
188
1211
    async fn try_next(&mut self) -> Result<Option<ApplicationDlData>, Box<dyn StdError>> {
189
1211
        if let Some(item) = self.cursor.try_next().await? {
190
1136
            self.offset += 1;
191
1136
            return Ok(Some(ApplicationDlData {
192
1136
                data_id: item.data_id,
193
1136
                proc: item.proc.into(),
194
1136
                resp: match item.resp {
195
745
                    None => None,
196
391
                    Some(resp) => Some(resp.into()),
197
                },
198
1136
                status: item.status,
199
1136
                unit_id: item.unit_id,
200
1136
                device_id: item.device_id,
201
1136
                network_code: item.network_code,
202
1136
                network_addr: item.network_addr,
203
1136
                profile: item.profile,
204
1136
                data: item.data,
205
1136
                extension: match item.extension {
206
883
                    None => None,
207
253
                    Some(extension) => Some(bson::from_document(extension)?),
208
                },
209
            }));
210
75
        }
211
75
        Ok(None)
212
2422
    }
213

            
214
5
    fn offset(&self) -> u64 {
215
5
        self.offset
216
5
    }
217
}
218

            
219
/// Transforms query conditions to the MongoDB document.
220
6
fn get_query_filter(cond: &QueryCond) -> Document {
221
6
    let mut filter = Document::new();
222
6
    if let Some(value) = cond.unit_id {
223
3
        filter.insert("unitId", value);
224
3
    }
225
6
    if let Some(value) = cond.device_id {
226
1
        filter.insert("deviceId", value);
227
5
    }
228
6
    if let Some(value) = cond.network_code {
229
1
        filter.insert("networkCode", value);
230
5
    }
231
6
    if let Some(value) = cond.network_addr {
232
1
        filter.insert("networkAddr", value);
233
5
    }
234
6
    let mut time_doc = Document::new();
235
6
    if let Some(value) = cond.proc_gte {
236
1
        time_doc.insert("$gte", Bson::DateTime(value.into()));
237
5
    }
238
6
    if let Some(value) = cond.proc_lte {
239
1
        time_doc.insert("$lte", Bson::DateTime(value.into()));
240
5
    }
241
6
    if time_doc.len() > 0 {
242
1
        filter.insert("proc", time_doc);
243
5
    }
244
6
    filter
245
6
}
246

            
247
/// Transforms query conditions to the MongoDB document.
248
112
fn get_list_query_filter(cond: &ListQueryCond) -> Document {
249
112
    let mut filter = Document::new();
250
112
    if let Some(value) = cond.unit_id {
251
46
        filter.insert("unitId", value);
252
66
    }
253
112
    if let Some(value) = cond.device_id {
254
14
        filter.insert("deviceId", value);
255
98
    }
256
112
    if let Some(value) = cond.network_code {
257
8
        filter.insert("networkCode", value);
258
104
    }
259
112
    if let Some(value) = cond.network_addr {
260
6
        filter.insert("networkAddr", value);
261
106
    }
262
112
    if let Some(value) = cond.profile {
263
6
        filter.insert("profile", value);
264
106
    }
265
112
    let mut time_doc = Document::new();
266
112
    if let Some(value) = cond.proc_gte {
267
25
        time_doc.insert("$gte", Bson::DateTime(value.into()));
268
87
    }
269
112
    if let Some(value) = cond.proc_lte {
270
8
        time_doc.insert("$lte", Bson::DateTime(value.into()));
271
104
    }
272
112
    if time_doc.len() > 0 {
273
25
        filter.insert("proc", time_doc);
274
87
    }
275
112
    time_doc = Document::new();
276
112
    if let Some(value) = cond.resp_gte {
277
14
        time_doc.insert("$gte", Bson::DateTime(value.into()));
278
98
    }
279
112
    if let Some(value) = cond.resp_lte {
280
8
        time_doc.insert("$lte", Bson::DateTime(value.into()));
281
104
    }
282
112
    if time_doc.len() > 0 {
283
14
        filter.insert("resp", time_doc);
284
98
    }
285
112
    filter
286
112
}
287

            
288
/// Transforms model options to the options.
289
75
fn build_find_options<'a, T>(opts: &ListOptions, mut find: Find<'a, T>) -> Find<'a, T>
290
75
where
291
75
    T: Send + Sync,
292
75
{
293
75
    if let Some(offset) = opts.offset {
294
12
        find = find.skip(offset);
295
63
    }
296
75
    if let Some(limit) = opts.limit {
297
57
        if limit > 0 {
298
54
            find = find.limit(limit as i64);
299
54
        }
300
18
    }
301
75
    if let Some(sort_list) = opts.sort.as_ref() {
302
67
        if sort_list.len() > 0 {
303
67
            let mut sort_opts = Document::new();
304
83
            for cond in sort_list.iter() {
305
83
                let key = match cond.key {
306
63
                    SortKey::Proc => "proc",
307
4
                    SortKey::Resp => "resp",
308
8
                    SortKey::NetworkCode => "networkCode",
309
8
                    SortKey::NetworkAddr => "networkAddr",
310
                };
311
83
                if cond.asc {
312
41
                    sort_opts.insert(key.to_string(), 1);
313
42
                } else {
314
42
                    sort_opts.insert(key.to_string(), -1);
315
42
                }
316
            }
317
67
            find = find.sort(sort_opts);
318
        }
319
8
    }
320
75
    find
321
75
}
322

            
323
/// Transforms query conditions to the MongoDB document.
324
11
fn get_update_query_filter(cond: &UpdateQueryCond, status: i32) -> Document {
325
11
    let mut document = doc! {"dataId": cond.data_id};
326
11
    if status >= 0 {
327
7
        document.insert("status", doc! {"$ne": 0});
328
7
    } else {
329
4
        document.insert("status", doc! {"$lt": status});
330
4
    }
331
11
    document
332
11
}
333

            
334
/// Transforms the model object to the MongoDB document.
335
11
fn get_update_doc(updates: &Updates) -> Option<Document> {
336
11
    let document = doc! {
337
11
        "resp": DateTime::from_chrono(updates.resp),
338
11
        "status": updates.status,
339
11
    };
340
11
    Some(doc! {"$set": document})
341
11
}