1
use std::error::Error as StdError;
2

            
3
use axum::{
4
    body::{Body, Bytes},
5
    extract::State,
6
    http::header,
7
    response::IntoResponse,
8
    Extension,
9
};
10
use chrono::{TimeZone, Utc};
11
use csv::WriterBuilder;
12
use log::error;
13
use serde_json;
14

            
15
use sylvia_iot_corelib::{
16
    constants::ContentType,
17
    err::ErrResp,
18
    http::{Json, Query},
19
    strings,
20
};
21

            
22
use super::{
23
    super::{
24
        super::{middleware::GetTokenInfoData, State as AppState},
25
        get_unit_cond,
26
    },
27
    request, response,
28
};
29
use crate::models::network_dldata::{ListOptions, ListQueryCond, NetworkDlData, SortCond, SortKey};
30

            
31
const LIST_LIMIT_DEFAULT: u64 = 100;
32
const LIST_CURSOR_MAX: u64 = 100;
33
const CSV_FIELDS: &'static str =
34
    "dataId,proc,pub,resp,status,unitId,deviceId,networkCode,networkAddr,profile,data,extension\n";
35

            
36
/// `GET /{base}/api/v1/network-dldata/count`
37
108
pub async fn get_count(
38
108
    State(state): State<AppState>,
39
108
    Extension(token_info): Extension<GetTokenInfoData>,
40
108
    Query(mut query): Query<request::GetCountQuery>,
41
108
) -> impl IntoResponse {
42
    const FN_NAME: &'static str = "get_count";
43

            
44
108
    if let Some(network) = query.network {
45
6
        query.network = Some(network.to_lowercase());
46
102
    }
47
108
    if let Some(addr) = query.addr {
48
4
        query.addr = Some(addr.to_lowercase());
49
104
    }
50

            
51
108
    let unit_cond = get_unit_cond(FN_NAME, &token_info, query.unit.as_ref(), &state).await?;
52
100
    let cond = match get_list_cond(&query, &unit_cond).await {
53
30
        Err(e) => return Err(e.into_response()),
54
70
        Ok(cond) => cond,
55
70
    };
56
70
    match state.model.network_dldata().count(&cond).await {
57
        Err(e) => {
58
            error!("[{}] count error: {}", FN_NAME, e);
59
            Err(ErrResp::ErrDb(Some(e.to_string())).into_response())
60
        }
61
70
        Ok(count) => Ok(Json(response::GetCount {
62
70
            data: response::GetCountData { count },
63
70
        })),
64
    }
65
108
}
66

            
67
/// `GET /{base}/api/v1/network-dldata/list`
68
162
pub async fn get_list(
69
162
    State(state): State<AppState>,
70
162
    Extension(token_info): Extension<GetTokenInfoData>,
71
162
    Query(query): Query<request::GetListQuery>,
72
162
) -> impl IntoResponse {
73
    const FN_NAME: &'static str = "get_list";
74

            
75
162
    let cond_query = request::GetCountQuery {
76
162
        unit: query.unit.clone(),
77
162
        device: query.device.clone(),
78
162
        network: match query.network.as_ref() {
79
156
            None => None,
80
6
            Some(network) => Some(network.to_lowercase()),
81
        },
82
162
        addr: match query.addr.as_ref() {
83
158
            None => None,
84
4
            Some(addr) => Some(addr.to_lowercase()),
85
        },
86
162
        profile: query.profile.clone(),
87
162
        tfield: query.tfield.clone(),
88
162
        tstart: query.tstart,
89
162
        tend: query.tend,
90
    };
91
162
    let unit_cond = match get_unit_cond(FN_NAME, &token_info, query.unit.as_ref(), &state).await {
92
8
        Err(e) => return Ok(e),
93
154
        Ok(cond) => cond,
94
    };
95
154
    let cond = match get_list_cond(&cond_query, &unit_cond).await {
96
30
        Err(e) => return Err(e),
97
124
        Ok(cond) => cond,
98
    };
99
124
    let sort_cond = match get_sort_cond(&query.sort) {
100
10
        Err(e) => return Err(e),
101
114
        Ok(cond) => cond,
102
    };
103
114
    let opts = ListOptions {
104
114
        cond: &cond,
105
114
        offset: query.offset,
106
114
        limit: match query.limit {
107
96
            None => Some(LIST_LIMIT_DEFAULT),
108
18
            Some(limit) => Some(limit),
109
        },
110
114
        sort: Some(sort_cond.as_slice()),
111
114
        cursor_max: Some(LIST_CURSOR_MAX),
112
    };
113

            
114
114
    let (list, cursor) = match state.model.network_dldata().list(&opts, None).await {
115
        Err(e) => {
116
            error!("[{}] list error: {}", FN_NAME, e);
117
            return Err(ErrResp::ErrDb(Some(e.to_string())));
118
        }
119
114
        Ok((list, cursor)) => match cursor {
120
100
            None => match query.format.as_ref() {
121
                Some(request::ListFormat::Array) => {
122
2
                    return Ok(Json(list_transform(&list)).into_response())
123
                }
124
                Some(request::ListFormat::Csv) => {
125
2
                    let bytes = match list_transform_bytes(&list, true, true, query.format.as_ref())
126
                    {
127
                        Err(e) => {
128
                            let e = format!("transform CSV error: {}", e);
129
                            return Err(ErrResp::ErrUnknown(Some(e)));
130
                        }
131
2
                        Ok(bytes) => bytes,
132
2
                    };
133
2
                    return Ok((
134
2
                        [
135
2
                            (header::CONTENT_TYPE, ContentType::CSV),
136
2
                            (
137
2
                                header::CONTENT_DISPOSITION,
138
2
                                "attachment;filename=network-dldata.csv",
139
2
                            ),
140
2
                        ],
141
2
                        bytes,
142
2
                    )
143
2
                        .into_response());
144
                }
145
                _ => {
146
96
                    return Ok(Json(response::GetList {
147
96
                        data: list_transform(&list),
148
96
                    })
149
96
                    .into_response())
150
                }
151
            },
152
14
            Some(_) => (list, cursor),
153
14
        },
154
14
    };
155
14

            
156
14
    let query_format = query.format.clone();
157
14
    let body = Body::from_stream(async_stream::stream! {
158
14
        let query = query.clone();
159
14
        let cond_query = request::GetCountQuery {
160
14
            unit: query.unit.clone(),
161
14
            device: query.device.clone(),
162
14
            network: query.network.clone(),
163
14
            addr: query.addr.clone(),
164
14
            profile: query.profile.clone(),
165
14
            tfield: query.tfield.clone(),
166
14
            tstart: query.tstart,
167
14
            tend: query.tend,
168
14
        };
169
14
        let cond = match get_list_cond(&cond_query, &unit_cond).await {
170
14
            Err(_) => return,
171
14
            Ok(cond) => cond,
172
14
        };
173
14
        let opts = ListOptions {
174
14
            cond: &cond,
175
14
            offset: query.offset,
176
14
            limit: match query.limit {
177
14
                None => Some(LIST_LIMIT_DEFAULT),
178
14
                Some(limit) => Some(limit),
179
14
            },
180
14
            sort: Some(sort_cond.as_slice()),
181
14
            cursor_max: Some(LIST_CURSOR_MAX),
182
14
        };
183
14

            
184
14
        let mut list = list;
185
14
        let mut cursor = cursor;
186
14
        let mut is_first = true;
187
14
        loop {
188
14
            yield list_transform_bytes(&list, is_first, cursor.is_none(), query.format.as_ref());
189
14
            is_first = false;
190
14
            if cursor.is_none() {
191
14
                break;
192
14
            }
193
14
            let (_list, _cursor) = match state.model.network_dldata().list(&opts, cursor).await {
194
14
                Err(_) => break,
195
14
                Ok((list, cursor)) => (list, cursor),
196
14
            };
197
14
            list = _list;
198
14
            cursor = _cursor;
199
14
        }
200
14
    });
201
4
    match query_format {
202
2
        Some(request::ListFormat::Csv) => Ok((
203
2
            [
204
2
                (header::CONTENT_TYPE, ContentType::CSV),
205
2
                (
206
2
                    header::CONTENT_DISPOSITION,
207
2
                    "attachment;filename=network-dldata.csv",
208
2
                ),
209
2
            ],
210
2
            body,
211
2
        )
212
2
            .into_response()),
213
12
        _ => Ok(([(header::CONTENT_TYPE, ContentType::JSON)], body).into_response()),
214
    }
215
162
}
216

            
217
268
async fn get_list_cond<'a>(
218
268
    query: &'a request::GetCountQuery,
