1
use std::error::Error as StdError;
2

            
3
use axum::{
4
    Extension,
5
    body::{Body, Bytes},
6
    extract::State,
7
    http::header,
8
    response::IntoResponse,
9
};
10
use chrono::{TimeZone, Utc};
11
use csv::WriterBuilder;
12
use log::error;
13
use serde_json;
14

            
15
use sylvia_iot_corelib::{
16
    constants::ContentType,
17
    err::ErrResp,
18
    http::{Json, Query},
19
    strings,
20
};
21

            
22
use super::{
23
    super::{
24
        super::{State as AppState, middleware::GetTokenInfoData},
25
        get_unit_cond,
26
    },
27
    request, response,
28
};
29
use crate::models::application_dldata::{
30
    ApplicationDlData, ListOptions, ListQueryCond, SortCond, SortKey,
31
};
32

            
33
const LIST_LIMIT_DEFAULT: u64 = 100;
34
const LIST_CURSOR_MAX: u64 = 100;
35
const CSV_FIELDS: &'static str =
36
    "dataId,proc,resp,status,unitId,deviceId,networkCode,networkAddr,profile,data,extension\n";
37

            
38
/// `GET /{base}/api/v1/application-dldata/count`
39
192
pub async fn get_count(
40
192
    State(state): State<AppState>,
41
192
    Extension(token_info): Extension<GetTokenInfoData>,
42
192
    Query(mut query): Query<request::GetCountQuery>,
43
192
) -> impl IntoResponse {
44
    const FN_NAME: &'static str = "get_count";
45

            
46
192
    if let Some(network) = query.network {
47
12
        query.network = Some(network.to_lowercase());
48
180
    }
49
192
    if let Some(addr) = query.addr {
50
8
        query.addr = Some(addr.to_lowercase());
51
184
    }
52

            
53
192
    let unit_cond = get_unit_cond(FN_NAME, &token_info, query.unit.as_ref(), &state).await?;
54
176
    let cond = match get_list_cond(&query, &unit_cond).await {
55
60
        Err(e) => return Err(e.into_response()),
56
116
        Ok(cond) => cond,
57
116
    };
58
116
    match state.model.application_dldata().count(&cond).await {
59
        Err(e) => {
60
            error!("[{}] count error: {}", FN_NAME, e);
61
            Err(ErrResp::ErrDb(Some(e.to_string())).into_response())
62
        }
63
116
        Ok(count) => Ok(Json(response::GetCount {
64
116
            data: response::GetCountData { count },
65
116
        })),
66
    }
67
192
}
68

            
69
/// `GET /{base}/api/v1/application-dldata/list`
70
292
pub async fn get_list(
71
292
    State(state): State<AppState>,
72
292
    Extension(token_info): Extension<GetTokenInfoData>,
73
292
    Query(query): Query<request::GetListQuery>,
74
292
) -> impl IntoResponse {
75
    const FN_NAME: &'static str = "get_list";
76

            
77
292
    let cond_query = request::GetCountQuery {
78
292
        unit: query.unit.clone(),
79
292
        device: query.device.clone(),
80
292
        network: match query.network.as_ref() {
81
280
            None => None,
82
12
            Some(network) => Some(network.to_lowercase()),
83
        },
84
292
        addr: match query.addr.as_ref() {
85
284
            None => None,
86
8
            Some(addr) => Some(addr.to_lowercase()),
87
        },
88
292
        profile: query.profile.clone(),
89
292
        tfield: query.tfield.clone(),
90
292
        tstart: query.tstart,
91
292
        tend: query.tend,
92
    };
93
292
    let unit_cond = match get_unit_cond(FN_NAME, &token_info, query.unit.as_ref(), &state).await {
94
16
        Err(e) => return Ok(e),
95
276
        Ok(cond) => cond,
96
    };
97
276
    let cond = match get_list_cond(&cond_query, &unit_cond).await {
98
60
        Err(e) => return Err(e),
99
216
        Ok(cond) => cond,
100
    };
101
216
    let sort_cond = match get_sort_cond(&query.sort) {
102
20
        Err(e) => return Err(e),
103
196
        Ok(cond) => cond,
104
    };
105
196
    let opts = ListOptions {
106
196
        cond: &cond,
107
196
        offset: query.offset,
108
196
        limit: match query.limit {
109
160
            None => Some(LIST_LIMIT_DEFAULT),
110
36
            Some(limit) => Some(limit),
111
        },
112
196
        sort: Some(sort_cond.as_slice()),
113
196
        cursor_max: Some(LIST_CURSOR_MAX),
114
    };
115

            
116
196
    let (list, cursor) = match state.model.application_dldata().list(&opts, None).await {
117
        Err(e) => {
118
            error!("[{}] list error: {}", FN_NAME, e);
119
            return Err(ErrResp::ErrDb(Some(e.to_string())));
120
        }
121
196
        Ok((list, cursor)) => match cursor {
122
168
            None => match query.format.as_ref() {
123
                Some(request::ListFormat::Array) => {
124
4
                    return Ok(Json(list_transform(&list)).into_response());
125
                }
126
                Some(request::ListFormat::Csv) => {
127
4
                    let bytes = match list_transform_bytes(&list, true, true, query.format.as_ref())
128
                    {
129
                        Err(e) => {
130
                            let e = format!("transform CSV error: {}", e);
131
                            return Err(ErrResp::ErrUnknown(Some(e)));
132
                        }
133
4
                        Ok(bytes) => bytes,
134
4
                    };
135
4
                    return Ok((
136
4
                        [
137
4
                            (header::CONTENT_TYPE, ContentType::CSV),
138
4
                            (
139
4
                                header::CONTENT_DISPOSITION,
140
4
                                "attachment;filename=application-dldata.csv",
141
4
                            ),
142
4
                        ],
143
4
                        bytes,
144
4
                    )
145
4
                        .into_response());
146
                }
147
                _ => {
148
160
                    return Ok(Json(response::GetList {
149
160
                        data: list_transform(&list),
150
160
                    })
151
160
                    .into_response());
152
                }
153
            },
154
28
            Some(_) => (list, cursor),
155
28
        },
156
28
    };
157
28

            
158
28
    let query_format = query.format.clone();
159
28
    let body = Body::from_stream(async_stream::stream! {
160
28
        let cond_query = request::GetCountQuery {
161
28
            unit: query.unit.clone(),
162
28
            device: query.device.clone(),
163
28
            network: query.network.clone(),
164
28
            addr: query.addr.clone(),
165
28
            profile: query.profile.clone(),
166
28
            tfield: query.tfield.clone(),
167
28
            tstart: query.tstart,
168
28
            tend: query.tend,
169
28
        };
170
28
        let cond = match get_list_cond(&cond_query, &unit_cond).await {
171
28
            Err(_) => return,
172
28
            Ok(cond) => cond,
173
28
        };
174
28
        let opts = ListOptions {
175
28
            cond: &cond,
176
28
            offset: query.offset,
177
28
            limit: match query.limit {
178
28
                None => Some(LIST_LIMIT_DEFAULT),
179
28
                Some(limit) => Some(limit),
180
28
            },
181
28
            sort: Some(sort_cond.as_slice()),
182
28
            cursor_max: Some(LIST_CURSOR_MAX),
183
28
        };
184
28

            
185
28
        let mut list = list;
186
28
        let mut cursor = cursor;
187
28
        let mut is_first = true;
188
28
        loop {
189
28
            yield list_transform_bytes(&list, is_first, cursor.is_none(), query.format.as_ref());
190
28
            is_first = false;
191
28
            if cursor.is_none() {
192
28
                break;
193
28
            }
194
28
            let (_list, _cursor) = match state.model.application_dldata().list(&opts, cursor).await {
195
28
                Err(_) => break,
196
28
                Ok((list, cursor)) => (list, cursor),
197
28
            };
198
28
            list = _list;
199
28
            cursor = _cursor;
200
28
        }
201
28
    });
202
8
    match query_format {
203
4
        Some(request::ListFormat::Csv) => Ok((
204
4
            [
205
4
                (header::CONTENT_TYPE, ContentType::CSV),
206
4
                (
207
4
                    header::CONTENT_DISPOSITION,
208
4
                    "attachment;filename=application-dldata.csv",
209
4
                ),
210
4
            ],
211
4
            body,
212
4
        )
213
4
            .into_response()),
214
24
        _ => Ok(([(header::CONTENT_TYPE, ContentType::JSON)], body).into_response()),
215
    }
216
292
}
217

            
218
480
async fn get_list_cond<'a>(
219
480
    query: &'a request::GetCountQuery,
