1
use std::{error::Error as StdError, sync::Arc};
2

            
3
use async_trait::async_trait;
4
use chrono::{TimeZone, Utc};
5
use futures::TryStreamExt;
6
use sql_builder::{quote, SqlBuilder};
7
use sqlx::SqlitePool;
8

            
9
use super::super::application_uldata::{
10
    ApplicationUlData, ApplicationUlDataModel, Cursor, ListOptions, ListQueryCond, QueryCond,
11
    SortKey,
12
};
13

            
14
/// Model instance.
15
pub struct Model {
16
    /// The associated database connection.
17
    conn: Arc<SqlitePool>,
18
}
19

            
20
/// Cursor instance.
21
///
22
/// The SQLite implementation uses the original list options and the progress offset.
23
pub struct DbCursor {
24
    offset: u64,
25
}
26

            
27
/// SQLite schema.
28
#[derive(sqlx::FromRow)]
29
struct Schema {
30
    pub data_id: String,
31
    /// i64 as time tick from Epoch in milliseconds.
32
    pub proc: i64,
33
    /// i64 as time tick from Epoch in milliseconds.
34
    #[sqlx(rename = "pub")]
35
    pub publish: i64,
36
    /// use empty string as NULL.
37
    pub unit_code: String,
38
    pub network_code: String,
39
    pub network_addr: String,
40
    pub unit_id: String,
41
    pub device_id: String,
42
    /// i64 as time tick from Epoch in milliseconds.
43
    pub time: i64,
44
    pub profile: String,
45
    pub data: String,
46
    /// use empty string as NULL.
47
    pub extension: String,
48
}
49

            
50
/// Use "COUNT(*)" instead of "COUNT(fields...)" to simplify the implementation.
51
#[derive(sqlx::FromRow)]
52
struct CountSchema {
53
    #[sqlx(rename = "COUNT(*)")]
54
    count: i64,
55
}
56

            
57
const TABLE_NAME: &'static str = "application_uldata";
58
const FIELDS: &'static [&'static str] = &[
59
    "data_id",
60
    "proc",
61
    "pub",
62
    "unit_code",
63
    "network_code",
64
    "network_addr",
65
    "unit_id",
66
    "device_id",
67
    "time",
68
    "profile",
69
    "data",
70
    "extension",
71
];
72
const TABLE_INIT_SQL: &'static str = "\
73
    CREATE TABLE IF NOT EXISTS application_uldata (\
74
    data_id TEXT NOT NULL UNIQUE,\
75
    proc INTEGER NOT NULL,\
76
    pub INTEGER NOT NULL,\
77
    unit_code TEXT NOT NULL,\
78
    network_code TEXT NOT NULL,\
79
    network_addr TEXT NOT NULL,\
80
    unit_id TEXT NOT NULL,\
81
    device_id TEXT NOT NULL,\
82
    time INTEGER NOT NULL,\
83
    profile TEXT NOT NULL,\
84
    data TEXT NOT NULL,\
85
    extension TEXT NOT NULL,\
86
    PRIMARY KEY (data_id))";
