1
use std::{error::Error as StdError, sync::Arc};
2

            
3
use async_trait::async_trait;
4
use futures::TryStreamExt;
5
use mongodb::{
6
    action::Find,
7
    bson::{doc, DateTime, Document},
8
    Cursor as MongoDbCursor, Database,
9
};
10
use serde::{Deserialize, Serialize};
11

            
12
use super::super::network_route::{
13
    Cursor, ListOptions, ListQueryCond, NetworkRoute, NetworkRouteModel, QueryCond, SortKey,
14
};
15

            
16
/// Model instance.
17
pub struct Model {
18
    /// The associated database connection.
19
    conn: Arc<Database>,
20
}
21

            
22
/// Cursor instance.
23
struct DbCursor {
24
    /// The associated collection cursor.
25
    cursor: MongoDbCursor<Schema>,
26
    /// (Useless) only for Cursor trait implementation.
27
    offset: u64,
28
}
29

            
30
/// MongoDB schema.
31
#[derive(Deserialize, Serialize)]
32
struct Schema {
33
    #[serde(rename = "routeId")]
34
    route_id: String,
35
    #[serde(rename = "unitId")]
36
    unit_id: String,
37
    #[serde(rename = "unitCode")]
38
    unit_code: String,
39
    #[serde(rename = "applicationId")]
40
    application_id: String,
41
    #[serde(rename = "applicationCode")]
42
    application_code: String,
43
    #[serde(rename = "networkId")]
44
    network_id: String,
45
    #[serde(rename = "networkCode")]
46
    network_code: String,
47
    #[serde(rename = "createdAt")]
48
    created_at: DateTime,
49
}
50

            
51
const COL_NAME: &'static str = "networkRoute";
52

            
53
impl Model {
54
    /// To create the model instance with a database connection.
55
12
    pub async fn new(conn: Arc<Database>) -> Result<Self, Box<dyn StdError>> {
56
12
        let model = Model { conn };
57
12
        model.init().await?;
58
12
        Ok(model)
59
12
    }
60
}
61

            
62
#[async_trait]
63
impl NetworkRouteModel for Model {
64
19
    async fn init(&self) -> Result<(), Box<dyn StdError>> {
65
19
        let indexes = vec![
66
19
            doc! {"name": "routeId_1", "key": {"routeId": 1}, "unique": true},
67
19
            doc! {
68
19
                "name": "applicationId_1_networkId_1",
69
19
                "key": {"applicationId": 1, "networkId": 1},
70
19
                "unique": true
71
19
            },
72
19
            doc! {"name": "unitId_1", "key": {"unitId": 1}},
73
19
            doc! {"name": "networkId_1", "key": {"networkId": 1}},
74
19
            doc! {"name": "applicationId_1", "key": {"applicationId": 1}},
75
19
            doc! {"name": "createdAt_1", "key": {"createdAt": 1}},
76
19
        ];
77
19
        let command = doc! {
78
19
            "createIndexes": COL_NAME,
79
19
            "indexes": indexes,
80
19
        };
81
19
        self.conn.run_command(command).await?;
82
19
        Ok(())
83
38
    }
84

            
85
54
    async fn count(&self, cond: &ListQueryCond) -> Result<u64, Box<dyn StdError>> {
86
54
        let filter = get_list_query_filter(cond);
87
54
        let count = self
88
54
            .conn
89
54
            .collection::<Schema>(COL_NAME)
90
54
            .count_documents(filter)
91
54
            .await?;
92
54
        Ok(count)
93
108
    }
94

            
95
    async fn list(
96
        &self,
97
        opts: &ListOptions,
98
        cursor: Option<Box<dyn Cursor>>,
99
139
    ) -> Result<(Vec<NetworkRoute>, Option<Box<dyn Cursor>>), Box<dyn StdError>> {
100
139
        let mut cursor = match cursor {
101
            None => {
102
122
                let filter = get_list_query_filter(opts.cond);
103
122
                Box::new(DbCursor::new(
104
122
                    build_find_options(opts, self.conn.collection::<Schema>(COL_NAME).find(filter))
105
122
                        .await?,
106
                ))
107
            }
108
17
            Some(cursor) => cursor,
109
        };
110

            
111
139
        let mut count: u64 = 0;
112
139
        let mut list = Vec::new();
113
1745
        while let Some(item) = cursor.try_next().await? {
114
1623
            list.push(item);
115
1623
            if let Some(cursor_max) = opts.cursor_max {
116
1547
                count += 1;
117
1547
                if count >= cursor_max {
118
17
                    return Ok((list, Some(cursor)));
119
1530
                }
120
76
            }
121
        }
122
122
        Ok((list, None))
123
278
    }
124

            
125
77
    async fn get(&self, route_id: &str) -> Result<Option<NetworkRoute>, Box<dyn StdError>> {
126
77
        let filter = doc! {"routeId": route_id};
127
77
        let mut cursor = self
128
77
            .conn
129
77
            .collection::<Schema>(COL_NAME)
130
77
            .find(filter)
131
77
            .await?;
132
77
        if let Some(route) = cursor.try_next().await? {
133
54
            return Ok(Some(NetworkRoute {
134
54
                route_id: route.route_id,
135
54
                unit_id: route.unit_id,
136
54
                unit_code: route.unit_code,
137
54
                application_id: route.application_id,
138
54
                application_code: route.application_code,
139
54
                network_id: route.network_id,
140
54
                network_code: route.network_code,
141
54
                created_at: route.created_at.into(),
142
54
            }));
143
23
        }
144
23
        Ok(None)
145
154
    }
146

            
147
517
    async fn add(&self, route: &NetworkRoute) -> Result<(), Box<dyn StdError>> {
148
517
        let item = Schema {
149
517
            route_id: route.route_id.clone(),
150
517
            unit_id: route.unit_id.clone(),
151
517
            unit_code: route.unit_code.clone(),
152
517
            application_id: route.application_id.clone(),
153
517
            application_code: route.application_code.clone(),
154
517
            network_id: route.network_id.clone(),
155
517
            network_code: route.network_code.clone(),
156
517
            created_at: route.created_at.into(),
157
517
        };
158
517
        self.conn
159
517
            .collection::<Schema>(COL_NAME)
160
517
            .insert_one(item)
161
517
            .await?;
162
515
        Ok(())
163
1034
    }
164

            
165
26
    async fn del(&self, cond: &QueryCond) -> Result<(), Box<dyn StdError>> {
166
26
        let filter = get_query_filter(cond);
167
26
        self.conn
168
26
            .collection::<Schema>(COL_NAME)
169
26
            .delete_many(filter)
170
26
            .await?;
171
26
        Ok(())
172
52
    }
173
}
174

            
175
impl DbCursor {
176
    /// To create the cursor instance with a collection cursor.
177
122
    pub fn new(cursor: MongoDbCursor<Schema>) -> Self {
178
122
        DbCursor { cursor, offset: 0 }
179
122
    }
180
}
181

            
182
#[async_trait]
183
impl Cursor for DbCursor {
184
1745
    async fn try_next(&mut self) -> Result<Option<NetworkRoute>, Box<dyn StdError>> {
185
1745
        if let Some(item) = self.cursor.try_next().await? {
186
1623
            self.offset += 1;
187
1623
            return Ok(Some(NetworkRoute {
188
1623
                route_id: item.route_id,
189
1623
                unit_id: item.unit_id,
190
1623
                unit_code: item.unit_code,
191
1623
                application_id: item.application_id,
192
1623
                application_code: item.application_code,
193
1623
                network_id: item.network_id,
194
1623
                network_code: item.network_code,
195
1623
                created_at: item.created_at.into(),
196
1623
            }));
197
122
        }
198
122
        Ok(None)
199
3490
    }
200

            
201
4
    fn offset(&self) -> u64 {
202
4
        self.offset
203
4
    }
204
}
205

            
206
/// Transforms query conditions to the MongoDB document.
207
26
fn get_query_filter(cond: &QueryCond) -> Document {
208
26
    let mut filter = Document::new();
209
26
    if let Some(value) = cond.route_id {
210
5
        filter.insert("routeId", value);
211
21
    }
212
26
    if let Some(value) = cond.unit_id {
213
9
        filter.insert("unitId", value);
214
17
    }
215
26
    if let Some(value) = cond.application_id {
216
6
        filter.insert("applicationId", value);
217
20
    }
218
26
    if let Some(value) = cond.network_id {
219
7
        filter.insert("networkId", value);
220
19
    }
221
26
    filter
222
26
}
223

            
224
/// Transforms query conditions to the MongoDB document.
225
176
fn get_list_query_filter(cond: &ListQueryCond) -> Document {
226
176
    let mut filter = Document::new();
227
176
    if let Some(value) = cond.route_id {
228
26
        filter.insert("routeId", value);
229
150
    }
230
176
    if let Some(value) = cond.unit_id {
231
58
        filter.insert("unitId", value);
232
118
    }
233
176
    if let Some(value) = cond.unit_code {
234
6
        filter.insert("unitCode", value);
235
170
    }
236
176
    if let Some(value) = cond.application_id {
237
46
        filter.insert("applicationId", value);
238
130
    }
239
176
    if let Some(value) = cond.application_code {
240
6
        filter.insert("applicationCode", value);
241
170
    }
242
176
    if let Some(value) = cond.network_id {
243
63
        filter.insert("networkId", value);
244
113
    }
245
176
    if let Some(value) = cond.network_code {
246
6
        filter.insert("networkCode", value);
247
170
    }
248
176
    filter
249
176
}
250

            
251
/// Transforms model options to the options.
252
122
fn build_find_options<'a, T>(opts: &ListOptions, mut find: Find<'a, T>) -> Find<'a, T>
253
122
where
254
122
    T: Send + Sync,
255
122
{
256
122
    if let Some(offset) = opts.offset {
257
10
        find = find.skip(offset);
258
112
    }
259
122
    if let Some(limit) = opts.limit {
260
71
        if limit > 0 {
261
70
            find = find.limit(limit as i64);
262
70
        }
263
51
    }
264
122
    if let Some(sort_list) = opts.sort.as_ref() {
265
73
        if sort_list.len() > 0 {
266
72
            let mut sort_opts = Document::new();
267
131
            for cond in sort_list.iter() {
268
131
                let key = match cond.key {
269
59
                    SortKey::CreatedAt => "createdAt",
270
6
                    SortKey::ApplicationCode => "applicationCode",
271
66
                    SortKey::NetworkCode => "networkCode",
272
                };
273
131
                if cond.asc {
274
69
                    sort_opts.insert(key.to_string(), 1);
275
69
                } else {
276
62
                    sort_opts.insert(key.to_string(), -1);
277
62
                }
278
            }
279
72
            find = find.sort(sort_opts);
280
1
        }
281
49
    }
282
122
    find
283
122
}