220
480
    unit_cond: &'a Option<String>,
221
480
) -> Result<ListQueryCond<'a>, ErrResp> {
222
480
    let mut cond = ListQueryCond {
223
480
        unit_id: match unit_cond.as_ref() {
224
192
            None => None,
225
288
            Some(unit_id) => Some(unit_id.as_str()),
226
        },
227
480
        ..Default::default()
228
    };
229
480
    if let Some(device_id) = query.device.as_ref() {
230
40
        if device_id.len() > 0 {
231
40
            cond.device_id = Some(device_id.as_str());
232
40
        }
233
440
    }
234
480
    if let Some(network_code) = query.network.as_ref() {
235
24
        if network_code.len() > 0 {
236
24
            cond.network_code = Some(network_code.as_str());
237
24
        }
238
456
    }
239
480
    if let Some(network_addr) = query.addr.as_ref() {
240
16
        if network_addr.len() > 0 {
241
16
            cond.network_addr = Some(network_addr.as_str());
242
16
        }
243
464
    }
244
480
    if let Some(profile) = query.profile.as_ref() {
245
16
        if profile.len() > 0 {
246
16
            cond.profile = Some(profile.as_str());
247
16
        }
248
464
    }
249
480
    if let Some(start) = query.tstart.as_ref() {
250
240
        match query.tfield.as_ref() {
251
48
            None => return Err(ErrResp::ErrParam(Some("missing `tfield`".to_string()))),
252
192
            Some(tfield) => match tfield.as_str() {
253
192
                "proc" => cond.proc_gte = Some(Utc.timestamp_nanos(*start * 1000000)),
254
72
                "resp" => cond.resp_gte = Some(Utc.timestamp_nanos(*start * 1000000)),
255
24
                _ => return Err(ErrResp::ErrParam(Some("invalid `tfield`".to_string()))),
256
            },
257
        }
258
240
    }
