1
use std::{error::Error as StdError, sync::Arc};
2

            
3
use async_trait::async_trait;
4
use chrono::{TimeZone, Utc};
5
use futures::TryStreamExt;
6
use sql_builder::{quote, SqlBuilder};
7
use sqlx::SqlitePool;
8

            
9
use super::{
10
    super::application::{
11
        Application, ApplicationModel, Cursor, ListOptions, ListQueryCond, QueryCond, SortKey,
12
        UpdateQueryCond, Updates,
13
    },
14
    build_where_like,
15
};
16

            
17
/// Model instance.
18
pub struct Model {
19
    /// The associated database connection.
20
    conn: Arc<SqlitePool>,
21
}
22

            
23
/// Cursor instance.
24
///
25
/// The SQLite implementation uses the original list options and the progress offset.
26
pub struct DbCursor {
27
    offset: u64,
28
}
29

            
30
/// SQLite schema.
31
#[derive(sqlx::FromRow)]
32
struct Schema {
33
    application_id: String,
34
    code: String,
35
    unit_id: String,
36
    unit_code: String,
37
    /// i64 as time tick from Epoch in milliseconds.
38
    created_at: i64,
39
    /// i64 as time tick from Epoch in milliseconds.
40
    modified_at: i64,
41
    host_uri: String,
42
    name: String,
43
    info: String,
44
}
45

            
46
/// Use "COUNT(*)" instead of "COUNT(fields...)" to simplify the implementation.
47
#[derive(sqlx::FromRow)]
48
struct CountSchema {
49
    #[sqlx(rename = "COUNT(*)")]
50
    count: i64,
51
}
52

            
53
const TABLE_NAME: &'static str = "application";
54
const FIELDS: &'static [&'static str] = &[
55
    "application_id",
56
    "code",
57
    "unit_id",
58
    "unit_code",
59
    "created_at",
60
    "modified_at",
61
    "host_uri",
62
    "name",
63
    "info",
64
];
65
const TABLE_INIT_SQL: &'static str = "\
66
    CREATE TABLE IF NOT EXISTS application (\
67
    application_id TEXT NOT NULL UNIQUE,\
68
    code TEXT NOT NULL,\
69
    unit_id TEXT NOT NULL,\
70
    unit_code TEXT NOT NULL,\
71
    created_at INTEGER NOT NULL,\
72
    modified_at INTEGER NOT NULL,\
73
    host_uri TEXT NOT NULL,\
74
    name TEXT NOT NULL,\
75
    info TEXT,\
76
    UNIQUE (unit_id,code),\
77
    PRIMARY KEY (application_id))";
