1
use std::{error::Error as StdError, sync::Arc};
2

            
3
use async_trait::async_trait;
4
use chrono::{TimeZone, Utc};
5
use futures::TryStreamExt;
6
use sql_builder::{quote, SqlBuilder};
7
use sqlx::SqlitePool;
8

            
9
use super::super::coremgr_opdata::{
10
    CoremgrOpData, CoremgrOpDataModel, Cursor, ListOptions, ListQueryCond, QueryCond, SortKey,
11
};
12

            
13
/// Model instance.
14
pub struct Model {
15
    /// The associated database connection.
16
    conn: Arc<SqlitePool>,
17
}
18

            
19
/// Cursor instance.
20
///
21
/// The SQLite implementation uses the original list options and the progress offset.
22
pub struct DbCursor {
23
    offset: u64,
24
}
25

            
26
/// SQLite schema.
27
#[derive(sqlx::FromRow)]
28
struct Schema {
29
    pub data_id: String,
30
    /// i64 as time tick from Epoch in milliseconds.
31
    pub req_time: i64,
32
    /// i64 as time tick from Epoch in milliseconds.
33
    pub res_time: i64,
34
    pub latency_ms: i64,
35
    pub status: i32,
36
    pub source_ip: String,
37
    pub method: String,
38
    pub path: String,
39
    /// use empty string as NULL.
40
    pub body: String,
41
    pub user_id: String,
42
    pub client_id: String,
43
    /// use empty string as NULL.
44
    pub err_code: String,
45
    /// use empty string as NULL.
46
    pub err_message: String,
47
}
48

            
49
/// Use "COUNT(*)" instead of "COUNT(fields...)" to simplify the implementation.
50
#[derive(sqlx::FromRow)]
51
struct CountSchema {
52
    #[sqlx(rename = "COUNT(*)")]
53
    count: i64,
54
}
55

            
56
const TABLE_NAME: &'static str = "coremgr_opdata";
57
const FIELDS: &'static [&'static str] = &[
58
    "data_id",
59
    "req_time",
60
    "res_time",
61
    "latency_ms",
62
    "status",
63
    "source_ip",
64
    "method",
65
    "path",
66
    "body",
67
    "user_id",
68
    "client_id",
69
    "err_code",
70
    "err_message",
71
];
72
const TABLE_INIT_SQL: &'static str = "\
73
    CREATE TABLE IF NOT EXISTS coremgr_opdata (\
74
    data_id TEXT NOT NULL UNIQUE,\
75
    req_time INTEGER NOT NULL,\
76
    res_time INTEGER NOT NULL,\
77
    latency_ms INTEGER NOT NULL,\
78
    status INTEGER NOT NULL,\
79
    source_ip TEXT NOT NULL,\
80
    method TEXT NOT NULL,\
81
    path TEXT NOT NULL,\
82
    body TEXT NOT NULL,\
83
    user_id TEXT NOT NULL,\
84
    client_id TEXT NOT NULL,\
85
    err_code TEXT NOT NULL,\
86
    err_message TEXT NOT NULL)";
