1
use std::{error::Error as StdError, sync::Arc};
2

            
3
use async_trait::async_trait;
4
use futures::TryStreamExt;
5
use mongodb::{
6
    bson::{self, doc, Bson, DateTime, Document},
7
    options::FindOptions,
8
    Cursor as MongoDbCursor, Database,
9
};
10
use serde::{Deserialize, Serialize};
11

            
12
use super::super::network_uldata::{
13
    Cursor, ListOptions, ListQueryCond, NetworkUlData, NetworkUlDataModel, QueryCond, SortKey,
14
    EXPIRES,
15
};
16

            
17
/// Model instance.
18
pub struct Model {
19
    /// The associated database connection.
20
    conn: Arc<Database>,
21
}
22

            
23
/// Cursor instance.
24
struct DbCursor {
25
    /// The associated collection cursor.
26
    cursor: MongoDbCursor<Schema>,
27
    /// (Useless) only for Cursor trait implementation.
28
    offset: u64,
29
}
30

            
31
/// MongoDB schema.
32
22330
#[derive(Deserialize, Serialize)]
33
struct Schema {
34
    #[serde(rename = "dataId")]
35
    pub data_id: String,
36
    pub proc: DateTime,
37
    #[serde(rename = "unitCode")]
38
    pub unit_code: Option<String>,
39
    #[serde(rename = "networkCode")]
40
    pub network_code: String,
41
    #[serde(rename = "networkAddr")]
42
    pub network_addr: String,
43
    #[serde(rename = "unitId", skip_serializing_if = "Option::is_none")]
44
    pub unit_id: Option<String>,
45
    #[serde(rename = "deviceId", skip_serializing_if = "Option::is_none")]
46
    pub device_id: Option<String>,
47
    pub time: DateTime,
48
    pub profile: String,
49
    pub data: String,
50
    #[serde(skip_serializing_if = "Option::is_none")]
51
    pub extension: Option<Document>,
52
}
53

            
54
const COL_NAME: &'static str = "networkUlData";
55

            
56
impl Model {
57
    /// To create the model instance with a database connection.
58
7
    pub async fn new(conn: Arc<Database>) -> Result<Self, Box<dyn StdError>> {
59
7
        let model = Model { conn };
60
14
        model.init().await?;
61
7
        Ok(model)
62
7
    }
63
}
64

            
65
#[async_trait]
66
impl NetworkUlDataModel for Model {
67
12
    async fn init(&self) -> Result<(), Box<dyn StdError>> {
68
12
        let indexes = vec![
69
12
            doc! {"name": "dataId_1", "key": {"dataId": 1}, "unique": true},
70
12
            doc! {"name": "unitId_1", "key": {"unitId": 1}},
71
12
            doc! {"name": "deviceId_1", "key": {"deviceId": 1}},
72
12
            doc! {"name": "networkCode_1", "key": {"networkCode": 1}},
73
12
            doc! {"name": "networkAddr_1", "key": {"networkAddr": 1}},
74
12
            doc! {"name": "proc_1", "key": {"proc": 1}, "expireAfterSeconds": EXPIRES},
75
12
            doc! {"name": "time_1", "key": {"time": 1}},
76
12
            doc! {"name": "profile_1", "key": {"profile": 1}},
77
12
        ];
78
12
        let command = doc! {
79
12
            "createIndexes": COL_NAME,
80
12
            "indexes": indexes,
81
12
        };
82
24
        self.conn.run_command(command, None).await?;
83
12
        Ok(())
84
24
    }
85

            
86
37
    async fn count(&self, cond: &ListQueryCond) -> Result<u64, Box<dyn StdError>> {
87
37
        let filter = get_list_query_filter(cond);
88
37
        let count = self
89
37
            .conn
90
37
            .collection::<Schema>(COL_NAME)
91
37
            .count_documents(filter, None)
92
74
            .await?;
93
37
        Ok(count)
94
74
    }
95

            
96
    async fn list(
97
        &self,
98
        opts: &ListOptions,
99
        cursor: Option<Box<dyn Cursor>>,
100
88
    ) -> Result<(Vec<NetworkUlData>, Option<Box<dyn Cursor>>), Box<dyn StdError>> {
101
88
        let mut cursor = match cursor {
102
            None => {
103
75
                let filter = get_list_query_filter(opts.cond);
104
75
                let options = get_find_options(opts);
105
75
                Box::new(DbCursor::new(
106
75
                    self.conn
107
75
                        .collection::<Schema>(COL_NAME)
108
75
                        .find(filter, options)
109
150
                        .await?,
110
                ))
111
            }
112
13
            Some(cursor) => cursor,
113
        };
114

            
115
88
        let mut count: u64 = 0;
116
88
        let mut list = Vec::new();
117
1201
        while let Some(item) = cursor.try_next().await? {
118
1126
            list.push(item);
119
1126
            if let Some(cursor_max) = opts.cursor_max {
120
1049
                count += 1;
121
1049
                if count >= cursor_max {
122
13
                    return Ok((list, Some(cursor)));
123
1036
                }
124
77
            }
125
        }
126
75
        Ok((list, None))
127
176
    }
128

            
129
459
    async fn add(&self, data: &NetworkUlData) -> Result<(), Box<dyn StdError>> {
130
459
        let item = Schema {
131
459
            data_id: data.data_id.clone(),
132
459
            proc: data.proc.into(),
133
459
            unit_code: data.unit_code.clone(),
134
459
            network_code: data.network_code.clone(),
135
459
            network_addr: data.network_addr.clone(),
136
459
            unit_id: data.unit_id.clone(),
137
459
            device_id: data.device_id.clone(),
138
459
            time: data.time.into(),
139
459
            profile: data.profile.clone(),
140
459
            data: data.data.clone(),
141
459
            extension: match data.extension.as_ref() {
142
248
                None => None,
143
211
                Some(extension) => Some(bson::to_document(extension)?),
144
            },
145
        };
146
459
        self.conn
147
459
            .collection::<Schema>(COL_NAME)
148
459
            .insert_one(item, None)
149
919
            .await?;
150
458
        Ok(())
151
918
    }
152

            
153
5
    async fn del(&self, cond: &QueryCond) -> Result<(), Box<dyn StdError>> {
154
5
        let filter = get_query_filter(cond);
155
5
        self.conn
156
5
            .collection::<Schema>(COL_NAME)
157
5
            .delete_many(filter, None)
158
10
            .await?;
159
5
        Ok(())
160
10
    }
161
}
162

            
163
impl DbCursor {
164
    /// To create the cursor instance with a collection cursor.
165
75
    pub fn new(cursor: MongoDbCursor<Schema>) -> Self {
166
75
        DbCursor { cursor, offset: 0 }
167
75
    }
168
}
169

            
170
#[async_trait]
171
impl Cursor for DbCursor {
172
1201
    async fn try_next(&mut self) -> Result<Option<NetworkUlData>, Box<dyn StdError>> {
173
1201
        if let Some(item) = self.cursor.try_next().await? {
174
1126
            self.offset += 1;
175
1126
            return Ok(Some(NetworkUlData {
176
1126
                data_id: item.data_id,
177
1126
                proc: item.proc.into(),
178
1126
                unit_code: item.unit_code,
179
1126
                network_code: item.network_code,
180
1126
                network_addr: item.network_addr,
181
1126
                unit_id: item.unit_id,
182
1126
                device_id: item.device_id,
183
1126
                time: item.time.into(),
184
1126
                profile: item.profile,
185
1126
                data: item.data,
186
1126
                extension: match item.extension {
187
873
                    None => None,
188
253
                    Some(extension) => Some(bson::from_document(extension)?),
189
                },
190
            }));
191
75
        }
192
75
        Ok(None)
193
2402
    }
194

            
195
5
    fn offset(&self) -> u64 {
196
5
        self.offset
197
5
    }
198
}
199

            
200
/// Transforms query conditions to the MongoDB document.
201
5
fn get_query_filter(cond: &QueryCond) -> Document {
202
5
    let mut filter = Document::new();
203
5
    if let Some(value) = cond.unit_id {
204
3
        filter.insert("unitId", value);
205
3
    }
206
5
    if let Some(value) = cond.device_id {
207
1
        filter.insert("deviceId", value);
208
4
    }
209
5
    let mut time_doc = Document::new();
210
5
    if let Some(value) = cond.proc_gte {
211
1
        time_doc.insert("$gte", Bson::DateTime(value.into()));
212
4
    }
213
5
    if let Some(value) = cond.proc_lte {
214
1
        time_doc.insert("$lte", Bson::DateTime(value.into()));
215
4
    }
216
5
    if time_doc.len() > 0 {
217
1
        filter.insert("proc", time_doc);
218
4
    }
219
5
    filter
220
5
}
221

            
222
/// Transforms query conditions to the MongoDB document.
223
112
fn get_list_query_filter(cond: &ListQueryCond) -> Document {
224
112
    let mut filter = Document::new();
225
112
    if let Some(value) = cond.unit_id {
226
46
        filter.insert("unitId", value);
227
66
    }
228
112
    if let Some(value) = cond.device_id {
229
14
        filter.insert("deviceId", value);
230
98
    }
231
112
    if let Some(value) = cond.network_code {
232
8
        filter.insert("networkCode", value);
233
104
    }
234
112
    if let Some(value) = cond.network_addr {
235
6
        filter.insert("networkAddr", value);
236
106
    }
237
112
    if let Some(value) = cond.profile {
238
6
        filter.insert("profile", value);
239
106
    }
240
112
    let mut time_doc = Document::new();
241
112
    if let Some(value) = cond.proc_gte {
242
25
        time_doc.insert("$gte", Bson::DateTime(value.into()));
243
87
    }
244
112
    if let Some(value) = cond.proc_lte {
245
8
        time_doc.insert("$lte", Bson::DateTime(value.into()));
246
104
    }
247
112
    if time_doc.len() > 0 {
248
25
        filter.insert("proc", time_doc);
249
87
    }
250
112
    time_doc = Document::new();
251
112
    if let Some(value) = cond.time_gte {
252
14
        time_doc.insert("$gte", Bson::DateTime(value.into()));
253
98
    }
254
112
    if let Some(value) = cond.time_lte {
255
8
        time_doc.insert("$lte", Bson::DateTime(value.into()));
256
104
    }
257
112
    if time_doc.len() > 0 {
258
14
        filter.insert("time", time_doc);
259
98
    }
260
112
    filter
261
112
}
262

            
263
/// Transforms model options to the options.
264
75
fn get_find_options(opts: &ListOptions) -> FindOptions {
265
75
    let mut options = FindOptions::builder().build();
266
75
    if let Some(offset) = opts.offset {
267
12
        options.skip = Some(offset);
268
63
    }
269
75
    if let Some(limit) = opts.limit {
270
57
        if limit > 0 {
271
54
            options.limit = Some(limit as i64);
272
54
        }
273
18
    }
274
75
    if let Some(sort_list) = opts.sort.as_ref() {
275
67
        if sort_list.len() > 0 {
276
67
            let mut sort_opts = Document::new();
277
83
            for cond in sort_list.iter() {
278
83
                let key = match cond.key {
279
63
                    SortKey::Proc => "proc",
280
4
                    SortKey::Time => "time",
281
8
                    SortKey::NetworkCode => "networkCode",
282
8
                    SortKey::NetworkAddr => "networkAddr",
283
                };
284
83
                if cond.asc {
285
41
                    sort_opts.insert(key.to_string(), 1);
286
42
                } else {
287
42
                    sort_opts.insert(key.to_string(), -1);
288
42
                }
289
            }
290
67
            options.sort = Some(sort_opts);
291
        }
292
8
    }
293
75
    options
294
75
}