1
use std::error::Error as StdError;
2

            
3
use actix_web::{
4
    http::header::{self, HeaderValue},
5
    web::{self, Bytes},
6
    HttpMessage, HttpRequest, HttpResponse, Responder, ResponseError,
7
};
8
use chrono::{TimeZone, Utc};
9
use csv::WriterBuilder;
10
use log::error;
11
use serde_json;
12

            
13
use sylvia_iot_corelib::{err::ErrResp, role::Role, strings};
14

            
15
use super::{
16
    super::{
17
        super::{middleware::FullTokenInfo, ErrReq, State},
18
        get_unit_inner,
19
    },
20
    request, response,
21
};
22
use crate::models::application_uldata::{
23
    ApplicationUlData, ListOptions, ListQueryCond, SortCond, SortKey,
24
};
25

            
26
const LIST_LIMIT_DEFAULT: u64 = 100;
27
const LIST_CURSOR_MAX: u64 = 100;
28
const CSV_FIELDS: &'static str =
29
    "dataId,proc,pub,unitCode,networkCode,networkAddr,unitId,deviceId,time,profile,data,extension\n";
30

            
31
/// `GET /{base}/api/v1/application-uldata/count`
32
108
pub async fn get_count(
33
108
    req: HttpRequest,
34
108
    query: web::Query<request::GetCountQuery>,
35
108
    state: web::Data<State>,
36
108
) -> impl Responder {
37
108
    const FN_NAME: &'static str = "get_count";
38
108

            
39
108
    let mut query: request::GetCountQuery = (*query).clone();
40
108
    if let Some(network) = query.network {
41
6
        query.network = Some(network.to_lowercase());
42
102
    }
43
108
    if let Some(addr) = query.addr {
44
4
        query.addr = Some(addr.to_lowercase());
45
104
    }
46

            
47
108
    let unit_cond = match get_unit_cond(FN_NAME, &req, query.unit.as_ref(), &state).await {
48
8
        Err(e) => return e,
49
100
        Ok(cond) => cond,
50
    };
51
100
    let cond = match get_list_cond(&query, &unit_cond).await {
52
30
        Err(e) => return e.error_response(),
53
70
        Ok(cond) => cond,
54
70
    };
55
140
    match state.model.application_uldata().count(&cond).await {
56
        Err(e) => {
57
            error!("[{}] count error: {}", FN_NAME, e);
58
            ErrResp::ErrDb(Some(e.to_string())).error_response()
59
        }
60
70
        Ok(count) => HttpResponse::Ok().json(response::GetCount {
61
70
            data: response::GetCountData { count },
62
70
        }),
63
    }
64
108
}
65

            
66
/// `GET /{base}/api/v1/application-uldata/list`
67
162
pub async fn get_list(
68
162
    req: HttpRequest,
69
162
    query: web::Query<request::GetListQuery>,
70
162
    state: web::Data<State>,
71
162
) -> impl Responder {
72
    const FN_NAME: &'static str = "get_list";
73

            
74
162
    let cond_query = request::GetCountQuery {
75
162
        unit: query.unit.clone(),
76
162
        device: query.device.clone(),
77
162
        network: match query.network.as_ref() {
78
156
            None => None,
79
6
            Some(network) => Some(network.to_lowercase()),
80
        },
81
162
        addr: match query.addr.as_ref() {
82
158
            None => None,
83
4
            Some(addr) => Some(addr.to_lowercase()),
84
        },
85
162
        profile: query.profile.clone(),
86
162
        tfield: query.tfield.clone(),
87
162
        tstart: query.tstart,
88
162
        tend: query.tend,
89
    };
90
162
    let unit_cond = match get_unit_cond(FN_NAME, &req, query.unit.as_ref(), &state).await {
91
8
        Err(e) => return Ok(e),
92
154
        Ok(cond) => cond,
93
    };
94
154
    let cond = match get_list_cond(&cond_query, &unit_cond).await {
95
30
        Err(e) => return Err(e),
96
124
        Ok(cond) => cond,
97
    };
98
124
    let sort_cond = match get_sort_cond(&query.sort) {
99
10
        Err(e) => return Err(e),
100
114
        Ok(cond) => cond,
101
    };
102
114
    let opts = ListOptions {
103
114
        cond: &cond,
104
114
        offset: query.offset,
105
114
        limit: match query.limit {
106
96
            None => Some(LIST_LIMIT_DEFAULT),
107
18
            Some(limit) => Some(limit),
108
        },
109
114
        sort: Some(sort_cond.as_slice()),
110
114
        cursor_max: Some(LIST_CURSOR_MAX),
111
    };
112

            
113
228
    let (list, cursor) = match state.model.application_uldata().list(&opts, None).await {
114
        Err(e) => {
115
            error!("[{}] list error: {}", FN_NAME, e);
116
            return Err(ErrResp::ErrDb(Some(e.to_string())));
117
        }
118
114
        Ok((list, cursor)) => match cursor {
119
100
            None => match query.format.as_ref() {
120
                Some(request::ListFormat::Array) => {
121
2
                    return Ok(HttpResponse::Ok().json(list_transform(&list)))
122
                }
123
                Some(request::ListFormat::Csv) => {
124
2
                    let bytes = match list_transform_bytes(&list, true, true, query.format.as_ref())
125
                    {
126
                        Err(e) => {
127
                            return Err(ErrResp::ErrUnknown(Some(format!(
128
                                "transform CSV error: {}",
129
                                e
130
                            ))))
131
                        }
132
2
                        Ok(bytes) => bytes,
133
2
                    };
134
2
                    return Ok(HttpResponse::Ok()
135
2
                        .insert_header((header::CONTENT_TYPE, "text/csv"))
136
2
                        .insert_header((
137
2
                            header::CONTENT_DISPOSITION,
138
2
                            "attachment;filename=application-uldata.csv",
139
2
                        ))
140
2
                        .body(bytes));
141
                }
142
                _ => {
143
96
                    return Ok(HttpResponse::Ok().json(response::GetList {
144
96
                        data: list_transform(&list),
145
96
                    }))
146
                }
147
            },
148
14
            Some(_) => (list, cursor),
149
14
        },
150
14
    };
151
14

            
152
14
    // TODO: detect client disconnect
153
14
    let query_format = query.format.clone();
154
14
    let stream = async_stream::stream! {
155
14
        let query = query.0.clone();
156
14
        let cond_query = request::GetCountQuery {
157
14
            unit: query.unit.clone(),
158
14
            device: query.device.clone(),
159
14
            network: query.network.clone(),
160
14
            addr: query.addr.clone(),
161
14
            profile: query.profile.clone(),
162
14
            tfield: query.tfield.clone(),
163
14
            tstart: query.tstart,
164
14
            tend: query.tend,
165
14
        };
166
14
        let cond = match get_list_cond(&cond_query, &unit_cond).await {
167
            Err(_) => return,
168
14
            Ok(cond) => cond,
169
        };
170
14
        let opts = ListOptions {
171
14
            cond: &cond,
172
14
            offset: query.offset,
173
14
            limit: match query.limit {
174
4
                None => Some(LIST_LIMIT_DEFAULT),
175
10
                Some(limit) => Some(limit),
176
            },
177
14
            sort: Some(sort_cond.as_slice()),
178
14
            cursor_max: Some(LIST_CURSOR_MAX),
179
14
        };
180
14

            
181
14
        let mut list = list;
182
14
        let mut cursor = cursor;
183
14
        let mut is_first = true;
184
        loop {
185
30
            yield list_transform_bytes(&list, is_first, cursor.is_none(), query.format.as_ref());
186
30
            is_first = false;
187
30
            if cursor.is_none() {
188
14
                break;
189
16
            }
190
22
            let (_list, _cursor) = match state.model.application_uldata().list(&opts, cursor).await {
191
                Err(_) => break,
192
16
                Ok((list, cursor)) => (list, cursor),
193
16
            };
194
16
            list = _list;
195
16
            cursor = _cursor;
196
        }
197
    };
198
4
    match query_format {
199
2
        Some(request::ListFormat::Csv) => Ok(HttpResponse::Ok()
200
2
            .insert_header((header::CONTENT_TYPE, "text/csv"))
201
2
            .insert_header((
202
2
                header::CONTENT_DISPOSITION,
203
2
                "attachment;filename=application-uldata.csv",
204
2
            ))
205
2
            .streaming(stream)),
206
12
        _ => Ok(HttpResponse::Ok().streaming(stream)),
207
    }
208
162
}
209

            
210
268
async fn get_list_cond<'a>(
211
268
    query: &'a request::GetCountQuery,
