1
use std::error::Error as StdError;
2

            
3
use axum::{
4
    body::{Body, Bytes},
5
    extract::State,
6
    http::{header, HeaderValue},
7
    response::{IntoResponse, Response},
8
    Extension,
9
};
10
use chrono::{TimeZone, Utc};
11
use csv::WriterBuilder;
12
use log::error;
13
use serde_json;
14

            
15
use sylvia_iot_corelib::{
16
    constants::ContentType,
17
    err::ErrResp,
18
    http::{Json, Query},
19
    role::Role,
20
    strings,
21
};
22

            
23
use super::{
24
    super::{
25
        super::{middleware::GetTokenInfoData, ErrReq, State as AppState},
26
        get_user_inner,
27
    },
28
    request, response,
29
};
30
use crate::models::coremgr_opdata::{CoremgrOpData, ListOptions, ListQueryCond, SortCond, SortKey};
31

            
32
const LIST_LIMIT_DEFAULT: u64 = 100;
33
const LIST_CURSOR_MAX: u64 = 100;
34
const CSV_FIELDS: &'static str =
35
    "dataId,reqTime,resTime,latencyMs,status,method,path,body,userId,clientId,errCode,errMessage\n";
36

            
37
/// `GET /{base}/api/v1/coremgr-opdata/count`
38
112
pub async fn get_count(
39
112
    State(state): State<AppState>,
40
112
    Extension(token_info): Extension<GetTokenInfoData>,
41
112
    Query(query): Query<request::GetCountQuery>,
42
112
) -> impl IntoResponse {
43
    const FN_NAME: &'static str = "get_count";
44

            
45
112
    let user_cond = get_user_cond(FN_NAME, &token_info, query.user.as_ref(), &state).await?;
46
108
    let cond = match get_list_cond(&query, &user_cond).await {
47
60
        Err(e) => return Err(e.into_response()),
48
48
        Ok(cond) => cond,
49
48
    };
50
48
    match state.model.coremgr_opdata().count(&cond).await {
51
        Err(e) => {
52
            error!("[{}] count error: {}", FN_NAME, e);
53
            Err(ErrResp::ErrDb(Some(e.to_string())).into_response())
54
        }
55
48
        Ok(count) => Ok(Json(response::GetCount {
56
48
            data: response::GetCountData { count },
57
48
        })),
58
    }
59
112
}
60

            
61
/// `GET /{base}/api/v1/coremgr-opdata/list`
62
204
pub async fn get_list(
63
204
    State(state): State<AppState>,
64
204
    Extension(token_info): Extension<GetTokenInfoData>,
65
204
    Query(query): Query<request::GetListQuery>,
66
204
) -> impl IntoResponse {
67
    const FN_NAME: &'static str = "get_list";
68

            
69
204
    let cond_query = request::GetCountQuery {
70
204
        user: query.user.clone(),
71
204
        tfield: query.tfield.clone(),
72
204
        tstart: query.tstart,
73
204
        tend: query.tend,
74
204
    };
75
204
    let user_cond = match get_user_cond(FN_NAME, &token_info, query.user.as_ref(), &state).await {
76
4
        Err(e) => return Ok(e),
77
200
        Ok(cond) => cond,
78
    };
79
200
    let cond = match get_list_cond(&cond_query, &user_cond).await {
80
60
        Err(e) => return Err(e),
81
140
        Ok(cond) => cond,
82
    };
83
140
    let sort_cond = match get_sort_cond(&query.sort) {
84
20
        Err(e) => return Err(e),
85
120
        Ok(cond) => cond,
86
    };
87
120
    let opts = ListOptions {
88
120
        cond: &cond,
89
120
        offset: query.offset,
90
120
        limit: match query.limit {
91
84
            None => Some(LIST_LIMIT_DEFAULT),
92
36
            Some(limit) => Some(limit),
93
        },
94
120
        sort: Some(sort_cond.as_slice()),
95
120
        cursor_max: Some(LIST_CURSOR_MAX),
96
    };
97

            
98
120
    let (list, cursor) = match state.model.coremgr_opdata().list(&opts, None).await {
99
        Err(e) => {
100
            error!("[{}] list error: {}", FN_NAME, e);
101
            return Err(ErrResp::ErrDb(Some(e.to_string())));
102
        }
103
120
        Ok((list, cursor)) => match cursor {
104
92
            None => match query.format.as_ref() {
105
                Some(request::ListFormat::Array) => {
106
4
                    return Ok(Json(list_transform(&list)).into_response())
107
                }
108
                Some(request::ListFormat::Csv) => {
109
4
                    let bytes = match list_transform_bytes(&list, true, true, query.format.as_ref())
110
                    {
111
                        Err(e) => {
112
                            let e = format!("transform CSV error: {}", e);
113
                            return Err(ErrResp::ErrUnknown(Some(e)));
114
                        }
115
4
                        Ok(bytes) => bytes,
116
4
                    };
117
4
                    return Ok((
118
4
                        [
119
4
                            (header::CONTENT_TYPE, ContentType::CSV),
120
4
                            (
121
4
                                header::CONTENT_DISPOSITION,
122
4
                                "attachment;filename=coremgr-opdata.csv",
123
4
                            ),
124
4
                        ],
125
4
                        bytes,
126
4
                    )
127
4
                        .into_response());
128
                }
129
                _ => {
130
84
                    return Ok(Json(response::GetList {
131
84
                        data: list_transform(&list),
132
84
                    })
133
84
                    .into_response())
134
                }
135
            },
136
28
            Some(_) => (list, cursor),
137
28
        },
138
28
    };
139
28

            
140
28
    let query_format = query.format.clone();
141
28
    let body = Body::from_stream(async_stream::stream! {
142
28
        let cond_query = request::GetCountQuery {
143
28
            user: query.user.clone(),
144
28
            tfield: query.tfield.clone(),
145
28
            tstart: query.tstart,
146
28
            tend: query.tend,
147
28
        };
148
28
        let cond = match get_list_cond(&cond_query, &user_cond).await {
149
28
            Err(_) => return,
150
28
            Ok(cond) => cond,
151
28
        };
152
28
        let opts = ListOptions {
153
28
            cond: &cond,
154
28
            offset: query.offset,
155
28
            limit: match query.limit {
156
28
                None => Some(LIST_LIMIT_DEFAULT),
157
28
                Some(limit) => Some(limit),
158
28
            },
159
28
            sort: Some(sort_cond.as_slice()),
160
28
            cursor_max: Some(LIST_CURSOR_MAX),
161
28
        };
162
28

            
163
28
        let mut list = list;
164
28
        let mut cursor = cursor;
165
28
        let mut is_first = true;
166
28
        loop {
167
28
            yield list_transform_bytes(&list, is_first, cursor.is_none(), query.format.as_ref());
168
28
            is_first = false;
169
28
            if cursor.is_none() {
170
28
                break;
171
28
            }
172
28
            let (_list, _cursor) = match state.model.coremgr_opdata().list(&opts, cursor).await {
173
28
                Err(_) => break,
174
28
                Ok((list, cursor)) => (list, cursor),
175
28
            };
176
28
            list = _list;
177
28
            cursor = _cursor;
178
28
        }
179
28
    });
180
8
    match query_format {
181
4
        Some(request::ListFormat::Csv) => Ok((
182
4
            [
183
4
                (header::CONTENT_TYPE, ContentType::CSV),
184
4
                (
185
4
                    header::CONTENT_DISPOSITION,
186
4
                    "attachment;filename=coremgr-opdata.csv",
187
4
                ),
188
4
            ],
189
4
            body,
190
4
        )
191
4
            .into_response()),
192
24
        _ => Ok(([(header::CONTENT_TYPE, ContentType::JSON)], body).into_response()),
193
    }
194
204
}
195

            
196
336
async fn get_list_cond<'a>(
197
336
    query: &'a request::GetCountQuery,
