1
use std::{error::Error as StdError, sync::Arc};
2

            
3
use async_trait::async_trait;
4
use chrono::{TimeZone, Utc};
5
use futures::TryStreamExt;
6
use sql_builder::{quote, SqlBuilder};
7
use sqlx::SqlitePool;
8

            
9
use super::super::application_dldata::{
10
    ApplicationDlData, ApplicationDlDataModel, Cursor, ListOptions, ListQueryCond, QueryCond,
11
    SortKey, UpdateQueryCond, Updates,
12
};
13

            
14
/// Model instance.
15
pub struct Model {
16
    /// The associated database connection.
17
    conn: Arc<SqlitePool>,
18
}
19

            
20
/// Cursor instance.
21
///
22
/// The SQLite implementation uses the original list options and the progress offset.
23
pub struct DbCursor {
24
    offset: u64,
25
}
26

            
27
/// SQLite schema.
28
#[derive(sqlx::FromRow)]
29
struct Schema {
30
    pub data_id: String,
31
    /// i64 as time tick from Epoch in milliseconds.
32
    pub proc: i64,
33
    /// i64 as time tick from Epoch in milliseconds.
34
    pub resp: Option<i64>,
35
    pub status: i32,
36
    pub unit_id: String,
37
    /// use empty string as NULL.
38
    pub device_id: String,
39
    /// use empty string as NULL.
40
    pub network_code: String,
41
    /// use empty string as NULL.
42
    pub network_addr: String,
43
    pub profile: String,
44
    pub data: String,
45
    /// use empty string as NULL.
46
    pub extension: String,
47
}
48

            
49
/// Use "COUNT(*)" instead of "COUNT(fields...)" to simplify the implementation.
50
#[derive(sqlx::FromRow)]
51
struct CountSchema {
52
    #[sqlx(rename = "COUNT(*)")]
53
    count: i64,
54
}
55

            
56
const TABLE_NAME: &'static str = "application_dldata";
57
const FIELDS: &'static [&'static str] = &[
58
    "data_id",
59
    "proc",
60
    "resp",
61
    "status",
62
    "unit_id",
63
    "device_id",
64
    "network_code",
65
    "network_addr",
66
    "profile",
67
    "data",
68
    "extension",
69
];
70
const TABLE_INIT_SQL: &'static str = "\
71
    CREATE TABLE IF NOT EXISTS application_dldata (\
72
    data_id TEXT NOT NULL UNIQUE,\
73
    proc INTEGER NOT NULL,\
74
    resp INTEGER,\
75
    status INTEGER NOT NULL,\
76
    unit_id TEXT NOT NULL,\
77
    device_id TEXT NOT NULL,\
78
    network_code TEXT NOT NULL,\
79
    network_addr TEXT NOT NULL,\
80
    profile TEXT NOT NULL,\
81
    data TEXT NOT NULL,\
82
    extension TEXT NOT NULL,\
83
    PRIMARY KEY (data_id))";
