1
use std::error::Error as StdError;
2

            
3
use axum::{
4
    Extension,
5
    body::{Body, Bytes},
6
    extract::State,
7
    http::header,
8
    response::IntoResponse,
9
};
10
use chrono::{TimeZone, Utc};
11
use csv::WriterBuilder;
12
use log::error;
13
use serde_json;
14

            
15
use sylvia_iot_corelib::{
16
    constants::ContentType,
17
    err::ErrResp,
18
    http::{Json, Query},
19
    strings,
20
};
21

            
22
use super::{
23
    super::{
24
        super::{State as AppState, middleware::GetTokenInfoData},
25
        get_unit_cond,
26
    },
27
    request, response,
28
};
29
use crate::models::application_uldata::{
30
    ApplicationUlData, ListOptions, ListQueryCond, SortCond, SortKey,
31
};
32

            
33
const LIST_LIMIT_DEFAULT: u64 = 100;
34
const LIST_CURSOR_MAX: u64 = 100;
35
const CSV_FIELDS: &'static str = "dataId,proc,pub,unitCode,networkCode,networkAddr,unitId,deviceId,time,profile,data,extension\n";
36

            
37
/// `GET /{base}/api/v1/application-uldata/count`
38
216
pub async fn get_count(
39
216
    State(state): State<AppState>,
40
216
    Extension(token_info): Extension<GetTokenInfoData>,
41
216
    Query(mut query): Query<request::GetCountQuery>,
42
216
) -> impl IntoResponse {
43
    const FN_NAME: &'static str = "get_count";
44

            
45
216
    if let Some(network) = query.network {
46
12
        query.network = Some(network.to_lowercase());
47
204
    }
48
216
    if let Some(addr) = query.addr {
49
8
        query.addr = Some(addr.to_lowercase());
50
208
    }
51

            
52
216
    let unit_cond = get_unit_cond(FN_NAME, &token_info, query.unit.as_ref(), &state).await?;
53
200
    let cond = match get_list_cond(&query, &unit_cond).await {
54
60
        Err(e) => return Err(e.into_response()),
55
140
        Ok(cond) => cond,
56
140
    };
57
140
    match state.model.application_uldata().count(&cond).await {
58
        Err(e) => {
59
            error!("[{}] count error: {}", FN_NAME, e);
60
            Err(ErrResp::ErrDb(Some(e.to_string())).into_response())
61
        }
62
140
        Ok(count) => Ok(Json(response::GetCount {
63
140
            data: response::GetCountData { count },
64
140
        })),
65
    }
66
216
}
67

            
68
/// `GET /{base}/api/v1/application-uldata/list`
69
324
pub async fn get_list(
70
324
    State(state): State<AppState>,
71
324
    Extension(token_info): Extension<GetTokenInfoData>,
72
324
    Query(query): Query<request::GetListQuery>,
73
324
) -> impl IntoResponse {
74
    const FN_NAME: &'static str = "get_list";
75

            
76
324
    let cond_query = request::GetCountQuery {
77
324
        unit: query.unit.clone(),
78
324
        device: query.device.clone(),
79
324
        network: match query.network.as_ref() {
80
312
            None => None,
81
12
            Some(network) => Some(network.to_lowercase()),
82
        },
83
324
        addr: match query.addr.as_ref() {
84
316
            None => None,
85
8
            Some(addr) => Some(addr.to_lowercase()),
86
        },
87
324
        profile: query.profile.clone(),
88
324
        tfield: query.tfield.clone(),
89
324
        tstart: query.tstart,
90
324
        tend: query.tend,
91
    };
92
324
    let unit_cond = match get_unit_cond(FN_NAME, &token_info, query.unit.as_ref(), &state).await {
93
16
        Err(e) => return Ok(e),
94
308
        Ok(cond) => cond,
95
    };
96
308
    let cond = match get_list_cond(&cond_query, &unit_cond).await {
97
60
        Err(e) => return Err(e),
98
248
        Ok(cond) => cond,
99
    };
100
248
    let sort_cond = match get_sort_cond(&query.sort) {
101
20
        Err(e) => return Err(e),
102
228
        Ok(cond) => cond,
103
    };
104
228
    let opts = ListOptions {
105
228
        cond: &cond,
106
228
        offset: query.offset,
107
228
        limit: match query.limit {
108
192
            None => Some(LIST_LIMIT_DEFAULT),
109
36
            Some(limit) => Some(limit),
110
        },
111
228
        sort: Some(sort_cond.as_slice()),
112
228
        cursor_max: Some(LIST_CURSOR_MAX),
113
    };
114

            
115
228
    let (list, cursor) = match state.model.application_uldata().list(&opts, None).await {
116
        Err(e) => {
117
            error!("[{}] list error: {}", FN_NAME, e);
118
            return Err(ErrResp::ErrDb(Some(e.to_string())));
119
        }
120
228
        Ok((list, cursor)) => match cursor {
121
200
            None => match query.format.as_ref() {
122
                Some(request::ListFormat::Array) => {
123
4
                    return Ok(Json(list_transform(&list)).into_response());
124
                }
125
                Some(request::ListFormat::Csv) => {
126
4
                    let bytes = match list_transform_bytes(&list, true, true, query.format.as_ref())
127
                    {
128
                        Err(e) => {
129
                            let e = format!("transform CSV error: {}", e);
130
                            return Err(ErrResp::ErrUnknown(Some(e)));
131
                        }
132
4
                        Ok(bytes) => bytes,
133
4
                    };
134
4
                    return Ok((
135
4
                        [
136
4
                            (header::CONTENT_TYPE, ContentType::CSV),
137
4
                            (
138
4
                                header::CONTENT_DISPOSITION,
139
4
                                "attachment;filename=application-uldata.csv",
140
4
                            ),
141
4
                        ],
142
4
                        bytes,
143
4
                    )
144
4
                        .into_response());
145
                }
146
                _ => {
147
192
                    return Ok(Json(response::GetList {
148
192
                        data: list_transform(&list),
149
192
                    })
150
192
                    .into_response());
151
                }
152
            },
153
28
            Some(_) => (list, cursor),
154
28
        },
155
28
    };
156
28

            
157
28
    let query_format = query.format.clone();
158
28
    let body = Body::from_stream(async_stream::stream! {
159
28
        let cond_query = request::GetCountQuery {
160
28
            unit: query.unit.clone(),
161
28
            device: query.device.clone(),
162
28
            network: query.network.clone(),
163
28
            addr: query.addr.clone(),
164
28
            profile: query.profile.clone(),
165
28
            tfield: query.tfield.clone(),
166
28
            tstart: query.tstart,
167
28
            tend: query.tend,
168
28
        };
169
28
        let cond = match get_list_cond(&cond_query, &unit_cond).await {
170
28
            Err(_) => return,
171
28
            Ok(cond) => cond,
172
28
        };
173
28
        let opts = ListOptions {
174
28
            cond: &cond,
175
28
            offset: query.offset,
176
28
            limit: match query.limit {
177
28
                None => Some(LIST_LIMIT_DEFAULT),
178
28
                Some(limit) => Some(limit),
179
28
            },
180
28
            sort: Some(sort_cond.as_slice()),
181
28
            cursor_max: Some(LIST_CURSOR_MAX),
182
28
        };
183
28

            
184
28
        let mut list = list;
185
28
        let mut cursor = cursor;
186
28
        let mut is_first = true;
187
28
        loop {
188
28
            yield list_transform_bytes(&list, is_first, cursor.is_none(), query.format.as_ref());
189
28
            is_first = false;
190
28
            if cursor.is_none() {
191
28
                break;
192
28
            }
193
28
            let (_list, _cursor) = match state.model.application_uldata().list(&opts, cursor).await {
194
28
                Err(_) => break,
195
28
                Ok((list, cursor)) => (list, cursor),
196
28
            };
197
28
            list = _list;
198
28
            cursor = _cursor;
199
28
        }
200
28
    });
201
8
    match query_format {
202
4
        Some(request::ListFormat::Csv) => Ok((
203
4
            [
204
4
                (header::CONTENT_TYPE, ContentType::CSV),
205
4
                (
206
4
                    header::CONTENT_DISPOSITION,
207
4
                    "attachment;filename=application-uldata.csv",
208
4
                ),
209
4
            ],
210
4
            body,
211
4
        )
212
4
            .into_response()),
213
24
        _ => Ok(([(header::CONTENT_TYPE, ContentType::JSON)], body).into_response()),
214
    }
215
324
}
216

            
217
536
async fn get_list_cond<'a>(
218
536
    query: &'a request::GetCountQuery,
