1
use std::{error::Error as StdError, sync::Arc};
2

            
3
use async_trait::async_trait;
4
use chrono::{TimeZone, Utc};
5
use futures::TryStreamExt;
6
use sql_builder::{quote, SqlBuilder};
7
use sqlx::SqlitePool;
8

            
9
use super::super::application_dldata::{
10
    ApplicationDlData, ApplicationDlDataModel, Cursor, ListOptions, ListQueryCond, QueryCond,
11
    SortKey, UpdateQueryCond, Updates,
12
};
13

            
14
/// Model instance.
15
pub struct Model {
16
    /// The associated database connection.
17
    conn: Arc<SqlitePool>,
18
}
19

            
20
/// Cursor instance.
21
///
22
/// The SQLite implementation uses the original list options and the progress offset.
23
pub struct DbCursor {
24
    offset: u64,
25
}
26

            
27
/// SQLite schema.
28
#[derive(sqlx::FromRow)]
29
struct Schema {
30
    pub data_id: String,
31
    /// i64 as time tick from Epoch in milliseconds.
32
    pub proc: i64,
33
    /// i64 as time tick from Epoch in milliseconds.
34
    pub resp: Option<i64>,
35
    pub status: i32,
36
    pub unit_id: String,
37
    /// use empty string as NULL.
38
    pub device_id: String,
39
    /// use empty string as NULL.
40
    pub network_code: String,
41
    /// use empty string as NULL.
42
    pub network_addr: String,
43
    pub profile: String,
44
    pub data: String,
45
    /// use empty string as NULL.
46
    pub extension: String,
47
}
48

            
49
/// Use "COUNT(*)" instead of "COUNT(fields...)" to simplify the implementation.
50
#[derive(sqlx::FromRow)]
51
struct CountSchema {
52
    #[sqlx(rename = "COUNT(*)")]
53
    count: i64,
54
}
55

            
56
const TABLE_NAME: &'static str = "application_dldata";
57
const FIELDS: &'static [&'static str] = &[
58
    "data_id",
59
    "proc",
60
    "resp",
61
    "status",
62
    "unit_id",
63
    "device_id",
64
    "network_code",
65
    "network_addr",
66
    "profile",
67
    "data",
68
    "extension",
69
];
70
const TABLE_INIT_SQL: &'static str = "\
71
    CREATE TABLE IF NOT EXISTS application_dldata (\
72
    data_id TEXT NOT NULL UNIQUE,\
73
    proc INTEGER NOT NULL,\
74
    resp INTEGER,\
75
    status INTEGER NOT NULL,\
76
    unit_id TEXT NOT NULL,\
77
    device_id TEXT NOT NULL,\
78
    network_code TEXT NOT NULL,\
79
    network_addr TEXT NOT NULL,\
80
    profile TEXT NOT NULL,\
81
    data TEXT NOT NULL,\
82
    extension TEXT NOT NULL,\
83
    PRIMARY KEY (data_id))";
