1
use std::error::Error as StdError;
2

            
3
use actix_web::{
4
    http::header::{self, HeaderValue},
5
    web::{self, Bytes},
6
    HttpMessage, HttpRequest, HttpResponse, Responder, ResponseError,
7
};
8
use chrono::{TimeZone, Utc};
9
use csv::WriterBuilder;
10
use log::error;
11
use serde_json;
12

            
13
use sylvia_iot_corelib::{err::ErrResp, role::Role, strings};
14

            
15
use super::{
16
    super::{
17
        super::{middleware::FullTokenInfo, ErrReq, State},
18
        get_user_inner,
19
    },
20
    request, response,
21
};
22
use crate::models::coremgr_opdata::{CoremgrOpData, ListOptions, ListQueryCond, SortCond, SortKey};
23

            
24
const LIST_LIMIT_DEFAULT: u64 = 100;
25
const LIST_CURSOR_MAX: u64 = 100;
26
const CSV_FIELDS: &'static str =
27
    "dataId,reqTime,resTime,latencyMs,status,method,path,body,userId,clientId,errCode,errMessage\n";
28

            
29
/// `GET /{base}/api/v1/coremgr-opdata/count`
30
56
pub async fn get_count(
31
56
    req: HttpRequest,
32
56
    query: web::Query<request::GetCountQuery>,
33
56
    state: web::Data<State>,
34
56
) -> impl Responder {
35
    const FN_NAME: &'static str = "get_count";
36

            
37
56
    let user_cond = match get_user_cond(FN_NAME, &req, query.user.as_ref(), &state).await {
38
2
        Err(e) => return e,
39
54
        Ok(cond) => cond,
40
    };
41
54
    let cond = match get_list_cond(&query, &user_cond).await {
42
30
        Err(e) => return e.error_response(),
43
24
        Ok(cond) => cond,
44
24
    };
45
48
    match state.model.coremgr_opdata().count(&cond).await {
46
        Err(e) => {
47
            error!("[{}] count error: {}", FN_NAME, e);
48
            ErrResp::ErrDb(Some(e.to_string())).error_response()
49
        }
50
24
        Ok(count) => HttpResponse::Ok().json(response::GetCount {
51
24
            data: response::GetCountData { count },
52
24
        }),
53
    }
54
56
}
55

            
56
/// `GET /{base}/api/v1/coremgr-opdata/list`
57
102
pub async fn get_list(
58
102
    req: HttpRequest,
59
102
    query: web::Query<request::GetListQuery>,
60
102
    state: web::Data<State>,
61
102
) -> impl Responder {
62
102
    const FN_NAME: &'static str = "get_list";
63
102

            
64
102
    let cond_query = request::GetCountQuery {
65
102
        user: query.user.clone(),
66
102
        tfield: query.tfield.clone(),
67
102
        tstart: query.tstart,
68
102
        tend: query.tend,
69
102
    };
70
102
    let user_cond = match get_user_cond(FN_NAME, &req, query.user.as_ref(), &state).await {
71
2
        Err(e) => return Ok(e),
72
100
        Ok(cond) => cond,
73
    };
74
100
    let cond = match get_list_cond(&cond_query, &user_cond).await {
75
30
        Err(e) => return Err(e),
76
70
        Ok(cond) => cond,
77
    };
78
70
    let sort_cond = match get_sort_cond(&query.sort) {
79
10
        Err(e) => return Err(e),
80
60
        Ok(cond) => cond,
81
    };
82
60
    let opts = ListOptions {
83
60
        cond: &cond,
84
60
        offset: query.offset,
85
60
        limit: match query.limit {
86
42
            None => Some(LIST_LIMIT_DEFAULT),
87
18
            Some(limit) => Some(limit),
88
        },
89
60
        sort: Some(sort_cond.as_slice()),
90
60
        cursor_max: Some(LIST_CURSOR_MAX),
91
    };
92

            
93
120
    let (list, cursor) = match state.model.coremgr_opdata().list(&opts, None).await {
94
        Err(e) => {
95
            error!("[{}] list error: {}", FN_NAME, e);
96
            return Err(ErrResp::ErrDb(Some(e.to_string())));
97
        }
98
60
        Ok((list, cursor)) => match cursor {
99
46
            None => match query.format.as_ref() {
100
                Some(request::ListFormat::Array) => {
101
2
                    return Ok(HttpResponse::Ok().json(list_transform(&list)))
102
                }
103
                Some(request::ListFormat::Csv) => {
104
2
                    let bytes = match list_transform_bytes(&list, true, true, query.format.as_ref())
105
                    {
106
                        Err(e) => {
107
                            return Err(ErrResp::ErrUnknown(Some(format!(
108
                                "transform CSV error: {}",
109
                                e
110
                            ))))
111
                        }
112
2
                        Ok(bytes) => bytes,
113
2
                    };
114
2
                    return Ok(HttpResponse::Ok()
115
2
                        .insert_header((header::CONTENT_TYPE, "text/csv"))
116
2
                        .insert_header((
117
2
                            header::CONTENT_DISPOSITION,
118
2
                            "attachment;filename=coremgr-opdata.csv",
119
2
                        ))
120
2
                        .body(bytes));
121
                }
122
                _ => {
123
42
                    return Ok(HttpResponse::Ok().json(response::GetList {
124
42
                        data: list_transform(&list),
125
42
                    }))
126
                }
127
            },
128
14
            Some(_) => (list, cursor),
129
14
        },
130
14
    };
131
14

            
132
14
    // TODO: detect client disconnect
133
14
    let query_format = query.format.clone();
134
14
    let stream = async_stream::stream! {
135
14
        let query = query.0.clone();
136
14
        let cond_query = request::GetCountQuery {
137
14
            user: query.user.clone(),
138
14
            tfield: query.tfield.clone(),
139
14
            tstart: query.tstart,
140
14
            tend: query.tend,
141
14
        };
142
14
        let cond = match get_list_cond(&cond_query, &user_cond).await {
143
            Err(_) => return,
144
14
            Ok(cond) => cond,
145
        };
146
14
        let opts = ListOptions {
147
14
            cond: &cond,
148
14
            offset: query.offset,
149
14
            limit: match query.limit {
150
4
                None => Some(LIST_LIMIT_DEFAULT),
151
10
                Some(limit) => Some(limit),
152
            },
153
14
            sort: Some(sort_cond.as_slice()),
154
14
            cursor_max: Some(LIST_CURSOR_MAX),
155
14
        };
156
14

            
157
14
        let mut list = list;
158
14
        let mut cursor = cursor;
159
14
        let mut is_first = true;
160
        loop {
161
30
            yield list_transform_bytes(&list, is_first, cursor.is_none(), query.format.as_ref());
162
30
            is_first = false;
163
30
            if cursor.is_none() {
164
14
                break;
165
16
            }
166
22
            let (_list, _cursor) = match state.model.coremgr_opdata().list(&opts, cursor).await {
167
                Err(_) => break,
168
16
                Ok((list, cursor)) => (list, cursor),
169
16
            };
170
16
            list = _list;
171
16
            cursor = _cursor;
172
        }
173
    };
174
4
    match query_format {
175
2
        Some(request::ListFormat::Csv) => Ok(HttpResponse::Ok()
176
2
            .insert_header((header::CONTENT_TYPE, "text/csv"))
177
2
            .insert_header((
178
2
                header::CONTENT_DISPOSITION,
179
2
                "attachment;filename=coremgr-opdata.csv",
180
2
            ))
181
2
            .streaming(stream)),
182
12
        _ => Ok(HttpResponse::Ok().streaming(stream)),
183
    }
184
102
}
185

            
186
168
async fn get_list_cond<'a>(
187
168
    query: &'a request::GetCountQuery,
