1
use std::error::Error as StdError;
2

            
3
use actix_web::{
4
    http::header::{self, HeaderValue},
5
    web::{self, Bytes},
6
    HttpMessage, HttpRequest, HttpResponse, Responder, ResponseError,
7
};
8
use chrono::{TimeZone, Utc};
9
use csv::WriterBuilder;
10
use log::error;
11
use serde_json;
12

            
13
use sylvia_iot_corelib::{err::ErrResp, role::Role, strings};
14

            
15
use super::{
16
    super::{
17
        super::{middleware::FullTokenInfo, ErrReq, State},
18
        get_unit_inner,
19
    },
20
    request, response,
21
};
22
use crate::models::network_uldata::{ListOptions, ListQueryCond, NetworkUlData, SortCond, SortKey};
23

            
24
const LIST_LIMIT_DEFAULT: u64 = 100;
25
const LIST_CURSOR_MAX: u64 = 100;
26
const CSV_FIELDS: &'static str =
27
    "dataId,proc,unitCode,networkCode,networkAddr,unitId,deviceId,time,profile,data,extension\n";
28

            
29
/// `GET /{base}/api/v1/network-uldata/count`
30
96
pub async fn get_count(
31
96
    req: HttpRequest,
32
96
    query: web::Query<request::GetCountQuery>,
33
96
    state: web::Data<State>,
34
96
) -> impl Responder {
35
96
    const FN_NAME: &'static str = "get_count";
36
96

            
37
96
    let mut query: request::GetCountQuery = (*query).clone();
38
96
    if let Some(network) = query.network {
39
6
        query.network = Some(network.to_lowercase());
40
90
    }
41
96
    if let Some(addr) = query.addr {
42
4
        query.addr = Some(addr.to_lowercase());
43
92
    }
44

            
45
96
    let unit_cond = match get_unit_cond(FN_NAME, &req, query.unit.as_ref(), &state).await {
46
8
        Err(e) => return e,
47
88
        Ok(cond) => cond,
48
    };
49
88
    let cond = match get_list_cond(&query, &unit_cond).await {
50
30
        Err(e) => return e.error_response(),
51
58
        Ok(cond) => cond,
52
58
    };
53
116
    match state.model.network_uldata().count(&cond).await {
54
        Err(e) => {
55
            error!("[{}] count error: {}", FN_NAME, e);
56
            ErrResp::ErrDb(Some(e.to_string())).error_response()
57
        }
58
58
        Ok(count) => HttpResponse::Ok().json(response::GetCount {
59
58
            data: response::GetCountData { count },
60
58
        }),
61
    }
62
96
}
63

            
64
/// `GET /{base}/api/v1/network-uldata/list`
65
146
pub async fn get_list(
66
146
    req: HttpRequest,
67
146
    query: web::Query<request::GetListQuery>,
68
146
    state: web::Data<State>,
69
146
) -> impl Responder {
70
    const FN_NAME: &'static str = "get_list";
71

            
72
146
    let cond_query = request::GetCountQuery {
73
146
        unit: query.unit.clone(),
74
146
        device: query.device.clone(),
75
146
        network: match query.network.as_ref() {
76
140
            None => None,
77
6
            Some(network) => Some(network.to_lowercase()),
78
        },
79
146
        addr: match query.addr.as_ref() {
80
142
            None => None,
81
4
            Some(addr) => Some(addr.to_lowercase()),
82
        },
83
146
        profile: query.profile.clone(),
84
146
        tfield: query.tfield.clone(),
85
146
        tstart: query.tstart,
86
146
        tend: query.tend,
87
    };
88
146
    let unit_cond = match get_unit_cond(FN_NAME, &req, query.unit.as_ref(), &state).await {
89
8
        Err(e) => return Ok(e),
90
138
        Ok(cond) => cond,
91
    };
92
138
    let cond = match get_list_cond(&cond_query, &unit_cond).await {
93
30
        Err(e) => return Err(e),
94
108
        Ok(cond) => cond,
95
    };
96
108
    let sort_cond = match get_sort_cond(&query.sort) {
97
10
        Err(e) => return Err(e),
98
98
        Ok(cond) => cond,
99
    };
100
98
    let opts = ListOptions {
101
98
        cond: &cond,
102
98
        offset: query.offset,
103
98
        limit: match query.limit {
104
80
            None => Some(LIST_LIMIT_DEFAULT),
105
18
            Some(limit) => Some(limit),
106
        },
107
98
        sort: Some(sort_cond.as_slice()),
108
98
        cursor_max: Some(LIST_CURSOR_MAX),
109
    };
110

            
111
196
    let (list, cursor) = match state.model.network_uldata().list(&opts, None).await {
112
        Err(e) => {
113
            error!("[{}] list error: {}", FN_NAME, e);
114
            return Err(ErrResp::ErrDb(Some(e.to_string())));
115
        }
116
98
        Ok((list, cursor)) => match cursor {
117
84
            None => match query.format.as_ref() {
118
                Some(request::ListFormat::Array) => {
119
2
                    return Ok(HttpResponse::Ok().json(list_transform(&list)))
120
                }
121
                Some(request::ListFormat::Csv) => {
122
2
                    let bytes = match list_transform_bytes(&list, true, true, query.format.as_ref())
123
                    {
124
                        Err(e) => {
125
                            return Err(ErrResp::ErrUnknown(Some(format!(
126
                                "transform CSV error: {}",
127
                                e
128
                            ))))
129
                        }
130
2
                        Ok(bytes) => bytes,
131
2
                    };
132
2
                    return Ok(HttpResponse::Ok()
133
2
                        .insert_header((header::CONTENT_TYPE, "text/csv"))
134
2
                        .insert_header((
135
2
                            header::CONTENT_DISPOSITION,
136
2
                            "attachment;filename=network-uldata.csv",
137
2
                        ))
138
2
                        .body(bytes));
139
                }
140
                _ => {
141
80
                    return Ok(HttpResponse::Ok().json(response::GetList {
142
80
                        data: list_transform(&list),
143
80
                    }))
144
                }
145
            },
146
14
            Some(_) => (list, cursor),
147
14
        },
148
14
    };
149
14

            
150
14
    // TODO: detect client disconnect
151
14
    let query_format = query.format.clone();
152
14
    let stream = async_stream::stream! {
153
14
        let query = query.0.clone();
154
14
        let cond_query = request::GetCountQuery {
155
14
            unit: query.unit.clone(),
156
14
            device: query.device.clone(),
157
14
            network: query.network.clone(),
158
14
            addr: query.addr.clone(),
159
14
            profile: query.profile.clone(),
160
14
            tfield: query.tfield.clone(),
161
14
            tstart: query.tstart,
162
14
            tend: query.tend,
163
14
        };
164
14
        let cond = match get_list_cond(&cond_query, &unit_cond).await {
165
            Err(_) => return,
166
14
            Ok(cond) => cond,
167
        };
168
14
        let opts = ListOptions {
169
14
            cond: &cond,
170
14
            offset: query.offset,
171
14
            limit: match query.limit {
172
4
                None => Some(LIST_LIMIT_DEFAULT),
173
10
                Some(limit) => Some(limit),
174
            },
175
14
            sort: Some(sort_cond.as_slice()),
176
14
            cursor_max: Some(LIST_CURSOR_MAX),
177
14
        };
178
14

            
179
14
        let mut list = list;
180
14
        let mut cursor = cursor;
181
14
        let mut is_first = true;
182
        loop {
183
30
            yield list_transform_bytes(&list, is_first, cursor.is_none(), query.format.as_ref());
184
30
            is_first = false;
185
30
            if cursor.is_none() {
186
14
                break;
187
16
            }
188
22
            let (_list, _cursor) = match state.model.network_uldata().list(&opts, cursor).await {
189
                Err(_) => break,
190
16
                Ok((list, cursor)) => (list, cursor),
191
16
            };
192
16
            list = _list;
193
16
            cursor = _cursor;
194
        }
195
    };
196
4
    match query_format {
197
2
        Some(request::ListFormat::Csv) => Ok(HttpResponse::Ok()
198
2
            .insert_header((header::CONTENT_TYPE, "text/csv"))
199
2
            .insert_header((
200
2
                header::CONTENT_DISPOSITION,
201
2
                "attachment;filename=network-uldata.csv",
202
2
            ))
203
2
            .streaming(stream)),
204
12
        _ => Ok(HttpResponse::Ok().streaming(stream)),
205
    }
206
146
}
207

            
208
240
async fn get_list_cond<'a>(
209
240
    query: &'a request::GetCountQuery,