84

            
85
impl Model {
86
    /// To create the model instance with a database connection.
87
24
    pub async fn new(conn: Arc<SqlitePool>) -> Result<Self, Box<dyn StdError>> {
88
24
        let model = Model { conn };
89
24
        model.init().await?;
90
24
        Ok(model)
91
24
    }
92
}
93

            
94
#[async_trait]
95
impl ApplicationDlDataModel for Model {
96
42
    async fn init(&self) -> Result<(), Box<dyn StdError>> {
97
42
        let _ = sqlx::query(TABLE_INIT_SQL)
98
42
            .execute(self.conn.as_ref())
99
42
            .await?;
100
42
        Ok(())
101
84
    }
102

            
103
74
    async fn count(&self, cond: &ListQueryCond) -> Result<u64, Box<dyn StdError>> {
104
74
        let sql = build_list_where(SqlBuilder::select_from(TABLE_NAME).count("*"), &cond).sql()?;
105

            
106
74
        let result: Result<CountSchema, sqlx::Error> = sqlx::query_as(sql.as_str())
107
74
            .fetch_one(self.conn.as_ref())
108
74
            .await;
109

            
110
74
        let row = match result {
111
            Err(e) => return Err(Box::new(e)),
112
74
            Ok(row) => row,
113
74
        };
114
74
        Ok(row.count as u64)
115
148
    }
116

            
117
    async fn list(
118
        &self,
119
        opts: &ListOptions,
120
        cursor: Option<Box<dyn Cursor>>,
121
184
    ) -> Result<(Vec<ApplicationDlData>, Option<Box<dyn Cursor>>), Box<dyn StdError>> {
122
184
        let mut cursor = match cursor {
123
158
            None => Box::new(DbCursor::new()),
124
26
            Some(cursor) => cursor,
125
        };
126

            
127
184
        let mut opts = ListOptions { ..*opts };
128
184
        if let Some(offset) = opts.offset {
129
38
            opts.offset = Some(offset + cursor.offset());
130
146
        } else {
131
146
            opts.offset = Some(cursor.offset());
132
146
        }
133
184
        let opts_limit = opts.limit;
134
184
        if let Some(limit) = opts_limit {
135
138
            if limit > 0 {
136
126
                if cursor.offset() >= limit {
137
6
                    return Ok((vec![], None));
138
120
                }
139
120
                opts.limit = Some(limit - cursor.offset());
140
12
            }
141
46
        }
142
178
        let mut builder = SqlBuilder::select_from(TABLE_NAME);
143
178
        build_limit_offset(&mut builder, &opts);
144
178
        build_sort(&mut builder, &opts);
145
178
        let sql = build_list_where(&mut builder, opts.cond).sql()?;
146

            
147
178
        let mut rows = sqlx::query_as::<_, Schema>(sql.as_str()).fetch(self.conn.as_ref());
148
178

            
149
178
        let mut count: u64 = 0;
150
178
        let mut list = vec![];
151
2414
        while let Some(row) = rows.try_next().await? {
152
2280
            let _ = cursor.as_mut().try_next().await?;
153
2280
            list.push(ApplicationDlData {
154
2280
                data_id: row.data_id,
155
2280
                proc: Utc.timestamp_nanos(row.proc * 1000000),
156
2280
                resp: match row.resp {
157
1498
                    None => None,
158
782
                    Some(resp) => Some(Utc.timestamp_nanos(resp * 1000000)),
159
                },
160
2280
                status: row.status,
161
2280
                unit_id: row.unit_id,
162
2280
                device_id: match row.device_id.len() {
163
1770
                    0 => None,
164
510
                    _ => Some(row.device_id),
165
                },
166
2280
                network_code: match row.network_code.len() {
167
1476
                    0 => None,
168
804
                    _ => Some(row.network_code),
169
                },
170
2280
                network_addr: match row.network_addr.len() {
171
1476
                    0 => None,
172
804
                    _ => Some(row.network_addr),
173
                },
174
2280
                profile: row.profile,
175
2280
                data: row.data,
176
2280
                extension: match row.extension.len() {
177
1770
                    0 => None,
178
510
                    _ => serde_json::from_str(row.extension.as_str())?,
179
                },
180
            });
181
2280
            if let Some(limit) = opts_limit {
182
2136
                if limit > 0 && cursor.offset() >= limit {
183
24
                    if let Some(cursor_max) = opts.cursor_max {
184
22
                        if (count + 1) >= cursor_max {
185
6
                            return Ok((list, Some(cursor)));
186
16
                        }
187
2
                    }
188
18
                    return Ok((list, None));
189
2112
                }
190
144
            }
191
2256
            if let Some(cursor_max) = opts.cursor_max {
192
2094
                count += 1;
193
2094
                if count >= cursor_max {
194
20
                    return Ok((list, Some(cursor)));
195
2074
                }
196
162
            }
197
        }
198
134
        Ok((list, None))
199
368
    }
200

            
201
952
    async fn add(&self, data: &ApplicationDlData) -> Result<(), Box<dyn StdError>> {
202
952
        let device_id = match data.device_id.as_deref() {
203
508
            None => quote(""),
204
444
            Some(value) => quote(value),
205
        };
206
952
        let network_code = match data.network_code.as_deref() {
207
444
            None => quote(""),
208
508
            Some(value) => quote(value),
209
        };
210
952
        let network_addr = match data.network_addr.as_deref() {
211
444
            None => quote(""),
212
508
            Some(value) => quote(value),
213
        };
214
952
        let extension = match data.extension.as_ref() {
215
526
            None => quote(""),
216
426
            Some(extension) => match serde_json::to_string(extension) {
217
                Err(_) => quote("{}"),
218
426
                Ok(value) => quote(value.as_str()),
219
            },
220
        };
221
952
        let values = vec![
222
952
            quote(data.data_id.as_str()),
223
952
            data.proc.timestamp_millis().to_string(),
224
952
            match data.resp {
225
482
                None => "NULL".to_string(),
226
470
                Some(resp) => resp.timestamp_millis().to_string(),
227
            },
228
952
            data.status.to_string(),
229
952
            quote(data.unit_id.as_str()),
230
952
            device_id,
231
952
            network_code,
232
952
            network_addr,
233
952
            quote(data.profile.as_str()),
234
952
            quote(data.data.as_str()),
235
952
            extension,
236
        ];
237
952
        let sql = SqlBuilder::insert_into(TABLE_NAME)
238
952
            .fields(FIELDS)
239
952
            .values(&values)
240
952
            .sql()?;
241
952
        let _ = sqlx::query(sql.as_str())
242
952
            .execute(self.conn.as_ref())
243
952
            .await?;
244
950
        Ok(())
245
1904
    }
246

            
247
84
    async fn del(&self, cond: &QueryCond) -> Result<(), Box<dyn StdError>> {
248
84
        let sql = build_where(&mut SqlBuilder::delete_from(TABLE_NAME), cond).sql()?;
249
84
        let _ = sqlx::query(sql.as_str())
250
84
            .execute(self.conn.as_ref())
251
84
            .await?;
252
84
        Ok(())
253
168
    }
254

            
255
    async fn update(
256
        &self,
257
        cond: &UpdateQueryCond,
258
        updates: &Updates,
259
50
    ) -> Result<(), Box<dyn StdError>> {
260
50
        let sql = match build_update_where(&mut SqlBuilder::update_table(TABLE_NAME), cond, updates)
261
        {
262
            None => return Ok(()),
263
50
            Some(builder) => builder.sql()?,
264
        };
265
50
        let _ = sqlx::query(sql.as_str())
266
50
            .execute(self.conn.as_ref())
267
50
            .await?;
268
50
        Ok(())
269
100
    }
270
}
271

            
272
impl DbCursor {
273
    /// To create the cursor instance.
274
158
    pub fn new() -> Self {
275
158
        DbCursor { offset: 0 }
276
158
    }
277
}
278

            
279
#[async_trait]
280
impl Cursor for DbCursor {
281
2280
    async fn try_next(&mut self) -> Result<Option<ApplicationDlData>, Box<dyn StdError>> {
282
2280
        self.offset += 1;
283
2280
        Ok(None)
284
4560
    }
285

            
286
1768
    fn offset(&self) -> u64 {
287
1768
        self.offset
288
1768
    }
289
}
290

            
291
/// Transforms query conditions to the SQL builder.
292
84
fn build_where<'a>(builder: &'a mut SqlBuilder, cond: &QueryCond<'a>) -> &'a mut SqlBuilder {
293
84
    if let Some(value) = cond.unit_id {
294
6
        builder.and_where_eq("unit_id", quote(value));
295
78
    }
296
84
    if let Some(value) = cond.device_id {
297
2
        builder.and_where_eq("device_id", quote(value));
298
82
    }
299
84
    if let Some(value) = cond.network_code {
300
2
        builder.and_where_eq("network_code", quote(value));
301
82
    }
302
84
    if let Some(value) = cond.network_addr {
303
2
        builder.and_where_eq("network_addr", quote(value));
304
82
    }
305
84
    if let Some(value) = cond.proc_gte {
306
2
        builder.and_where_ge("proc", value.timestamp_millis());
307
82
    }
308
84
    if let Some(value) = cond.proc_lte {
309
2
        builder.and_where_le("proc", value.timestamp_millis());
310
82
    }
311
84
    builder
312
84
}
313

            
314
/// Transforms query conditions to the SQL builder.
315
252
fn build_list_where<'a>(
316
252
    builder: &'a mut SqlBuilder,