259
408
    if let Some(end) = query.tend.as_ref() {
260
96
        match query.tfield.as_ref() {
261
24
            None => return Err(ErrResp::ErrParam(Some("missing `tfield`".to_string()))),
262
72
            Some(tfield) => match tfield.as_str() {
263
72
                "proc" => cond.proc_lte = Some(Utc.timestamp_nanos(*end * 1000000)),
264
48
                "resp" => cond.resp_lte = Some(Utc.timestamp_nanos(*end * 1000000)),
265
24
                _ => return Err(ErrResp::ErrParam(Some("invalid `tfield`".to_string()))),
266
            },
267
        }
268
312
    }
269

            
270
360
    Ok(cond)
271
480
}
272

            
273
216
fn get_sort_cond(sort_args: &Option<String>) -> Result<Vec<SortCond>, ErrResp> {
274
216
    match sort_args.as_ref() {
275
120
        None => Ok(vec![SortCond {
276
120
            key: SortKey::Proc,
277
120
            asc: false,
278
120
        }]),
279
96
        Some(args) => {
280
96
            let mut args = args.split(",");
281
96
            let mut sort_cond = vec![];
282
208
            while let Some(arg) = args.next() {
283
132
                let mut cond = arg.split(":");
284
132
                let key = match cond.next() {
285
                    None => return Err(ErrResp::ErrParam(Some("wrong sort argument".to_string()))),
286
132
                    Some(field) => match field {
287
132
                        "proc" => SortKey::Proc,
288
52
                        "resp" => SortKey::Resp,
289
40
                        "network" => SortKey::NetworkCode,
290
24
                        "addr" => SortKey::NetworkAddr,
291
                        _ => {
292
8
                            return Err(ErrResp::ErrParam(Some(format!(
293
8
                                "invalid sort key {}",
294
8
                                field
295
8
                            ))));
296
                        }
297
                    },
298
                };
299
124
                let asc = match cond.next() {
300
4
                    None => return Err(ErrResp::ErrParam(Some("wrong sort argument".to_string()))),
301
120
                    Some(asc) => match asc {
302
120
                        "asc" => true,
303
28
                        "desc" => false,
304
                        _ => {
305
4
                            return Err(ErrResp::ErrParam(Some(format!(
306
4
                                "invalid sort asc {}",
307
4
                                asc
308
4
                            ))));
309
                        }
310
                    },
311
                };
312
116
                if cond.next().is_some() {
313
4
                    return Err(ErrResp::ErrParam(Some(
314
4
                        "invalid sort condition".to_string(),
315
4
                    )));
316
112
                }
317
112
                sort_cond.push(SortCond { key, asc });
318
            }
319
76
            Ok(sort_cond)
320
        }
321
    }
322
216
}
323

            
324
164
fn list_transform(list: &Vec<ApplicationDlData>) -> Vec<response::GetListData> {
325
164
    let mut ret = vec![];
326
488
    for item in list.iter() {
327
488
        ret.push(data_transform(&item));
328
488
    }
329
164
    ret
330
164
}
331

            
332
64
fn list_transform_bytes(
333
64
    list: &Vec<ApplicationDlData>,
334
64
    with_start: bool,
335
64
    with_end: bool,
336
64
    format: Option<&request::ListFormat>,
337
64
) -> Result<Bytes, Box<dyn StdError + Send + Sync>> {
338
64
    let mut build_str = match with_start {
339
32
        false => "".to_string(),
340
12
        true => match format {
341
4
            Some(request::ListFormat::Array) => "[".to_string(),
342
            Some(request::ListFormat::Csv) => {
343
8
                let bom = String::from_utf8(vec![0xEF, 0xBB, 0xBF])?;
344
8
                format!("{}{}", bom, CSV_FIELDS)
345
            }
346
20
            _ => "{\"data\":[".to_string(),
347
        },
348
    };
349
64
    let mut is_first = with_start;
350

            
351
3740
    for item in list {
352
860
        match format {
353
            Some(request::ListFormat::Csv) => {
354
440
                let mut writer = WriterBuilder::new().has_headers(false).from_writer(vec![]);
355
440
                writer.serialize(data_transform_csv(item))?;
356
440
                build_str += String::from_utf8(writer.into_inner()?)?.as_str();
357
            }
358
            _ => {
359
3236
                if is_first {
360
24
                    is_first = false;
361
3212
                } else {
362
3212
                    build_str.push(',');
363
3212
                }
364
3236
                let json_str = match serde_json::to_string(&data_transform(item)) {
365
                    Err(e) => return Err(Box::new(e)),
366
3236
                    Ok(str) => str,
367
3236
                };
368
3236
                build_str += json_str.as_str();
369
            }
370
        }
371
    }
372

            
373
64
    if with_end {
374
32
        build_str += match format {
375
4
            Some(request::ListFormat::Array) => "]",
376
8
            Some(request::ListFormat::Csv) => "",
377
20
            _ => "]}",
378
        }
379
32
    }
380
64
    Ok(Bytes::copy_from_slice(build_str.as_str().as_bytes()))
381
64
}
382

            
383
3724
fn data_transform(data: &ApplicationDlData) -> response::GetListData {
384
3724
    response::GetListData {
385
3724
        data_id: data.data_id.clone(),
386
3724
        proc: strings::time_str(&data.proc),
387
3724
        resp: match data.resp {
388
2492
            None => None,
389
1232
            Some(resp) => Some(strings::time_str(&resp)),
390
        },
391
3724
        status: data.status,
392
3724
        unit_id: data.unit_id.clone(),
393
3724
        device_id: data.device_id.clone(),
394
3724
        network_code: data.network_code.clone(),
395
3724
        network_addr: data.network_addr.clone(),
396
3724
        profile: data.profile.clone(),
397
3724
        data: data.data.clone(),
398
3724
        extension: data.extension.clone(),
399
3724
    }
400
3724
}
401

            
402
440
fn data_transform_csv(data: &ApplicationDlData) -> response::GetListCsvData {
403
440
    response::GetListCsvData {
404
440
        data_id: data.data_id.clone(),
405
440
        proc: strings::time_str(&data.proc),
406
440
        resp: match data.resp {
407
416
            None => "".to_string(),
408
24
            Some(resp) => strings::time_str(&resp),
409
        },
410
440
        status: data.status,
411
440
        unit_id: data.unit_id.clone(),
412
440
        device_id: match data.device_id.as_ref() {
413
416
            None => "".to_string(),
414
24
            Some(device_id) => device_id.clone(),
415
        },
416
440
        network_code: match data.network_code.as_ref() {
417
416
            None => "".to_string(),
418
24
            Some(network_code) => network_code.clone(),
419
        },
420
440
        network_addr: match data.network_addr.as_ref() {
421
416
            None => "".to_string(),
422
24
            Some(network_addr) => network_addr.clone(),
423
        },
424
440
        profile: data.profile.clone(),
425
440
        data: data.data.clone(),
426
440
        extension: match data.extension.as_ref() {
427
416
            None => "".to_string(),
428
24
            Some(extension) => match serde_json::to_string(extension) {
429
                Err(_) => "".to_string(),
430
24
                Ok(extension) => extension,
431
            },
432
        },
433
    }
434
440
}