1
use std::{error::Error as StdError, sync::Arc};
2

            
3
use async_trait::async_trait;
4
use chrono::{TimeZone, Utc};
5
use futures::TryStreamExt;
6
use sql_builder::{quote, SqlBuilder};
7
use sqlx::SqlitePool;
8

            
9
use super::super::network_uldata::{
10
    Cursor, ListOptions, ListQueryCond, NetworkUlData, NetworkUlDataModel, QueryCond, SortKey,
11
};
12

            
13
/// Model instance.
14
pub struct Model {
15
    /// The associated database connection.
16
    conn: Arc<SqlitePool>,
17
}
18

            
19
/// Cursor instance.
20
///
21
/// The SQLite implementation uses the original list options and the progress offset.
22
pub struct DbCursor {
23
    offset: u64,
24
}
25

            
26
/// SQLite schema.
27
#[derive(sqlx::FromRow)]
28
struct Schema {
29
    pub data_id: String,
30
    /// i64 as time tick from Epoch in milliseconds.
31
    pub proc: i64,
32
    /// use empty string as NULL.
33
    pub unit_code: String,
34
    pub network_code: String,
35
    pub network_addr: String,
36
    /// use empty string as NULL.
37
    pub unit_id: String,
38
    /// use empty string as NULL.
39
    pub device_id: String,
40
    /// i64 as time tick from Epoch in milliseconds.
41
    pub time: i64,
42
    pub profile: String,
43
    pub data: String,
44
    /// use empty string as NULL.
45
    pub extension: String,
46
}
47

            
48
/// Use "COUNT(*)" instead of "COUNT(fields...)" to simplify the implementation.
49
#[derive(sqlx::FromRow)]
50
struct CountSchema {
51
    #[sqlx(rename = "COUNT(*)")]
52
    count: i64,
53
}
54

            
55
const TABLE_NAME: &'static str = "network_uldata";
56
const FIELDS: &'static [&'static str] = &[
57
    "data_id",
58
    "proc",
59
    "unit_code",
60
    "network_code",
61
    "network_addr",
62
    "unit_id",
63
    "device_id",
64
    "time",
65
    "profile",
66
    "data",
67
    "extension",
68
];
69
const TABLE_INIT_SQL: &'static str = "\
70
    CREATE TABLE IF NOT EXISTS network_uldata (\
71
    data_id TEXT NOT NULL UNIQUE,\
72
    proc INTEGER NOT NULL,\
73
    unit_code TEXT NOT NULL,\
74
    network_code TEXT NOT NULL,\
75
    network_addr TEXT NOT NULL,\
76
    unit_id TEXT NOT NULL,\
77
    device_id TEXT NOT NULL,\
78
    time INTEGER NOT NULL,\
79
    profile TEXT NOT NULL,\
80
    data TEXT NOT NULL,\
81
    extension TEXT NOT NULL,\
82
    PRIMARY KEY (data_id))";