78

            
79
impl Model {
80
    /// To create the model instance with a database connection.
81
28
    pub async fn new(conn: Arc<SqlitePool>) -> Result<Self, Box<dyn StdError>> {
82
28
        let model = Model { conn };
83
28
        model.init().await?;
84
28
        Ok(model)
85
28
    }
86
}
87

            
88
#[async_trait]
89
impl ApplicationModel for Model {
90
48
    async fn init(&self) -> Result<(), Box<dyn StdError>> {
91
48
        let _ = sqlx::query(TABLE_INIT_SQL)
92
48
            .execute(self.conn.as_ref())
93
48
            .await?;
94
48
        Ok(())
95
96
    }
96

            
97
46
    async fn count(&self, cond: &ListQueryCond) -> Result<u64, Box<dyn StdError>> {
98
46
        let sql = build_list_where(SqlBuilder::select_from(TABLE_NAME).count("*"), &cond).sql()?;
99

            
100
46
        let result: Result<CountSchema, sqlx::Error> = sqlx::query_as(sql.as_str())
101
46
            .fetch_one(self.conn.as_ref())
102
46
            .await;
103

            
104
46
        let row = match result {
105
            Err(e) => return Err(Box::new(e)),
106
46
            Ok(row) => row,
107
46
        };
108
46
        Ok(row.count as u64)
109
92
    }
110

            
111
    async fn list(
112
        &self,
113
        opts: &ListOptions,
114
        cursor: Option<Box<dyn Cursor>>,
115
151
    ) -> Result<(Vec<Application>, Option<Box<dyn Cursor>>), Box<dyn StdError>> {
116
151
        let mut cursor = match cursor {
117
131
            None => Box::new(DbCursor::new()),
118
20
            Some(cursor) => cursor,
119
        };
120

            
121
151
        let mut opts = ListOptions { ..*opts };
122
151
        if let Some(offset) = opts.offset {
123
24
            opts.offset = Some(offset + cursor.offset());
124
127
        } else {
125
127
            opts.offset = Some(cursor.offset());
126
127
        }
127
151
        let opts_limit = opts.limit;
128
151
        if let Some(limit) = opts_limit {
129
86
            if limit > 0 {
130
85
                if cursor.offset() >= limit {
131
7
                    return Ok((vec![], None));
132
78
                }
133
78
                opts.limit = Some(limit - cursor.offset());
134
1
            }
135
65
        }
136
144
        let mut builder = SqlBuilder::select_from(TABLE_NAME);
137
144
        build_limit_offset(&mut builder, &opts);
138
144
        build_sort(&mut builder, &opts);
139
144
        let sql = build_list_where(&mut builder, opts.cond).sql()?;
140

            
141
144
        let mut rows = sqlx::query_as::<_, Schema>(sql.as_str()).fetch(self.conn.as_ref());
142
144

            
143
144
        let mut count: u64 = 0;
144
144
        let mut list = vec![];
145
2240
        while let Some(row) = rows.try_next().await? {
146
2126
            let _ = cursor.as_mut().try_next().await?;
147
2126
            list.push(Application {
148
2126
                application_id: row.application_id,
149
2126
                code: row.code,
150
2126
                unit_id: row.unit_id,
151
2126
                unit_code: row.unit_code,
152
2126
                created_at: Utc.timestamp_nanos(row.created_at * 1000000),
153
2126
                modified_at: Utc.timestamp_nanos(row.modified_at * 1000000),
154
2126
                host_uri: row.host_uri,
155
2126
                name: row.name,
156
2126
                info: serde_json::from_str(row.info.as_str())?,
157
            });
158
2126
            if let Some(limit) = opts_limit {
159
1243
                if limit > 0 && cursor.offset() >= limit {
160
17
                    if let Some(cursor_max) = opts.cursor_max {
161
16
                        if (count + 1) >= cursor_max {
162
7
                            return Ok((list, Some(cursor)));
163
9
                        }
164
1
                    }
165
10
                    return Ok((list, None));
166
1226
                }
167
883
            }
168
2109
            if let Some(cursor_max) = opts.cursor_max {
169
2025
                count += 1;
170
2025
                if count >= cursor_max {
171
13
                    return Ok((list, Some(cursor)));
172
2012
                }
173
84
            }
174
        }
175
114
        Ok((list, None))
176
302
    }
177

            
178
413
    async fn get(&self, cond: &QueryCond) -> Result<Option<Application>, Box<dyn StdError>> {
179
413
        let sql = build_where(SqlBuilder::select_from(TABLE_NAME).fields(FIELDS), &cond).sql()?;
180

            
181
413
        let result: Result<Schema, sqlx::Error> = sqlx::query_as(sql.as_str())
182
413
            .fetch_one(self.conn.as_ref())
183
413
            .await;
184

            
185
413
        let row = match result {
186
59
            Err(e) => match e {
187
59
                sqlx::Error::RowNotFound => return Ok(None),
188
                _ => return Err(Box::new(e)),
189
            },
190
354
            Ok(row) => row,
191
354
        };
192
354

            
193
354
        Ok(Some(Application {
194
354
            application_id: row.application_id,
195
354
            code: row.code,
196
354
            unit_id: row.unit_id,
197
354
            unit_code: row.unit_code,
198
354
            created_at: Utc.timestamp_nanos(row.created_at * 1000000),
199
354
            modified_at: Utc.timestamp_nanos(row.modified_at * 1000000),
200
354
            host_uri: row.host_uri,
201
354
            name: row.name,
202
354
            info: serde_json::from_str(row.info.as_str())?,
203
        }))
204
826
    }
205

            
206
1521
    async fn add(&self, application: &Application) -> Result<(), Box<dyn StdError>> {
207
1521
        let info = match serde_json::to_string(&application.info) {
208
            Err(_) => quote("{}"),
209
1521
            Ok(value) => quote(value.as_str()),
210
        };
211
1521
        let values = vec![
212
1521
            quote(application.application_id.as_str()),
213
1521
            quote(application.code.as_str()),
214
1521
            quote(application.unit_id.as_str()),
215
1521
            quote(application.unit_code.as_str()),
216
1521
            application.created_at.timestamp_millis().to_string(),
217
1521
            application.modified_at.timestamp_millis().to_string(),
218
1521
            quote(application.host_uri.as_str()),
219
1521
            quote(application.name.as_str()),
220
1521
            info,
221
1521
        ];
222
1521
        let sql = SqlBuilder::insert_into(TABLE_NAME)
223
1521
            .fields(FIELDS)
224
1521
            .values(&values)
225
1521
            .sql()?;
226
1521
        let _ = sqlx::query(sql.as_str())
227
1521
            .execute(self.conn.as_ref())
228
1521
            .await?;
229
1517
        Ok(())
230
3042
    }
231

            
232
29
    async fn del(&self, cond: &QueryCond) -> Result<(), Box<dyn StdError>> {
233
29
        let sql = build_where(&mut SqlBuilder::delete_from(TABLE_NAME), cond).sql()?;
234
29
        let _ = sqlx::query(sql.as_str())
235
29
            .execute(self.conn.as_ref())
236
29
            .await?;
237
29
        Ok(())
238
58
    }
239

            
240
    async fn update(
241
        &self,
242
        cond: &UpdateQueryCond,
243
        updates: &Updates,
244
13
    ) -> Result<(), Box<dyn StdError>> {
245
13
        let sql = match build_update_where(&mut SqlBuilder::update_table(TABLE_NAME), cond, updates)
246
        {
247
1
            None => return Ok(()),
248
12
            Some(builder) => builder.sql()?,
249
        };
250
12
        let _ = sqlx::query(sql.as_str())
251
12
            .execute(self.conn.as_ref())
252
12
            .await?;
253
12
        Ok(())
254
26
    }
255
}
256

            
257
impl DbCursor {
258
    /// To create the cursor instance.
259
131
    pub fn new() -> Self {
260
131
        DbCursor { offset: 0 }
261
131
    }
262
}
263

            
264
#[async_trait]
265
impl Cursor for DbCursor {
266
2126
    async fn try_next(&mut self) -> Result<Option<Application>, Box<dyn StdError>> {
267
2126
        self.offset += 1;
268
2126
        Ok(None)
269
4252
    }
270

            
271
1557
    fn offset(&self) -> u64 {
272
1557
        self.offset
273
1557
    }
274
}
275

            
276
/// Transforms query conditions to the SQL builder.
277
442
fn build_where<'a>(builder: &'a mut SqlBuilder, cond: &QueryCond<'a>) -> &'a mut SqlBuilder {
278
442
    if let Some(value) = cond.unit_id {
279
37
        builder.and_where_eq("unit_id", quote(value));
280
405
    }
281
442
    if let Some(value) = cond.application_id {
282
411
        builder.and_where_eq("application_id", quote(value));
283
411
    }
284
442
    if let Some(value) = cond.code {
285
16
        builder.and_where_eq("code", quote(value));
286
426
    }
287
442
    builder
288
442
}
289

            
290
/// Transforms query conditions to the SQL builder.
291
190
fn build_list_where<'a>(
292
190
    builder: &'a mut SqlBuilder,