212
268
    unit_cond: &'a Option<String>,
213
268
) -> Result<ListQueryCond<'a>, ErrResp> {
214
268
    let mut cond = ListQueryCond {
215
268
        unit_id: match unit_cond.as_ref() {
216
100
            None => None,
217
168
            Some(unit_id) => Some(unit_id.as_str()),
218
        },
219
268
        ..Default::default()
220
    };
221
268
    if let Some(device_id) = query.device.as_ref() {
222
20
        if device_id.len() > 0 {
223
20
            cond.device_id = Some(device_id.as_str());
224
20
        }
225
248
    }
226
268
    if let Some(network_code) = query.network.as_ref() {
227
12
        if network_code.len() > 0 {
228
12
            cond.network_code = Some(network_code.as_str());
229
12
        }
230
256
    }
231
268
    if let Some(network_addr) = query.addr.as_ref() {
232
8
        if network_addr.len() > 0 {
233
8
            cond.network_addr = Some(network_addr.as_str());
234
8
        }
235
260
    }
236
268
    if let Some(profile) = query.profile.as_ref() {
237
8
        if profile.len() > 0 {
238
8
            cond.profile = Some(profile.as_str());
239
8
        }
240
260
    }
241
268
    if let Some(start) = query.tstart.as_ref() {
242
144
        match query.tfield.as_ref() {
243
24
            None => return Err(ErrResp::ErrParam(Some("missing `tfield`".to_string()))),
244
120
            Some(tfield) => match tfield.as_str() {
245
120
                "proc" => cond.proc_gte = Some(Utc.timestamp_nanos(*start * 1000000)),
246
60
                "pub" => cond.pub_gte = Some(Utc.timestamp_nanos(*start * 1000000)),
247
36
                "time" => cond.time_gte = Some(Utc.timestamp_nanos(*start * 1000000)),
248
12
                _ => return Err(ErrResp::ErrParam(Some("invalid `tfield`".to_string()))),
249
            },
250
        }
251
124
    }
