1
use std::{error::Error as StdError, sync::Arc};
2

            
3
use async_trait::async_trait;
4
use futures::TryStreamExt;
5
use mongodb::{
6
    action::Find,
7
    bson::{self, doc, Bson, DateTime, Document},
8
    Cursor as MongoDbCursor, Database,
9
};
10
use serde::{Deserialize, Serialize};
11

            
12
use super::super::application_uldata::{
13
    ApplicationUlData, ApplicationUlDataModel, Cursor, ListOptions, ListQueryCond, QueryCond,
14
    SortKey, EXPIRES,
15
};
16

            
17
/// Model instance.
18
pub struct Model {
19
    /// The associated database connection.
20
    conn: Arc<Database>,
21
}
22

            
23
/// Cursor instance.
24
struct DbCursor {
25
    /// The associated collection cursor.
26
    cursor: MongoDbCursor<Schema>,
27
    /// (Useless) only for Cursor trait implementation.
28
    offset: u64,
29
}
30

            
31
/// MongoDB schema.
32
#[derive(Deserialize, Serialize)]
33
struct Schema {
34
    #[serde(rename = "dataId")]
35
    pub data_id: String,
36
    pub proc: DateTime,
37
    #[serde(rename = "pub")]
38
    pub publish: DateTime,
39
    #[serde(rename = "unitCode")]
40
    pub unit_code: Option<String>,
41
    #[serde(rename = "networkCode")]
42
    pub network_code: String,
43
    #[serde(rename = "networkAddr")]
44
    pub network_addr: String,
45
    #[serde(rename = "unitId")]
46
    pub unit_id: String,
47
    #[serde(rename = "deviceId")]
48
    pub device_id: String,
49
    pub time: DateTime,
50
    pub profile: String,
51
    pub data: String,
52
    #[serde(skip_serializing_if = "Option::is_none")]
53
    pub extension: Option<Document>,
54
}
55

            
56
const COL_NAME: &'static str = "applicationUlData";
57

            
58
impl Model {
59
    /// To create the model instance with a database connection.
60
7
    pub async fn new(conn: Arc<Database>) -> Result<Self, Box<dyn StdError>> {
61
7
        let model = Model { conn };
62
7
        model.init().await?;
63
7
        Ok(model)
64
7
    }
65
}
66

            
67
#[async_trait]
68
impl ApplicationUlDataModel for Model {
69
12
    async fn init(&self) -> Result<(), Box<dyn StdError>> {
70
12
        let indexes = vec![
71
12
            doc! {"name": "dataId_1", "key": {"dataId": 1}, "unique": true},
72
12
            doc! {"name": "unitId_1", "key": {"unitId": 1}},
73
12
            doc! {"name": "deviceId_1", "key": {"deviceId": 1}},
74
12
            doc! {"name": "networkCode_1", "key": {"networkCode": 1}},
75
12
            doc! {"name": "networkAddr_1", "key": {"networkAddr": 1}},
76
12
            doc! {"name": "profile_1", "key": {"profile": 1}},
77
12
            doc! {"name": "proc_1", "key": {"proc": 1}, "expireAfterSeconds": EXPIRES},
78
12
            doc! {"name": "pub_1", "key": {"pub": 1}},
79
12
            doc! {"name": "time_1", "key": {"time": 1}},
80
12
        ];
81
12
        let command = doc! {
82
12
            "createIndexes": COL_NAME,
83
12
            "indexes": indexes,
84
12
        };
85
12
        self.conn.run_command(command).await?;
86
12
        Ok(())
87
24
    }
88

            
89
44
    async fn count(&self, cond: &ListQueryCond) -> Result<u64, Box<dyn StdError>> {
90
44
        let filter = get_list_query_filter(cond);
91
44
        let count = self
92
44
            .conn
93
44
            .collection::<Schema>(COL_NAME)
94
44
            .count_documents(filter)
95
44
            .await?;
96
44
        Ok(count)
97
88
    }
98

            
99
    async fn list(
100
        &self,
101
        opts: &ListOptions,
102
        cursor: Option<Box<dyn Cursor>>,
103
99
    ) -> Result<(Vec<ApplicationUlData>, Option<Box<dyn Cursor>>), Box<dyn StdError>> {
104
99
        let mut cursor = match cursor {
105
            None => {
106
86
                let filter = get_list_query_filter(opts.cond);
107
86
                Box::new(DbCursor::new(
108
86
                    build_find_options(opts, self.conn.collection::<Schema>(COL_NAME).find(filter))
109
86
                        .await?,
110
                ))
111
            }
112
13
            Some(cursor) => cursor,
113
        };
114

            
115
99
        let mut count: u64 = 0;
116
99
        let mut list = Vec::new();
117
1258
        while let Some(item) = cursor.try_next().await? {
118
1172
            list.push(item);
119
1172
            if let Some(cursor_max) = opts.cursor_max {
120
1080
                count += 1;
121
1080
                if count >= cursor_max {
122
13
                    return Ok((list, Some(cursor)));
123
1067
                }
124
92
            }
125
        }
126
86
        Ok((list, None))
127
198
    }
128

            
129
459
    async fn add(&self, data: &ApplicationUlData) -> Result<(), Box<dyn StdError>> {
130
459
        let item = Schema {
131
459
            data_id: data.data_id.clone(),
132
459
            proc: data.proc.into(),
133
459
            publish: data.publish.into(),
134
459
            unit_code: data.unit_code.clone(),
135
459
            network_code: data.network_code.clone(),
136
459
            network_addr: data.network_addr.clone(),
137
459
            unit_id: data.unit_id.clone(),
138
459
            device_id: data.device_id.clone(),
139
459
            time: data.time.into(),
140
459
            profile: data.profile.clone(),
141
459
            data: data.data.clone(),
142
459
            extension: match data.extension.as_ref() {
143
248
                None => None,
144
211
                Some(extension) => Some(bson::to_document(extension)?),
145
            },
146
        };
147
459
        self.conn
148
459
            .collection::<Schema>(COL_NAME)
149
459
            .insert_one(item)
150
459
            .await?;
151
458
        Ok(())
152
918
    }
153

            
154
5
    async fn del(&self, cond: &QueryCond) -> Result<(), Box<dyn StdError>> {
155
5
        let filter = get_query_filter(cond);
156
5
        self.conn
157
5
            .collection::<Schema>(COL_NAME)
158
5
            .delete_many(filter)
159
5
            .await?;
160
5
        Ok(())
161
10
    }
162
}
163

            
164
impl DbCursor {
165
    /// To create the cursor instance with a collection cursor.
166
86
    pub fn new(cursor: MongoDbCursor<Schema>) -> Self {
167
86
        DbCursor { cursor, offset: 0 }
168
86
    }
169
}
170

            
171
#[async_trait]
172
impl Cursor for DbCursor {
173
1258
    async fn try_next(&mut self) -> Result<Option<ApplicationUlData>, Box<dyn StdError>> {
174
1258
        if let Some(item) = self.cursor.try_next().await? {
175
1172
            self.offset += 1;
176
1172
            return Ok(Some(ApplicationUlData {
177
1172
                data_id: item.data_id,
178
1172
                proc: item.proc.into(),
179
1172
                publish: item.publish.into(),
180
1172
                unit_code: item.unit_code,
181
1172
                network_code: item.network_code,
182
1172
                network_addr: item.network_addr,
183
1172
                unit_id: item.unit_id,
184
1172
                device_id: item.device_id,
185
1172
                time: item.time.into(),
186
1172
                profile: item.profile,
187
1172
                data: item.data,
188
1172
                extension: match item.extension {
189
915
                    None => None,
190
257
                    Some(extension) => Some(bson::from_document(extension)?),
191
                },
192
            }));
193
86
        }
194
86
        Ok(None)
195
2516
    }
196

            
197
5
    fn offset(&self) -> u64 {
198
5
        self.offset
199
5
    }
200
}
201

            
202
/// Transforms query conditions to the MongoDB document.
203
5
fn get_query_filter(cond: &QueryCond) -> Document {
204
5
    let mut filter = Document::new();
205
5
    if let Some(value) = cond.unit_id {
206
3
        filter.insert("unitId", value);
207
3
    }
208
5
    if let Some(value) = cond.device_id {
209
1
        filter.insert("deviceId", value);
210
4
    }
211
5
    let mut time_doc = Document::new();
212
5
    if let Some(value) = cond.proc_gte {
213
1
        time_doc.insert("$gte", Bson::DateTime(value.into()));
214
4
    }
215
5
    if let Some(value) = cond.proc_lte {
216
1
        time_doc.insert("$lte", Bson::DateTime(value.into()));
217
4
    }
218
5
    if time_doc.len() > 0 {
219
1
        filter.insert("proc", time_doc);
220
4
    }
221
5
    filter
222
5
}
223

            
224
/// Transforms query conditions to the MongoDB document.
225
130
fn get_list_query_filter(cond: &ListQueryCond) -> Document {
226
130
    let mut filter = Document::new();
227
130
    if let Some(value) = cond.unit_id {
228
58
        filter.insert("unitId", value);
229
72
    }
230
130
    if let Some(value) = cond.device_id {
231
14
        filter.insert("deviceId", value);
232
116
    }
233
130
    if let Some(value) = cond.network_code {
234
8
        filter.insert("networkCode", value);
235
122
    }
236
130
    if let Some(value) = cond.network_addr {
237
6
        filter.insert("networkAddr", value);
238
124
    }
239
130
    if let Some(value) = cond.profile {
240
6
        filter.insert("profile", value);
241
124
    }
242
130
    let mut time_doc = Document::new();
243
130
    if let Some(value) = cond.proc_gte {
244
25
        time_doc.insert("$gte", Bson::DateTime(value.into()));
245
105
    }
246
130
    if let Some(value) = cond.proc_lte {
247
8
        time_doc.insert("$lte", Bson::DateTime(value.into()));
248
122
    }
249
130
    if time_doc.len() > 0 {
250
25
        filter.insert("proc", time_doc);
251
105
    }
252
130
    time_doc = Document::new();
253
130
    if let Some(value) = cond.pub_gte {
254
14
        time_doc.insert("$gte", Bson::DateTime(value.into()));
255
116
    }
256
130
    if let Some(value) = cond.pub_lte {
257
8
        time_doc.insert("$lte", Bson::DateTime(value.into()));
258
122
    }
259
130
    if time_doc.len() > 0 {
260
14
        filter.insert("pub", time_doc);
261
116
    }
262
130
    time_doc = Document::new();
263
130
    if let Some(value) = cond.time_gte {
264
14
        time_doc.insert("$gte", Bson::DateTime(value.into()));
265
116
    }
266
130
    if let Some(value) = cond.time_lte {
267
8
        time_doc.insert("$lte", Bson::DateTime(value.into()));
268
122
    }
269
130
    if time_doc.len() > 0 {
270
14
        filter.insert("time", time_doc);
271
116
    }
272
130
    filter
273
130
}
274

            
275
/// Transforms model options to the options.
276
86
fn build_find_options<'a, T>(opts: &ListOptions, mut find: Find<'a, T>) -> Find<'a, T>
277
86
where
278
86
    T: Send + Sync,
279
86
{
280
86
    if let Some(offset) = opts.offset {
281
12
        find = find.skip(offset);
282
74
    }
283
86
    if let Some(limit) = opts.limit {
284
65
        if limit > 0 {
285
62
            find = find.limit(limit as i64);
286
62
        }
287
21
    }
288
86
    if let Some(sort_list) = opts.sort.as_ref() {
289
77
        if sort_list.len() > 0 {
290
77
            let mut sort_opts = Document::new();
291
93
            for cond in sort_list.iter() {
292
93
                let key = match cond.key {
293
69
                    SortKey::Proc => "proc",
294
4
                    SortKey::Pub => "pub",
295
4
                    SortKey::Time => "time",
296
8
                    SortKey::NetworkCode => "networkCode",
297
8
                    SortKey::NetworkAddr => "networkAddr",
298
                };
299
93
                if cond.asc {
300
43
                    sort_opts.insert(key.to_string(), 1);
301
50
                } else {
302
50
                    sort_opts.insert(key.to_string(), -1);
303
50
                }
304
            }
305
77
            find = find.sort(sort_opts);
306
        }
307
9
    }
308
86
    find
309
86
}