1
use std::{error::Error as StdError, sync::Arc};
2

            
3
use async_trait::async_trait;
4
use chrono::{TimeZone, Utc};
5
use futures::TryStreamExt;
6
use sql_builder::{quote, SqlBuilder};
7
use sqlx::SqlitePool;
8

            
9
use super::{
10
    super::application::{
11
        Application, ApplicationModel, Cursor, ListOptions, ListQueryCond, QueryCond, SortKey,
12
        UpdateQueryCond, Updates,
13
    },
14
    build_where_like,
15
};
16

            
17
/// Model instance.
18
pub struct Model {
19
    /// The associated database connection.
20
    conn: Arc<SqlitePool>,
21
}
22

            
23
/// Cursor instance.
24
///
25
/// The SQLite implementation uses the original list options and the progress offset.
26
pub struct DbCursor {
27
    offset: u64,
28
}
29

            
30
/// SQLite schema.
31
#[derive(sqlx::FromRow)]
32
struct Schema {
33
    application_id: String,
34
    code: String,
35
    unit_id: String,
36
    unit_code: String,
37
    /// i64 as time tick from Epoch in milliseconds.
38
    created_at: i64,
39
    /// i64 as time tick from Epoch in milliseconds.
40
    modified_at: i64,
41
    host_uri: String,
42
    name: String,
43
    info: String,
44
}
45

            
46
/// Use "COUNT(*)" instead of "COUNT(fields...)" to simplify the implementation.
47
#[derive(sqlx::FromRow)]
48
struct CountSchema {
49
    #[sqlx(rename = "COUNT(*)")]
50
    count: i64,
51
}
52

            
53
const TABLE_NAME: &'static str = "application";
54
const FIELDS: &'static [&'static str] = &[
55
    "application_id",
56
    "code",
57
    "unit_id",
58
    "unit_code",
59
    "created_at",
60
    "modified_at",
61
    "host_uri",
62
    "name",
63
    "info",
64
];
65
const TABLE_INIT_SQL: &'static str = "\
66
    CREATE TABLE IF NOT EXISTS application (\
67
    application_id TEXT NOT NULL UNIQUE,\
68
    code TEXT NOT NULL,\
69
    unit_id TEXT NOT NULL,\
70
    unit_code TEXT NOT NULL,\
71
    created_at INTEGER NOT NULL,\
72
    modified_at INTEGER NOT NULL,\
73
    host_uri TEXT NOT NULL,\
74
    name TEXT NOT NULL,\
75
    info TEXT,\
76
    UNIQUE (unit_id,code),\
77
    PRIMARY KEY (application_id))";