188
168
    user_id: &'a Option<String>,
189
168
) -> Result<ListQueryCond<'a>, ErrResp> {
190
168
    let mut cond = ListQueryCond {
191
168
        user_id: match user_id.as_ref() {
192
88
            None => None,
193
80
            Some(user_id) => Some(user_id.as_str()),
194
        },
195
168
        ..Default::default()
196
    };
197
168
    if let Some(start) = query.tstart.as_ref() {
198
72
        match query.tfield.as_ref() {
199
24
            None => return Err(ErrResp::ErrParam(Some("missing `tfield`".to_string()))),
200
48
            Some(tfield) => match tfield.as_str() {
201
48
                "req" => cond.req_gte = Some(Utc.timestamp_nanos(*start * 1000000)),
202
12
                "res" => cond.res_gte = Some(Utc.timestamp_nanos(*start * 1000000)),
203
12
                _ => return Err(ErrResp::ErrParam(Some("invalid `tfield`".to_string()))),
204
            },
205
        }
206
96
    }
207
132
    if let Some(end) = query.tend.as_ref() {
208
24
        match query.tfield.as_ref() {
209
12
            None => return Err(ErrResp::ErrParam(Some("missing `tfield`".to_string()))),
210
12
            Some(tfield) => match tfield.as_str() {
211
12
                "req" => cond.req_lte = Some(Utc.timestamp_nanos(*end * 1000000)),
212
12
                "res" => cond.res_lte = Some(Utc.timestamp_nanos(*end * 1000000)),
213
12
                _ => return Err(ErrResp::ErrParam(Some("invalid `tfield`".to_string()))),
214
            },
215
        }
216
108
    }