219
268
    unit_cond: &'a Option<String>,
220
268
) -> Result<ListQueryCond<'a>, ErrResp> {
221
268
    let mut cond = ListQueryCond {
222
268
        unit_id: match unit_cond.as_ref() {
223
100
            None => None,
224
168
            Some(unit_id) => Some(unit_id.as_str()),
225
        },
226
268
        ..Default::default()
227
    };
228
268
    if let Some(device_id) = query.device.as_ref() {
229
20
        if device_id.len() > 0 {
230
20
            cond.device_id = Some(device_id.as_str());
231
20
        }
232
248
    }
233
268
    if let Some(network_code) = query.network.as_ref() {
234
12
        if network_code.len() > 0 {
235
12
            cond.network_code = Some(network_code.as_str());
236
12
        }
237
256
    }
238
268
    if let Some(network_addr) = query.addr.as_ref() {
239
8
        if network_addr.len() > 0 {
240
8
            cond.network_addr = Some(network_addr.as_str());
241
8
        }
242
260
    }
243
268
    if let Some(profile) = query.profile.as_ref() {
244
8
        if profile.len() > 0 {
245
8
            cond.profile = Some(profile.as_str());
246
8
        }
247
260
    }
248
268
    if let Some(start) = query.tstart.as_ref() {
249
144
        match query.tfield.as_ref() {
250
24
            None => return Err(ErrResp::ErrParam(Some("missing `tfield`".to_string()))),
251
120
            Some(tfield) => match tfield.as_str() {
252
120
                "proc" => cond.proc_gte = Some(Utc.timestamp_nanos(*start * 1000000)),
253
60
                "pub" => cond.pub_gte = Some(Utc.timestamp_nanos(*start * 1000000)),
254
36
                "resp" => cond.resp_gte = Some(Utc.timestamp_nanos(*start * 1000000)),
255
12
                _ => return Err(ErrResp::ErrParam(Some("invalid `tfield`".to_string()))),
256
            },
257
        }
258
124
    }