78

            
79
impl Model {
80
    /// To create the model instance with a database connection.
81
56
    pub async fn new(conn: Arc<SqlitePool>) -> Result<Self, Box<dyn StdError>> {
82
56
        let model = Model { conn };
83
56
        model.init().await?;
84
56
        Ok(model)
85
56
    }
86
}
87

            
88
#[async_trait]
89
impl ApplicationModel for Model {
90
96
    async fn init(&self) -> Result<(), Box<dyn StdError>> {
91
96
        let _ = sqlx::query(TABLE_INIT_SQL)
92
96
            .execute(self.conn.as_ref())
93
96
            .await?;
94
96
        Ok(())
95
192
    }
96

            
97
92
    async fn count(&self, cond: &ListQueryCond) -> Result<u64, Box<dyn StdError>> {
98
92
        let sql = build_list_where(SqlBuilder::select_from(TABLE_NAME).count("*"), &cond).sql()?;
99

            
100
92
        let result: Result<CountSchema, sqlx::Error> = sqlx::query_as(sql.as_str())
101
92
            .fetch_one(self.conn.as_ref())
102
92
            .await;
103

            
104
92
        let row = match result {
105
            Err(e) => return Err(Box::new(e)),
106
92
            Ok(row) => row,
107
92
        };
108
92
        Ok(row.count as u64)
109
184
    }
110

            
111
    async fn list(
112
        &self,
113
        opts: &ListOptions,
114
        cursor: Option<Box<dyn Cursor>>,
115
302
    ) -> Result<(Vec<Application>, Option<Box<dyn Cursor>>), Box<dyn StdError>> {
116
302
        let mut cursor = match cursor {
117
262
            None => Box::new(DbCursor::new()),
118
40
            Some(cursor) => cursor,
119
        };
120

            
121
302
        let mut opts = ListOptions { ..*opts };
122
302
        if let Some(offset) = opts.offset {
123
48
            opts.offset = Some(offset + cursor.offset());
124
254
        } else {
125
254
            opts.offset = Some(cursor.offset());
126
254
        }
127
302
        let opts_limit = opts.limit;
128
302
        if let Some(limit) = opts_limit {
129
172
            if limit > 0 {
130
170
                if cursor.offset() >= limit {
131
14
                    return Ok((vec![], None));
132
156
                }
133
156
                opts.limit = Some(limit - cursor.offset());
134
2
            }
135
130
        }
136
288
        let mut builder = SqlBuilder::select_from(TABLE_NAME);
137
288
        build_limit_offset(&mut builder, &opts);
138
288
        build_sort(&mut builder, &opts);
139
288
        let sql = build_list_where(&mut builder, opts.cond).sql()?;
140

            
141
288
        let mut rows = sqlx::query_as::<_, Schema>(sql.as_str()).fetch(self.conn.as_ref());
142
288

            
143
288
        let mut count: u64 = 0;
144
288
        let mut list = vec![];
145
4480
        while let Some(row) = rows.try_next().await? {
146
4252
            let _ = cursor.as_mut().try_next().await?;
147
4252
            list.push(Application {
148
4252
                application_id: row.application_id,
149
4252
                code: row.code,
150
4252
                unit_id: row.unit_id,
151
4252
                unit_code: row.unit_code,
152
4252
                created_at: Utc.timestamp_nanos(row.created_at * 1000000),
153
4252
                modified_at: Utc.timestamp_nanos(row.modified_at * 1000000),
154
4252
                host_uri: row.host_uri,
155
4252
                name: row.name,
156
4252
                info: serde_json::from_str(row.info.as_str())?,
157
            });
158
4252
            if let Some(limit) = opts_limit {
159
2486
                if limit > 0 && cursor.offset() >= limit {
160
34
                    if let Some(cursor_max) = opts.cursor_max {
161
32
                        if (count + 1) >= cursor_max {
162
14
                            return Ok((list, Some(cursor)));
163
18
                        }
164
2
                    }
165
20
                    return Ok((list, None));
166
2452
                }
167
1766
            }
168
4218
            if let Some(cursor_max) = opts.cursor_max {
169
4050
                count += 1;
170
4050
                if count >= cursor_max {
171
26
                    return Ok((list, Some(cursor)));
172
4024
                }
173
168
            }
174
        }
175
228
        Ok((list, None))
176
604
    }
177

            
178
826
    async fn get(&self, cond: &QueryCond) -> Result<Option<Application>, Box<dyn StdError>> {
179
826
        let sql = build_where(SqlBuilder::select_from(TABLE_NAME).fields(FIELDS), &cond).sql()?;
180

            
181
826
        let result: Result<Schema, sqlx::Error> = sqlx::query_as(sql.as_str())
182
826
            .fetch_one(self.conn.as_ref())
183
826
            .await;
184

            
185
826
        let row = match result {
186
118
            Err(e) => match e {
187
118
                sqlx::Error::RowNotFound => return Ok(None),
188
                _ => return Err(Box::new(e)),
189
            },
190
708
            Ok(row) => row,
191
708
        };
192
708

            
193
708
        Ok(Some(Application {
194
708
            application_id: row.application_id,
195
708
            code: row.code,
196
708
            unit_id: row.unit_id,
197
708
            unit_code: row.unit_code,
198
708
            created_at: Utc.timestamp_nanos(row.created_at * 1000000),
199
708
            modified_at: Utc.timestamp_nanos(row.modified_at * 1000000),
200
708
            host_uri: row.host_uri,
201
708
            name: row.name,
202
708
            info: serde_json::from_str(row.info.as_str())?,
203
        }))
204
1652
    }
205

            
206
3042
    async fn add(&self, application: &Application) -> Result<(), Box<dyn StdError>> {
207
3042
        let info = match serde_json::to_string(&application.info) {
208
            Err(_) => quote("{}"),
209
3042
            Ok(value) => quote(value.as_str()),
210
        };
211
3042
        let values = vec![
212
3042
            quote(application.application_id.as_str()),
213
3042
            quote(application.code.as_str()),
214
3042
            quote(application.unit_id.as_str()),
215
3042
            quote(application.unit_code.as_str()),
216
3042
            application.created_at.timestamp_millis().to_string(),
217
3042
            application.modified_at.timestamp_millis().to_string(),
218
3042
            quote(application.host_uri.as_str()),
219
3042
            quote(application.name.as_str()),
220
3042
            info,
221
3042
        ];
222
3042
        let sql = SqlBuilder::insert_into(TABLE_NAME)
223
3042
            .fields(FIELDS)
224
3042
            .values(&values)
225
3042
            .sql()?;
226
3042
        let _ = sqlx::query(sql.as_str())
227
3042
            .execute(self.conn.as_ref())
228
3042
            .await?;
229
3034
        Ok(())
230
6084
    }
231

            
232
58
    async fn del(&self, cond: &QueryCond) -> Result<(), Box<dyn StdError>> {
233
58
        let sql = build_where(&mut SqlBuilder::delete_from(TABLE_NAME), cond).sql()?;
234
58
        let _ = sqlx::query(sql.as_str())
235
58
            .execute(self.conn.as_ref())
236
58
            .await?;
237
58
        Ok(())
238
116
    }
239

            
240
    async fn update(
241
        &self,
242
        cond: &UpdateQueryCond,
243
        updates: &Updates,
244
26
    ) -> Result<(), Box<dyn StdError>> {
245
26
        let sql = match build_update_where(&mut SqlBuilder::update_table(TABLE_NAME), cond, updates)
246
        {
247
2
            None => return Ok(()),
248
24
            Some(builder) => builder.sql()?,
249
        };
250
24
        let _ = sqlx::query(sql.as_str())
251
24
            .execute(self.conn.as_ref())
252
24
            .await?;
253
24
        Ok(())
254
52
    }
255
}
256

            
257
impl DbCursor {
258
    /// To create the cursor instance.
259
262
    pub fn new() -> Self {
260
262
        DbCursor { offset: 0 }
261
262
    }
262
}
263

            
264
#[async_trait]
265
impl Cursor for DbCursor {
266
4252
    async fn try_next(&mut self) -> Result<Option<Application>, Box<dyn StdError>> {
267
4252
        self.offset += 1;
268
4252
        Ok(None)
269
8504
    }
270

            
271
3114
    fn offset(&self) -> u64 {
272
3114
        self.offset
273
3114
    }
274
}
275

            
276
/// Transforms query conditions to the SQL builder.
277
884
fn build_where<'a>(builder: &'a mut SqlBuilder, cond: &QueryCond<'a>) -> &'a mut SqlBuilder {
278
884
    if let Some(value) = cond.unit_id {
279
74
        builder.and_where_eq("unit_id", quote(value));
280
810
    }
281
884
    if let Some(value) = cond.application_id {
282
822
        builder.and_where_eq("application_id", quote(value));
283
822
    }
284
884
    if let Some(value) = cond.code {
285
32
        builder.and_where_eq("code", quote(value));
286
852
    }
287
884
    builder
288
884
}
289

            
290
/// Transforms query conditions to the SQL builder.
291
380
fn build_list_where<'a>(
292
380
    builder: &'a mut SqlBuilder,
