1
use std::{error::Error as StdError, sync::Arc};
2

            
3
use async_trait::async_trait;
4
use chrono::{TimeZone, Utc};
5
use futures::TryStreamExt;
6
use sql_builder::{quote, SqlBuilder};
7
use sqlx::SqlitePool;
8

            
9
use super::super::network_uldata::{
10
    Cursor, ListOptions, ListQueryCond, NetworkUlData, NetworkUlDataModel, QueryCond, SortKey,
11
};
12

            
13
/// Model instance.
14
pub struct Model {
15
    /// The associated database connection.
16
    conn: Arc<SqlitePool>,
17
}
18

            
19
/// Cursor instance.
20
///
21
/// The SQLite implementation uses the original list options and the progress offset.
22
pub struct DbCursor {
23
    offset: u64,
24
}
25

            
26
/// SQLite schema.
27
#[derive(sqlx::FromRow)]
28
struct Schema {
29
    pub data_id: String,
30
    /// i64 as time tick from Epoch in milliseconds.
31
    pub proc: i64,
32
    /// use empty string as NULL.
33
    pub unit_code: String,
34
    pub network_code: String,
35
    pub network_addr: String,
36
    /// use empty string as NULL.
37
    pub unit_id: String,
38
    /// use empty string as NULL.
39
    pub device_id: String,
40
    /// i64 as time tick from Epoch in milliseconds.
41
    pub time: i64,
42
    pub profile: String,
43
    pub data: String,
44
    /// use empty string as NULL.
45
    pub extension: String,
46
}
47

            
48
/// Use "COUNT(*)" instead of "COUNT(fields...)" to simplify the implementation.
49
#[derive(sqlx::FromRow)]
50
struct CountSchema {
51
    #[sqlx(rename = "COUNT(*)")]
52
    count: i64,
53
}
54

            
55
const TABLE_NAME: &'static str = "network_uldata";
56
const FIELDS: &'static [&'static str] = &[
57
    "data_id",
58
    "proc",
59
    "unit_code",
60
    "network_code",
61
    "network_addr",
62
    "unit_id",
63
    "device_id",
64
    "time",
65
    "profile",
66
    "data",
67
    "extension",
68
];
69
const TABLE_INIT_SQL: &'static str = "\
70
    CREATE TABLE IF NOT EXISTS network_uldata (\
71
    data_id TEXT NOT NULL UNIQUE,\
72
    proc INTEGER NOT NULL,\
73
    unit_code TEXT NOT NULL,\
74
    network_code TEXT NOT NULL,\
75
    network_addr TEXT NOT NULL,\
76
    unit_id TEXT NOT NULL,\
77
    device_id TEXT NOT NULL,\
78
    time INTEGER NOT NULL,\
79
    profile TEXT NOT NULL,\
80
    data TEXT NOT NULL,\
81
    extension TEXT NOT NULL,\
82
    PRIMARY KEY (data_id))";
