1
use std::error::Error as StdError;
2

            
3
use axum::{
4
    body::{Body, Bytes},
5
    extract::State,
6
    http::{header, HeaderValue},
7
    response::{IntoResponse, Response},
8
    Extension,
9
};
10
use chrono::{TimeZone, Utc};
11
use csv::WriterBuilder;
12
use log::error;
13
use serde_json;
14

            
15
use sylvia_iot_corelib::{
16
    constants::ContentType,
17
    err::ErrResp,
18
    http::{Json, Query},
19
    role::Role,
20
    strings,
21
};
22

            
23
use super::{
24
    super::{
25
        super::{middleware::GetTokenInfoData, ErrReq, State as AppState},
26
        get_user_inner,
27
    },
28
    request, response,
29
};
30
use crate::models::coremgr_opdata::{CoremgrOpData, ListOptions, ListQueryCond, SortCond, SortKey};
31

            
32
const LIST_LIMIT_DEFAULT: u64 = 100;
33
const LIST_CURSOR_MAX: u64 = 100;
34
const CSV_FIELDS: &'static str =
35
    "dataId,reqTime,resTime,latencyMs,status,method,path,body,userId,clientId,errCode,errMessage\n";
36

            
37
/// `GET /{base}/api/v1/coremgr-opdata/count`
38
56
pub async fn get_count(
39
56
    State(state): State<AppState>,
40
56
    Extension(token_info): Extension<GetTokenInfoData>,
41
56
    Query(query): Query<request::GetCountQuery>,
42
56
) -> impl IntoResponse {
43
    const FN_NAME: &'static str = "get_count";
44

            
45
56
    let user_cond = get_user_cond(FN_NAME, &token_info, query.user.as_ref(), &state).await?;
46
54
    let cond = match get_list_cond(&query, &user_cond).await {
47
30
        Err(e) => return Err(e.into_response()),
48
24
        Ok(cond) => cond,
49
24
    };
50
48
    match state.model.coremgr_opdata().count(&cond).await {
51
        Err(e) => {
52
            error!("[{}] count error: {}", FN_NAME, e);
53
            Err(ErrResp::ErrDb(Some(e.to_string())).into_response())
54
        }
55
24
        Ok(count) => Ok(Json(response::GetCount {
56
24
            data: response::GetCountData { count },
57
24
        })),
58
    }
59
56
}
60

            
61
/// `GET /{base}/api/v1/coremgr-opdata/list`
62
102
pub async fn get_list(
63
102
    State(state): State<AppState>,
64
102
    Extension(token_info): Extension<GetTokenInfoData>,
65
102
    Query(query): Query<request::GetListQuery>,
66
102
) -> impl IntoResponse {
67
    const FN_NAME: &'static str = "get_list";
68

            
69
102
    let cond_query = request::GetCountQuery {
70
102
        user: query.user.clone(),
71
102
        tfield: query.tfield.clone(),
72
102
        tstart: query.tstart,
73
102
        tend: query.tend,
74
102
    };
75
102
    let user_cond = match get_user_cond(FN_NAME, &token_info, query.user.as_ref(), &state).await {
76
2
        Err(e) => return Ok(e),
77
100
        Ok(cond) => cond,
78
    };
79
100
    let cond = match get_list_cond(&cond_query, &user_cond).await {
80
30
        Err(e) => return Err(e),
81
70
        Ok(cond) => cond,
82
    };
83
70
    let sort_cond = match get_sort_cond(&query.sort) {
84
10
        Err(e) => return Err(e),
85
60
        Ok(cond) => cond,
86
    };
87
60
    let opts = ListOptions {
88
60
        cond: &cond,
89
60
        offset: query.offset,
90
60
        limit: match query.limit {
91
42
            None => Some(LIST_LIMIT_DEFAULT),
92
18
            Some(limit) => Some(limit),
93
        },
94
60
        sort: Some(sort_cond.as_slice()),
95
60
        cursor_max: Some(LIST_CURSOR_MAX),
96
    };
97

            
98
120
    let (list, cursor) = match state.model.coremgr_opdata().list(&opts, None).await {
99
        Err(e) => {
100
            error!("[{}] list error: {}", FN_NAME, e);
101
            return Err(ErrResp::ErrDb(Some(e.to_string())));
102
        }
103
60
        Ok((list, cursor)) => match cursor {
104
46
            None => match query.format.as_ref() {
105
                Some(request::ListFormat::Array) => {
106
2
                    return Ok(Json(list_transform(&list)).into_response())
107
                }
108
                Some(request::ListFormat::Csv) => {
109
2
                    let bytes = match list_transform_bytes(&list, true, true, query.format.as_ref())
110
                    {
111
                        Err(e) => {
112
                            let e = format!("transform CSV error: {}", e);
113
                            return Err(ErrResp::ErrUnknown(Some(e)));
114
                        }
115
2
                        Ok(bytes) => bytes,
116
2
                    };
117
2
                    return Ok((
118
2
                        [
119
2
                            (header::CONTENT_TYPE, ContentType::CSV),
120
2
                            (
121
2
                                header::CONTENT_DISPOSITION,
122
2
                                "attachment;filename=coremgr-opdata.csv",
123
2
                            ),
124
2
                        ],
125
2
                        bytes,
126
2
                    )
127
2
                        .into_response());
128
                }
129
                _ => {
130
42
                    return Ok(Json(response::GetList {
131
42
                        data: list_transform(&list),
132
42
                    })
133
42
                    .into_response())
134
                }
135
            },
136
14
            Some(_) => (list, cursor),
137
14
        },
138
14
    };
139
14

            
140
14
    let query_format = query.format.clone();
141
14
    let body = Body::from_stream(async_stream::stream! {
142
14
        let cond_query = request::GetCountQuery {
143
14
            user: query.user.clone(),
144
14
            tfield: query.tfield.clone(),
145
14
            tstart: query.tstart,
146
14
            tend: query.tend,
147
14
        };
148
14
        let cond = match get_list_cond(&cond_query, &user_cond).await {
149
14
            Err(_) => return,
150
14
            Ok(cond) => cond,
151
14
        };
152
14
        let opts = ListOptions {
153
14
            cond: &cond,
154
14
            offset: query.offset,
155
14
            limit: match query.limit {
156
14
                None => Some(LIST_LIMIT_DEFAULT),
157
14
                Some(limit) => Some(limit),
158
14
            },
159
14
            sort: Some(sort_cond.as_slice()),
160
14
            cursor_max: Some(LIST_CURSOR_MAX),
161
14
        };
162
14

            
163
14
        let mut list = list;
164
14
        let mut cursor = cursor;
165
14
        let mut is_first = true;
166
14
        loop {
167
14
            yield list_transform_bytes(&list, is_first, cursor.is_none(), query.format.as_ref());
168
14
            is_first = false;
169
14
            if cursor.is_none() {
170
14
                break;
171
14
            }
172
14
            let (_list, _cursor) = match state.model.coremgr_opdata().list(&opts, cursor).await {
173
14
                Err(_) => break,
174
14
                Ok((list, cursor)) => (list, cursor),
175
14
            };
176
14
            list = _list;
177
14
            cursor = _cursor;
178
14
        }
179
14
    });
180
4
    match query_format {
181
2
        Some(request::ListFormat::Csv) => Ok((
182
2
            [
183
2
                (header::CONTENT_TYPE, ContentType::CSV),
184
2
                (
185
2
                    header::CONTENT_DISPOSITION,
186
2
                    "attachment;filename=coremgr-opdata.csv",
187
2
                ),
188
2
            ],
189
2
            body,
190
2
        )
191
2
            .into_response()),
192
12
        _ => Ok(([(header::CONTENT_TYPE, ContentType::JSON)], body).into_response()),
193
    }
194
102
}
195

            
196
168
async fn get_list_cond<'a>(
197
168
    query: &'a request::GetCountQuery,