259
232
    if let Some(end) = query.tend.as_ref() {
260
60
        match query.tfield.as_ref() {
261
12
            None => return Err(ErrResp::ErrParam(Some("missing `tfield`".to_string()))),
262
48
            Some(tfield) => match tfield.as_str() {
263
48
                "proc" => cond.proc_lte = Some(Utc.timestamp_nanos(*end * 1000000)),
264
36
                "pub" => cond.pub_lte = Some(Utc.timestamp_nanos(*end * 1000000)),
265
24
                "resp" => cond.resp_lte = Some(Utc.timestamp_nanos(*end * 1000000)),
266
12
                _ => return Err(ErrResp::ErrParam(Some("invalid `tfield`".to_string()))),
267
            },
268
        }
269
172
    }
270

            
271
208
    Ok(cond)
272
268
}
273

            
274
124
fn get_sort_cond(sort_args: &Option<String>) -> Result<Vec<SortCond>, ErrResp> {
275
124
    match sort_args.as_ref() {
276
72
        None => Ok(vec![SortCond {
277
72
            key: SortKey::Proc,
278
72
            asc: false,
279
72
        }]),
280
52
        Some(args) => {
281
52
            let mut args = args.split(",");
282
52
            let mut sort_cond = vec![];
283
112
            while let Some(arg) = args.next() {
284
70
                let mut cond = arg.split(":");
285
70
                let key = match cond.next() {
286
                    None => return Err(ErrResp::ErrParam(Some("wrong sort argument".to_string()))),
287
70
                    Some(field) => match field {
288
70
                        "proc" => SortKey::Proc,
289
30
                        "pub" => SortKey::Pub,
290
26
                        "resp" => SortKey::Resp,
291
20
                        "network" => SortKey::NetworkCode,
292
12
                        "addr" => SortKey::NetworkAddr,
293
                        _ => {
294
4
                            return Err(ErrResp::ErrParam(Some(format!(
295
4
                                "invalid sort key {}",
296
4
                                field
297
4
                            ))))
298
                        }
299
                    },
300
                };
301
66
                let asc = match cond.next() {
302
2
                    None => return Err(ErrResp::ErrParam(Some("wrong sort argument".to_string()))),
303
64
                    Some(asc) => match asc {
304
64
                        "asc" => true,
305
16
                        "desc" => false,
306
                        _ => {
307
2
                            return Err(ErrResp::ErrParam(Some(format!(
308
2
                                "invalid sort asc {}",
309
2
                                asc
310
2
                            ))))
311
                        }
312
                    },
313
                };
314
62
                if cond.next().is_some() {
315
2
                    return Err(ErrResp::ErrParam(Some(
316
2
                        "invalid sort condition".to_string(),
317
2
                    )));
318
60
                }
319
60
                sort_cond.push(SortCond { key, asc });
320
            }
321
42
            Ok(sort_cond)
322
        }
323
    }
324
124
}
325

            
326
98
fn list_transform(list: &Vec<NetworkDlData>) -> Vec<response::GetListData> {
327
98
    let mut ret = vec![];
328
306
    for item in list.iter() {
329
306
        ret.push(data_transform(&item));
330
306
    }
331
98
    ret
332
98
}
333

            
334
32
fn list_transform_bytes(
335
32
    list: &Vec<NetworkDlData>,
336
32
    with_start: bool,
337
32
    with_end: bool,
338
32
    format: Option<&request::ListFormat>,
339
32
) -> Result<Bytes, Box<dyn StdError + Send + Sync>> {
340
32
    let mut build_str = match with_start {
341
16
        false => "".to_string(),
342
6
        true => match format {
343
2
            Some(request::ListFormat::Array) => "[".to_string(),
344
            Some(request::ListFormat::Csv) => {
345
4
                let bom = String::from_utf8(vec![0xEF, 0xBB, 0xBF])?;
346
4
                format!("{}{}", bom, CSV_FIELDS)
347
            }
348
10
            _ => "{\"data\":[".to_string(),
349
        },
350
    };
351
32
    let mut is_first = with_start;
352

            
353
1870
    for item in list {
354
430
        match format {
355
            Some(request::ListFormat::Csv) => {
356
220
                let mut writer = WriterBuilder::new().has_headers(false).from_writer(vec![]);
357
220
                writer.serialize(data_transform_csv(item))?;
358
220
                build_str += String::from_utf8(writer.into_inner()?)?.as_str();
359
            }
360
            _ => {
361
1618
                if is_first {
362
12
                    is_first = false;
363
1606
                } else {
364
1606
                    build_str.push(',');
365
1606
                }
366
1618
                let json_str = match serde_json::to_string(&data_transform(item)) {
367
                    Err(e) => return Err(Box::new(e)),
368
1618
                    Ok(str) => str,
369
1618
                };
370
1618
                build_str += json_str.as_str();
371
            }
372
        }
373
    }
374

            
375
32
    if with_end {
376
16
        build_str += match format {
377
2
            Some(request::ListFormat::Array) => "]",
378
4
            Some(request::ListFormat::Csv) => "",
379
10
            _ => "]}",
380
        }
381
16
    }
382
32
    Ok(Bytes::copy_from_slice(build_str.as_str().as_bytes()))
383
32
}
384

            
385
1924
fn data_transform(data: &NetworkDlData) -> response::GetListData {
386
1924
    response::GetListData {
387
1924
        data_id: data.data_id.clone(),
388
1924
        proc: strings::time_str(&data.proc),
389
1924
        publish: strings::time_str(&data.publish),
390
1924
        resp: match data.resp {
391
1262
            None => None,
392
662
            Some(resp) => Some(strings::time_str(&resp)),
393
        },
394
1924
        status: data.status,
395
1924
        unit_id: data.unit_id.clone(),
396
1924
        device_id: data.device_id.clone(),
397
1924
        network_code: data.network_code.clone(),
398
1924
        network_addr: data.network_addr.clone(),
399
1924
        profile: data.profile.clone(),
400
1924
        data: data.data.clone(),
401
1924
        extension: data.extension.clone(),
402
1924
    }
403
1924
}
404

            
405
220
fn data_transform_csv(data: &NetworkDlData) -> response::GetListCsvData {
406
220
    response::GetListCsvData {
407
220
        data_id: data.data_id.clone(),
408
220
        proc: strings::time_str(&data.proc),
409
220
        publish: strings::time_str(&data.publish),
410
220
        resp: match data.resp {
411
208
            None => "".to_string(),
412
12
            Some(resp) => strings::time_str(&resp),
413
        },
414
220
        status: data.status,
415
220
        unit_id: data.unit_id.clone(),
416
220
        device_id: data.device_id.clone(),
417
220
        network_code: data.network_code.clone(),
418
220
        network_addr: data.network_addr.clone(),
419
220
        profile: data.profile.clone(),
420
220
        data: data.data.clone(),
421
220
        extension: match data.extension.as_ref() {
422
208
            None => "".to_string(),
423
12
            Some(extension) => match serde_json::to_string(extension) {
424
                Err(_) => "".to_string(),
425
12
                Ok(extension) => extension,
426
            },
427
        },
428
    }
429
220
}