83

            
84
impl Model {
85
    /// To create the model instance with a database connection.
86
24
    pub async fn new(conn: Arc<SqlitePool>) -> Result<Self, Box<dyn StdError>> {
87
24
        let model = Model { conn };
88
24
        model.init().await?;
89
24
        Ok(model)
90
24
    }
91
}
92

            
93
#[async_trait]
94
impl NetworkUlDataModel for Model {
95
40
    async fn init(&self) -> Result<(), Box<dyn StdError>> {
96
40
        let _ = sqlx::query(TABLE_INIT_SQL)
97
40
            .execute(self.conn.as_ref())
98
40
            .await?;
99
40
        Ok(())
100
80
    }
101

            
102
74
    async fn count(&self, cond: &ListQueryCond) -> Result<u64, Box<dyn StdError>> {
103
74
        let sql = build_list_where(SqlBuilder::select_from(TABLE_NAME).count("*"), &cond).sql()?;
104

            
105
74
        let result: Result<CountSchema, sqlx::Error> = sqlx::query_as(sql.as_str())
106
74
            .fetch_one(self.conn.as_ref())
107
74
            .await;
108

            
109
74
        let row = match result {
110
            Err(e) => return Err(Box::new(e)),
111
74
            Ok(row) => row,
112
74
        };
113
74
        Ok(row.count as u64)
114
148
    }
115

            
116
    async fn list(
117
        &self,
118
        opts: &ListOptions,
119
        cursor: Option<Box<dyn Cursor>>,
120
184
    ) -> Result<(Vec<NetworkUlData>, Option<Box<dyn Cursor>>), Box<dyn StdError>> {
121
184
        let mut cursor = match cursor {
122
158
            None => Box::new(DbCursor::new()),
123
26
            Some(cursor) => cursor,
124
        };
125

            
126
184
        let mut opts = ListOptions { ..*opts };
127
184
        if let Some(offset) = opts.offset {
128
38
            opts.offset = Some(offset + cursor.offset());
129
146
        } else {
130
146
            opts.offset = Some(cursor.offset());
131
146
        }
132
184
        let opts_limit = opts.limit;
133
184
        if let Some(limit) = opts_limit {
134
138
            if limit > 0 {
135
126
                if cursor.offset() >= limit {
136
6
                    return Ok((vec![], None));
137
120
                }
138
120
                opts.limit = Some(limit - cursor.offset());
139
12
            }
140
46
        }
141
178
        let mut builder = SqlBuilder::select_from(TABLE_NAME);
142
178
        build_limit_offset(&mut builder, &opts);
143
178
        build_sort(&mut builder, &opts);
144
178
        let sql = build_list_where(&mut builder, opts.cond).sql()?;
145

            
146
178
        let mut rows = sqlx::query_as::<_, Schema>(sql.as_str()).fetch(self.conn.as_ref());
147
178

            
148
178
        let mut count: u64 = 0;
149
178
        let mut list = vec![];
150
2394
        while let Some(row) = rows.try_next().await? {
151
2260
            let _ = cursor.as_mut().try_next().await?;
152
2260
            list.push(NetworkUlData {
153
2260
                data_id: row.data_id,
154
2260
                proc: Utc.timestamp_nanos(row.proc * 1000000),
155
2260
                unit_code: match row.unit_code.len() {
156
1476
                    0 => None,
157
784
                    _ => Some(row.unit_code),
158
                },
159
2260
                network_code: row.network_code,
160
2260
                network_addr: row.network_addr,
161
2260
                unit_id: match row.unit_id.len() {
162
1478
                    0 => None,
163
782
                    _ => Some(row.unit_id),
164
                },
165
2260
                device_id: match row.device_id.len() {
166
1478
                    0 => None,
167
782
                    _ => Some(row.device_id),
168
                },
169
2260
                time: Utc.timestamp_nanos(row.time * 1000000),
170
2260
                profile: row.profile,
171
2260
                data: row.data,
172
2260
                extension: match row.extension.len() {
173
1750
                    0 => None,
174
510
                    _ => serde_json::from_str(row.extension.as_str())?,
175
                },
176
            });
177
2260
            if let Some(limit) = opts_limit {
178
2118
                if limit > 0 && cursor.offset() >= limit {
179
24
                    if let Some(cursor_max) = opts.cursor_max {
180
22
                        if (count + 1) >= cursor_max {
181
6
                            return Ok((list, Some(cursor)));
182
16
                        }
183
2
                    }
184
18
                    return Ok((list, None));
185
2094
                }
186
142
            }
187
2236
            if let Some(cursor_max) = opts.cursor_max {
188
2076
                count += 1;
189
2076
                if count >= cursor_max {
190
20
                    return Ok((list, Some(cursor)));
191
2056
                }
192
160
            }
193
        }
194
134
        Ok((list, None))
195
368
    }
196

            
197
926
    async fn add(&self, data: &NetworkUlData) -> Result<(), Box<dyn StdError>> {
198
926
        let unit_code = match data.unit_code.as_deref() {
199
428
            None => quote(""),
200
498
            Some(value) => quote(value),
201
        };
202
926
        let unit_id = match data.unit_id.as_deref() {
203
426
            None => quote(""),
204
500
            Some(value) => quote(value),
205
        };
206
926
        let device_id = match data.device_id.as_deref() {
207
426
            None => quote(""),
208
500
            Some(value) => quote(value),
209
        };
210
926
        let extension = match data.extension.as_ref() {
211
500
            None => quote(""),
212
426
            Some(extension) => match serde_json::to_string(extension) {
213
                Err(_) => quote("{}"),
214
426
                Ok(value) => quote(value.as_str()),
215
            },
216
        };
217
926
        let values = vec![
218
926
            quote(data.data_id.as_str()),
219
926
            data.proc.timestamp_millis().to_string(),
220
926
            unit_code,
221
926
            quote(data.network_code.as_str()),
222
926
            quote(data.network_addr.as_str()),
223
926
            unit_id,
224
926
            device_id,
225
926
            data.time.timestamp_millis().to_string(),
226
926
            quote(data.profile.as_str()),
227
926
            quote(data.data.as_str()),
228
926
            extension,
229
926
        ];
230
926
        let sql = SqlBuilder::insert_into(TABLE_NAME)
231
926
            .fields(FIELDS)
232
926
            .values(&values)
233
926
            .sql()?;
234
926
        let _ = sqlx::query(sql.as_str())
235
926
            .execute(self.conn.as_ref())
236
926
            .await?;
237
924
        Ok(())
238
1852
    }
239

            
240
82
    async fn del(&self, cond: &QueryCond) -> Result<(), Box<dyn StdError>> {
241
82
        let sql = build_where(&mut SqlBuilder::delete_from(TABLE_NAME), cond).sql()?;
242
82
        let _ = sqlx::query(sql.as_str())
243
82
            .execute(self.conn.as_ref())
244
82
            .await?;
245
82
        Ok(())
246
164
    }
247
}
248

            
249
impl DbCursor {
250
    /// To create the cursor instance.
251
158
    pub fn new() -> Self {
252
158
        DbCursor { offset: 0 }
253
158
    }
254
}
255

            
256
#[async_trait]
257
impl Cursor for DbCursor {
258
2260
    async fn try_next(&mut self) -> Result<Option<NetworkUlData>, Box<dyn StdError>> {
259
2260
        self.offset += 1;
260
2260
        Ok(None)
261
4520
    }
262

            
263
1750
    fn offset(&self) -> u64 {
264
1750
        self.offset
265
1750
    }
266
}
267

            
268
/// Transforms query conditions to the SQL builder.
269
82
fn build_where<'a>(builder: &'a mut SqlBuilder, cond: &QueryCond<'a>) -> &'a mut SqlBuilder {
270
82
    if let Some(value) = cond.unit_id {
271
6
        builder.and_where_eq("unit_id", quote(value));
272
76
    }
273
82
    if let Some(value) = cond.device_id {
274
2
        builder.and_where_eq("device_id", quote(value));
275
80
    }
276
82
    if let Some(value) = cond.proc_gte {
277
2
        builder.and_where_ge("proc", value.timestamp_millis());
278
80
    }
279
82
    if let Some(value) = cond.proc_lte {
280
2
        builder.and_where_le("proc", value.timestamp_millis());
281
80
    }
282
82
    builder
283
82
}
284

            
285
/// Transforms query conditions to the SQL builder.
286
252
fn build_list_where<'a>(
287
252
    builder: &'a mut SqlBuilder,
