1
use std::{error::Error as StdError, sync::Arc};
2

            
3
use async_trait::async_trait;
4
use chrono::{TimeZone, Utc};
5
use futures::TryStreamExt;
6
use sql_builder::{quote, SqlBuilder};
7
use sqlx::SqlitePool;
8

            
9
use super::super::network_route::{
10
    Cursor, ListOptions, ListQueryCond, NetworkRoute, NetworkRouteModel, QueryCond, SortKey,
11
};
12

            
13
/// Model instance.
14
pub struct Model {
15
    /// The associated database connection.
16
    conn: Arc<SqlitePool>,
17
}
18

            
19
/// Cursor instance.
20
///
21
/// The SQLite implementation uses the original list options and the progress offset.
22
pub struct DbCursor {
23
    offset: u64,
24
}
25

            
26
/// SQLite schema.
27
#[derive(sqlx::FromRow)]
28
struct Schema {
29
    route_id: String,
30
    unit_id: String,
31
    unit_code: String,
32
    application_id: String,
33
    application_code: String,
34
    network_id: String,
35
    network_code: String,
36
    /// i64 as time tick from Epoch in milliseconds.
37
    created_at: i64,
38
}
39

            
40
/// Use "COUNT(*)" instead of "COUNT(fields...)" to simplify the implementation.
41
#[derive(sqlx::FromRow)]
42
struct CountSchema {
43
    #[sqlx(rename = "COUNT(*)")]
44
    count: i64,
45
}
46

            
47
const TABLE_NAME: &'static str = "network_route";
48
const FIELDS: &'static [&'static str] = &[
49
    "route_id",
50
    "unit_id",
51
    "unit_code",
52
    "application_id",
53
    "application_code",
54
    "network_id",
55
    "network_code",
56
    "created_at",
57
];
58
const TABLE_INIT_SQL: &'static str = "\
59
    CREATE TABLE IF NOT EXISTS network_route (\
60
    route_id TEXT NOT NULL UNIQUE,\
61
    unit_id TEXT NOT NULL,\
62
    unit_code TEXT NOT NULL,\
63
    application_id TEXT NOT NULL,\
64
    application_code TEXT NOT NULL,\
65
    network_id TEXT NOT NULL,\
66
    network_code TEXT NOT NULL,\
67
    created_at INTEGER NOT NULL,\
68
    UNIQUE (application_id,network_id),\
69
    PRIMARY KEY (route_id))";
