1
use std::{error::Error as StdError, sync::Arc};
2

            
3
use async_trait::async_trait;
4
use chrono::{TimeZone, Utc};
5
use futures::TryStreamExt;
6
use sql_builder::{quote, SqlBuilder};
7
use sqlx::SqlitePool;
8

            
9
use super::super::dldata_buffer::{
10
    Cursor, DlDataBuffer, DlDataBufferModel, ListOptions, ListQueryCond, QueryCond, SortKey,
11
};
12

            
13
/// Model instance.
14
pub struct Model {
15
    /// The associated database connection.
16
    conn: Arc<SqlitePool>,
17
}
18

            
19
/// Cursor instance.
20
///
21
/// The SQLite implementation uses the original list options and the progress offset.
22
pub struct DbCursor {
23
    offset: u64,
24
}
25

            
26
/// SQLite schema.
27
#[derive(sqlx::FromRow)]
28
struct Schema {
29
    data_id: String,
30
    unit_id: String,
31
    unit_code: String,
32
    application_id: String,
33
    application_code: String,
34
    network_id: String,
35
    network_addr: String,
36
    device_id: String,
37
    /// i64 as time tick from Epoch in milliseconds.
38
    created_at: i64,
39
    /// i64 as time tick from Epoch in milliseconds.
40
    expired_at: i64,
41
}
42

            
43
/// Use "COUNT(*)" instead of "COUNT(fields...)" to simplify the implementation.
44
#[derive(sqlx::FromRow)]
45
struct CountSchema {
46
    #[sqlx(rename = "COUNT(*)")]
47
    count: i64,
48
}
49

            
50
const TABLE_NAME: &'static str = "dldata_buffer";
51
const FIELDS: &'static [&'static str] = &[
52
    "data_id",
53
    "unit_id",
54
    "unit_code",
55
    "application_id",
56
    "application_code",
57
    "network_id",
58
    "network_addr",
59
    "device_id",
60
    "created_at",
61
    "expired_at",
62
];
63
const TABLE_INIT_SQL: &'static str = "\
64
    CREATE TABLE IF NOT EXISTS dldata_buffer (\
65
    data_id TEXT NOT NULL UNIQUE,\
66
    unit_id TEXT NOT NULL,\
67
    unit_code TEXT NOT NULL,\
68
    application_id TEXT NOT NULL,\
69
    application_code TEXT NOT NULL,\
70
    network_id TEXT NOT NULL,\
71
    network_addr TEXT NOT NULL,\
72
    device_id TEXT NOT NULL,\
73
    created_at INTEGER NOT NULL,\
74
    expired_at INTEGER NOT NULL,\
75
    PRIMARY KEY (data_id))";