84

            
85
impl Model {
86
    /// To create the model instance with a database connection.
87
12
    pub async fn new(conn: Arc<SqlitePool>) -> Result<Self, Box<dyn StdError>> {
88
12
        let model = Model { conn };
89
24
        model.init().await?;
90
12
        Ok(model)
91
12
    }
92
}
93

            
94
#[async_trait]
95
impl ApplicationDlDataModel for Model {
96
21
    async fn init(&self) -> Result<(), Box<dyn StdError>> {
97
21
        let _ = sqlx::query(TABLE_INIT_SQL)
98
21
            .execute(self.conn.as_ref())
99
42
            .await?;
100
21
        Ok(())
101
42
    }
102

            
103
37
    async fn count(&self, cond: &ListQueryCond) -> Result<u64, Box<dyn StdError>> {
104
37
        let sql = build_list_where(SqlBuilder::select_from(TABLE_NAME).count("*"), &cond).sql()?;
105

            
106
37
        let result: Result<CountSchema, sqlx::Error> = sqlx::query_as(sql.as_str())
107
37
            .fetch_one(self.conn.as_ref())
108
74
            .await;
109

            
110
37
        let row = match result {
111
            Err(e) => return Err(Box::new(e)),
112
37
            Ok(row) => row,
113
37
        };
114
37
        Ok(row.count as u64)
115
74
    }
116

            
117
    async fn list(
118
        &self,
119
        opts: &ListOptions,
120
        cursor: Option<Box<dyn Cursor>>,
121
92
    ) -> Result<(Vec<ApplicationDlData>, Option<Box<dyn Cursor>>), Box<dyn StdError>> {
122
92
        let mut cursor = match cursor {
123
79
            None => Box::new(DbCursor::new()),
124
13
            Some(cursor) => cursor,
125
        };
126

            
127
92
        let mut opts = ListOptions { ..*opts };
128
92
        if let Some(offset) = opts.offset {
129
19
            opts.offset = Some(offset + cursor.offset());
130
73
        } else {
131
73
            opts.offset = Some(cursor.offset());
132
73
        }
133
92
        let opts_limit = opts.limit;
134
92
        if let Some(limit) = opts_limit {
135
69
            if limit > 0 {
136
63
                if cursor.offset() >= limit {
137
3
                    return Ok((vec![], None));
138
60
                }
139
60
                opts.limit = Some(limit - cursor.offset());
140
6
            }
141
23
        }
142
89
        let mut builder = SqlBuilder::select_from(TABLE_NAME);
143
89
        build_limit_offset(&mut builder, &opts);
144
89
        build_sort(&mut builder, &opts);
145
89
        let sql = build_list_where(&mut builder, opts.cond).sql()?;
146

            
147
89
        let mut rows = sqlx::query_as::<_, Schema>(sql.as_str()).fetch(self.conn.as_ref());
148
89

            
149
89
        let mut count: u64 = 0;
150
89
        let mut list = vec![];
151
1207
        while let Some(row) = rows.try_next().await? {
152
1140
            let _ = cursor.as_mut().try_next().await?;
153
1140
            list.push(ApplicationDlData {
154
1140
                data_id: row.data_id,
155
1140
                proc: Utc.timestamp_nanos(row.proc * 1000000),
156
1140
                resp: match row.resp {
157
749
                    None => None,
158
391
                    Some(resp) => Some(Utc.timestamp_nanos(resp * 1000000)),
159
                },
160
1140
                status: row.status,
161
1140
                unit_id: row.unit_id,
162
1140
                device_id: match row.device_id.len() {
163
885
                    0 => None,
164
255
                    _ => Some(row.device_id),
165
                },
166
1140
                network_code: match row.network_code.len() {
167
738
                    0 => None,
168
402
                    _ => Some(row.network_code),
169
                },
170
1140
                network_addr: match row.network_addr.len() {
171
738
                    0 => None,
172
402
                    _ => Some(row.network_addr),
173
                },
174
1140
                profile: row.profile,
175
1140
                data: row.data,
176
1140
                extension: match row.extension.len() {
177
885
                    0 => None,
178
255
                    _ => serde_json::from_str(row.extension.as_str())?,
179
                },
180
            });
181
1140
            if let Some(limit) = opts_limit {
182
1068
                if limit > 0 && cursor.offset() >= limit {
183
12
                    if let Some(cursor_max) = opts.cursor_max {
184
11
                        if (count + 1) >= cursor_max {
185
3
                            return Ok((list, Some(cursor)));
186
8
                        }
187
1
                    }
188
9
                    return Ok((list, None));
189
1056
                }
190
72
            }
191
1128
            if let Some(cursor_max) = opts.cursor_max {
192
1047
                count += 1;
193
1047
                if count >= cursor_max {
194
10
                    return Ok((list, Some(cursor)));
195
1037
                }
196
81
            }
197
        }
198
67
        Ok((list, None))
199
184
    }
200

            
201
476
    async fn add(&self, data: &ApplicationDlData) -> Result<(), Box<dyn StdError>> {
202
476
        let device_id = match data.device_id.as_deref() {
203
254
            None => quote(""),
204
222
            Some(value) => quote(value),
205
        };
206
476
        let network_code = match data.network_code.as_deref() {
207
222
            None => quote(""),
208
254
            Some(value) => quote(value),
209
        };
210
476
        let network_addr = match data.network_addr.as_deref() {
211
222
            None => quote(""),
212
254
            Some(value) => quote(value),
213
        };
214
476
        let extension = match data.extension.as_ref() {
215
263
            None => quote(""),
216
213
            Some(extension) => match serde_json::to_string(extension) {
217
                Err(_) => quote("{}"),
218
213
                Ok(value) => quote(value.as_str()),
219
            },
220
        };
221
476
        let values = vec![
222
476
            quote(data.data_id.as_str()),
223
476
            data.proc.timestamp_millis().to_string(),
224
476
            match data.resp {
225
241
                None => "NULL".to_string(),
226
235
                Some(resp) => resp.timestamp_millis().to_string(),
227
            },
228
476
            data.status.to_string(),
229
476
            quote(data.unit_id.as_str()),
230
476
            device_id,
231
476
            network_code,
232
476
            network_addr,
233
476
            quote(data.profile.as_str()),
234
476
            quote(data.data.as_str()),
235
476
            extension,
236
        ];
237
476
        let sql = SqlBuilder::insert_into(TABLE_NAME)
238
476
            .fields(FIELDS)
239
476
            .values(&values)
240
476
            .sql()?;
241
476
        let _ = sqlx::query(sql.as_str())
242
476
            .execute(self.conn.as_ref())
243
953
            .await?;
244
475
        Ok(())
245
952
    }
246

            
247
42
    async fn del(&self, cond: &QueryCond) -> Result<(), Box<dyn StdError>> {
248
42
        let sql = build_where(&mut SqlBuilder::delete_from(TABLE_NAME), cond).sql()?;
249
42
        let _ = sqlx::query(sql.as_str())
250
42
            .execute(self.conn.as_ref())
251
83
            .await?;
252
42
        Ok(())
253
84
    }
254

            
255
    async fn update(
256
        &self,
257
        cond: &UpdateQueryCond,
258
        updates: &Updates,
259
25
    ) -> Result<(), Box<dyn StdError>> {
260
25
        let sql = match build_update_where(&mut SqlBuilder::update_table(TABLE_NAME), cond, updates)
261
        {
262
            None => return Ok(()),
263
25
            Some(builder) => builder.sql()?,
264
        };
265
25
        let _ = sqlx::query(sql.as_str())
266
25
            .execute(self.conn.as_ref())
267
50
            .await?;
268
25
        Ok(())
269
50
    }
270
}
271

            
272
impl DbCursor {
273
    /// To create the cursor instance.
274
79
    pub fn new() -> Self {
275
79
        DbCursor { offset: 0 }
276
79
    }
277
}
278

            
279
#[async_trait]
280
impl Cursor for DbCursor {
281
1140
    async fn try_next(&mut self) -> Result<Option<ApplicationDlData>, Box<dyn StdError>> {
282
1140
        self.offset += 1;
283
1140
        Ok(None)
284
2280
    }
285

            
286
884
    fn offset(&self) -> u64 {
287
884
        self.offset
288
884
    }
289
}
290

            
291
/// Transforms query conditions to the SQL builder.
292
42
fn build_where<'a>(builder: &'a mut SqlBuilder, cond: &QueryCond<'a>) -> &'a mut SqlBuilder {
293
42
    if let Some(value) = cond.unit_id {
294
3
        builder.and_where_eq("unit_id", quote(value));
295
39
    }
296
42
    if let Some(value) = cond.device_id {
297
1
        builder.and_where_eq("device_id", quote(value));
298
41
    }
299
42
    if let Some(value) = cond.network_code {
300
1
        builder.and_where_eq("network_code", quote(value));
301
41
    }
302
42
    if let Some(value) = cond.network_addr {
303
1
        builder.and_where_eq("network_addr", quote(value));
304
41
    }
305
42
    if let Some(value) = cond.proc_gte {
306
1
        builder.and_where_ge("proc", value.timestamp_millis());
307
41
    }
308
42
    if let Some(value) = cond.proc_lte {
309
1
        builder.and_where_le("proc", value.timestamp_millis());
310
41
    }
311
42
    builder
312
42
}
313

            
314
/// Transforms query conditions to the SQL builder.
315
126
fn build_list_where<'a>(
316
126
    builder: &'a mut SqlBuilder,