87

            
88
impl Model {
89
    /// To create the model instance with a database connection.
90
24
    pub async fn new(conn: Arc<SqlitePool>) -> Result<Self, Box<dyn StdError>> {
91
24
        let model = Model { conn };
92
24
        model.init().await?;
93
24
        Ok(model)
94
24
    }
95
}
96

            
97
#[async_trait]
98
impl CoremgrOpDataModel for Model {
99
40
    async fn init(&self) -> Result<(), Box<dyn StdError>> {
100
40
        let _ = sqlx::query(TABLE_INIT_SQL)
101
40
            .execute(self.conn.as_ref())
102
40
            .await?;
103
40
        Ok(())
104
80
    }
105

            
106
36
    async fn count(&self, cond: &ListQueryCond) -> Result<u64, Box<dyn StdError>> {
107
36
        let sql = build_list_where(SqlBuilder::select_from(TABLE_NAME).count("*"), &cond).sql()?;
108

            
109
36
        let result: Result<CountSchema, sqlx::Error> = sqlx::query_as(sql.as_str())
110
36
            .fetch_one(self.conn.as_ref())
111
36
            .await;
112

            
113
36
        let row = match result {
114
            Err(e) => return Err(Box::new(e)),
115
36
            Ok(row) => row,
116
36
        };
117
36
        Ok(row.count as u64)
118
72
    }
119

            
120
    async fn list(
121
        &self,
122
        opts: &ListOptions,
123
        cursor: Option<Box<dyn Cursor>>,
124
138
    ) -> Result<(Vec<CoremgrOpData>, Option<Box<dyn Cursor>>), Box<dyn StdError>> {
125
138
        let mut cursor = match cursor {
126
112
            None => Box::new(DbCursor::new()),
127
26
            Some(cursor) => cursor,
128
        };
129

            
130
138
        let mut opts = ListOptions { ..*opts };
131
138
        if let Some(offset) = opts.offset {
132
38
            opts.offset = Some(offset + cursor.offset());
133
100
        } else {
134
100
            opts.offset = Some(cursor.offset());
135
100
        }
136
138
        let opts_limit = opts.limit;
137
138
        if let Some(limit) = opts_limit {
138
100
            if limit > 0 {
139
88
                if cursor.offset() >= limit {
140
6
                    return Ok((vec![], None));
141
82
                }
142
82
                opts.limit = Some(limit - cursor.offset());
143
12
            }
144
38
        }
145
132
        let mut builder = SqlBuilder::select_from(TABLE_NAME);
146
132
        build_limit_offset(&mut builder, &opts);
147
132
        build_sort(&mut builder, &opts);
148
132
        let sql = build_list_where(&mut builder, opts.cond).sql()?;
149

            
150
132
        let mut rows = sqlx::query_as::<_, Schema>(sql.as_str()).fetch(self.conn.as_ref());
151
132

            
152
132
        let mut count: u64 = 0;
153
132
        let mut list = vec![];
154
2242
        while let Some(row) = rows.try_next().await? {
155
2154
            let _ = cursor.as_mut().try_next().await?;
156
2154
            list.push(CoremgrOpData {
157
2154
                data_id: row.data_id,
158
2154
                req_time: Utc.timestamp_nanos(row.req_time * 1000000),
159
2154
                res_time: Utc.timestamp_nanos(row.res_time * 1000000),
160
2154
                latency_ms: row.latency_ms,
161
2154
                status: row.status,
162
2154
                source_ip: row.source_ip,
163
2154
                method: row.method,
164
2154
                path: row.path,
165
2154
                body: match row.body.len() {
166
1650
                    0 => None,
167
504
                    _ => Some(serde_json::from_str(row.body.as_str())?),
168
                },
169
2154
                user_id: row.user_id,
170
2154
                client_id: row.client_id,
171
2154
                err_code: match row.err_code.len() {
172
1652
                    0 => None,
173
502
                    _ => Some(row.err_code),
174
                },
175
2154
                err_message: match row.err_message.len() {
176
1652
                    0 => None,
177
502
                    _ => Some(row.err_message),
178
                },
179
            });
180
2154
            if let Some(limit) = opts_limit {
181
2042
                if limit > 0 && cursor.offset() >= limit {
182
24
                    if let Some(cursor_max) = opts.cursor_max {
183
22
                        if (count + 1) >= cursor_max {
184
6
                            return Ok((list, Some(cursor)));
185
16
                        }
186
2
                    }
187
18
                    return Ok((list, None));
188
2018
                }
189
112
            }
190
2130
            if let Some(cursor_max) = opts.cursor_max {
191
2000
                count += 1;
192
2000
                if count >= cursor_max {
193
20
                    return Ok((list, Some(cursor)));
194
1980
                }
195
130
            }
196
        }
197
88
        Ok((list, None))
198
276
    }
199

            
200
926
    async fn add(&self, data: &CoremgrOpData) -> Result<(), Box<dyn StdError>> {
201
926
        let body = match data.body.as_ref() {
202
500
            None => quote(""),
203
426
            Some(body) => match serde_json::to_string(body) {
204
                Err(_) => quote("{}"),
205
426
                Ok(value) => quote(value.as_str()),
206
            },
207
        };
208
926
        let err_code = match data.err_code.as_deref() {
209
500
            None => quote(""),
210
426
            Some(value) => quote(value),
211
        };
212
926
        let err_message = match data.err_message.as_deref() {
213
500
            None => quote(""),
214
426
            Some(value) => quote(value),
215
        };
216
926
        let values = vec![
217
926
            quote(data.data_id.as_str()),
218
926
            data.req_time.timestamp_millis().to_string(),
219
926
            data.res_time.timestamp_millis().to_string(),
220
926
            data.latency_ms.to_string(),
221
926
            data.status.to_string(),
222
926
            quote(data.source_ip.as_str()),
223
926
            quote(data.method.as_str()),
224
926
            quote(data.path.as_str()),
225
926
            body,
226
926
            quote(data.user_id.as_str()),
227
926
            quote(data.client_id.as_str()),
228
926
            err_code,
229
926
            err_message,
230
926
        ];
231
926
        let sql = SqlBuilder::insert_into(TABLE_NAME)
232
926
            .fields(FIELDS)
233
926
            .values(&values)
234
926
            .sql()?;
235
926
        let _ = sqlx::query(sql.as_str())
236
926
            .execute(self.conn.as_ref())
237
926
            .await?;
238
924
        Ok(())
239
1852
    }
240

            
241
82
    async fn del(&self, cond: &QueryCond) -> Result<(), Box<dyn StdError>> {
242
82
        let sql = build_where(&mut SqlBuilder::delete_from(TABLE_NAME), cond).sql()?;
243
82
        let _ = sqlx::query(sql.as_str())
244
82
            .execute(self.conn.as_ref())
245
82
            .await?;
246
82
        Ok(())
247
164
    }
248
}
249

            
250
impl DbCursor {
251
    /// To create the cursor instance.
252
112
    pub fn new() -> Self {
253
112
        DbCursor { offset: 0 }
254
112
    }
255
}
256

            
257
#[async_trait]
258
impl Cursor for DbCursor {
259
2154
    async fn try_next(&mut self) -> Result<Option<CoremgrOpData>, Box<dyn StdError>> {
260
2154
        self.offset += 1;
261
2154
        Ok(None)
262
4308
    }
263

            
264
1552
    fn offset(&self) -> u64 {
265
1552
        self.offset
266
1552
    }
267
}
268

            
269
/// Transforms query conditions to the SQL builder.
270
82
fn build_where<'a>(builder: &'a mut SqlBuilder, cond: &QueryCond<'a>) -> &'a mut SqlBuilder {
271
82
    if let Some(value) = cond.user_id {
272
6
        builder.and_where_eq("user_id", quote(value));
273
76
    }
274
82
    if let Some(value) = cond.client_id {
275
2
        builder.and_where_eq("client_id", quote(value));
276
80
    }
277
82
    if let Some(value) = cond.req_gte {
278
2
        builder.and_where_ge("req_time", value.timestamp_millis());
279
80
    }
280
82
    if let Some(value) = cond.req_lte {
281
2
        builder.and_where_le("req_time", value.timestamp_millis());
282
80
    }
283
82
    builder
284
82
}
285

            
286
/// Transforms query conditions to the SQL builder.
287
168
fn build_list_where<'a>(
288
168
    builder: &'a mut SqlBuilder,