83

            
84
impl Model {
85
    /// To create the model instance with a database connection.
86
12
    pub async fn new(conn: Arc<SqlitePool>) -> Result<Self, Box<dyn StdError>> {
87
12
        let model = Model { conn };
88
12
        model.init().await?;
89
12
        Ok(model)
90
12
    }
91
}
92

            
93
#[async_trait]
94
impl NetworkUlDataModel for Model {
95
20
    async fn init(&self) -> Result<(), Box<dyn StdError>> {
96
20
        let _ = sqlx::query(TABLE_INIT_SQL)
97
20
            .execute(self.conn.as_ref())
98
20
            .await?;
99
20
        Ok(())
100
40
    }
101

            
102
37
    async fn count(&self, cond: &ListQueryCond) -> Result<u64, Box<dyn StdError>> {
103
37
        let sql = build_list_where(SqlBuilder::select_from(TABLE_NAME).count("*"), &cond).sql()?;
104

            
105
37
        let result: Result<CountSchema, sqlx::Error> = sqlx::query_as(sql.as_str())
106
37
            .fetch_one(self.conn.as_ref())
107
37
            .await;
108

            
109
37
        let row = match result {
110
            Err(e) => return Err(Box::new(e)),
111
37
            Ok(row) => row,
112
37
        };
113
37
        Ok(row.count as u64)
114
74
    }
115

            
116
    async fn list(
117
        &self,
118
        opts: &ListOptions,
119
        cursor: Option<Box<dyn Cursor>>,
120
92
    ) -> Result<(Vec<NetworkUlData>, Option<Box<dyn Cursor>>), Box<dyn StdError>> {
121
92
        let mut cursor = match cursor {
122
79
            None => Box::new(DbCursor::new()),
123
13
            Some(cursor) => cursor,
124
        };
125

            
126
92
        let mut opts = ListOptions { ..*opts };
127
92
        if let Some(offset) = opts.offset {
128
19
            opts.offset = Some(offset + cursor.offset());
129
73
        } else {
130
73
            opts.offset = Some(cursor.offset());
131
73
        }
132
92
        let opts_limit = opts.limit;
133
92
        if let Some(limit) = opts_limit {
134
69
            if limit > 0 {
135
63
                if cursor.offset() >= limit {
136
3
                    return Ok((vec![], None));
137
60
                }
138
60
                opts.limit = Some(limit - cursor.offset());
139
6
            }
140
23
        }
141
89
        let mut builder = SqlBuilder::select_from(TABLE_NAME);
142
89
        build_limit_offset(&mut builder, &opts);
143
89
        build_sort(&mut builder, &opts);
144
89
        let sql = build_list_where(&mut builder, opts.cond).sql()?;
145

            
146
89
        let mut rows = sqlx::query_as::<_, Schema>(sql.as_str()).fetch(self.conn.as_ref());
147
89

            
148
89
        let mut count: u64 = 0;
149
89
        let mut list = vec![];
150
1197
        while let Some(row) = rows.try_next().await? {
151
1130
            let _ = cursor.as_mut().try_next().await?;
152
1130
            list.push(NetworkUlData {
153
1130
                data_id: row.data_id,
154
1130
                proc: Utc.timestamp_nanos(row.proc * 1000000),
155
1130
                unit_code: match row.unit_code.len() {
156
738
                    0 => None,
157
392
                    _ => Some(row.unit_code),
158
                },
159
1130
                network_code: row.network_code,
160
1130
                network_addr: row.network_addr,
161
1130
                unit_id: match row.unit_id.len() {
162
739
                    0 => None,
163
391
                    _ => Some(row.unit_id),
164
                },
165
1130
                device_id: match row.device_id.len() {
166
739
                    0 => None,
167
391
                    _ => Some(row.device_id),
168
                },
169
1130
                time: Utc.timestamp_nanos(row.time * 1000000),
170
1130
                profile: row.profile,
171
1130
                data: row.data,
172
1130
                extension: match row.extension.len() {
173
875
                    0 => None,
174
255
                    _ => serde_json::from_str(row.extension.as_str())?,
175
                },
176
            });
177
1130
            if let Some(limit) = opts_limit {
178
1059
                if limit > 0 && cursor.offset() >= limit {
179
12
                    if let Some(cursor_max) = opts.cursor_max {
180
11
                        if (count + 1) >= cursor_max {
181
3
                            return Ok((list, Some(cursor)));
182
8
                        }
183
1
                    }
184
9
                    return Ok((list, None));
185
1047
                }
186
71
            }
187
1118
            if let Some(cursor_max) = opts.cursor_max {
188
1038
                count += 1;
189
1038
                if count >= cursor_max {
190
10
                    return Ok((list, Some(cursor)));
191
1028
                }
192
80
            }
193
        }
194
67
        Ok((list, None))
195
184
    }
196

            
197
463
    async fn add(&self, data: &NetworkUlData) -> Result<(), Box<dyn StdError>> {
198
463
        let unit_code = match data.unit_code.as_deref() {
199
214
            None => quote(""),
200
249
            Some(value) => quote(value),
201
        };
202
463
        let unit_id = match data.unit_id.as_deref() {
203
213
            None => quote(""),
204
250
            Some(value) => quote(value),
205
        };
206
463
        let device_id = match data.device_id.as_deref() {
207
213
            None => quote(""),
208
250
            Some(value) => quote(value),
209
        };
210
463
        let extension = match data.extension.as_ref() {
211
250
            None => quote(""),
212
213
            Some(extension) => match serde_json::to_string(extension) {
213
                Err(_) => quote("{}"),
214
213
                Ok(value) => quote(value.as_str()),
215
            },
216
        };
217
463
        let values = vec![
218
463
            quote(data.data_id.as_str()),
219
463
            data.proc.timestamp_millis().to_string(),
220
463
            unit_code,
221
463
            quote(data.network_code.as_str()),
222
463
            quote(data.network_addr.as_str()),
223
463
            unit_id,
224
463
            device_id,
225
463
            data.time.timestamp_millis().to_string(),
226
463
            quote(data.profile.as_str()),
227
463
            quote(data.data.as_str()),
228
463
            extension,
229
463
        ];
230
463
        let sql = SqlBuilder::insert_into(TABLE_NAME)
231
463
            .fields(FIELDS)
232
463
            .values(&values)
233
463
            .sql()?;
234
463
        let _ = sqlx::query(sql.as_str())
235
463
            .execute(self.conn.as_ref())
236
463
            .await?;
237
462
        Ok(())
238
926
    }
239

            
240
41
    async fn del(&self, cond: &QueryCond) -> Result<(), Box<dyn StdError>> {
241
41
        let sql = build_where(&mut SqlBuilder::delete_from(TABLE_NAME), cond).sql()?;
242
41
        let _ = sqlx::query(sql.as_str())
243
41
            .execute(self.conn.as_ref())
244
41
            .await?;
245
41
        Ok(())
246
82
    }
247
}
248

            
249
impl DbCursor {
250
    /// To create the cursor instance.
251
79
    pub fn new() -> Self {
252
79
        DbCursor { offset: 0 }
253
79
    }
254
}
255

            
256
#[async_trait]
257
impl Cursor for DbCursor {
258
1130
    async fn try_next(&mut self) -> Result<Option<NetworkUlData>, Box<dyn StdError>> {
259
1130
        self.offset += 1;
260
1130
        Ok(None)
261
2260
    }
262

            
263
875
    fn offset(&self) -> u64 {
264
875
        self.offset
265
875
    }
266
}
267

            
268
/// Transforms query conditions to the SQL builder.
269
41
fn build_where<'a>(builder: &'a mut SqlBuilder, cond: &QueryCond<'a>) -> &'a mut SqlBuilder {
270
41
    if let Some(value) = cond.unit_id {
271
3
        builder.and_where_eq("unit_id", quote(value));
272
38
    }
273
41
    if let Some(value) = cond.device_id {
274
1
        builder.and_where_eq("device_id", quote(value));
275
40
    }
276
41
    if let Some(value) = cond.proc_gte {
277
1
        builder.and_where_ge("proc", value.timestamp_millis());
278
40
    }
279
41
    if let Some(value) = cond.proc_lte {
280
1
        builder.and_where_le("proc", value.timestamp_millis());
281
40
    }
282
41
    builder
283
41
}
284

            
285
/// Transforms query conditions to the SQL builder.
286
126
fn build_list_where<'a>(
287
126
    builder: &'a mut SqlBuilder,