317
126
    cond: &ListQueryCond<'a>,
318
126
) -> &'a mut SqlBuilder {
319
126
    if let Some(value) = cond.unit_id {
320
46
        builder.and_where_eq("unit_id", quote(value));
321
80
    }
322
126
    if let Some(value) = cond.device_id {
323
14
        builder.and_where_eq("device_id", quote(value));
324
112
    }
325
126
    if let Some(value) = cond.network_code {
326
8
        builder.and_where_eq("network_code", quote(value));
327
118
    }
328
126
    if let Some(value) = cond.network_addr {
329
6
        builder.and_where_eq("network_addr", quote(value));
330
120
    }
331
126
    if let Some(value) = cond.profile {
332
6
        builder.and_where_eq("profile", quote(value));
333
120
    }
334
126
    if let Some(value) = cond.proc_gte {
335
31
        builder.and_where_ge("proc", value.timestamp_millis());
336
95
    }
337
126
    if let Some(value) = cond.proc_lte {
338
8
        builder.and_where_le("proc", value.timestamp_millis());
339
118
    }
340
126
    if let Some(value) = cond.resp_gte {
341
14
        builder.and_where_ge("resp", value.timestamp_millis());
342
112
    }
343
126
    if let Some(value) = cond.resp_lte {
344
8
        builder.and_where_le("resp", value.timestamp_millis());
345
118
    }