293
380
    cond: &ListQueryCond<'a>,
294
380
) -> &'a mut SqlBuilder {
295
380
    if let Some(value) = cond.unit_id {
296
164
        builder.and_where_eq("unit_id", quote(value));
297
216
    }
298
380
    if let Some(value) = cond.application_id {
299
12
        builder.and_where_eq("application_id", quote(value));
300
368
    }
301
380
    if let Some(value) = cond.code {
302
48
        builder.and_where_eq("code", quote(value));
303
332
    }
304
380
    if let Some(value) = cond.code_contains {
305
82
        build_where_like(builder, "code", value.to_lowercase().as_str());
306
298
    }
307
380
    if let Some(value) = cond.name_contains {
308
16
        build_where_like(builder, "name", value.to_lowercase().as_str());
309
364
    }
310
380
    builder
311
380
}
312

            
313
/// Transforms model options to the SQL builder.
314
288
fn build_limit_offset<'a>(builder: &'a mut SqlBuilder, opts: &ListOptions) -> &'a mut SqlBuilder {
315
288
    if let Some(value) = opts.limit {
316
158
        if value > 0 {
317
156
            builder.limit(value);
318
156
        }
319
130
    }
320
288
    if let Some(value) = opts.offset {
321
288
        match opts.limit {
322
130
            None => builder.limit(-1).offset(value),
323
2
            Some(0) => builder.limit(-1).offset(value),
324
156
            _ => builder.offset(value),
325
        };
326
    }