70

            
71
impl Model {
72
    /// To create the model instance with a database connection.
73
56
    pub async fn new(conn: Arc<SqlitePool>) -> Result<Self, Box<dyn StdError>> {
74
56
        let model = Model { conn };
75
56
        model.init().await?;
76
56
        Ok(model)
77
56
    }
78
}
79

            
80
#[async_trait]
81
impl NetworkRouteModel for Model {
82
96
    async fn init(&self) -> Result<(), Box<dyn StdError>> {
83
96
        let _ = sqlx::query(TABLE_INIT_SQL)
84
96
            .execute(self.conn.as_ref())
85
96
            .await?;
86
96
        Ok(())
87
192
    }
88

            
89
176
    async fn count(&self, cond: &ListQueryCond) -> Result<u64, Box<dyn StdError>> {
90
176
        let sql = build_list_where(SqlBuilder::select_from(TABLE_NAME).count("*"), &cond).sql()?;
91

            
92
176
        let result: Result<CountSchema, sqlx::Error> = sqlx::query_as(sql.as_str())
93
176
            .fetch_one(self.conn.as_ref())
94
176
            .await;
95

            
96
176
        let row = match result {
97
            Err(e) => return Err(Box::new(e)),
98
176
            Ok(row) => row,
99
176
        };
100
176
        Ok(row.count as u64)
101
352
    }
102

            
103
    async fn list(
104
        &self,
105
        opts: &ListOptions,
106
        cursor: Option<Box<dyn Cursor>>,
107
454
    ) -> Result<(Vec<NetworkRoute>, Option<Box<dyn Cursor>>), Box<dyn StdError>> {
108
454
        let mut cursor = match cursor {
109
394
            None => Box::new(DbCursor::new()),
110
60
            Some(cursor) => cursor,
111
        };
112

            
113
454
        let mut opts = ListOptions { ..*opts };
114
454
        if let Some(offset) = opts.offset {
115
48
            opts.offset = Some(offset + cursor.offset());
116
406
        } else {
117
406
            opts.offset = Some(cursor.offset());
118
406
        }
119
454
        let opts_limit = opts.limit;
120
454
        if let Some(limit) = opts_limit {
121
308
            if limit > 0 {
122
306
                if cursor.offset() >= limit {
123
26
                    return Ok((vec![], None));
124
280
                }
125
280
                opts.limit = Some(limit - cursor.offset());
126
2
            }
127
146
        }
128
428
        let mut builder = SqlBuilder::select_from(TABLE_NAME);
129
428
        build_limit_offset(&mut builder, &opts);
130
428
        build_sort(&mut builder, &opts);
131
428
        let sql = build_list_where(&mut builder, opts.cond).sql()?;
132

            
133
428
        let mut rows = sqlx::query_as::<_, Schema>(sql.as_str()).fetch(self.conn.as_ref());
134
428

            
135
428
        let mut count: u64 = 0;
136
428
        let mut list = vec![];
137
6666
        while let Some(row) = rows.try_next().await? {
138
6326
            let _ = cursor.as_mut().try_next().await?;
139
6326
            list.push(NetworkRoute {
140
6326
                route_id: row.route_id,
141
6326
                unit_id: row.unit_id,
142
6326
                unit_code: row.unit_code,
143
6326
                application_id: row.application_id,
144
6326
                application_code: row.application_code,
145
6326
                network_id: row.network_id,
146
6326
                network_code: row.network_code,
147
6326
                created_at: Utc.timestamp_nanos(row.created_at * 1000000),
148
6326
            });
149
6326
            if let Some(limit) = opts_limit {
150
3778
                if limit > 0 && cursor.offset() >= limit {
151
54
                    if let Some(cursor_max) = opts.cursor_max {
152
44
                        if (count + 1) >= cursor_max {
153
26
                            return Ok((list, Some(cursor)));
154
18
                        }
155
10
                    }
156
28
                    return Ok((list, None));
157
3724
                }
158
2548
            }
159
6272
            if let Some(cursor_max) = opts.cursor_max {
160
6116
                count += 1;
161
6116
                if count >= cursor_max {
162
34
                    return Ok((list, Some(cursor)));
163
6082
                }
164
156
            }
165
        }
166
340
        Ok((list, None))
167
908
    }
168

            
169
270
    async fn get(&self, route_id: &str) -> Result<Option<NetworkRoute>, Box<dyn StdError>> {
170
270
        let sql = SqlBuilder::select_from(TABLE_NAME)
171
270
            .fields(FIELDS)
172
270
            .and_where_eq("route_id", quote(route_id))
173
270
            .sql()?;
174

            
175
270
        let result: Result<Schema, sqlx::Error> = sqlx::query_as(sql.as_str())
176
270
            .fetch_one(self.conn.as_ref())
177
270
            .await;
178

            
179
270
        let row = match result {
180
74
            Err(e) => match e {
181
74
                sqlx::Error::RowNotFound => return Ok(None),
182
                _ => return Err(Box::new(e)),
183
            },
184
196
            Ok(row) => row,
185
196
        };
186
196

            
187
196
        Ok(Some(NetworkRoute {
188
196
            route_id: row.route_id,
189
196
            unit_id: row.unit_id,
190
196
            unit_code: row.unit_code,
191
196
            application_id: row.application_id,
192
196
            application_code: row.application_code,
193
196
            network_id: row.network_id,
194
196
            network_code: row.network_code,
195
196
            created_at: Utc.timestamp_nanos(row.created_at * 1000000),
196
196
        }))
197
540
    }
198

            
199
1992
    async fn add(&self, route: &NetworkRoute) -> Result<(), Box<dyn StdError>> {
200
1992
        let values = vec![
201
1992
            quote(route.route_id.as_str()),
202
1992
            quote(route.unit_id.as_str()),
203
1992
            quote(route.unit_code.as_str()),
204
1992
            quote(route.application_id.as_str()),
205
1992
            quote(route.application_code.as_str()),
206
1992
            quote(route.network_id.as_str()),
207
1992
            quote(route.network_code.as_str()),
208
1992
            route.created_at.timestamp_millis().to_string(),
209
1992
        ];
210
1992
        let sql = SqlBuilder::insert_into(TABLE_NAME)
211
1992
            .fields(FIELDS)
212
1992
            .values(&values)
213
1992
            .sql()?;
214
1992
        let _ = sqlx::query(sql.as_str())
215
1992
            .execute(self.conn.as_ref())
216
1992
            .await?;
217
1988
        Ok(())
218
3984
    }
219

            
220
94
    async fn del(&self, cond: &QueryCond) -> Result<(), Box<dyn StdError>> {
221
94
        let sql = build_where(&mut SqlBuilder::delete_from(TABLE_NAME), cond).sql()?;
222
94
        let _ = sqlx::query(sql.as_str())
223
94
            .execute(self.conn.as_ref())
224
94
            .await?;
225
94
        Ok(())
226
188
    }
227
}
228

            
229
impl DbCursor {
230
    /// To create the cursor instance.
231
394
    pub fn new() -> Self {
232
394
        DbCursor { offset: 0 }
233
394
    }
234
}
235

            
236
#[async_trait]
237
impl Cursor for DbCursor {
238
6326
    async fn try_next(&mut self) -> Result<Option<NetworkRoute>, Box<dyn StdError>> {
239
6326
        self.offset += 1;
240
6326
        Ok(None)
241
12652
    }
242

            
243
4818
    fn offset(&self) -> u64 {
244
4818
        self.offset
245
4818
    }
246
}
247

            
248
/// Transforms query conditions to the SQL builder.
249
94
fn build_where<'a>(builder: &'a mut SqlBuilder, cond: &QueryCond<'a>) -> &'a mut SqlBuilder {
250
94
    if let Some(value) = cond.route_id {
251
12
        builder.and_where_eq("route_id", quote(value));
252
82
    }
253
94
    if let Some(value) = cond.unit_id {
254
36
        builder.and_where_eq("unit_id", quote(value));
255
58
    }
256
94
    if let Some(value) = cond.application_id {
257
22
        builder.and_where_eq("application_id", quote(value));
258
72
    }
259
94
    if let Some(value) = cond.network_id {
260
26
        builder.and_where_eq("network_id", quote(value));
261
68
    }
262
94
    builder
263
94
}
264

            
265
/// Transforms query conditions to the SQL builder.
266
604
fn build_list_where<'a>(
267
604
    builder: &'a mut SqlBuilder,