87

            
88
impl Model {
89
    /// To create the model instance with a database connection.
90
12
    pub async fn new(conn: Arc<SqlitePool>) -> Result<Self, Box<dyn StdError>> {
91
12
        let model = Model { conn };
92
47
        model.init().await?;
93
12
        Ok(model)
94
12
    }
95
}
96

            
97
#[async_trait]
98
impl ApplicationUlDataModel for Model {
99
20
    async fn init(&self) -> Result<(), Box<dyn StdError>> {
100
20
        let _ = sqlx::query(TABLE_INIT_SQL)
101
20
            .execute(self.conn.as_ref())
102
63
            .await?;
103
20
        Ok(())
104
40
    }
105

            
106
44
    async fn count(&self, cond: &ListQueryCond) -> Result<u64, Box<dyn StdError>> {
107
44
        let sql = build_list_where(SqlBuilder::select_from(TABLE_NAME).count("*"), &cond).sql()?;
108

            
109
44
        let result: Result<CountSchema, sqlx::Error> = sqlx::query_as(sql.as_str())
110
44
            .fetch_one(self.conn.as_ref())
111
88
            .await;
112

            
113
44
        let row = match result {
114
            Err(e) => return Err(Box::new(e)),
115
44
            Ok(row) => row,
116
44
        };
117
44
        Ok(row.count as u64)
118
88
    }
119

            
120
    async fn list(
121
        &self,
122
        opts: &ListOptions,
123
        cursor: Option<Box<dyn Cursor>>,
124
103
    ) -> Result<(Vec<ApplicationUlData>, Option<Box<dyn Cursor>>), Box<dyn StdError>> {
125
103
        let mut cursor = match cursor {
126
90
            None => Box::new(DbCursor::new()),
127
13
            Some(cursor) => cursor,
128
        };
129

            
130
103
        let mut opts = ListOptions { ..*opts };
131
103
        if let Some(offset) = opts.offset {
132
19
            opts.offset = Some(offset + cursor.offset());
133
84
        } else {
134
84
            opts.offset = Some(cursor.offset());
135
84
        }
136
103
        let opts_limit = opts.limit;
137
103
        if let Some(limit) = opts_limit {
138
77
            if limit > 0 {
139
71
                if cursor.offset() >= limit {
140
3
                    return Ok((vec![], None));
141
68
                }
142
68
                opts.limit = Some(limit - cursor.offset());
143
6
            }
144
26
        }
145
100
        let mut builder = SqlBuilder::select_from(TABLE_NAME);
146
100
        build_limit_offset(&mut builder, &opts);
147
100
        build_sort(&mut builder, &opts);
148
100
        let sql = build_list_where(&mut builder, opts.cond).sql()?;
149

            
150
100
        let mut rows = sqlx::query_as::<_, Schema>(sql.as_str()).fetch(self.conn.as_ref());
151
100

            
152
100
        let mut count: u64 = 0;
153
100
        let mut list = vec![];
154
1254
        while let Some(row) = rows.try_next().await? {
155
1176
            let _ = cursor.as_mut().try_next().await?;
156
1176
            list.push(ApplicationUlData {
157
1176
                data_id: row.data_id,
158
1176
                proc: Utc.timestamp_nanos(row.proc * 1000000),
159
1176
                publish: Utc.timestamp_nanos(row.publish * 1000000),
160
1176
                unit_code: match row.unit_code.len() {
161
742
                    0 => None,
162
434
                    _ => Some(row.unit_code),
163
                },
164
1176
                network_code: row.network_code,
165
1176
                network_addr: row.network_addr,
166
1176
                unit_id: row.unit_id,
167
1176
                device_id: row.device_id,
168
1176
                time: Utc.timestamp_nanos(row.time * 1000000),
169
1176
                profile: row.profile,
170
1176
                data: row.data,
171
1176
                extension: match row.extension.len() {
172
917
                    0 => None,
173
259
                    _ => serde_json::from_str(row.extension.as_str())?,
174
                },
175
            });
176
1176
            if let Some(limit) = opts_limit {
177
1090
                if limit > 0 && cursor.offset() >= limit {
178
12
                    if let Some(cursor_max) = opts.cursor_max {
179
11
                        if (count + 1) >= cursor_max {
180
3
                            return Ok((list, Some(cursor)));
181
8
                        }
182
1
                    }
183
9
                    return Ok((list, None));
184
1078
                }
185
86
            }
186
1164
            if let Some(cursor_max) = opts.cursor_max {
187
1069
                count += 1;
188
1069
                if count >= cursor_max {
189
10
                    return Ok((list, Some(cursor)));
190
1059
                }
191
95
            }
192
        }
193
78
        Ok((list, None))
194
206
    }
195

            
196
463
    async fn add(&self, data: &ApplicationUlData) -> Result<(), Box<dyn StdError>> {
197
463
        let unit_code = match data.unit_code.as_deref() {
198
214
            None => quote(""),
199
249
            Some(value) => quote(value),
200
        };
201
463
        let extension = match data.extension.as_ref() {
202
250
            None => quote(""),
203
213
            Some(extension) => match serde_json::to_string(extension) {
204
                Err(_) => quote("{}"),
205
213
                Ok(value) => quote(value.as_str()),
206
            },
207
        };
208
463
        let values = vec![
209
463
            quote(data.data_id.as_str()),
210
463
            data.proc.timestamp_millis().to_string(),
211
463
            data.publish.timestamp_millis().to_string(),
212
463
            unit_code,
213
463
            quote(data.network_code.as_str()),
214
463
            quote(data.network_addr.as_str()),
215
463
            quote(data.unit_id.as_str()),
216
463
            quote(data.device_id.as_str()),
217
463
            data.time.timestamp_millis().to_string(),
218
463
            quote(data.profile.as_str()),
219
463
            quote(data.data.as_str()),
220
463
            extension,
221
463
        ];
222
463
        let sql = SqlBuilder::insert_into(TABLE_NAME)
223
463
            .fields(FIELDS)
224
463
            .values(&values)
225
463
            .sql()?;
226
463
        let _ = sqlx::query(sql.as_str())
227
463
            .execute(self.conn.as_ref())
228
924
            .await?;
229
462
        Ok(())
230
926
    }
231

            
232
41
    async fn del(&self, cond: &QueryCond) -> Result<(), Box<dyn StdError>> {
233
41
        let sql = build_where(&mut SqlBuilder::delete_from(TABLE_NAME), cond).sql()?;
234
41
        let _ = sqlx::query(sql.as_str())
235
41
            .execute(self.conn.as_ref())
236
73
            .await?;
237
41
        Ok(())
238
82
    }
239
}
240

            
241
impl DbCursor {
242
    /// To create the cursor instance.
243
90
    pub fn new() -> Self {
244
90
        DbCursor { offset: 0 }
245
90
    }
246
}
247

            
248
#[async_trait]
249
impl Cursor for DbCursor {
250
1176
    async fn try_next(&mut self) -> Result<Option<ApplicationUlData>, Box<dyn StdError>> {
251
1176
        self.offset += 1;
252
1176
        Ok(None)
253
2352
    }
254

            
255
933
    fn offset(&self) -> u64 {
256
933
        self.offset
257
933
    }
258
}
259

            
260
/// Transforms query conditions to the SQL builder.
261
41
fn build_where<'a>(builder: &'a mut SqlBuilder, cond: &QueryCond<'a>) -> &'a mut SqlBuilder {
262
41
    if let Some(value) = cond.unit_id {
263
3
        builder.and_where_eq("unit_id", quote(value));
264
38
    }
265
41
    if let Some(value) = cond.device_id {
266
1
        builder.and_where_eq("device_id", quote(value));
267
40
    }
268
41
    if let Some(value) = cond.proc_gte {
269
1
        builder.and_where_ge("proc", value.timestamp_millis());
270
40
    }
271
41
    if let Some(value) = cond.proc_lte {
272
1
        builder.and_where_le("proc", value.timestamp_millis());
273
40
    }
274
41
    builder
275
41
}
276

            
277
/// Transforms query conditions to the SQL builder.
278
144
fn build_list_where<'a>(
279
144
    builder: &'a mut SqlBuilder,