289
168
    cond: &ListQueryCond<'a>,
290
168
) -> &'a mut SqlBuilder {
291
168
    if let Some(value) = cond.user_id {
292
48
        builder.and_where_eq("user_id", quote(value));
293
120
    }
294
168
    if let Some(value) = cond.client_id {
295
8
        builder.and_where_eq("client_id", quote(value));
296
160
    }
297
168
    if let Some(value) = cond.req_gte {
298
38
        builder.and_where_ge("req_time", value.timestamp_millis());
299
130
    }
300
168
    if let Some(value) = cond.req_lte {
301
4
        builder.and_where_le("req_time", value.timestamp_millis());
302
164
    }
303
168
    if let Some(value) = cond.res_gte {
304
4
        builder.and_where_ge("res_time", value.timestamp_millis());
305
164
    }
306
168
    if let Some(value) = cond.res_lte {
307
4
        builder.and_where_le("res_time", value.timestamp_millis());
308
164
    }
309
168
    builder
310
168
}
311

            
312
/// Transforms model options to the SQL builder.
313
132
fn build_limit_offset<'a>(builder: &'a mut SqlBuilder, opts: &ListOptions) -> &'a mut SqlBuilder {
314
132
    if let Some(value) = opts.limit {
315
94
        if value > 0 {
316
82
            builder.limit(value);
317
82
        }
318
38
    }
319
132
    if let Some(value) = opts.offset {
320
132
        match opts.limit {
321
38
            None => builder.limit(-1).offset(value),
322
12
            Some(0) => builder.limit(-1).offset(value),
323
82
            _ => builder.offset(value),
324
        };
325
    }
326
132
    builder
327
132
}
328

            
329
/// Transforms model options to the SQL builder.
330
132
fn build_sort<'a>(builder: &'a mut SqlBuilder, opts: &ListOptions) -> &'a mut SqlBuilder {
331
132
    if let Some(sort_cond) = opts.sort.as_ref() {
332
112
        for cond in sort_cond.iter() {
333
112
            let key = match cond.key {
334
96
                SortKey::ReqTime => "req_time",
335
8
                SortKey::ResTime => "res_time",
336
8
                SortKey::Latency => "latency_ms",
337
            };
338
112
            builder.order_by(key, !cond.asc);
339
        }
340
20
    }
341
132
    builder
342
132
}