268
604
    cond: &ListQueryCond<'a>,
269
604
) -> &'a mut SqlBuilder {
270
604
    if let Some(value) = cond.route_id {
271
52
        builder.and_where_eq("route_id", quote(value));
272
552
    }
273
604
    if let Some(value) = cond.unit_id {
274
220
        builder.and_where_eq("unit_id", quote(value));
275
384
    }
276
604
    if let Some(value) = cond.unit_code {
277
12
        builder.and_where_eq("unit_code", quote(value));
278
592
    }
279
604
    if let Some(value) = cond.application_id {
280
172
        builder.and_where_eq("application_id", quote(value));
281
432
    }
282
604
    if let Some(value) = cond.application_code {
283
12
        builder.and_where_eq("application_code", quote(value));
284
592
    }
285
604
    if let Some(value) = cond.network_id {
286
218
        builder.and_where_eq("network_id", quote(value));
287
386
    }
288
604
    if let Some(value) = cond.network_code {
289
12
        builder.and_where_eq("network_code", quote(value));
290
592
    }
291
604
    builder
292
604
}
293

            
294
/// Transforms model options to the SQL builder.
295
428
fn build_limit_offset<'a>(builder: &'a mut SqlBuilder, opts: &ListOptions) -> &'a mut SqlBuilder {
296
428
    if let Some(value) = opts.limit {
297
282
        if value > 0 {
298
280
            builder.limit(value);
299
280
        }
300
146
    }
301
428
    if let Some(value) = opts.offset {
302
428
        match opts.limit {
303
146
            None => builder.limit(-1).offset(value),
304
2
            Some(0) => builder.limit(-1).offset(value),
305
280
            _ => builder.offset(value),
306
        };
307
    }
308
428
    builder
309
428
}
310

            
311
/// Transforms model options to the SQL builder.
312
428
fn build_sort<'a>(builder: &'a mut SqlBuilder, opts: &ListOptions) -> &'a mut SqlBuilder {
313
428
    if let Some(sort_cond) = opts.sort.as_ref() {
314
548
        for cond in sort_cond.iter() {
315
548
            let key = match cond.key {
316
260
                SortKey::CreatedAt => "created_at",
317
16
                SortKey::ApplicationCode => "application_code",
318
272
                SortKey::NetworkCode => "network_code",
319
            };
320
548
            builder.order_by(key, !cond.asc);
321
        }
322
134
    }
323
428
    builder
324
428
}