346
126
    builder
347
126
}
348

            
349
/// Transforms model options to the SQL builder.
350
89
fn build_limit_offset<'a>(builder: &'a mut SqlBuilder, opts: &ListOptions) -> &'a mut SqlBuilder {
351
89
    if let Some(value) = opts.limit {
352
66
        if value > 0 {
353
60
            builder.limit(value);
354
60
        }
355
23
    }
356
89
    if let Some(value) = opts.offset {
357
89
        match opts.limit {
358
23
            None => builder.limit(-1).offset(value),
359
6
            Some(0) => builder.limit(-1).offset(value),
360
60
            _ => builder.offset(value),
361
        };
362
    }
363
89
    builder
364
89
}
365

            
366
/// Transforms model options to the SQL builder.
367
89
fn build_sort<'a>(builder: &'a mut SqlBuilder, opts: &ListOptions) -> &'a mut SqlBuilder {
368
89
    if let Some(sort_cond) = opts.sort.as_ref() {
369
93
        for cond in sort_cond.iter() {
370
93
            let key = match cond.key {
371
73
                SortKey::Proc => "proc",
372
4
                SortKey::Resp => "resp",
373
8
                SortKey::NetworkCode => "network_code",
374
8
                SortKey::NetworkAddr => "network_addr",
375
            };
376
93
            builder.order_by(key, !cond.asc);
377
        }
378
12
    }
379
89
    builder
380
89
}
381

            
382
/// Transforms query conditions and the model object to the SQL builder.
383
25
fn build_update_where<'a>(
384
25
    builder: &'a mut SqlBuilder,
385
25
    cond: &UpdateQueryCond<'a>,
386
25
    updates: &Updates,
387
25
) -> Option<&'a mut SqlBuilder> {
388
25
    builder.set("resp", updates.resp.timestamp_millis());
389
25
    builder.set("status", updates.status);
390
25
    builder.and_where_eq("data_id", quote(cond.data_id));
391
25
    if updates.status >= 0 {
392
14
        builder.and_where_ne("status", 0);
393
14
    } else {
394
11
        builder.and_where_lt("status", updates.status);
395
11
    }
396
25
    Some(builder)
397
25
}