252
232
    if let Some(end) = query.tend.as_ref() {
253
60
        match query.tfield.as_ref() {
254
12
            None => return Err(ErrResp::ErrParam(Some("missing `tfield`".to_string()))),
255
48
            Some(tfield) => match tfield.as_str() {
256
48
                "proc" => cond.proc_lte = Some(Utc.timestamp_nanos(*end * 1000000)),
257
36
                "pub" => cond.pub_lte = Some(Utc.timestamp_nanos(*end * 1000000)),
258
24
                "time" => cond.time_lte = Some(Utc.timestamp_nanos(*end * 1000000)),
259
12
                _ => return Err(ErrResp::ErrParam(Some("invalid `tfield`".to_string()))),
260
            },
261
        }
262
172
    }
263

            
264
208
    Ok(cond)
265
268
}
266

            
267
270
async fn get_unit_cond(
268
270
    fn_name: &str,
269
270
    req: &HttpRequest,
270
270
    query_unit: Option<&String>,
271
270
    state: &web::Data<State>,
272
270
) -> Result<Option<String>, HttpResponse> {
273
270
    let token_info = match req.extensions_mut().get::<FullTokenInfo>() {
274
        None => {
275
            error!("[{}] token not found", fn_name);
276
            return Err(
277
                ErrResp::ErrUnknown(Some("token info not found".to_string())).error_response(),
278
            );
279
        }
280
270
        Some(token_info) => token_info.clone(),
281
270
    };
282
270
    let broker_base = state.broker_base.as_str();
283
270
    let client = state.client.clone();
284
270

            
285
270
    match query_unit {
286
        None => {
287
82
            if !Role::is_role(&token_info.info.roles, Role::ADMIN)
288
82
                && !Role::is_role(&token_info.info.roles, Role::MANAGER)
289
            {
290
8
                return Err(ErrResp::ErrParam(Some("missing `unit`".to_string())).error_response());
291
74
            }
292
74
            Ok(None)
293
        }
294
188
        Some(unit_id) => match unit_id.len() {
295
12
            0 => Ok(None),
296
            _ => {
297
176
                let token = match HeaderValue::from_str(token_info.token.as_str()) {
298
                    Err(e) => {
299
                        error!("[{}] get token error: {}", fn_name, e);
300
                        return Err(ErrResp::ErrUnknown(Some(format!("get token error: {}", e)))
301
                            .error_response());
302
                    }
303
176
                    Ok(value) => value,
304
176
                };
305
181
                match get_unit_inner(fn_name, &client, broker_base, unit_id, &token).await {
306
                    Err(e) => {
307
                        error!("[{}] get unit error", fn_name);
308
                        return Err(e);
309
                    }
310
176
                    Ok(unit) => match unit {
311
                        None => {
312
8
                            return Err(ErrResp::Custom(
313
8
                                ErrReq::UNIT_NOT_EXIST.0,
314
8
                                ErrReq::UNIT_NOT_EXIST.1,
315
8
                                None,
316
8
                            )
317
8
                            .error_response())
318
                        }
319
168
                        Some(_) => Ok(Some(unit_id.clone())),
320
                    },
321
                }
322
            }
323
        },
324
    }
325
270
}
326

            
327
124
fn get_sort_cond(sort_args: &Option<String>) -> Result<Vec<SortCond>, ErrResp> {
328
124
    match sort_args.as_ref() {
329
72
        None => Ok(vec![SortCond {
330
72
            key: SortKey::Proc,
331
72
            asc: false,
332
72
        }]),
333
52
        Some(args) => {
334
52
            let mut args = args.split(",");
335
52
            let mut sort_cond = vec![];
336
112
            while let Some(arg) = args.next() {
337
70
                let mut cond = arg.split(":");
338
70
                let key = match cond.next() {
339
                    None => return Err(ErrResp::ErrParam(Some("wrong sort argument".to_string()))),
340
70
                    Some(field) => match field {
341
70
                        "proc" => SortKey::Proc,
342
30
                        "pub" => SortKey::Pub,
343
24
                        "time" => SortKey::Time,
344
20
                        "network" => SortKey::NetworkCode,
345
12
                        "addr" => SortKey::NetworkAddr,
346
                        _ => {
347
4
                            return Err(ErrResp::ErrParam(Some(format!(
348
4
                                "invalid sort key {}",
349
4
                                field
350
4
                            ))))
351
                        }
352
                    },
353
                };
354
66
                let asc = match cond.next() {
355
2
                    None => return Err(ErrResp::ErrParam(Some("wrong sort argument".to_string()))),
356
64
                    Some(asc) => match asc {
357
64
                        "asc" => true,
358
16
                        "desc" => false,
359
                        _ => {
360
2
                            return Err(ErrResp::ErrParam(Some(format!(
361
2
                                "invalid sort asc {}",
362
2
                                asc
363
2
                            ))))
364
                        }
365
                    },
366
                };
367
62
                if cond.next().is_some() {
368
2
                    return Err(ErrResp::ErrParam(Some(
369
2
                        "invalid sort condition".to_string(),
370
2
                    )));
371
60
                }
372
60
                sort_cond.push(SortCond { key, asc });
373
            }
374
42
            Ok(sort_cond)
375
        }
376
    }
377
124
}
378

            
379
98
fn list_transform(list: &Vec<ApplicationUlData>) -> Vec<response::GetListData> {
380
98
    let mut ret = vec![];
381
288
    for item in list.iter() {
382
288
        ret.push(data_transform(&item));
383
288
    }
384
98
    ret
385
98
}
386

            
387
32
fn list_transform_bytes(
388
32
    list: &Vec<ApplicationUlData>,
389
32
    with_start: bool,
390
32
    with_end: bool,
391
32
    format: Option<&request::ListFormat>,
392
32
) -> Result<Bytes, Box<dyn StdError>> {
393
32
    let mut build_str = match with_start {
394
16
        false => "".to_string(),
395
6
        true => match format {
396
2
            Some(request::ListFormat::Array) => "[".to_string(),
397
            Some(request::ListFormat::Csv) => {
398
4
                let bom = String::from_utf8(vec![0xEF, 0xBB, 0xBF])?;
399
4
                format!("{}{}", bom, CSV_FIELDS)
400
            }
401
10
            _ => "{\"data\":[".to_string(),
402
        },
403
    };
404
32
    let mut is_first = with_start;
405

            
406
1870
    for item in list {
407
430
        match format {
408
            Some(request::ListFormat::Csv) => {
409
220
                let mut writer = WriterBuilder::new().has_headers(false).from_writer(vec![]);
410
220
                writer.serialize(data_transform_csv(item))?;
411
220
                build_str += String::from_utf8(writer.into_inner()?)?.as_str();
412
            }
413
            _ => {
414
1618
                if is_first {
415
12
                    is_first = false;
416
1606
                } else {
417
1606
                    build_str.push(',');
418
1606
                }
419
1618
                let json_str = match serde_json::to_string(&data_transform(item)) {
420
                    Err(e) => return Err(Box::new(e)),
421
1618
                    Ok(str) => str,
422
1618
                };
423
1618
                build_str += json_str.as_str();
424
            }
425
        }
426
    }
427

            
428
32
    if with_end {
429
16
        build_str += match format {
430
2
            Some(request::ListFormat::Array) => "]",
431
4
            Some(request::ListFormat::Csv) => "",
432
10
            _ => "]}",
433
        }
434
16
    }
435
32
    Ok(Bytes::copy_from_slice(build_str.as_str().as_bytes()))
436
32
}
437

            
438
1906
fn data_transform(data: &ApplicationUlData) -> response::GetListData {
439
1906
    response::GetListData {
440
1906
        data_id: data.data_id.clone(),
441
1906
        proc: strings::time_str(&data.proc),
442
1906
        publish: strings::time_str(&data.publish),
443
1906
        unit_code: data.unit_code.clone(),
444
1906
        network_code: data.network_code.clone(),
445
1906
        network_addr: data.network_addr.clone(),
446
1906
        unit_id: data.unit_id.clone(),
447
1906
        device_id: data.device_id.clone(),
448
1906
        time: strings::time_str(&data.time),
449
1906
        profile: data.profile.clone(),
450
1906
        data: data.data.clone(),
451
1906
        extension: data.extension.clone(),
452
1906
    }
453
1906
}
454

            
455
220
fn data_transform_csv(data: &ApplicationUlData) -> response::GetListCsvData {
456
220
    response::GetListCsvData {
457
220
        data_id: data.data_id.clone(),
458
220
        proc: strings::time_str(&data.proc),
459
220
        publish: strings::time_str(&data.publish),
460
220
        unit_code: match data.unit_code.as_ref() {
461
208
            None => "".to_string(),
462
12
            Some(unit_code) => unit_code.clone(),
463
        },
464
220
        network_code: data.network_code.clone(),
465
220
        network_addr: data.network_addr.clone(),
466
220
        unit_id: data.unit_id.clone(),
467
220
        device_id: data.device_id.clone(),
468
220
        time: strings::time_str(&data.time),
469
220
        profile: data.profile.clone(),
470
220
        data: data.data.clone(),
471
220
        extension: match data.extension.as_ref() {
472
208
            None => "".to_string(),
473
12
            Some(extension) => match serde_json::to_string(extension) {
474
                Err(_) => "".to_string(),
475
12
                Ok(extension) => extension,
476
            },
477
        },
478
    }
479
220
}