210
240
    unit_cond: &'a Option<String>,
211
240
) -> Result<ListQueryCond<'a>, ErrResp> {
212
240
    let mut cond = ListQueryCond {
213
240
        unit_id: match unit_cond.as_ref() {
214
96
            None => None,
215
144
            Some(unit_id) => Some(unit_id.as_str()),
216
        },
217
240
        ..Default::default()
218
    };
219
240
    if let Some(device_id) = query.device.as_ref() {
220
20
        if device_id.len() > 0 {
221
20
            cond.device_id = Some(device_id.as_str());
222
20
        }
223
220
    }
224
240
    if let Some(network_code) = query.network.as_ref() {
225
12
        if network_code.len() > 0 {
226
12
            cond.network_code = Some(network_code.as_str());
227
12
        }
228
228
    }
229
240
    if let Some(network_addr) = query.addr.as_ref() {
230
8
        if network_addr.len() > 0 {
231
8
            cond.network_addr = Some(network_addr.as_str());
232
8
        }
233
232
    }
234
240
    if let Some(profile) = query.profile.as_ref() {
235
8
        if profile.len() > 0 {
236
8
            cond.profile = Some(profile.as_str());
237
8
        }
238
232
    }
239
240
    if let Some(start) = query.tstart.as_ref() {
240
120
        match query.tfield.as_ref() {
241
24
            None => return Err(ErrResp::ErrParam(Some("missing `tfield`".to_string()))),
242
96
            Some(tfield) => match tfield.as_str() {
243
96
                "proc" => cond.proc_gte = Some(Utc.timestamp_nanos(*start * 1000000)),
244
36
                "time" => cond.time_gte = Some(Utc.timestamp_nanos(*start * 1000000)),
245
12
                _ => return Err(ErrResp::ErrParam(Some("invalid `tfield`".to_string()))),
246
            },
247
        }
248
120
    }