198
168
    user_id: &'a Option<String>,
199
168
) -> Result<ListQueryCond<'a>, ErrResp> {
200
168
    let mut cond = ListQueryCond {
201
168
        user_id: match user_id.as_ref() {
202
88
            None => None,
203
80
            Some(user_id) => Some(user_id.as_str()),
204
        },
205
168
        ..Default::default()
206
    };
207
168
    if let Some(start) = query.tstart.as_ref() {
208
72
        match query.tfield.as_ref() {
209
24
            None => return Err(ErrResp::ErrParam(Some("missing `tfield`".to_string()))),
210
48
            Some(tfield) => match tfield.as_str() {
211
48
                "req" => cond.req_gte = Some(Utc.timestamp_nanos(*start * 1000000)),
212
12
                "res" => cond.res_gte = Some(Utc.timestamp_nanos(*start * 1000000)),
213
12
                _ => return Err(ErrResp::ErrParam(Some("invalid `tfield`".to_string()))),
214
            },
215
        }
216
96
    }
217
132
    if let Some(end) = query.tend.as_ref() {
218
24
        match query.tfield.as_ref() {
219
12
            None => return Err(ErrResp::ErrParam(Some("missing `tfield`".to_string()))),
220
12
            Some(tfield) => match tfield.as_str() {
221
12
                "req" => cond.req_lte = Some(Utc.timestamp_nanos(*end * 1000000)),
222
12
                "res" => cond.res_lte = Some(Utc.timestamp_nanos(*end * 1000000)),
223
12
                _ => return Err(ErrResp::ErrParam(Some("invalid `tfield`".to_string()))),
224
            },
225
        }
226
108
    }
