1
use std::{error::Error as StdError, sync::Arc};
2

            
3
use async_trait::async_trait;
4
use futures::TryStreamExt;
5
use mongodb::{
6
    action::Find,
7
    bson::{self, doc, Bson, DateTime, Document},
8
    Cursor as MongoDbCursor, Database,
9
};
10
use serde::{Deserialize, Serialize};
11

            
12
use super::super::network_uldata::{
13
    Cursor, ListOptions, ListQueryCond, NetworkUlData, NetworkUlDataModel, QueryCond, SortKey,
14
    EXPIRES,
15
};
16

            
17
/// Model instance.
18
pub struct Model {
19
    /// The associated database connection.
20
    conn: Arc<Database>,
21
}
22

            
23
/// Cursor instance.
24
struct DbCursor {
25
    /// The associated collection cursor.
26
    cursor: MongoDbCursor<Schema>,
27
    /// (Useless) only for Cursor trait implementation.
28
    offset: u64,
29
}
30

            
31
/// MongoDB schema.
32
#[derive(Deserialize, Serialize)]
33
struct Schema {
34
    #[serde(rename = "dataId")]
35
    pub data_id: String,
36
    pub proc: DateTime,
37
    #[serde(rename = "unitCode")]
38
    pub unit_code: Option<String>,
39
    #[serde(rename = "networkCode")]
40
    pub network_code: String,
41
    #[serde(rename = "networkAddr")]
42
    pub network_addr: String,
43
    #[serde(rename = "unitId", skip_serializing_if = "Option::is_none")]
44
    pub unit_id: Option<String>,
45
    #[serde(rename = "deviceId", skip_serializing_if = "Option::is_none")]
46
    pub device_id: Option<String>,
47
    pub time: DateTime,
48
    pub profile: String,
49
    pub data: String,
50
    #[serde(skip_serializing_if = "Option::is_none")]
51
    pub extension: Option<Document>,
52
}
53

            
54
const COL_NAME: &'static str = "networkUlData";
55

            
56
impl Model {
57
    /// To create the model instance with a database connection.
58
7
    pub async fn new(conn: Arc<Database>) -> Result<Self, Box<dyn StdError>> {
59
7
        let model = Model { conn };
60
7
        model.init().await?;
61
7
        Ok(model)
62
7
    }
63
}
64

            
65
#[async_trait]
66
impl NetworkUlDataModel for Model {
67
12
    async fn init(&self) -> Result<(), Box<dyn StdError>> {
68
12
        let indexes = vec![
69
12
            doc! {"name": "dataId_1", "key": {"dataId": 1}, "unique": true},
70
12
            doc! {"name": "unitId_1", "key": {"unitId": 1}},
71
12
            doc! {"name": "deviceId_1", "key": {"deviceId": 1}},
72
12
            doc! {"name": "networkCode_1", "key": {"networkCode": 1}},
73
12
            doc! {"name": "networkAddr_1", "key": {"networkAddr": 1}},
74
12
            doc! {"name": "proc_1", "key": {"proc": 1}, "expireAfterSeconds": EXPIRES},
75
12
            doc! {"name": "time_1", "key": {"time": 1}},
76
12
            doc! {"name": "profile_1", "key": {"profile": 1}},
77
12
        ];
78
12
        let command = doc! {
79
12
            "createIndexes": COL_NAME,
80
12
            "indexes": indexes,
81
12
        };
82
12
        self.conn.run_command(command).await?;
83
12
        Ok(())
84
24
    }
85

            
86
37
    async fn count(&self, cond: &ListQueryCond) -> Result<u64, Box<dyn StdError>> {
87
37
        let filter = get_list_query_filter(cond);
88
37
        let count = self
89
37
            .conn
90
37
            .collection::<Schema>(COL_NAME)
91
37
            .count_documents(filter)
92
37
            .await?;
93
37
        Ok(count)
94
74
    }
95

            
96
    async fn list(
97
        &self,
98
        opts: &ListOptions,
99
        cursor: Option<Box<dyn Cursor>>,
100
88
    ) -> Result<(Vec<NetworkUlData>, Option<Box<dyn Cursor>>), Box<dyn StdError>> {
101
88
        let mut cursor = match cursor {
102
            None => {
103
75
                let filter = get_list_query_filter(opts.cond);
104
75
                Box::new(DbCursor::new(
105
75
                    build_find_options(opts, self.conn.collection::<Schema>(COL_NAME).find(filter))
106
75
                        .await?,
107
                ))
108
            }
109
13
            Some(cursor) => cursor,
110
        };
111

            
112
88
        let mut count: u64 = 0;
113
88
        let mut list = Vec::new();
114
1201
        while let Some(item) = cursor.try_next().await? {
115
1126
            list.push(item);
116
1126
            if let Some(cursor_max) = opts.cursor_max {
117
1049
                count += 1;
118
1049
                if count >= cursor_max {
119
13
                    return Ok((list, Some(cursor)));
120
1036
                }
121
77
            }
122
        }
123
75
        Ok((list, None))
124
176
    }
125

            
126
459
    async fn add(&self, data: &NetworkUlData) -> Result<(), Box<dyn StdError>> {
127
459
        let item = Schema {
128
459
            data_id: data.data_id.clone(),
129
459
            proc: data.proc.into(),
130
459
            unit_code: data.unit_code.clone(),
131
459
            network_code: data.network_code.clone(),
132
459
            network_addr: data.network_addr.clone(),
133
459
            unit_id: data.unit_id.clone(),
134
459
            device_id: data.device_id.clone(),
135
459
            time: data.time.into(),
136
459
            profile: data.profile.clone(),
137
459
            data: data.data.clone(),
138
459
            extension: match data.extension.as_ref() {
139
248
                None => None,
140
211
                Some(extension) => Some(bson::to_document(extension)?),
141
            },
142
        };
143
459
        self.conn
144
459
            .collection::<Schema>(COL_NAME)
145
459
            .insert_one(item)
146
459
            .await?;
147
458
        Ok(())
148
918
    }
149

            
150
5
    async fn del(&self, cond: &QueryCond) -> Result<(), Box<dyn StdError>> {
151
5
        let filter = get_query_filter(cond);
152
5
        self.conn
153
5
            .collection::<Schema>(COL_NAME)
154
5
            .delete_many(filter)
155
5
            .await?;
156
5
        Ok(())
157
10
    }
158
}
159

            
160
impl DbCursor {
161
    /// To create the cursor instance with a collection cursor.
162
75
    pub fn new(cursor: MongoDbCursor<Schema>) -> Self {
163
75
        DbCursor { cursor, offset: 0 }
164
75
    }
165
}
166

            
167
#[async_trait]
168
impl Cursor for DbCursor {
169
1201
    async fn try_next(&mut self) -> Result<Option<NetworkUlData>, Box<dyn StdError>> {
170
1201
        if let Some(item) = self.cursor.try_next().await? {
171
1126
            self.offset += 1;
172
1126
            return Ok(Some(NetworkUlData {
173
1126
                data_id: item.data_id,
174
1126
                proc: item.proc.into(),
175
1126
                unit_code: item.unit_code,
176
1126
                network_code: item.network_code,
177
1126
                network_addr: item.network_addr,
178
1126
                unit_id: item.unit_id,
179
1126
                device_id: item.device_id,
180
1126
                time: item.time.into(),
181
1126
                profile: item.profile,
182
1126
                data: item.data,
183
1126
                extension: match item.extension {
184
873
                    None => None,
185
253
                    Some(extension) => Some(bson::from_document(extension)?),
186
                },
187
            }));
188
75
        }
189
75
        Ok(None)
190
2402
    }
191

            
192
5
    fn offset(&self) -> u64 {
193
5
        self.offset
194
5
    }
195
}
196

            
197
/// Transforms query conditions to the MongoDB document.
198
5
fn get_query_filter(cond: &QueryCond) -> Document {
199
5
    let mut filter = Document::new();
200
5
    if let Some(value) = cond.unit_id {
201
3
        filter.insert("unitId", value);
202
3
    }
203
5
    if let Some(value) = cond.device_id {
204
1
        filter.insert("deviceId", value);
205
4
    }
206
5
    let mut time_doc = Document::new();
207
5
    if let Some(value) = cond.proc_gte {
208
1
        time_doc.insert("$gte", Bson::DateTime(value.into()));
209
4
    }
210
5
    if let Some(value) = cond.proc_lte {
211
1
        time_doc.insert("$lte", Bson::DateTime(value.into()));
212
4
    }
213
5
    if time_doc.len() > 0 {
214
1
        filter.insert("proc", time_doc);
215
4
    }
216
5
    filter
217
5
}
218

            
219
/// Transforms query conditions to the MongoDB document.
220
112
fn get_list_query_filter(cond: &ListQueryCond) -> Document {
221
112
    let mut filter = Document::new();
222
112
    if let Some(value) = cond.unit_id {
223
46
        filter.insert("unitId", value);
224
66
    }
225
112
    if let Some(value) = cond.device_id {
226
14
        filter.insert("deviceId", value);
227
98
    }
228
112
    if let Some(value) = cond.network_code {
229
8
        filter.insert("networkCode", value);
230
104
    }
231
112
    if let Some(value) = cond.network_addr {
232
6
        filter.insert("networkAddr", value);
233
106
    }
234
112
    if let Some(value) = cond.profile {
235
6
        filter.insert("profile", value);
236
106
    }
237
112
    let mut time_doc = Document::new();
238
112
    if let Some(value) = cond.proc_gte {
239
25
        time_doc.insert("$gte", Bson::DateTime(value.into()));
240
87
    }
241
112
    if let Some(value) = cond.proc_lte {
242
8
        time_doc.insert("$lte", Bson::DateTime(value.into()));
243
104
    }
244
112
    if time_doc.len() > 0 {
245
25
        filter.insert("proc", time_doc);
246
87
    }
247
112
    time_doc = Document::new();
248
112
    if let Some(value) = cond.time_gte {
249
14
        time_doc.insert("$gte", Bson::DateTime(value.into()));
250
98
    }
251
112
    if let Some(value) = cond.time_lte {
252
8
        time_doc.insert("$lte", Bson::DateTime(value.into()));
253
104
    }
254
112
    if time_doc.len() > 0 {
255
14
        filter.insert("time", time_doc);
256
98
    }
257
112
    filter
258
112
}
259

            
260
/// Transforms model options to the options.
261
75
fn build_find_options<'a, T>(opts: &ListOptions, mut find: Find<'a, T>) -> Find<'a, T>
262
75
where
263
75
    T: Send + Sync,
264
75
{
265
75
    if let Some(offset) = opts.offset {
266
12
        find = find.skip(offset);
267
63
    }
268
75
    if let Some(limit) = opts.limit {
269
57
        if limit > 0 {
270
54
            find = find.limit(limit as i64);
271
54
        }
272
18
    }
273
75
    if let Some(sort_list) = opts.sort.as_ref() {
274
67
        if sort_list.len() > 0 {
275
67
            let mut sort_opts = Document::new();
276
83
            for cond in sort_list.iter() {
277
83
                let key = match cond.key {
278
63
                    SortKey::Proc => "proc",
279
4
                    SortKey::Time => "time",
280
8
                    SortKey::NetworkCode => "networkCode",
281
8
                    SortKey::NetworkAddr => "networkAddr",
282
                };
283
83
                if cond.asc {
284
41
                    sort_opts.insert(key.to_string(), 1);
285
42
                } else {
286
42
                    sort_opts.insert(key.to_string(), -1);
287
42
                }
288
            }
289
67
            find = find.sort(sort_opts);
290
        }
291
8
    }
292
75
    find
293
75
}