217

            
218
108
    Ok(cond)
219
168
}
220

            
221
158
async fn get_user_cond(
222
158
    fn_name: &str,
223
158
    req: &HttpRequest,
224
158
    query_user: Option<&String>,
225
158
    state: &web::Data<State>,
226
158
) -> Result<Option<String>, HttpResponse> {
227
158
    let token_info = match req.extensions_mut().get::<FullTokenInfo>() {
228
        None => {
229
            error!("[{}] token not found", fn_name);
230
            return Err(
231
                ErrResp::ErrUnknown(Some("token info not found".to_string())).error_response(),
232
            );
233
        }
234
158
        Some(token_info) => {
235
158
            if !Role::is_role(&token_info.info.roles, Role::ADMIN)
236
158
                && !Role::is_role(&token_info.info.roles, Role::MANAGER)
237
            {
238
72
                return Ok(Some(token_info.info.user_id.clone()));
239
86
            }
240
86
            token_info.clone()
241
86
        }
242
86
    };
243
86
    let auth_base = state.auth_base.as_str();
244
86
    let client = state.client.clone();
245
86

            
246
86
    match query_user {
247
70
        None => Ok(None),
248
16
        Some(user_id) => match user_id.len() {
249
4
            0 => Ok(None),
250
            _ => {
251
12
                let token = match HeaderValue::from_str(token_info.token.as_str()) {
252
                    Err(e) => {
253
                        error!("[{}] get token error: {}", fn_name, e);
254
                        return Err(ErrResp::ErrUnknown(Some(format!("get token error: {}", e)))
255
                            .error_response());
256
                    }
257
12
                    Ok(value) => value,
258
12
                };
259
12
                match get_user_inner(fn_name, &client, auth_base, user_id, &token).await {
260
                    Err(e) => {
261
                        error!("[{}] get unit error", fn_name);
262
                        return Err(e);
263
                    }
264
12
                    Ok(unit) => match unit {
265
                        None => {
266
4
                            return Err(ErrResp::Custom(
267
4
                                ErrReq::USER_NOT_EXIST.0,
268
4
                                ErrReq::USER_NOT_EXIST.1,
269
4
                                None,
270
4
                            )
271
4
                            .error_response())
272
                        }
273
8
                        Some(_) => Ok(Some(user_id.clone())),
274
                    },
275
                }
276
            }
277
        },
278
    }
279
158
}
280

            
281
70
fn get_sort_cond(sort_args: &Option<String>) -> Result<Vec<SortCond>, ErrResp> {
282
70
    match sort_args.as_ref() {
283
26
        None => Ok(vec![SortCond {
284
26
            key: SortKey::ReqTime,
285
26
            asc: false,
286
26
        }]),
287
44
        Some(args) => {
288
44
            let mut args = args.split(",");
289
44
            let mut sort_cond = vec![];
290
80
            while let Some(arg) = args.next() {
291
46
                let mut cond = arg.split(":");
292
46
                let key = match cond.next() {
293
                    None => return Err(ErrResp::ErrParam(Some("wrong sort argument".to_string()))),
294
46
                    Some(field) => match field {
295
46
                        "req" => SortKey::ReqTime,
296
14
                        "res" => SortKey::ResTime,
297
8
                        "latency" => SortKey::Latency,
298
                        _ => {
299
4
                            return Err(ErrResp::ErrParam(Some(format!(
300
4
                                "invalid sort key {}",
301
4
                                field
302
4
                            ))))
303
                        }
304
                    },
305
                };
306
42
                let asc = match cond.next() {
307
2
                    None => return Err(ErrResp::ErrParam(Some("wrong sort argument".to_string()))),
308
40
                    Some(asc) => match asc {
309
40
                        "asc" => true,
310
8
                        "desc" => false,
311
                        _ => {
312
2
                            return Err(ErrResp::ErrParam(Some(format!(
313
2
                                "invalid sort asc {}",
314
2
                                asc
315
2
                            ))))
316
                        }
317
                    },
318
                };
319
38
                if cond.next().is_some() {
320
2
                    return Err(ErrResp::ErrParam(Some(
321
2
                        "invalid sort condition".to_string(),
322
2
                    )));
323
36
                }
324
36
                sort_cond.push(SortCond { key, asc });
325
            }
326
34
            Ok(sort_cond)
327
        }
328
    }
329
70
}
330

            
331
44
fn list_transform(list: &Vec<CoremgrOpData>) -> Vec<response::GetListData> {
332
44
    let mut ret = vec![];
333
150
    for item in list.iter() {
334
150
        ret.push(data_transform(&item));
335
150
    }
336
44
    ret
337
44
}
338

            
339
32
fn list_transform_bytes(
340
32
    list: &Vec<CoremgrOpData>,
341
32
    with_start: bool,
342
32
    with_end: bool,
343
32
    format: Option<&request::ListFormat>,
344
32
) -> Result<Bytes, Box<dyn StdError>> {
345
32
    let mut build_str = match with_start {
346
16
        false => "".to_string(),
347
6
        true => match format {
348
2
            Some(request::ListFormat::Array) => "[".to_string(),
349
            Some(request::ListFormat::Csv) => {
350
4
                let bom = String::from_utf8(vec![0xEF, 0xBB, 0xBF])?;
351
4
                format!("{}{}", bom, CSV_FIELDS)
352
            }
353
10
            _ => "{\"data\":[".to_string(),
354
        },
355
    };
356
32
    let mut is_first = with_start;
357

            
358
1870
    for item in list {
359
430
        match format {
360
            Some(request::ListFormat::Csv) => {
361
220
                let mut writer = WriterBuilder::new().has_headers(false).from_writer(vec![]);
362
220
                writer.serialize(data_transform_csv(item))?;
363
220
                build_str += String::from_utf8(writer.into_inner()?)?.as_str();
364
            }
365
            _ => {
366
1618
                if is_first {
367
12
                    is_first = false;
368
1606
                } else {
369
1606
                    build_str.push(',');
370
1606
                }
371
1618
                let json_str = match serde_json::to_string(&data_transform(item)) {
372
                    Err(e) => return Err(Box::new(e)),
373
1618
                    Ok(str) => str,
374
1618
                };
375
1618
                build_str += json_str.as_str();
376
            }
377
        }
378
    }
379

            
380
32
    if with_end {
381
16
        build_str += match format {
382
2
            Some(request::ListFormat::Array) => "]",
383
4
            Some(request::ListFormat::Csv) => "",
384
10
            _ => "]}",
385
        }
386
16
    }
387
32
    Ok(Bytes::copy_from_slice(build_str.as_str().as_bytes()))
388
32
}
389

            
390
1768
fn data_transform(data: &CoremgrOpData) -> response::GetListData {
391
1768
    response::GetListData {
392
1768
        data_id: data.data_id.clone(),
393
1768
        req_time: strings::time_str(&data.req_time),
394
1768
        res_time: strings::time_str(&data.res_time),
395
1768
        latency_ms: data.latency_ms,
396
1768
        status: data.status,
397
1768
        source_ip: data.source_ip.clone(),
398
1768
        method: data.method.clone(),
399
1768
        path: data.path.clone(),
400
1768
        body: data.body.clone(),
401
1768
        user_id: data.user_id.clone(),
402
1768
        client_id: data.client_id.clone(),
403
1768
        err_code: data.err_code.clone(),
404
1768
        err_message: data.err_message.clone(),
405
1768
    }
406
1768
}
407

            
408
220
fn data_transform_csv(data: &CoremgrOpData) -> response::GetListCsvData {
409
220
    response::GetListCsvData {
410
220
        data_id: data.data_id.clone(),
411
220
        req_time: strings::time_str(&data.req_time),
412
220
        res_time: strings::time_str(&data.res_time),
413
220
        latency_ms: data.latency_ms,
414
220
        status: data.status,
415
220
        source_ip: data.source_ip.clone(),
416
220
        method: data.method.clone(),
417
220
        path: data.path.clone(),
418
220
        body: match data.body.as_ref() {
419
208
            None => "".to_string(),
420
12
            Some(body) => match serde_json::to_string(body) {
421
                Err(_) => "".to_string(),
422
12
                Ok(body) => body,
423
            },
424
        },
425
220
        user_id: data.user_id.clone(),
426
220
        client_id: data.client_id.clone(),
427
220
        err_code: match data.err_code.as_ref() {
428
208
            None => "".to_string(),
429
12
            Some(err_code) => err_code.clone(),
430
        },
431
220
        err_message: match data.err_message.as_ref() {
432
208
            None => "".to_string(),
433
12
            Some(err_message) => err_message.clone(),
434
        },
435
    }
436
220
}