198
336
    user_id: &'a Option<String>,
199
336
) -> Result<ListQueryCond<'a>, ErrResp> {
200
336
    let mut cond = ListQueryCond {
201
336
        user_id: match user_id.as_ref() {
202
176
            None => None,
203
160
            Some(user_id) => Some(user_id.as_str()),
204
        },
205
336
        ..Default::default()
206
    };
207
336
    if let Some(start) = query.tstart.as_ref() {
208
144
        match query.tfield.as_ref() {
209
48
            None => return Err(ErrResp::ErrParam(Some("missing `tfield`".to_string()))),
210
96
            Some(tfield) => match tfield.as_str() {
211
96
                "req" => cond.req_gte = Some(Utc.timestamp_nanos(*start * 1000000)),
212
24
                "res" => cond.res_gte = Some(Utc.timestamp_nanos(*start * 1000000)),
213
24
                _ => return Err(ErrResp::ErrParam(Some("invalid `tfield`".to_string()))),
214
            },
215
        }
216
192
    }
217
264
    if let Some(end) = query.tend.as_ref() {
218
48
        match query.tfield.as_ref() {
219
24
            None => return Err(ErrResp::ErrParam(Some("missing `tfield`".to_string()))),
220
24
            Some(tfield) => match tfield.as_str() {
221
24
                "req" => cond.req_lte = Some(Utc.timestamp_nanos(*end * 1000000)),
222
24
                "res" => cond.res_lte = Some(Utc.timestamp_nanos(*end * 1000000)),
223
24
                _ => return Err(ErrResp::ErrParam(Some("invalid `tfield`".to_string()))),
224
            },
225
        }
226
216
    }