227

            
228
108
    Ok(cond)
229
168
}
230

            
231
158
async fn get_user_cond(
232
158
    fn_name: &str,
233
158
    token_info: &GetTokenInfoData,
234
158
    query_user: Option<&String>,
235
158
    state: &AppState,
236
158
) -> Result<Option<String>, Response> {
237
158
    if !Role::is_role(&token_info.roles, Role::ADMIN)
238
158
        && !Role::is_role(&token_info.roles, Role::MANAGER)
239
    {
240
72
        return Ok(Some(token_info.user_id.clone()));
241
86
    }
242
86
    let auth_base = state.auth_base.as_str();
243
86
    let client = state.client.clone();
244
86

            
245
86
    match query_user {
246
70
        None => Ok(None),
247
16
        Some(user_id) => match user_id.len() {
248
4
            0 => Ok(None),
249
            _ => {
250
12
                let token =
251
12
                    match HeaderValue::from_str(format!("Bearer {}", token_info.token).as_str()) {
252
                        Err(e) => {
253
                            error!("[{}] get token error: {}", fn_name, e);
254
                            return Err(ErrResp::ErrRsc(Some(format!("get token error: {}", e)))
255
                                .into_response());
256
                        }
257
12
                        Ok(value) => value,
258
12
                    };
259
12
                match get_user_inner(fn_name, &client, auth_base, user_id, &token).await {
260
                    Err(e) => {
261
                        error!("[{}] get unit error", fn_name);
262
                        return Err(e);
263
                    }
264
12
                    Ok(unit) => match unit {
265
                        None => {
266
4
                            return Err(ErrResp::Custom(
267
4
                                ErrReq::USER_NOT_EXIST.0,
268
4
                                ErrReq::USER_NOT_EXIST.1,
269
4
                                None,
270
4
                            )
271
4
                            .into_response())
272
                        }
273
8
                        Some(_) => Ok(Some(user_id.clone())),
274
                    },
275
                }
276
            }
277
        },
278
    }
279
158
}
280

            
281
70
fn get_sort_cond(sort_args: &Option<String>) -> Result<Vec<SortCond>, ErrResp> {
282
70
    match sort_args.as_ref() {
283
26
        None => Ok(vec![SortCond {
284
26
            key: SortKey::ReqTime,
285
26
            asc: false,
286
26
        }]),
287
44
        Some(args) => {
288
44
            let mut args = args.split(",");
289
44
            let mut sort_cond = vec![];
290
80
            while let Some(arg) = args.next() {
291
46
                let mut cond = arg.split(":");
292
46
                let key = match cond.next() {
293
                    None => return Err(ErrResp::ErrParam(Some("wrong sort argument".to_string()))),
294
46
                    Some(field) => match field {
295
46
                        "req" => SortKey::ReqTime,
296
14
                        "res" => SortKey::ResTime,
297
8
                        "latency" => SortKey::Latency,
298
                        _ => {
299
4
                            return Err(ErrResp::ErrParam(Some(format!(
300
4
                                "invalid sort key {}",
301
4
                                field
302
4
                            ))))
303
                        }
304
                    },
305
                };
306
42
                let asc = match cond.next() {
307
2
                    None => return Err(ErrResp::ErrParam(Some("wrong sort argument".to_string()))),
308
40
                    Some(asc) => match asc {
309
40
                        "asc" => true,
310
8
                        "desc" => false,
311
                        _ => {
312
2
                            return Err(ErrResp::ErrParam(Some(format!(
313
2
                                "invalid sort asc {}",
314
2
                                asc
315
2
                            ))))
316
                        }
317
                    },
318
                };
319
38
                if cond.next().is_some() {
320
2
                    return Err(ErrResp::ErrParam(Some(
321
2
                        "invalid sort condition".to_string(),
322
2
                    )));
323
36
                }
324
36
                sort_cond.push(SortCond { key, asc });
325
            }
326
34
            Ok(sort_cond)
327
        }
328
    }
329
70
}
330

            
331
44
fn list_transform(list: &Vec<CoremgrOpData>) -> Vec<response::GetListData> {
332
44
    let mut ret = vec![];
333
150
    for item in list.iter() {
334
150
        ret.push(data_transform(&item));
335
150
    }
336
44
    ret
337
44
}
338

            
339
32
fn list_transform_bytes(
340
32
    list: &Vec<CoremgrOpData>,
341
32
    with_start: bool,
342
32
    with_end: bool,
343
32
    format: Option<&request::ListFormat>,
344
32
) -> Result<Bytes, Box<dyn StdError + Send + Sync>> {
345
32
    let mut build_str = match with_start {
346
16
        false => "".to_string(),
347
6
        true => match format {
348
2
            Some(request::ListFormat::Array) => "[".to_string(),
349
            Some(request::ListFormat::Csv) => {
350
4
                let bom = String::from_utf8(vec![0xEF, 0xBB, 0xBF])?;
351
4
                format!("{}{}", bom, CSV_FIELDS)
352
            }
353
10
            _ => "{\"data\":[".to_string(),
354
        },
355
    };
356
32
    let mut is_first = with_start;
357

            
358
1870
    for item in list {
359
430
        match format {
360
            Some(request::ListFormat::Csv) => {
361
220
                let mut writer = WriterBuilder::new().has_headers(false).from_writer(vec![]);
362
220
                writer.serialize(data_transform_csv(item))?;
363
220
                build_str += String::from_utf8(writer.into_inner()?)?.as_str();
364
            }
365
            _ => {
366
1618
                if is_first {
367
12
                    is_first = false;
368
1606
                } else {
369
1606
                    build_str.push(',');
370
1606
                }
371
1618
                let json_str = match serde_json::to_string(&data_transform(item)) {
372
                    Err(e) => return Err(Box::new(e)),
373
1618
                    Ok(str) => str,
374
1618
                };
375
1618
                build_str += json_str.as_str();
376
            }
377
        }
378
    }
379

            
380
32
    if with_end {
381
16
        build_str += match format {
382
2
            Some(request::ListFormat::Array) => "]",
383
4
            Some(request::ListFormat::Csv) => "",
384
10
            _ => "]}",
385
        }
386
16
    }
387
32
    Ok(Bytes::copy_from_slice(build_str.as_str().as_bytes()))
388
32
}
389

            
390
1768
fn data_transform(data: &CoremgrOpData) -> response::GetListData {
391
1768
    response::GetListData {
392
1768
        data_id: data.data_id.clone(),
393
1768
        req_time: strings::time_str(&data.req_time),
394
1768
        res_time: strings::time_str(&data.res_time),
395
1768
        latency_ms: data.latency_ms,
396
1768
        status: data.status,
397
1768
        source_ip: data.source_ip.clone(),
398
1768
        method: data.method.clone(),
399
1768
        path: data.path.clone(),
400
1768
        body: data.body.clone(),
401
1768
        user_id: data.user_id.clone(),
402
1768
        client_id: data.client_id.clone(),
403
1768
        err_code: data.err_code.clone(),
404
1768
        err_message: data.err_message.clone(),
405
1768
    }
406
1768
}
407

            
408
220
fn data_transform_csv(data: &CoremgrOpData) -> response::GetListCsvData {
409
220
    response::GetListCsvData {
410
220
        data_id: data.data_id.clone(),
411
220
        req_time: strings::time_str(&data.req_time),
412
220
        res_time: strings::time_str(&data.res_time),
413
220
        latency_ms: data.latency_ms,
414
220
        status: data.status,
415
220
        source_ip: data.source_ip.clone(),
416
220
        method: data.method.clone(),
417
220
        path: data.path.clone(),
418
220
        body: match data.body.as_ref() {
419
208
            None => "".to_string(),
420
12
            Some(body) => match serde_json::to_string(body) {
421
                Err(_) => "".to_string(),
422
12
                Ok(body) => body,
423
            },
424
        },
425
220
        user_id: data.user_id.clone(),
426
220
        client_id: data.client_id.clone(),
427
220
        err_code: match data.err_code.as_ref() {
428
208
            None => "".to_string(),
429
12
            Some(err_code) => err_code.clone(),
430
        },
431
220
        err_message: match data.err_message.as_ref() {
432
208
            None => "".to_string(),
433
12
            Some(err_message) => err_message.clone(),
434
        },
435
    }
436
220
}