293
190
    cond: &ListQueryCond<'a>,
294
190
) -> &'a mut SqlBuilder {
295
190
    if let Some(value) = cond.unit_id {
296
82
        builder.and_where_eq("unit_id", quote(value));
297
108
    }
298
190
    if let Some(value) = cond.application_id {
299
6
        builder.and_where_eq("application_id", quote(value));
300
184
    }
301
190
    if let Some(value) = cond.code {
302
24
        builder.and_where_eq("code", quote(value));
303
166
    }
304
190
    if let Some(value) = cond.code_contains {
305
41
        build_where_like(builder, "code", value.to_lowercase().as_str());
306
149
    }
307
190
    if let Some(value) = cond.name_contains {
308
8
        build_where_like(builder, "name", value.to_lowercase().as_str());
309
182
    }
310
190
    builder
311
190
}
312

            
313
/// Transforms model options to the SQL builder.
314
144
fn build_limit_offset<'a>(builder: &'a mut SqlBuilder, opts: &ListOptions) -> &'a mut SqlBuilder {
315
144
    if let Some(value) = opts.limit {
316
79
        if value > 0 {
317
78
            builder.limit(value);
318
78
        }
319
65
    }
320
144
    if let Some(value) = opts.offset {
321
144
        match opts.limit {
322
65
            None => builder.limit(-1).offset(value),
323
1
            Some(0) => builder.limit(-1).offset(value),
324
78
            _ => builder.offset(value),
325
        };
326
    }