288
126
    cond: &ListQueryCond<'a>,
289
126
) -> &'a mut SqlBuilder {
290
126
    if let Some(value) = cond.unit_id {
291
46
        builder.and_where_eq("unit_id", quote(value));
292
80
    }
293
126
    if let Some(value) = cond.device_id {
294
14
        builder.and_where_eq("device_id", quote(value));
295
112
    }
296
126
    if let Some(value) = cond.network_code {
297
8
        builder.and_where_eq("network_code", quote(value));
298
118
    }
299
126
    if let Some(value) = cond.network_addr {
300
6
        builder.and_where_eq("network_addr", quote(value));
301
120
    }
302
126
    if let Some(value) = cond.profile {
303
6
        builder.and_where_eq("profile", quote(value));
304
120
    }
305
126
    if let Some(value) = cond.proc_gte {
306
31
        builder.and_where_ge("proc", value.timestamp_millis());
307
95
    }
308
126
    if let Some(value) = cond.proc_lte {
309
8
        builder.and_where_le("proc", value.timestamp_millis());
310
118
    }
311
126
    if let Some(value) = cond.time_gte {
312
14
        builder.and_where_ge("time", value.timestamp_millis());
313
112
    }
314
126
    if let Some(value) = cond.time_lte {
315
8
        builder.and_where_le("time", value.timestamp_millis());
316
118
    }
317
126
    builder
318
126
}
319

            
320
/// Transforms model options to the SQL builder.
321
89
fn build_limit_offset<'a>(builder: &'a mut SqlBuilder, opts: &ListOptions) -> &'a mut SqlBuilder {
322
89
    if let Some(value) = opts.limit {
323
66
        if value > 0 {
324
60
            builder.limit(value);
325
60
        }
326
23
    }
327
89
    if let Some(value) = opts.offset {
328
89
        match opts.limit {
329
23
            None => builder.limit(-1).offset(value),
330
6
            Some(0) => builder.limit(-1).offset(value),
331
60
            _ => builder.offset(value),
332
        };
333
    }
334
89
    builder
335
89
}
336

            
337
/// Transforms model options to the SQL builder.
338
89
fn build_sort<'a>(builder: &'a mut SqlBuilder, opts: &ListOptions) -> &'a mut SqlBuilder {
339
89
    if let Some(sort_cond) = opts.sort.as_ref() {
340
93
        for cond in sort_cond.iter() {
341
93
            let key = match cond.key {
342
73
                SortKey::Proc => "proc",
343
4
                SortKey::Time => "time",
344
8
                SortKey::NetworkCode => "network_code",
345
8
                SortKey::NetworkAddr => "network_addr",
346
            };
347
93
            builder.order_by(key, !cond.asc);
348
        }
349
12
    }
350
89
    builder
351
89
}