280
144
    cond: &ListQueryCond<'a>,
281
144
) -> &'a mut SqlBuilder {
282
144
    if let Some(value) = cond.unit_id {
283
58
        builder.and_where_eq("unit_id", quote(value));
284
86
    }
285
144
    if let Some(value) = cond.device_id {
286
14
        builder.and_where_eq("device_id", quote(value));
287
130
    }
288
144
    if let Some(value) = cond.network_code {
289
8
        builder.and_where_eq("network_code", quote(value));
290
136
    }
291
144
    if let Some(value) = cond.network_addr {
292
6
        builder.and_where_eq("network_addr", quote(value));
293
138
    }
294
144
    if let Some(value) = cond.profile {
295
6
        builder.and_where_eq("profile", quote(value));
296
138
    }
297
144
    if let Some(value) = cond.proc_gte {
298
31
        builder.and_where_ge("proc", value.timestamp_millis());
299
113
    }
300
144
    if let Some(value) = cond.proc_lte {
301
8
        builder.and_where_le("proc", value.timestamp_millis());
302
136
    }
303
144
    if let Some(value) = cond.pub_gte {
304
14
        builder.and_where_ge("pub", value.timestamp_millis());
305
130
    }
306
144
    if let Some(value) = cond.pub_lte {
307
8
        builder.and_where_le("pub", value.timestamp_millis());
308
136
    }
309
144
    if let Some(value) = cond.time_gte {
310
14
        builder.and_where_ge("time", value.timestamp_millis());
311
130
    }
312
144
    if let Some(value) = cond.time_lte {
313
8
        builder.and_where_le("time", value.timestamp_millis());
314
136
    }
315
144
    builder
316
144
}
317

            
318
/// Transforms model options to the SQL builder.
319
100
fn build_limit_offset<'a>(builder: &'a mut SqlBuilder, opts: &ListOptions) -> &'a mut SqlBuilder {
320
100
    if let Some(value) = opts.limit {
321
74
        if value > 0 {
322
68
            builder.limit(value);
323
68
        }
324
26
    }
325
100
    if let Some(value) = opts.offset {
326
100
        match opts.limit {
327
26
            None => builder.limit(-1).offset(value),
328
6
            Some(0) => builder.limit(-1).offset(value),
329
68
            _ => builder.offset(value),
330
        };
331
    }
332
100
    builder
333
100
}
334

            
335
/// Transforms model options to the SQL builder.
336
100
fn build_sort<'a>(builder: &'a mut SqlBuilder, opts: &ListOptions) -> &'a mut SqlBuilder {
337
100
    if let Some(sort_cond) = opts.sort.as_ref() {
338
103
        for cond in sort_cond.iter() {
339
103
            let key = match cond.key {
340
79
                SortKey::Proc => "proc",
341
4
                SortKey::Pub => "pub",
342
4
                SortKey::Time => "time",
343
8
                SortKey::NetworkCode => "network_code",
344
8
                SortKey::NetworkAddr => "network_addr",
345
            };
346
103
            builder.order_by(key, !cond.asc);
347
        }
348
13
    }
349
100
    builder
350
100
}