317
252
    cond: &ListQueryCond<'a>,
318
252
) -> &'a mut SqlBuilder {
319
252
    if let Some(value) = cond.unit_id {
320
92
        builder.and_where_eq("unit_id", quote(value));
321
160
    }
322
252
    if let Some(value) = cond.device_id {
323
28
        builder.and_where_eq("device_id", quote(value));
324
224
    }
325
252
    if let Some(value) = cond.network_code {
326
16
        builder.and_where_eq("network_code", quote(value));
327
236
    }
328
252
    if let Some(value) = cond.network_addr {
329
12
        builder.and_where_eq("network_addr", quote(value));
330
240
    }
331
252
    if let Some(value) = cond.profile {
332
12
        builder.and_where_eq("profile", quote(value));
333
240
    }
334
252
    if let Some(value) = cond.proc_gte {
335
62
        builder.and_where_ge("proc", value.timestamp_millis());
336
190
    }
337
252
    if let Some(value) = cond.proc_lte {
338
16
        builder.and_where_le("proc", value.timestamp_millis());
339
236
    }
340
252
    if let Some(value) = cond.resp_gte {
341
28
        builder.and_where_ge("resp", value.timestamp_millis());
342
224
    }
343
252
    if let Some(value) = cond.resp_lte {
344
16
        builder.and_where_le("resp", value.timestamp_millis());
345
236
    }