219
536
    unit_cond: &'a Option<String>,
220
536
) -> Result<ListQueryCond<'a>, ErrResp> {
221
536
    let mut cond = ListQueryCond {
222
536
        unit_id: match unit_cond.as_ref() {
223
200
            None => None,
224
336
            Some(unit_id) => Some(unit_id.as_str()),
225
        },
226
536
        ..Default::default()
227
    };
228
536
    if let Some(device_id) = query.device.as_ref() {
229
40
        if device_id.len() > 0 {
230
40
            cond.device_id = Some(device_id.as_str());
231
40
        }
232
496
    }
233
536
    if let Some(network_code) = query.network.as_ref() {
234
24
        if network_code.len() > 0 {
235
24
            cond.network_code = Some(network_code.as_str());
236
24
        }
237
512
    }
238
536
    if let Some(network_addr) = query.addr.as_ref() {
239
16
        if network_addr.len() > 0 {
240
16
            cond.network_addr = Some(network_addr.as_str());
241
16
        }
242
520
    }
243
536
    if let Some(profile) = query.profile.as_ref() {
244
16
        if profile.len() > 0 {
245
16
            cond.profile = Some(profile.as_str());
246
16
        }
247
520
    }
248
536
    if let Some(start) = query.tstart.as_ref() {
249
288
        match query.tfield.as_ref() {
250
48
            None => return Err(ErrResp::ErrParam(Some("missing `tfield`".to_string()))),
251
240
            Some(tfield) => match tfield.as_str() {
252
240
                "proc" => cond.proc_gte = Some(Utc.timestamp_nanos(*start * 1000000)),
253
120
                "pub" => cond.pub_gte = Some(Utc.timestamp_nanos(*start * 1000000)),
254
72
                "time" => cond.time_gte = Some(Utc.timestamp_nanos(*start * 1000000)),
255
24
                _ => return Err(ErrResp::ErrParam(Some("invalid `tfield`".to_string()))),
256
            },
257
        }
258
248
    }
