1
use std::{error::Error as StdError, sync::Arc};
2

            
3
use async_trait::async_trait;
4
use futures::TryStreamExt;
5
use mongodb::{
6
    action::Find,
7
    bson::{self, doc, Bson, DateTime, Document},
8
    Cursor as MongoDbCursor, Database,
9
};
10
use serde::{Deserialize, Serialize};
11

            
12
use super::super::network_uldata::{
13
    Cursor, ListOptions, ListQueryCond, NetworkUlData, NetworkUlDataModel, QueryCond, SortKey,
14
    EXPIRES,
15
};
16

            
17
/// Model instance.
18
pub struct Model {
19
    /// The associated database connection.
20
    conn: Arc<Database>,
21
}
22

            
23
/// Cursor instance.
24
struct DbCursor {
25
    /// The associated collection cursor.
26
    cursor: MongoDbCursor<Schema>,
27
    /// (Useless) only for Cursor trait implementation.
28
    offset: u64,
29
}
30

            
31
/// MongoDB schema.
32
#[derive(Deserialize, Serialize)]
33
struct Schema {
34
    #[serde(rename = "dataId")]
35
    pub data_id: String,
36
    pub proc: DateTime,
37
    #[serde(rename = "unitCode")]
38
    pub unit_code: Option<String>,
39
    #[serde(rename = "networkCode")]
40
    pub network_code: String,
41
    #[serde(rename = "networkAddr")]
42
    pub network_addr: String,
43
    #[serde(rename = "unitId", skip_serializing_if = "Option::is_none")]
44
    pub unit_id: Option<String>,
45
    #[serde(rename = "deviceId", skip_serializing_if = "Option::is_none")]
46
    pub device_id: Option<String>,
47
    pub time: DateTime,
48
    pub profile: String,
49
    pub data: String,
50
    #[serde(skip_serializing_if = "Option::is_none")]
51
    pub extension: Option<Document>,
52
}
53

            
54
const COL_NAME: &'static str = "networkUlData";
55

            
56
impl Model {
57
    /// To create the model instance with a database connection.
58
14
    pub async fn new(conn: Arc<Database>) -> Result<Self, Box<dyn StdError>> {
59
14
        let model = Model { conn };
60
14
        model.init().await?;
61
14
        Ok(model)
62
14
    }
63
}
64

            
65
#[async_trait]
66
impl NetworkUlDataModel for Model {
67
24
    async fn init(&self) -> Result<(), Box<dyn StdError>> {
68
24
        let indexes = vec![
69
24
            doc! {"name": "dataId_1", "key": {"dataId": 1}, "unique": true},
70
24
            doc! {"name": "unitId_1", "key": {"unitId": 1}},
71
24
            doc! {"name": "deviceId_1", "key": {"deviceId": 1}},
72
24
            doc! {"name": "networkCode_1", "key": {"networkCode": 1}},
73
24
            doc! {"name": "networkAddr_1", "key": {"networkAddr": 1}},
74
24
            doc! {"name": "proc_1", "key": {"proc": 1}, "expireAfterSeconds": EXPIRES},
75
24
            doc! {"name": "time_1", "key": {"time": 1}},
76
24
            doc! {"name": "profile_1", "key": {"profile": 1}},
77
24
        ];
78
24
        let command = doc! {
79
24
            "createIndexes": COL_NAME,
80
24
            "indexes": indexes,
81
24
        };
82
24
        self.conn.run_command(command).await?;
83
24
        Ok(())
84
48
    }
85

            
86
74
    async fn count(&self, cond: &ListQueryCond) -> Result<u64, Box<dyn StdError>> {
87
74
        let filter = get_list_query_filter(cond);
88
74
        let count = self
89
74
            .conn
90
74
            .collection::<Schema>(COL_NAME)
91
74
            .count_documents(filter)
92
74
            .await?;
93
74
        Ok(count)
94
148
    }
95

            
96
    async fn list(
97
        &self,
98
        opts: &ListOptions,
99
        cursor: Option<Box<dyn Cursor>>,
100
176
    ) -> Result<(Vec<NetworkUlData>, Option<Box<dyn Cursor>>), Box<dyn StdError>> {
101
176
        let mut cursor = match cursor {
102
            None => {
103
150
                let filter = get_list_query_filter(opts.cond);
104
150
                Box::new(DbCursor::new(
105
150
                    build_find_options(opts, self.conn.collection::<Schema>(COL_NAME).find(filter))
106
150
                        .await?,
107
                ))
108
            }
109
26
            Some(cursor) => cursor,
110
        };
111

            
112
176
        let mut count: u64 = 0;
113
176
        let mut list = Vec::new();
114
2402
        while let Some(item) = cursor.try_next().await? {
115
2252
            list.push(item);
116
2252
            if let Some(cursor_max) = opts.cursor_max {
117
2098
                count += 1;
118
2098
                if count >= cursor_max {
119
26
                    return Ok((list, Some(cursor)));
120
2072
                }
121
154
            }
122
        }
123
150
        Ok((list, None))
124
352
    }
125

            
126
918
    async fn add(&self, data: &NetworkUlData) -> Result<(), Box<dyn StdError>> {
127
918
        let item = Schema {
128
918
            data_id: data.data_id.clone(),
129
918
            proc: data.proc.into(),
130
918
            unit_code: data.unit_code.clone(),
131
918
            network_code: data.network_code.clone(),
132
918
            network_addr: data.network_addr.clone(),
133
918
            unit_id: data.unit_id.clone(),
134
918
            device_id: data.device_id.clone(),
135
918
            time: data.time.into(),
136
918
            profile: data.profile.clone(),
137
918
            data: data.data.clone(),
138
918
            extension: match data.extension.as_ref() {
139
496
                None => None,
140
422
                Some(extension) => Some(bson::to_document(extension)?),
141
            },
142
        };
143
918
        self.conn
144
918
            .collection::<Schema>(COL_NAME)
145
918
            .insert_one(item)
146
918
            .await?;
147
916
        Ok(())
148
1836
    }
149

            
150
10
    async fn del(&self, cond: &QueryCond) -> Result<(), Box<dyn StdError>> {
151
10
        let filter = get_query_filter(cond);
152
10
        self.conn
153
10
            .collection::<Schema>(COL_NAME)
154
10
            .delete_many(filter)
155
10
            .await?;
156
10
        Ok(())
157
20
    }
158
}
159

            
160
impl DbCursor {
161
    /// To create the cursor instance with a collection cursor.
162
150
    pub fn new(cursor: MongoDbCursor<Schema>) -> Self {
163
150
        DbCursor { cursor, offset: 0 }
164
150
    }
165
}
166

            
167
#[async_trait]
168
impl Cursor for DbCursor {
169
2402
    async fn try_next(&mut self) -> Result<Option<NetworkUlData>, Box<dyn StdError>> {
170
2402
        if let Some(item) = self.cursor.try_next().await? {
171
2252
            self.offset += 1;
172
2252
            return Ok(Some(NetworkUlData {
173
2252
                data_id: item.data_id,
174
2252
                proc: item.proc.into(),
175
2252
                unit_code: item.unit_code,
176
2252
                network_code: item.network_code,
177
2252
                network_addr: item.network_addr,
178
2252
                unit_id: item.unit_id,
179
2252
                device_id: item.device_id,
180
2252
                time: item.time.into(),
181
2252
                profile: item.profile,
182
2252
                data: item.data,
183
2252
                extension: match item.extension {
184
1746
                    None => None,
185
506
                    Some(extension) => Some(bson::from_document(extension)?),
186
                },
187
            }));
188
150
        }
189
150
        Ok(None)
190
4804
    }
191

            
192
10
    fn offset(&self) -> u64 {
193
10
        self.offset
194
10
    }
195
}
196

            
197
/// Transforms query conditions to the MongoDB document.
198
10
fn get_query_filter(cond: &QueryCond) -> Document {
199
10
    let mut filter = Document::new();
200
10
    if let Some(value) = cond.unit_id {
201
6
        filter.insert("unitId", value);
202
6
    }
203
10
    if let Some(value) = cond.device_id {
204
2
        filter.insert("deviceId", value);
205
8
    }
206
10
    let mut time_doc = Document::new();
207
10
    if let Some(value) = cond.proc_gte {
208
2
        time_doc.insert("$gte", Bson::DateTime(value.into()));
209
8
    }
210
10
    if let Some(value) = cond.proc_lte {
211
2
        time_doc.insert("$lte", Bson::DateTime(value.into()));
212
8
    }
213
10
    if time_doc.len() > 0 {
214
2
        filter.insert("proc", time_doc);
215
8
    }
216
10
    filter
217
10
}
218

            
219
/// Transforms query conditions to the MongoDB document.
220
224
fn get_list_query_filter(cond: &ListQueryCond) -> Document {
221
224
    let mut filter = Document::new();
222
224
    if let Some(value) = cond.unit_id {
223
92
        filter.insert("unitId", value);
224
132
    }
225
224
    if let Some(value) = cond.device_id {
226
28
        filter.insert("deviceId", value);
227
196
    }
228
224
    if let Some(value) = cond.network_code {
229
16
        filter.insert("networkCode", value);
230
208
    }
231
224
    if let Some(value) = cond.network_addr {
232
12
        filter.insert("networkAddr", value);
233
212
    }
234
224
    if let Some(value) = cond.profile {
235
12
        filter.insert("profile", value);
236
212
    }
237
224
    let mut time_doc = Document::new();
238
224
    if let Some(value) = cond.proc_gte {
239
50
        time_doc.insert("$gte", Bson::DateTime(value.into()));
240
174
    }
241
224
    if let Some(value) = cond.proc_lte {
242
16
        time_doc.insert("$lte", Bson::DateTime(value.into()));
243
208
    }
244
224
    if time_doc.len() > 0 {
245
50
        filter.insert("proc", time_doc);
246
174
    }
247
224
    time_doc = Document::new();
248
224
    if let Some(value) = cond.time_gte {
249
28
        time_doc.insert("$gte", Bson::DateTime(value.into()));
250
196
    }
251
224
    if let Some(value) = cond.time_lte {
252
16
        time_doc.insert("$lte", Bson::DateTime(value.into()));
253
208
    }
254
224
    if time_doc.len() > 0 {
255
28
        filter.insert("time", time_doc);
256
196
    }
257
224
    filter
258
224
}
259

            
260
/// Transforms model options to the options.
261
150
fn build_find_options<'a, T>(opts: &ListOptions, mut find: Find<'a, T>) -> Find<'a, T>
262
150
where
263
150
    T: Send + Sync,
264
150
{
265
150
    if let Some(offset) = opts.offset {
266
24
        find = find.skip(offset);
267
126
    }
268
150
    if let Some(limit) = opts.limit {
269
114
        if limit > 0 {
270
108
            find = find.limit(limit as i64);
271
108
        }
272
36
    }
273
150
    if let Some(sort_list) = opts.sort.as_ref() {
274
134
        if sort_list.len() > 0 {
275
134
            let mut sort_opts = Document::new();
276
166
            for cond in sort_list.iter() {
277
166
                let key = match cond.key {
278
126
                    SortKey::Proc => "proc",
279
8
                    SortKey::Time => "time",
280
16
                    SortKey::NetworkCode => "networkCode",
281
16
                    SortKey::NetworkAddr => "networkAddr",
282
                };
283
166
                if cond.asc {
284
82
                    sort_opts.insert(key.to_string(), 1);
285
84
                } else {
286
84
                    sort_opts.insert(key.to_string(), -1);
287
84
                }
288
            }
289
134
            find = find.sort(sort_opts);
290
        }
291
16
    }
292
150
    find
293
150
}