76

            
77
impl Model {
78
    /// To create the model instance with a database connection.
79
28
    pub async fn new(conn: Arc<SqlitePool>) -> Result<Self, Box<dyn StdError>> {
80
28
        let model = Model { conn };
81
28
        model.init().await?;
82
28
        Ok(model)
83
28
    }
84
}
85

            
86
#[async_trait]
87
impl DlDataBufferModel for Model {
88
48
    async fn init(&self) -> Result<(), Box<dyn StdError>> {
89
48
        let _ = sqlx::query(TABLE_INIT_SQL)
90
48
            .execute(self.conn.as_ref())
91
48
            .await?;
92
48
        Ok(())
93
96
    }
94

            
95
86
    async fn count(&self, cond: &ListQueryCond) -> Result<u64, Box<dyn StdError>> {
96
86
        let sql = build_list_where(SqlBuilder::select_from(TABLE_NAME).count("*"), &cond).sql()?;
97

            
98
86
        let result: Result<CountSchema, sqlx::Error> = sqlx::query_as(sql.as_str())
99
86
            .fetch_one(self.conn.as_ref())
100
86
            .await;
101

            
102
86
        let row = match result {
103
            Err(e) => return Err(Box::new(e)),
104
86
            Ok(row) => row,
105
86
        };
106
86
        Ok(row.count as u64)
107
172
    }
108

            
109
    async fn list(
110
        &self,
111
        opts: &ListOptions,
112
        cursor: Option<Box<dyn Cursor>>,
113
190
    ) -> Result<(Vec<DlDataBuffer>, Option<Box<dyn Cursor>>), Box<dyn StdError>> {
114
190
        let mut cursor = match cursor {
115
156
            None => Box::new(DbCursor::new()),
116
34
            Some(cursor) => cursor,
117
        };
118

            
119
190
        let mut opts = ListOptions { ..*opts };
120
190
        if let Some(offset) = opts.offset {
121
24
            opts.offset = Some(offset + cursor.offset());
122
166
        } else {
123
166
            opts.offset = Some(cursor.offset());
124
166
        }
125
190
        let opts_limit = opts.limit;
126
190
        if let Some(limit) = opts_limit {
127
158
            if limit > 0 {
128
157
                if cursor.offset() >= limit {
129
17
                    return Ok((vec![], None));
130
140
                }
131
140
                opts.limit = Some(limit - cursor.offset());
132
1
            }
133
32
        }
134
173
        let mut builder = SqlBuilder::select_from(TABLE_NAME);
135
173
        build_limit_offset(&mut builder, &opts);
136
173
        build_sort(&mut builder, &opts);
137
173
        let sql = build_list_where(&mut builder, opts.cond).sql()?;
138

            
139
173
        let mut rows = sqlx::query_as::<_, Schema>(sql.as_str()).fetch(self.conn.as_ref());
140
173

            
141
173
        let mut count: u64 = 0;
142
173
        let mut list = vec![];
143
3771
        while let Some(row) = rows.try_next().await? {
144
3642
            let _ = cursor.as_mut().try_next().await?;
145
3642
            list.push(DlDataBuffer {
146
3642
                data_id: row.data_id,
147
3642
                unit_id: row.unit_id,
148
3642
                unit_code: row.unit_code,
149
3642
                application_id: row.application_id,
150
3642
                application_code: row.application_code,
151
3642
                network_id: row.network_id,
152
3642
                network_addr: row.network_addr,
153
3642
                device_id: row.device_id,
154
3642
                created_at: Utc.timestamp_nanos(row.created_at * 1000000),
155
3642
                expired_at: Utc.timestamp_nanos(row.expired_at * 1000000),
156
3642
            });
157
3642
            if let Some(limit) = opts_limit {
158
2339
                if limit > 0 && cursor.offset() >= limit {
159
27
                    if let Some(cursor_max) = opts.cursor_max {
160
26
                        if (count + 1) >= cursor_max {
161
17
                            return Ok((list, Some(cursor)));
162
9
                        }
163
1
                    }
164
10
                    return Ok((list, None));
165
2312
                }
166
1303
            }
167
3615
            if let Some(cursor_max) = opts.cursor_max {
168
3507
                count += 1;
169
3507
                if count >= cursor_max {
170
17
                    return Ok((list, Some(cursor)));
171
3490
                }
172
108
            }
173
        }
174
129
        Ok((list, None))
175
380
    }
176

            
177
193
    async fn get(&self, data_id: &str) -> Result<Option<DlDataBuffer>, Box<dyn StdError>> {
178
193
        let sql = SqlBuilder::select_from(TABLE_NAME)
179
193
            .fields(FIELDS)
180
193
            .and_where_eq("data_id", quote(data_id))
181
193
            .sql()?;
182

            
183
193
        let result: Result<Schema, sqlx::Error> = sqlx::query_as(sql.as_str())
184
193
            .fetch_one(self.conn.as_ref())
185
193
            .await;
186

            
187
193
        let row = match result {
188
63
            Err(e) => match e {
189
63
                sqlx::Error::RowNotFound => return Ok(None),
190
                _ => return Err(Box::new(e)),
191
            },
192
130
            Ok(row) => row,
193
130
        };
194
130

            
195
130
        Ok(Some(DlDataBuffer {
196
130
            data_id: row.data_id,
197
130
            unit_id: row.unit_id,
198
130
            unit_code: row.unit_code,
199
130
            application_id: row.application_id,
200
130
            application_code: row.application_code,
201
130
            network_id: row.network_id,
202
130
            network_addr: row.network_addr,
203
130
            device_id: row.device_id,
204
130
            created_at: Utc.timestamp_nanos(row.created_at * 1000000),
205
130
            expired_at: Utc.timestamp_nanos(row.expired_at * 1000000),
206
130
        }))
207
386
    }
208

            
209
1142
    async fn add(&self, dldata: &DlDataBuffer) -> Result<(), Box<dyn StdError>> {
210
1142
        let values = vec![
211
1142
            quote(dldata.data_id.as_str()),
212
1142
            quote(dldata.unit_id.as_str()),
213
1142
            quote(dldata.unit_code.as_str()),
214
1142
            quote(dldata.application_id.as_str()),
215
1142
            quote(dldata.application_code.as_str()),
216
1142
            quote(dldata.network_id.as_str()),
217
1142
            quote(dldata.network_addr.as_str()),
218
1142
            quote(dldata.device_id.as_str()),
219
1142
            dldata.created_at.timestamp_millis().to_string(),
220
1142
            dldata.expired_at.timestamp_millis().to_string(),
221
1142
        ];
222
1142
        let sql = SqlBuilder::insert_into(TABLE_NAME)
223
1142
            .fields(FIELDS)
224
1142
            .values(&values)
225
1142
            .sql()?;
226
1142
        let _ = sqlx::query(sql.as_str())
227
1142
            .execute(self.conn.as_ref())
228
1142
            .await?;
229
1141
        Ok(())
230
2284
    }
231

            
232
88
    async fn del(&self, cond: &QueryCond) -> Result<(), Box<dyn StdError>> {
233
88
        let sql = build_where(&mut SqlBuilder::delete_from(TABLE_NAME), cond).sql()?;
234
88
        let _ = sqlx::query(sql.as_str())
235
88
            .execute(self.conn.as_ref())
236
88
            .await?;
237
88
        Ok(())
238
176
    }
239
}
240

            
241
impl DbCursor {
242
    /// To create the cursor instance.
243
156
    pub fn new() -> Self {
244
156
        DbCursor { offset: 0 }
245
156
    }
246
}
247

            
248
#[async_trait]
249
impl Cursor for DbCursor {
250
3642
    async fn try_next(&mut self) -> Result<Option<DlDataBuffer>, Box<dyn StdError>> {
251
3642
        self.offset += 1;
252
3642
        Ok(None)
253
7284
    }
254

            
255
2826
    fn offset(&self) -> u64 {
256
2826
        self.offset
257
2826
    }
258
}
259

            
260
/// Transforms query conditions to the SQL builder.
261
88
fn build_where<'a>(builder: &'a mut SqlBuilder, cond: &QueryCond<'a>) -> &'a mut SqlBuilder {
262
88
    if let Some(value) = cond.data_id {
263
14
        builder.and_where_eq("data_id", quote(value));
264
74
    }
265
88
    if let Some(value) = cond.unit_id {
266
30
        builder.and_where_eq("unit_id", quote(value));
267
58
    }
268
88
    if let Some(value) = cond.application_id {
269
11
        builder.and_where_eq("application_id", quote(value));
270
77
    }
271
88
    if let Some(value) = cond.network_id {
272
25
        builder.and_where_eq("network_id", quote(value));
273
63
    }
274
88
    if let Some(value) = cond.network_addrs {
275
8296
        let values: Vec<String> = value.iter().map(|&x| quote(x)).collect();
276
14
        builder.and_where_in("network_addr", &values);
277
74
    }
278
88
    if let Some(value) = cond.device_id {
279
21
        builder.and_where_eq("device_id", quote(value));
280
67
    }
281
88
    builder
282
88
}
283

            
284
/// Transforms query conditions to the SQL builder.
285
259
fn build_list_where<'a>(
286
259
    builder: &'a mut SqlBuilder,