327
288
    builder
328
288
}
329

            
330
/// Transforms model options to the SQL builder.
331
288
fn build_sort<'a>(builder: &'a mut SqlBuilder, opts: &ListOptions) -> &'a mut SqlBuilder {
332
288
    if let Some(sort_cond) = opts.sort.as_ref() {
333
208
        for cond in sort_cond.iter() {
334
208
            let key = match cond.key {
335
16
                SortKey::CreatedAt => "created_at",
336
12
                SortKey::ModifiedAt => "modified_at",
337
164
                SortKey::Code => "code",
338
16
                SortKey::Name => "name",
339
            };
340
208
            builder.order_by(key, !cond.asc);
341
        }
342
82
    }
343
288
    builder
344
288
}
345

            
346
/// Transforms query conditions and the model object to the SQL builder.
347
26
fn build_update_where<'a>(
348
26
    builder: &'a mut SqlBuilder,
349
26
    cond: &UpdateQueryCond<'a>,
350
26
    updates: &Updates,
351
26
) -> Option<&'a mut SqlBuilder> {
352
26
    let mut count = 0;
353
26
    if let Some(value) = updates.modified_at.as_ref() {
354
24
        builder.set("modified_at", value.timestamp_millis());
355
24
        count += 1;
356
24
    }
357
26
    if let Some(value) = updates.host_uri.as_ref() {
358
12
        builder.set("host_uri", quote(value));
359
12
        count += 1;
360
14
    }
361
26
    if let Some(value) = updates.name.as_ref() {
362
20
        builder.set("name", quote(value));
363
20
        count += 1;
364
20
    }
365
26
    if let Some(value) = updates.info {
366
20
        match serde_json::to_string(value) {
367
            Err(_) => {
368
                builder.set("info", quote("{}"));
369
            }
370
20
            Ok(value) => {
371
20
                builder.set("info", quote(value));
372
20
            }
373
        }
374
20
        count += 1;
375
6
    }
376
26
    if count == 0 {
377
2
        return None;
378
24
    }
379
24

            
380
24
    builder.and_where_eq("application_id", quote(cond.application_id));
381
24
    Some(builder)
382
26
}