327
144
    builder
328
144
}
329

            
330
/// Transforms model options to the SQL builder.
331
144
fn build_sort<'a>(builder: &'a mut SqlBuilder, opts: &ListOptions) -> &'a mut SqlBuilder {
332
144
    if let Some(sort_cond) = opts.sort.as_ref() {
333
104
        for cond in sort_cond.iter() {
334
104
            let key = match cond.key {
335
8
                SortKey::CreatedAt => "created_at",
336
6
                SortKey::ModifiedAt => "modified_at",
337
82
                SortKey::Code => "code",
338
8
                SortKey::Name => "name",
339
            };
340
104
            builder.order_by(key, !cond.asc);
341
        }
342
41
    }
343
144
    builder
344
144
}
345

            
346
/// Transforms query conditions and the model object to the SQL builder.
347
13
fn build_update_where<'a>(
348
13
    builder: &'a mut SqlBuilder,
349
13
    cond: &UpdateQueryCond<'a>,
350
13
    updates: &Updates,
351
13
) -> Option<&'a mut SqlBuilder> {
352
13
    let mut count = 0;
353
13
    if let Some(value) = updates.modified_at.as_ref() {
354
12
        builder.set("modified_at", value.timestamp_millis());
355
12
        count += 1;
356
12
    }
357
13
    if let Some(value) = updates.host_uri.as_ref() {
358
6
        builder.set("host_uri", quote(value));
359
6
        count += 1;
360
7
    }
361
13
    if let Some(value) = updates.name.as_ref() {
362
10
        builder.set("name", quote(value));
363
10
        count += 1;
364
10
    }
365
13
    if let Some(value) = updates.info {
366
10
        match serde_json::to_string(value) {
367
            Err(_) => {
368
                builder.set("info", quote("{}"));
369
            }
370
10
            Ok(value) => {
371
10
                builder.set("info", quote(value));
372
10
            }
373
        }
374
10
        count += 1;
375
3
    }
376
13
    if count == 0 {
377
1
        return None;
378
12
    }
379
12

            
380
12
    builder.and_where_eq("application_id", quote(cond.application_id));
381
12
    Some(builder)
382
13
}