249
204
    if let Some(end) = query.tend.as_ref() {
250
48
        match query.tfield.as_ref() {
251
12
            None => return Err(ErrResp::ErrParam(Some("missing `tfield`".to_string()))),
252
36
            Some(tfield) => match tfield.as_str() {
253
36
                "proc" => cond.proc_lte = Some(Utc.timestamp_nanos(*end * 1000000)),
254
24
                "time" => cond.time_lte = Some(Utc.timestamp_nanos(*end * 1000000)),
255
12
                _ => return Err(ErrResp::ErrParam(Some("invalid `tfield`".to_string()))),
256
            },
257
        }
258
156
    }
259

            
260
180
    Ok(cond)
261
240
}
262

            
263
242
async fn get_unit_cond(
264
242
    fn_name: &str,
265
242
    req: &HttpRequest,
266
242
    query_unit: Option<&String>,
267
242
    state: &web::Data<State>,
268
242
) -> Result<Option<String>, HttpResponse> {
269
242
    let token_info = match req.extensions_mut().get::<FullTokenInfo>() {
270
        None => {
271
            error!("[{}] token not found", fn_name);
272
            return Err(
273
                ErrResp::ErrUnknown(Some("token info not found".to_string())).error_response(),
274
            );
275
        }
276
242
        Some(token_info) => token_info.clone(),
277
242
    };
278
242
    let broker_base = state.broker_base.as_str();
279
242
    let client = state.client.clone();
280
242

            
281
242
    match query_unit {
282
        None => {
283
78
            if !Role::is_role(&token_info.info.roles, Role::ADMIN)
284
78
                && !Role::is_role(&token_info.info.roles, Role::MANAGER)
285
            {
286
8
                return Err(ErrResp::ErrParam(Some("missing `unit`".to_string())).error_response());
287
70
            }
288
70
            Ok(None)
289
        }
290
164
        Some(unit_id) => match unit_id.len() {
291
12
            0 => Ok(None),
292
            _ => {
293
152
                let token = match HeaderValue::from_str(token_info.token.as_str()) {
294
                    Err(e) => {
295
                        error!("[{}] get token error: {}", fn_name, e);
296
                        return Err(ErrResp::ErrUnknown(Some(format!("get token error: {}", e)))
297
                            .error_response());
298
                    }
299
152
                    Ok(value) => value,
300
152
                };
301
157
                match get_unit_inner(fn_name, &client, broker_base, unit_id, &token).await {
302
                    Err(e) => {
303
                        error!("[{}] get unit error", fn_name);
304
                        return Err(e);
305
                    }
306
152
                    Ok(unit) => match unit {
307
                        None => {
308
8
                            return Err(ErrResp::Custom(
309
8
                                ErrReq::UNIT_NOT_EXIST.0,
310
8
                                ErrReq::UNIT_NOT_EXIST.1,
311
8
                                None,
312
8
                            )
313
8
                            .error_response())
314
                        }
315
144
                        Some(_) => Ok(Some(unit_id.clone())),
316
                    },
317
                }
318
            }
319
        },
320
    }
321
242
}
322

            
323
108
fn get_sort_cond(sort_args: &Option<String>) -> Result<Vec<SortCond>, ErrResp> {
324
108
    match sort_args.as_ref() {
325
60
        None => Ok(vec![SortCond {
326
60
            key: SortKey::Proc,
327
60
            asc: false,
328
60
        }]),
329
48
        Some(args) => {
330
48
            let mut args = args.split(",");
331
48
            let mut sort_cond = vec![];
332
104
            while let Some(arg) = args.next() {
333
66
                let mut cond = arg.split(":");
334
66
                let key = match cond.next() {
335
                    None => return Err(ErrResp::ErrParam(Some("wrong sort argument".to_string()))),
336
66
                    Some(field) => match field {
337
66
                        "proc" => SortKey::Proc,
338
26
                        "time" => SortKey::Time,
339
20
                        "network" => SortKey::NetworkCode,
340
12
                        "addr" => SortKey::NetworkAddr,
341
                        _ => {
342
4
                            return Err(ErrResp::ErrParam(Some(format!(
343
4
                                "invalid sort key {}",
344
4
                                field
345
4
                            ))))
346
                        }
347
                    },
348
                };
349
62
                let asc = match cond.next() {
350
2
                    None => return Err(ErrResp::ErrParam(Some("wrong sort argument".to_string()))),
351
60
                    Some(asc) => match asc {
352
60
                        "asc" => true,
353
14
                        "desc" => false,
354
                        _ => {
355
2
                            return Err(ErrResp::ErrParam(Some(format!(
356
2
                                "invalid sort asc {}",
357
2
                                asc
358
2
                            ))))
359
                        }
360
                    },
361
                };
362
58
                if cond.next().is_some() {
363
2
                    return Err(ErrResp::ErrParam(Some(
364
2
                        "invalid sort condition".to_string(),
365
2
                    )));
366
56
                }
367
56
                sort_cond.push(SortCond { key, asc });
368
            }
369
38
            Ok(sort_cond)
370
        }
371
    }
372
108
}
373

            
374
82
fn list_transform(list: &Vec<NetworkUlData>) -> Vec<response::GetListData> {
375
82
    let mut ret = vec![];
376
226
    for item in list.iter() {
377
226
        ret.push(data_transform(&item));
378
226
    }
379
82
    ret
380
82
}
381

            
382
32
fn list_transform_bytes(
383
32
    list: &Vec<NetworkUlData>,
384
32
    with_start: bool,
385
32
    with_end: bool,
386
32
    format: Option<&request::ListFormat>,
387
32
) -> Result<Bytes, Box<dyn StdError>> {
388
32
    let mut build_str = match with_start {
389
16
        false => "".to_string(),
390
6
        true => match format {
391
2
            Some(request::ListFormat::Array) => "[".to_string(),
392
            Some(request::ListFormat::Csv) => {
393
4
                let bom = String::from_utf8(vec![0xEF, 0xBB, 0xBF])?;
394
4
                format!("{}{}", bom, CSV_FIELDS)
395
            }
396
10
            _ => "{\"data\":[".to_string(),
397
        },
398
    };
399
32
    let mut is_first = with_start;
400

            
401
1870
    for item in list {
402
430
        match format {
403
            Some(request::ListFormat::Csv) => {
404
220
                let mut writer = WriterBuilder::new().has_headers(false).from_writer(vec![]);
405
220
                writer.serialize(data_transform_csv(item))?;
406
220
                build_str += String::from_utf8(writer.into_inner()?)?.as_str();
407
            }
408
            _ => {
409
1618
                if is_first {
410
12
                    is_first = false;
411
1606
                } else {
412
1606
                    build_str.push(',');
413
1606
                }
414
1618
                let json_str = match serde_json::to_string(&data_transform(item)) {
415
                    Err(e) => return Err(Box::new(e)),
416
1618
                    Ok(str) => str,
417
1618
                };
418
1618
                build_str += json_str.as_str();
419
            }
420
        }
421
    }
422

            
423
32
    if with_end {
424
16
        build_str += match format {
425
2
            Some(request::ListFormat::Array) => "]",
426
4
            Some(request::ListFormat::Csv) => "",
427
10
            _ => "]}",
428
        }
429
16
    }
430
32
    Ok(Bytes::copy_from_slice(build_str.as_str().as_bytes()))
431
32
}
432

            
433
1844
fn data_transform(data: &NetworkUlData) -> response::GetListData {
434
1844
    response::GetListData {
435
1844
        data_id: data.data_id.clone(),
436
1844
        proc: strings::time_str(&data.proc),
437
1844
        unit_code: data.unit_code.clone(),
438
1844
        network_code: data.network_code.clone(),
439
1844
        network_addr: data.network_addr.clone(),
440
1844
        unit_id: data.unit_id.clone(),
441
1844
        device_id: data.device_id.clone(),
442
1844
        time: strings::time_str(&data.time),
443
1844
        profile: data.profile.clone(),
444
1844
        data: data.data.clone(),
445
1844
        extension: data.extension.clone(),
446
1844
    }
447
1844
}
448

            
449
220
fn data_transform_csv(data: &NetworkUlData) -> response::GetListCsvData {
450
220
    response::GetListCsvData {
451
220
        data_id: data.data_id.clone(),
452
220
        proc: strings::time_str(&data.proc),
453
220
        unit_code: match data.unit_code.as_ref() {
454
208
            None => "".to_string(),
455
12
            Some(unit_code) => unit_code.clone(),
456
        },
457
220
        network_code: data.network_code.clone(),
458
220
        network_addr: data.network_addr.clone(),
459
220
        unit_id: match data.unit_id.as_ref() {
460
208
            None => "".to_string(),
461
12
            Some(unit_id) => unit_id.clone(),
462
        },
463
220
        device_id: match data.device_id.as_ref() {
464
208
            None => "".to_string(),
465
12
            Some(device_id) => device_id.clone(),
466
        },
467
220
        time: strings::time_str(&data.time),
468
220
        profile: data.profile.clone(),
469
220
        data: data.data.clone(),
470
220
        extension: match data.extension.as_ref() {
471
208
            None => "".to_string(),
472
12
            Some(extension) => match serde_json::to_string(extension) {
473
                Err(_) => "".to_string(),
474
12
                Ok(extension) => extension,
475
            },
476
        },
477
    }
478
220
}