287
259
    cond: &ListQueryCond<'a>,
288
259
) -> &'a mut SqlBuilder {
289
259
    if let Some(value) = cond.unit_id {
290
128
        builder.and_where_eq("unit_id", quote(value));
291
131
    }
292
259
    if let Some(value) = cond.application_id {
293
64
        builder.and_where_eq("application_id", quote(value));
294
195
    }
295
259
    if let Some(value) = cond.network_id {
296
64
        builder.and_where_eq("network_id", quote(value));
297
195
    }
298
259
    if let Some(value) = cond.device_id {
299
26
        builder.and_where_eq("device_id", quote(value));
300
233
    }
301
259
    builder
302
259
}
303

            
304
/// Transforms model options to the SQL builder.
305
173
fn build_limit_offset<'a>(builder: &'a mut SqlBuilder, opts: &ListOptions) -> &'a mut SqlBuilder {
306
173
    if let Some(value) = opts.limit {
307
141
        if value > 0 {
308
140
            builder.limit(value);
309
140
        }
310
32
    }
311
173
    if let Some(value) = opts.offset {
312
173
        match opts.limit {
313
32
            None => builder.limit(-1).offset(value),
314
1
            Some(0) => builder.limit(-1).offset(value),
315
140
            _ => builder.offset(value),
316
        };
317
    }
318
173
    builder
319
173
}
320

            
321
/// Transforms model options to the SQL builder.
322
173
fn build_sort<'a>(builder: &'a mut SqlBuilder, opts: &ListOptions) -> &'a mut SqlBuilder {
323
173
    if let Some(sort_cond) = opts.sort.as_ref() {
324
311
        for cond in sort_cond.iter() {
325
311
            let key = match cond.key {
326
139
                SortKey::CreatedAt => "created_at",
327
14
                SortKey::ExpiredAt => "expired_at",
328
158
                SortKey::ApplicationCode => "application_code",
329
            };
330
311
            builder.order_by(key, !cond.asc);
331
        }
332
5
    }
333
173
    builder
334
173
}