227

            
228
216
    Ok(cond)
229
336
}
230

            
231
316
async fn get_user_cond(
232
316
    fn_name: &str,
233
316
    token_info: &GetTokenInfoData,
234
316
    query_user: Option<&String>,
235
316
    state: &AppState,
236
316
) -> Result<Option<String>, Response> {
237
316
    if !Role::is_role(&token_info.roles, Role::ADMIN)
238
316
        && !Role::is_role(&token_info.roles, Role::MANAGER)
239
    {
240
144
        return Ok(Some(token_info.user_id.clone()));
241
172
    }
242
172
    let auth_base = state.auth_base.as_str();
243
172
    let client = state.client.clone();
244
172

            
245
172
    match query_user {
246
140
        None => Ok(None),
247
32
        Some(user_id) => match user_id.len() {
248
8
            0 => Ok(None),
249
            _ => {
250
24
                let token =
251
24
                    match HeaderValue::from_str(format!("Bearer {}", token_info.token).as_str()) {
252
                        Err(e) => {
253
                            error!("[{}] get token error: {}", fn_name, e);
254
                            return Err(ErrResp::ErrRsc(Some(format!("get token error: {}", e)))
255
                                .into_response());
256
                        }
257
24
                        Ok(value) => value,
258
24
                    };
259
24
                match get_user_inner(fn_name, &client, auth_base, user_id, &token).await {
260
                    Err(e) => {
261
                        error!("[{}] get unit error", fn_name);
262
                        return Err(e);
263
                    }
264
24
                    Ok(unit) => match unit {
265
                        None => {
266
8
                            return Err(ErrResp::Custom(
267
8
                                ErrReq::USER_NOT_EXIST.0,
268
8
                                ErrReq::USER_NOT_EXIST.1,
269
8
                                None,
270
8
                            )
271
8
                            .into_response())
272
                        }
273
16
                        Some(_) => Ok(Some(user_id.clone())),
274
                    },
275
                }
276
            }
277
        },
278
    }
279
316
}
280

            
281
140
fn get_sort_cond(sort_args: &Option<String>) -> Result<Vec<SortCond>, ErrResp> {
282
140
    match sort_args.as_ref() {
283
52
        None => Ok(vec![SortCond {
284
52
            key: SortKey::ReqTime,
285
52
            asc: false,
286
52
        }]),
287
88
        Some(args) => {
288
88
            let mut args = args.split(",");
289
88
            let mut sort_cond = vec![];
290
160
            while let Some(arg) = args.next() {
291
92
                let mut cond = arg.split(":");
292
92
                let key = match cond.next() {
293
                    None => return Err(ErrResp::ErrParam(Some("wrong sort argument".to_string()))),
294
92
                    Some(field) => match field {
295
92
                        "req" => SortKey::ReqTime,
296
28
                        "res" => SortKey::ResTime,
297
16
                        "latency" => SortKey::Latency,
298
                        _ => {
299
8
                            return Err(ErrResp::ErrParam(Some(format!(
300
8
                                "invalid sort key {}",
301
8
                                field
302
8
                            ))))
303
                        }
304
                    },
305
                };
306
84
                let asc = match cond.next() {
307
4
                    None => return Err(ErrResp::ErrParam(Some("wrong sort argument".to_string()))),
308
80
                    Some(asc) => match asc {
309
80
                        "asc" => true,
310
16
                        "desc" => false,
311
                        _ => {
312
4
                            return Err(ErrResp::ErrParam(Some(format!(
313
4
                                "invalid sort asc {}",
314
4
                                asc
315
4
                            ))))
316
                        }
317
                    },
318
                };
319
76
                if cond.next().is_some() {
320
4
                    return Err(ErrResp::ErrParam(Some(
321
4
                        "invalid sort condition".to_string(),
322
4
                    )));
323
72
                }
324
72
                sort_cond.push(SortCond { key, asc });
325
            }
326
68
            Ok(sort_cond)
327
        }
328
    }
329
140
}
330

            
331
88
fn list_transform(list: &Vec<CoremgrOpData>) -> Vec<response::GetListData> {
332
88
    let mut ret = vec![];
333
300
    for item in list.iter() {
334
300
        ret.push(data_transform(&item));
335
300
    }
336
88
    ret
337
88
}
338

            
339
64
fn list_transform_bytes(
340
64
    list: &Vec<CoremgrOpData>,
341
64
    with_start: bool,
342
64
    with_end: bool,
343
64
    format: Option<&request::ListFormat>,
344
64
) -> Result<Bytes, Box<dyn StdError + Send + Sync>> {
345
64
    let mut build_str = match with_start {
346
32
        false => "".to_string(),
347
12
        true => match format {
348
4
            Some(request::ListFormat::Array) => "[".to_string(),
349
            Some(request::ListFormat::Csv) => {
350
8
                let bom = String::from_utf8(vec![0xEF, 0xBB, 0xBF])?;
351
8
                format!("{}{}", bom, CSV_FIELDS)
352
            }
353
20
            _ => "{\"data\":[".to_string(),
354
        },
355
    };
356
64
    let mut is_first = with_start;
357

            
358
3740
    for item in list {
359
860
        match format {
360
            Some(request::ListFormat::Csv) => {
361
440
                let mut writer = WriterBuilder::new().has_headers(false).from_writer(vec![]);
362
440
                writer.serialize(data_transform_csv(item))?;
363
440
                build_str += String::from_utf8(writer.into_inner()?)?.as_str();
364
            }
365
            _ => {
366
3236
                if is_first {
367
24
                    is_first = false;
368
3212
                } else {
369
3212
                    build_str.push(',');
370
3212
                }
371
3236
                let json_str = match serde_json::to_string(&data_transform(item)) {
372
                    Err(e) => return Err(Box::new(e)),
373
3236
                    Ok(str) => str,
374
3236
                };
375
3236
                build_str += json_str.as_str();
376
            }
377
        }
378
    }
379

            
380
64
    if with_end {
381
32
        build_str += match format {
382
4
            Some(request::ListFormat::Array) => "]",
383
8
            Some(request::ListFormat::Csv) => "",
384
20
            _ => "]}",
385
        }
386
32
    }
387
64
    Ok(Bytes::copy_from_slice(build_str.as_str().as_bytes()))
388
64
}
389

            
390
3536
fn data_transform(data: &CoremgrOpData) -> response::GetListData {
391
3536
    response::GetListData {
392
3536
        data_id: data.data_id.clone(),
393
3536
        req_time: strings::time_str(&data.req_time),
394
3536
        res_time: strings::time_str(&data.res_time),
395
3536
        latency_ms: data.latency_ms,
396
3536
        status: data.status,
397
3536
        source_ip: data.source_ip.clone(),
398
3536
        method: data.method.clone(),
399
3536
        path: data.path.clone(),
400
3536
        body: data.body.clone(),
401
3536
        user_id: data.user_id.clone(),
402
3536
        client_id: data.client_id.clone(),
403
3536
        err_code: data.err_code.clone(),
404
3536
        err_message: data.err_message.clone(),
405
3536
    }
406
3536
}
407

            
408
440
fn data_transform_csv(data: &CoremgrOpData) -> response::GetListCsvData {
409
440
    response::GetListCsvData {
410
440
        data_id: data.data_id.clone(),
411
440
        req_time: strings::time_str(&data.req_time),
412
440
        res_time: strings::time_str(&data.res_time),
413
440
        latency_ms: data.latency_ms,
414
440
        status: data.status,
415
440
        source_ip: data.source_ip.clone(),
416
440
        method: data.method.clone(),
417
440
        path: data.path.clone(),
418
440
        body: match data.body.as_ref() {
419
416
            None => "".to_string(),
420
24
            Some(body) => match serde_json::to_string(body) {
421
                Err(_) => "".to_string(),
422
24
                Ok(body) => body,
423
            },
424
        },
425
440
        user_id: data.user_id.clone(),
426
440
        client_id: data.client_id.clone(),
427
440
        err_code: match data.err_code.as_ref() {
428
416
            None => "".to_string(),
429
24
            Some(err_code) => err_code.clone(),
430
        },
431
440
        err_message: match data.err_message.as_ref() {
432
416
            None => "".to_string(),
433
24
            Some(err_message) => err_message.clone(),
434
        },
435
    }
436
440
}