346
252
    builder
347
252
}
348

            
349
/// Transforms model options to the SQL builder.
350
178
fn build_limit_offset<'a>(builder: &'a mut SqlBuilder, opts: &ListOptions) -> &'a mut SqlBuilder {
351
178
    if let Some(value) = opts.limit {
352
132
        if value > 0 {
353
120
            builder.limit(value);
354
120
        }
355
46
    }
356
178
    if let Some(value) = opts.offset {
357
178
        match opts.limit {
358
46
            None => builder.limit(-1).offset(value),
359
12
            Some(0) => builder.limit(-1).offset(value),
360
120
            _ => builder.offset(value),
361
        };
362
    }
363
178
    builder
364
178
}
365

            
366
/// Transforms model options to the SQL builder.
367
178
fn build_sort<'a>(builder: &'a mut SqlBuilder, opts: &ListOptions) -> &'a mut SqlBuilder {
368
178
    if let Some(sort_cond) = opts.sort.as_ref() {
369
186
        for cond in sort_cond.iter() {
370
186
            let key = match cond.key {
371
146
                SortKey::Proc => "proc",
372
8
                SortKey::Resp => "resp",
373
16
                SortKey::NetworkCode => "network_code",
374
16
                SortKey::NetworkAddr => "network_addr",
375
            };
376
186
            builder.order_by(key, !cond.asc);
377
        }
378
24
    }
379
178
    builder
380
178
}
381

            
382
/// Transforms query conditions and the model object to the SQL builder.
383
50
fn build_update_where<'a>(
384
50
    builder: &'a mut SqlBuilder,
385
50
    cond: &UpdateQueryCond<'a>,
386
50
    updates: &Updates,
387
50
) -> Option<&'a mut SqlBuilder> {
388
50
    builder.set("resp", updates.resp.timestamp_millis());
389
50
    builder.set("status", updates.status);
390
50
    builder.and_where_eq("data_id", quote(cond.data_id));
391
50
    if updates.status >= 0 {
392
28
        builder.and_where_ne("status", 0);
393
28
    } else {
394
22
        builder.and_where_lt("status", updates.status);
395
22
    }
396
50
    Some(builder)
397
50
}