259
464
    if let Some(end) = query.tend.as_ref() {
260
120
        match query.tfield.as_ref() {
261
24
            None => return Err(ErrResp::ErrParam(Some("missing `tfield`".to_string()))),
262
96
            Some(tfield) => match tfield.as_str() {
263
96
                "proc" => cond.proc_lte = Some(Utc.timestamp_nanos(*end * 1000000)),
264
72
                "pub" => cond.pub_lte = Some(Utc.timestamp_nanos(*end * 1000000)),
265
48
                "time" => cond.time_lte = Some(Utc.timestamp_nanos(*end * 1000000)),
266
24
                _ => return Err(ErrResp::ErrParam(Some("invalid `tfield`".to_string()))),
267
            },
268
        }
269
344
    }
270

            
271
416
    Ok(cond)
272
536
}
273

            
274
248
fn get_sort_cond(sort_args: &Option<String>) -> Result<Vec<SortCond>, ErrResp> {
275
248
    match sort_args.as_ref() {
276
144
        None => Ok(vec![SortCond {
277
144
            key: SortKey::Proc,
278
144
            asc: false,
279
144
        }]),
280
104
        Some(args) => {
281
104
            let mut args = args.split(",");
282
104
            let mut sort_cond = vec![];
283
224
            while let Some(arg) = args.next() {
284
140
                let mut cond = arg.split(":");
285
140
                let key = match cond.next() {
286
                    None => return Err(ErrResp::ErrParam(Some("wrong sort argument".to_string()))),
287
140
                    Some(field) => match field {
288
140
                        "proc" => SortKey::Proc,
289
60
                        "pub" => SortKey::Pub,
290
48
                        "time" => SortKey::Time,
291
40
                        "network" => SortKey::NetworkCode,
292
24
                        "addr" => SortKey::NetworkAddr,
293
                        _ => {
294
8
                            return Err(ErrResp::ErrParam(Some(format!(
295
8
                                "invalid sort key {}",
296
8
                                field
297
8
                            ))));
298
                        }
299
                    },
300
                };
301
132
                let asc = match cond.next() {
302
4
                    None => return Err(ErrResp::ErrParam(Some("wrong sort argument".to_string()))),
303
128
                    Some(asc) => match asc {
304
128
                        "asc" => true,
305
32
                        "desc" => false,
306
                        _ => {
307
4
                            return Err(ErrResp::ErrParam(Some(format!(
308
4
                                "invalid sort asc {}",
309
4
                                asc
310
4
                            ))));
311
                        }
312
                    },
313
                };
314
124
                if cond.next().is_some() {
315
4
                    return Err(ErrResp::ErrParam(Some(
316
4
                        "invalid sort condition".to_string(),
317
4
                    )));
318
120
                }
319
120
                sort_cond.push(SortCond { key, asc });
320
            }
321
84
            Ok(sort_cond)
322
        }
323
    }
324
248
}
325

            
326
196
fn list_transform(list: &Vec<ApplicationUlData>) -> Vec<response::GetListData> {
327
196
    let mut ret = vec![];
328
576
    for item in list.iter() {
329
576
        ret.push(data_transform(&item));
330
576
    }
331
196
    ret
332
196
}
333

            
334
64
fn list_transform_bytes(
335
64
    list: &Vec<ApplicationUlData>,
336
64
    with_start: bool,
337
64
    with_end: bool,
338
64
    format: Option<&request::ListFormat>,
339
64
) -> Result<Bytes, Box<dyn StdError + Send + Sync>> {
340
64
    let mut build_str = match with_start {
341
32
        false => "".to_string(),
342
12
        true => match format {
343
4
            Some(request::ListFormat::Array) => "[".to_string(),
344
            Some(request::ListFormat::Csv) => {
345
8
                let bom = String::from_utf8(vec![0xEF, 0xBB, 0xBF])?;
346
8
                format!("{}{}", bom, CSV_FIELDS)
347
            }
348
20
            _ => "{\"data\":[".to_string(),
349
        },
350
    };
351
64
    let mut is_first = with_start;
352

            
353
3740
    for item in list {
354
860
        match format {
355
            Some(request::ListFormat::Csv) => {
356
440
                let mut writer = WriterBuilder::new().has_headers(false).from_writer(vec![]);
357
440
                writer.serialize(data_transform_csv(item))?;
358
440
                build_str += String::from_utf8(writer.into_inner()?)?.as_str();
359
            }
360
            _ => {
361
3236
                if is_first {
362
24
                    is_first = false;
363
3212
                } else {
364
3212
                    build_str.push(',');
365
3212
                }
366
3236
                let json_str = match serde_json::to_string(&data_transform(item)) {
367
                    Err(e) => return Err(Box::new(e)),
368
3236
                    Ok(str) => str,
369
3236
                };
370
3236
                build_str += json_str.as_str();
371
            }
372
        }
373
    }
374

            
375
64
    if with_end {
376
32
        build_str += match format {
377
4
            Some(request::ListFormat::Array) => "]",
378
8
            Some(request::ListFormat::Csv) => "",
379
20
            _ => "]}",
380
        }
381
32
    }
382
64
    Ok(Bytes::copy_from_slice(build_str.as_str().as_bytes()))
383
64
}
384

            
385
3812
fn data_transform(data: &ApplicationUlData) -> response::GetListData {
386
3812
    response::GetListData {
387
3812
        data_id: data.data_id.clone(),
388
3812
        proc: strings::time_str(&data.proc),
389
3812
        publish: strings::time_str(&data.publish),
390
3812
        unit_code: data.unit_code.clone(),
391
3812
        network_code: data.network_code.clone(),
392
3812
        network_addr: data.network_addr.clone(),
393
3812
        unit_id: data.unit_id.clone(),
394
3812
        device_id: data.device_id.clone(),
395
3812
        time: strings::time_str(&data.time),
396
3812
        profile: data.profile.clone(),
397
3812
        data: data.data.clone(),
398
3812
        extension: data.extension.clone(),
399
3812
    }
400
3812
}
401

            
402
440
fn data_transform_csv(data: &ApplicationUlData) -> response::GetListCsvData {
403
440
    response::GetListCsvData {
404
440
        data_id: data.data_id.clone(),
405
440
        proc: strings::time_str(&data.proc),
406
440
        publish: strings::time_str(&data.publish),
407
440
        unit_code: match data.unit_code.as_ref() {
408
416
            None => "".to_string(),
409
24
            Some(unit_code) => unit_code.clone(),
410
        },
411
440
        network_code: data.network_code.clone(),
412
440
        network_addr: data.network_addr.clone(),
413
440
        unit_id: data.unit_id.clone(),
414
440
        device_id: data.device_id.clone(),
415
440
        time: strings::time_str(&data.time),
416
440
        profile: data.profile.clone(),
417
440
        data: data.data.clone(),
418
440
        extension: match data.extension.as_ref() {
419
416
            None => "".to_string(),
420
24
            Some(extension) => match serde_json::to_string(extension) {
421
                Err(_) => "".to_string(),
422
24
                Ok(extension) => extension,
423
            },
424
        },
425
    }
426
440
}