1
use std::{error::Error as StdError, sync::Arc};
2

            
3
use async_trait::async_trait;
4
use futures::TryStreamExt;
5
use mongodb::{
6
    action::Find,
7
    bson::{doc, DateTime, Document},
8
    Cursor as MongoDbCursor, Database,
9
};
10
use serde::{Deserialize, Serialize};
11

            
12
use super::super::dldata_buffer::{
13
    Cursor, DlDataBuffer, DlDataBufferModel, ListOptions, ListQueryCond, QueryCond, SortKey,
14
};
15

            
16
/// Model instance.
17
pub struct Model {
18
    /// The associated database connection.
19
    conn: Arc<Database>,
20
}
21

            
22
/// Cursor instance.
23
struct DbCursor {
24
    /// The associated collection cursor.
25
    cursor: MongoDbCursor<Schema>,
26
    /// (Useless) only for Cursor trait implementation.
27
    offset: u64,
28
}
29

            
30
/// MongoDB schema.
31
#[derive(Deserialize, Serialize)]
32
struct Schema {
33
    #[serde(rename = "dataId")]
34
    data_id: String,
35
    #[serde(rename = "unitId")]
36
    unit_id: String,
37
    #[serde(rename = "unitCode")]
38
    unit_code: String,
39
    #[serde(rename = "applicationId")]
40
    application_id: String,
41
    #[serde(rename = "applicationCode")]
42
    application_code: String,
43
    #[serde(rename = "networkId")]
44
    network_id: String,
45
    #[serde(rename = "networkAddr")]
46
    network_addr: String,
47
    #[serde(rename = "deviceId")]
48
    device_id: String,
49
    #[serde(rename = "createdAt")]
50
    created_at: DateTime,
51
    #[serde(rename = "expiredAt")]
52
    expired_at: DateTime,
53
}
54

            
55
const COL_NAME: &'static str = "dldataBuffer";
56

            
57
impl Model {
58
    /// To create the model instance with a database connection.
59
12
    pub async fn new(conn: Arc<Database>) -> Result<Self, Box<dyn StdError>> {
60
12
        let model = Model { conn };
61
12
        model.init().await?;
62
12
        Ok(model)
63
12
    }
64
}
65

            
66
#[async_trait]
67
impl DlDataBufferModel for Model {
68
19
    async fn init(&self) -> Result<(), Box<dyn StdError>> {
69
19
        let indexes = vec![
70
19
            doc! {"name": "dataId_1", "key": {"dataId": 1}, "unique": true},
71
19
            doc! {"name": "unitId_1", "key": {"unitId": 1}},
72
19
            doc! {"name": "applicationId_1", "key": {"applicationId": 1}},
73
19
            doc! {"name": "applicationCode", "key": {"applicationCode": 1}},
74
19
            doc! {"name": "networkId_1", "key": {"networkId": 1}},
75
19
            doc! {"name": "networkAddr_1", "key": {"networkAddr": 1}},
76
19
            doc! {"name": "deviceId_1", "key": {"deviceId": 1}},
77
19
            doc! {"name": "createdAt_1", "key": {"createdAt": 1}},
78
19
            doc! {"name": "expiredAt_1", "key": {"expiredAt": 1}},
79
19
        ];
80
19
        let command = doc! {
81
19
            "createIndexes": COL_NAME,
82
19
            "indexes": indexes,
83
19
        };
84
19
        self.conn.run_command(command).await?;
85
19
        Ok(())
86
38
    }
87

            
88
46
    async fn count(&self, cond: &ListQueryCond) -> Result<u64, Box<dyn StdError>> {
89
46
        let filter = get_list_query_filter(cond);
90
46
        let count = self
91
46
            .conn
92
46
            .collection::<Schema>(COL_NAME)
93
46
            .count_documents(filter)
94
46
            .await?;
95
46
        Ok(count)
96
92
    }
97

            
98
    async fn list(
99
        &self,
100
        opts: &ListOptions,
101
        cursor: Option<Box<dyn Cursor>>,
102
108
    ) -> Result<(Vec<DlDataBuffer>, Option<Box<dyn Cursor>>), Box<dyn StdError>> {
103
108
        let mut cursor = match cursor {
104
            None => {
105
89
                let filter = get_list_query_filter(opts.cond);
106
89
                Box::new(DbCursor::new(
107
89
                    build_find_options(opts, self.conn.collection::<Schema>(COL_NAME).find(filter))
108
89
                        .await?,
109
                ))
110
            }
111
19
            Some(cursor) => cursor,
112
        };
113

            
114
108
        let mut count: u64 = 0;
115
108
        let mut list = Vec::new();
116
1971
        while let Some(item) = cursor.try_next().await? {
117
1882
            list.push(item);
118
1882
            if let Some(cursor_max) = opts.cursor_max {
119
1773
                count += 1;
120
1773
                if count >= cursor_max {
121
19
                    return Ok((list, Some(cursor)));
122
1754
                }
123
109
            }
124
        }
125
89
        Ok((list, None))
126
216
    }
127

            
128
108
    async fn get(&self, data_id: &str) -> Result<Option<DlDataBuffer>, Box<dyn StdError>> {
129
108
        let filter = doc! {"dataId": data_id};
130
108
        let mut cursor = self
131
108
            .conn
132
108
            .collection::<Schema>(COL_NAME)
133
108
            .find(filter)
134
108
            .await?;
135
108
        if let Some(data) = cursor.try_next().await? {
136
71
            return Ok(Some(DlDataBuffer {
137
71
                data_id: data.data_id,
138
71
                unit_id: data.unit_id,
139
71
                unit_code: data.unit_code,
140
71
                application_id: data.application_id,
141
71
                application_code: data.application_code,
142
71
                network_id: data.network_id,
143
71
                network_addr: data.network_addr,
144
71
                device_id: data.device_id,
145
71
                created_at: data.created_at.into(),
146
71
                expired_at: data.expired_at.into(),
147
71
            }));
148
37
        }
149
37
        Ok(None)
150
216
    }
151

            
152
644
    async fn add(&self, dldata: &DlDataBuffer) -> Result<(), Box<dyn StdError>> {
153
644
        let item = Schema {
154
644
            data_id: dldata.data_id.clone(),
155
644
            unit_id: dldata.unit_id.clone(),
156
644
            unit_code: dldata.unit_code.clone(),
157
644
            application_id: dldata.application_id.clone(),
158
644
            application_code: dldata.application_code.clone(),
159
644
            network_id: dldata.network_id.clone(),
160
644
            network_addr: dldata.network_addr.clone(),
161
644
            device_id: dldata.device_id.clone(),
162
644
            created_at: dldata.created_at.into(),
163
644
            expired_at: dldata.expired_at.into(),
164
644
        };
165
644
        self.conn
166
644
            .collection::<Schema>(COL_NAME)
167
644
            .insert_one(item)
168
644
            .await?;
169
643
        Ok(())
170
1288
    }
171

            
172
49
    async fn del(&self, cond: &QueryCond) -> Result<(), Box<dyn StdError>> {
173
49
        let filter = get_query_filter(cond);
174
49
        self.conn
175
49
            .collection::<Schema>(COL_NAME)
176
49
            .delete_many(filter)
177
49
            .await?;
178
49
        Ok(())
179
98
    }
180
}
181

            
182
impl DbCursor {
183
    /// To create the cursor instance with a collection cursor.
184
89
    pub fn new(cursor: MongoDbCursor<Schema>) -> Self {
185
89
        DbCursor { cursor, offset: 0 }
186
89
    }
187
}
188

            
189
#[async_trait]
190
impl Cursor for DbCursor {
191
1971
    async fn try_next(&mut self) -> Result<Option<DlDataBuffer>, Box<dyn StdError>> {
192
1971
        if let Some(item) = self.cursor.try_next().await? {
193
1882
            self.offset += 1;
194
1882
            return Ok(Some(DlDataBuffer {
195
1882
                data_id: item.data_id,
196
1882
                unit_id: item.unit_id,
197
1882
                unit_code: item.unit_code,
198
1882
                application_id: item.application_id,
199
1882
                application_code: item.application_code,
200
1882
                network_id: item.network_id,
201
1882
                network_addr: item.network_addr,
202
1882
                device_id: item.device_id,
203
1882
                created_at: item.created_at.into(),
204
1882
                expired_at: item.expired_at.into(),
205
1882
            }));
206
89
        }
207
89
        Ok(None)
208
3942
    }
209

            
210
4
    fn offset(&self) -> u64 {
211
4
        self.offset
212
4
    }
213
}
214

            
215
/// Transforms query conditions to the MongoDB document.
216
49
fn get_query_filter(cond: &QueryCond) -> Document {
217
49
    let mut filter = Document::new();
218
49
    if let Some(value) = cond.data_id {
219
9
        filter.insert("dataId", value);
220
40
    }
221
49
    if let Some(value) = cond.unit_id {
222
17
        filter.insert("unitId", value);
223
32
    }
224
49
    if let Some(value) = cond.application_id {
225
6
        filter.insert("applicationId", value);
226
43
    }
227
49
    if let Some(value) = cond.network_id {
228
13
        filter.insert("networkId", value);
229
36
    }
230
49
    if let Some(value) = cond.network_addrs {
231
8
        let mut in_cond = Document::new();
232
8
        in_cond.insert("$in", value);
233
8
        filter.insert("networkAddr", in_cond);
234
41
    }
235
49
    if let Some(value) = cond.device_id {
236
11
        filter.insert("deviceId", value);
237
38
    }
238
49
    filter
239
49
}
240

            
241
/// Transforms query conditions to the MongoDB document.
242
135
fn get_list_query_filter(cond: &ListQueryCond) -> Document {
243
135
    let mut filter = Document::new();
244
135
    if let Some(value) = cond.unit_id {
245
65
        filter.insert("unitId", value);
246
70
    }
247
135
    if let Some(value) = cond.application_id {
248
33
        filter.insert("applicationId", value);
249
102
    }
250
135
    if let Some(value) = cond.network_id {
251
33
        filter.insert("networkId", value);
252
102
    }
253
135
    if let Some(value) = cond.device_id {
254
14
        filter.insert("deviceId", value);
255
121
    }
256
135
    filter
257
135
}
258

            
259
/// Transforms model options to the options.
260
89
fn build_find_options<'a, T>(opts: &ListOptions, mut find: Find<'a, T>) -> Find<'a, T>
261
89
where
262
89
    T: Send + Sync,
263
89
{
264
89
    if let Some(offset) = opts.offset {
265
10
        find = find.skip(offset);
266
79
    }
267
89
    if let Some(limit) = opts.limit {
268
71
        if limit > 0 {
269
70
            find = find.limit(limit as i64);
270
70
        }
271
18
    }
272
89
    if let Some(sort_list) = opts.sort.as_ref() {
273
84
        if sort_list.len() > 0 {
274
83
            let mut sort_opts = Document::new();
275
148
            for cond in sort_list.iter() {
276
148
                let key = match cond.key {
277
64
                    SortKey::CreatedAt => "createdAt",
278
8
                    SortKey::ExpiredAt => "expiredAt",
279
76
                    SortKey::ApplicationCode => "applicationCode",
280
                };
281
148
                if cond.asc {
282
79
                    sort_opts.insert(key.to_string(), 1);
283
79
                } else {
284
69
                    sort_opts.insert(key.to_string(), -1);
285
69
                }
286
            }
287
83
            find = find.sort(sort_opts);
288
1
        }
289
5
    }
290
89
    find
291
89
}