288
252
    cond: &ListQueryCond<'a>,
289
252
) -> &'a mut SqlBuilder {
290
252
    if let Some(value) = cond.unit_id {
291
92
        builder.and_where_eq("unit_id", quote(value));
292
160
    }
293
252
    if let Some(value) = cond.device_id {
294
28
        builder.and_where_eq("device_id", quote(value));
295
224
    }
296
252
    if let Some(value) = cond.network_code {
297
16
        builder.and_where_eq("network_code", quote(value));
298
236
    }
299
252
    if let Some(value) = cond.network_addr {
300
12
        builder.and_where_eq("network_addr", quote(value));
301
240
    }
302
252
    if let Some(value) = cond.profile {
303
12
        builder.and_where_eq("profile", quote(value));
304
240
    }
305
252
    if let Some(value) = cond.proc_gte {
306
62
        builder.and_where_ge("proc", value.timestamp_millis());
307
190
    }
308
252
    if let Some(value) = cond.proc_lte {
309
16
        builder.and_where_le("proc", value.timestamp_millis());
310
236
    }
311
252
    if let Some(value) = cond.time_gte {
312
28
        builder.and_where_ge("time", value.timestamp_millis());
313
224
    }
314
252
    if let Some(value) = cond.time_lte {
315
16
        builder.and_where_le("time", value.timestamp_millis());
316
236
    }
317
252
    builder
318
252
}
319

            
320
/// Transforms model options to the SQL builder.
321
178
fn build_limit_offset<'a>(builder: &'a mut SqlBuilder, opts: &ListOptions) -> &'a mut SqlBuilder {
322
178
    if let Some(value) = opts.limit {
323
132
        if value > 0 {
324
120
            builder.limit(value);
325
120
        }
326
46
    }
327
178
    if let Some(value) = opts.offset {
328
178
        match opts.limit {
329
46
            None => builder.limit(-1).offset(value),
330
12
            Some(0) => builder.limit(-1).offset(value),
331
120
            _ => builder.offset(value),
332
        };
333
    }
334
178
    builder
335
178
}
336

            
337
/// Transforms model options to the SQL builder.
338
178
fn build_sort<'a>(builder: &'a mut SqlBuilder, opts: &ListOptions) -> &'a mut SqlBuilder {
339
178
    if let Some(sort_cond) = opts.sort.as_ref() {
340
186
        for cond in sort_cond.iter() {
341
186
            let key = match cond.key {
342
146
                SortKey::Proc => "proc",
343
8
                SortKey::Time => "time",
344
16
                SortKey::NetworkCode => "network_code",
345
16
                SortKey::NetworkAddr => "network_addr",
346
            };
347
186
            builder.order_by(key, !cond.asc);
348
        }
349
24
    }
350
178
    builder
351
178
}