1
use std::{collections::HashMap, error::Error as StdError};
2

            
3
use axum::{
4
    body::{Body, Bytes},
5
    extract::State,
6
    http::{header, StatusCode},
7
    response::IntoResponse,
8
    Extension,
9
};
10
use log::error;
11
use serde_json;
12

            
13
use sylvia_iot_corelib::{
14
    constants::ContentType,
15
    err::ErrResp,
16
    http::{Json, Path, Query},
17
    role::Role,
18
    strings::time_str,
19
};
20

            
21
use super::{
22
    super::{
23
        super::{middleware::GetTokenInfoData, ErrReq, State as AppState},
24
        lib::check_unit,
25
    },
26
    request, response,
27
};
28
use crate::models::dldata_buffer::{
29
    DlDataBuffer, ListOptions, ListQueryCond, QueryCond, SortCond, SortKey,
30
};
31

            
32
const LIST_LIMIT_DEFAULT: u64 = 100;
33
const LIST_CURSOR_MAX: u64 = 100;
34

            
35
/// `GET /{base}/api/v1/dldata-buffer/count`
36
270
pub async fn get_dldata_buffer_count(
37
270
    State(state): State<AppState>,
38
270
    Extension(token_info): Extension<GetTokenInfoData>,
39
270
    Query(query): Query<request::GetDlDataBufferCountQuery>,
40
270
) -> impl IntoResponse {
41
    const FN_NAME: &'static str = "get_dldata_buffer_count";
42

            
43
270
    let user_id = token_info.user_id.as_str();
44
270
    let roles = &token_info.roles;
45
270

            
46
270
    if !Role::is_role(roles, Role::ADMIN) && !Role::is_role(roles, Role::MANAGER) {
47
138
        match query.unit.as_ref() {
48
6
            None => return Err(ErrResp::ErrParam(Some("missing `unit`".to_string()))),
49
132
            Some(unit_id) => {
50
132
                if unit_id.len() == 0 {
51
6
                    return Err(ErrResp::ErrParam(Some("missing `unit`".to_string())));
52
126
                }
53
            }
54
        }
55
132
    }
56
258
    let unit_cond = match query.unit.as_ref() {
57
48
        None => None,
58
210
        Some(unit_id) => match unit_id.len() {
59
12
            0 => None,
60
            _ => {
61
198
                match check_unit(FN_NAME, user_id, roles, unit_id.as_str(), false, &state).await? {
62
                    None => {
63
18
                        return Err(ErrResp::Custom(
64
18
                            ErrReq::UNIT_NOT_EXIST.0,
65
18
                            ErrReq::UNIT_NOT_EXIST.1,
66
18
                            None,
67
18
                        ))
68
                    }
69
180
                    Some(_) => Some(unit_id.as_str()),
70
                }
71
            }
72
        },
73
    };
74
240
    let cond = ListQueryCond {
75
240
        unit_id: unit_cond,
76
240
        application_id: match query.application.as_ref() {
77
150
            None => None,
78
90
            Some(application) => match application.len() {
79
6
                0 => None,
80
84
                _ => Some(application.as_ref()),
81
            },
82
        },
83
240
        network_id: match query.network.as_ref() {
84
150
            None => None,
85
90
            Some(network_id) => match network_id.len() {
86
6
                0 => None,
87
84
                _ => Some(network_id.as_ref()),
88
            },
89
        },
90
240
        device_id: match query.device.as_ref() {
91
198
            None => None,
92
42
            Some(device_id) => match device_id.len() {
93
6
                0 => None,
94
36
                _ => Some(device_id.as_ref()),
95
            },
96
        },
97
240
        ..Default::default()
98
240
    };
99
240
    match state.model.dldata_buffer().count(&cond).await {
100
        Err(e) => {
101
            error!("[{}] count error: {}", FN_NAME, e);
102
            Err(ErrResp::ErrDb(Some(e.to_string())))
103
        }
104
240
        Ok(count) => Ok(Json(response::GetDlDataBufferCount {
105
240
            data: response::GetCountData { count },
106
240
        })),
107
    }
108
270
}
109

            
110
/// `GET /{base}/api/v1/dldata-buffer/list`
111
462
pub async fn get_dldata_buffer_list(
112
462
    State(state): State<AppState>,
113
462
    Extension(token_info): Extension<GetTokenInfoData>,
114
462
    Query(query): Query<request::GetDlDataBufferListQuery>,
115
462
) -> impl IntoResponse {
116
    const FN_NAME: &'static str = "get_dldata_buffer_list";
117

            
118
462
    let user_id = token_info.user_id.as_str();
119
462
    let roles = &token_info.roles;
120
462

            
121
462
    if !Role::is_role(roles, Role::ADMIN) && !Role::is_role(roles, Role::MANAGER) {
122
138
        match query.unit.as_ref() {
123
6
            None => return Err(ErrResp::ErrParam(Some("missing `unit`".to_string()))),
124
132
            Some(unit_id) => {
125
132
                if unit_id.len() == 0 {
126
6
                    return Err(ErrResp::ErrParam(Some("missing `unit`".to_string())));
127
126
                }
128
            }
129
        }
130
324
    }
131
450
    let unit_cond = match query.unit.as_ref() {
132
216
        None => None,
133
234
        Some(unit_id) => match unit_id.len() {
134
18
            0 => None,
135
            _ => {
136
216
                match check_unit(FN_NAME, user_id, roles, unit_id.as_str(), false, &state).await? {
137
                    None => {
138
18
                        return Err(ErrResp::Custom(
139
18
                            ErrReq::UNIT_NOT_EXIST.0,
140
18
                            ErrReq::UNIT_NOT_EXIST.1,
141
18
                            None,
142
18
                        ))
143
                    }
144
198
                    Some(_) => Some(unit_id.as_str()),
145
                }
146
            }
147
        },
148
    };
149
432
    let cond = ListQueryCond {
150
432
        unit_id: unit_cond,
151
432
        application_id: match query.application.as_ref() {
152
318
            None => None,
153
114
            Some(application) => match application.len() {
154
12
                0 => None,
155
102
                _ => Some(application.as_ref()),
156
            },
157
        },
158
432
        network_id: match query.network.as_ref() {
159
318
            None => None,
160
114
            Some(network_id) => match network_id.len() {
161
12
                0 => None,
162
102
                _ => Some(network_id.as_ref()),
163
            },
164
        },
165
432
        device_id: match query.device.as_ref() {
166
384
            None => None,
167
48
            Some(device_id) => match device_id.len() {
168
12
                0 => None,
169
36
                _ => Some(device_id.as_ref()),
170
            },
171
        },
172
432
        ..Default::default()
173
    };
174
432
    let sort_cond = get_sort_cond(&query.sort)?;
175
402
    let opts = ListOptions {
176
402
        cond: &cond,
177
402
        offset: query.offset,
178
402
        limit: match query.limit {
179
354
            None => Some(LIST_LIMIT_DEFAULT),
180
48
            Some(limit) => match limit {
181
18
                0 => None,
182
30
                _ => Some(limit),
183
            },
184
        },
185
402
        sort: Some(sort_cond.as_slice()),
186
402
        cursor_max: Some(LIST_CURSOR_MAX),
187
    };
188

            
189
402
    let (list, cursor) = match state.model.dldata_buffer().list(&opts, None).await {
190
        Err(e) => {
191
            error!("[{}] list error: {}", FN_NAME, e);
192
            return Err(ErrResp::ErrDb(Some(e.to_string())));
193
        }
194
402
        Ok((list, cursor)) => match cursor {
195
6
            None => match query.format {
196
                Some(request::ListFormat::Array) => {
197
6
                    return Ok(Json(data_list_transform(&list)).into_response())
198
                }
199
                _ => {
200
318
                    return Ok(Json(response::GetDlDataBufferList {
201
318
                        data: data_list_transform(&list),
202
318
                    })
203
318
                    .into_response())
204
                }
205
            },
206
78
            Some(_) => (list, cursor),
207
78
        },
208
78
    };
209
78

            
210
78
    let body = Body::from_stream(async_stream::stream! {
211
78
        let unit_cond = match query.unit.as_ref() {
212
78
            None => None,
213
78
            Some(unit_id) => match unit_id.len() {
214
78
                0 => None,
215
78
                _ => Some(unit_id.as_str()),
216
78
            },
217
78
        };
218
78
        let cond = ListQueryCond {
219
78
            unit_id: unit_cond,
220
78
            application_id: match query.application.as_ref() {
221
78
                None => None,
222
78
                Some(application) => match application.len() {
223
78
                    0 => None,
224
78
                    _ => Some(application.as_ref())
225
78
                },
226
78
            },
227
78
            network_id: match query.network.as_ref() {
228
78
                None => None,
229
78
                Some(network_id) => match network_id.len() {
230
78
                    0 => None,
231
78
                    _ => Some(network_id.as_ref())
232
78
                },
233
78
            },
234
78
            device_id: match query.device.as_ref() {
235
78
                None => None,
236
78
                Some(device_id) => match device_id.len() {
237
78
                    0 => None,
238
78
                    _ => Some(device_id.as_ref())
239
78
                },
240
78
            },
241
78
            ..Default::default()
242
78
        };
243
78
        let opts = ListOptions {
244
78
            cond: &cond,
245
78
            offset: query.offset,
246
78
            limit: match query.limit {
247
78
                None => Some(LIST_LIMIT_DEFAULT),
248
78
                Some(limit) => match limit {
249
78
                    0 => None,
250
78
                    _ => Some(limit),
251
78
                },
252
78
            },
253
78
            sort: Some(sort_cond.as_slice()),
254
78
            cursor_max: Some(LIST_CURSOR_MAX),
255
78
        };
256
78

            
257
78
        let mut list = list;
258
78
        let mut cursor = cursor;
259
78
        let mut is_first = true;
260
78
        loop {
261
78
            yield data_list_transform_bytes(&list, is_first, cursor.is_none(), query.format.as_ref());
262
78
            is_first = false;
263
78
            if cursor.is_none() {
264
78
                break;
265
78
            }
266
78
            let (_list, _cursor) = match state.model.dldata_buffer().list(&opts, cursor).await {
267
78
                Err(_) => break,
268
78
                Ok((list, cursor)) => (list, cursor),
269
78
            };
270
78
            list = _list;
271
78
            cursor = _cursor;
272
78
        }
273
78
    });
274
78
    Ok(([(header::CONTENT_TYPE, ContentType::JSON)], body).into_response())
275
462
}
276

            
277
/// `DELETE /{base}/api/v1/dldata-buffer/{dataId}`
278
18
pub async fn delete_dldata_buffer(
279
18
    State(state): State<AppState>,
280
18
    Extension(token_info): Extension<GetTokenInfoData>,
281
18
    Path(param): Path<request::DataIdPath>,
282
18
) -> impl IntoResponse {
283
    const FN_NAME: &'static str = "delete_dldata_buffer";
284

            
285
18
    let user_id = token_info.user_id.as_str();
286
18
    let roles = &token_info.roles;
287
18
    let data_id = param.data_id.as_str();
288
18

            
289
18
    // To check if the dldata buffer is for the user.
290
18
    match check_data(FN_NAME, data_id, user_id, true, roles, &state).await? {
291
12
        None => return Ok(StatusCode::NO_CONTENT),
292
6
        Some(_) => (),
293
6
    }
294
6

            
295
6
    let cond = QueryCond {
296
6
        data_id: Some(data_id),
297
6
        ..Default::default()
298
6
    };
299
6
    if let Err(e) = state.model.dldata_buffer().del(&cond).await {
300
        error!("[{}] del error: {}", FN_NAME, e);
301
        return Err(ErrResp::ErrDb(Some(e.to_string())));
302
6
    }
303
6

            
304
6
    Ok(StatusCode::NO_CONTENT)
305
18
}
306

            
307
432
fn get_sort_cond(sort_args: &Option<String>) -> Result<Vec<SortCond>, ErrResp> {
308
432
    match sort_args.as_ref() {
309
342
        None => Ok(vec![
310
342
            SortCond {
311
342
                key: SortKey::ApplicationCode,
312
342
                asc: true,
313
342
            },
314
342
            SortCond {
315
342
                key: SortKey::CreatedAt,
316
342
                asc: false,
317
342
            },
318
342
        ]),
319
90
        Some(args) => {
320
90
            let mut args = args.split(",");
321
90
            let mut sort_cond = vec![];
322
204
            while let Some(arg) = args.next() {
323
144
                let mut cond = arg.split(":");
324
144
                let key = match cond.next() {
325
                    None => return Err(ErrResp::ErrParam(Some("wrong sort argument".to_string()))),
326
144
                    Some(field) => match field {
327
144
                        "application" => SortKey::ApplicationCode,
328
90
                        "created" => SortKey::CreatedAt,
329
48
                        "expired" => SortKey::ExpiredAt,
330
                        _ => {
331
12
                            return Err(ErrResp::ErrParam(Some(format!(
332
12
                                "invalid sort key {}",
333
12
                                field
334
12
                            ))))
335
                        }
336
                    },
337
                };
338
132
                let asc = match cond.next() {
339
6
                    None => return Err(ErrResp::ErrParam(Some("wrong sort argument".to_string()))),
340
126
                    Some(asc) => match asc {
341
126
                        "asc" => true,
342
60
                        "desc" => false,
343
                        _ => {
344
6
                            return Err(ErrResp::ErrParam(Some(format!(
345
6
                                "invalid sort asc {}",
346
6
                                asc
347
6
                            ))))
348
                        }
349
                    },
350
                };
351
120
                if cond.next().is_some() {
352
6
                    return Err(ErrResp::ErrParam(Some(
353
6
                        "invalid sort condition".to_string(),
354
6
                    )));
355
114
                }
356
114
                sort_cond.push(SortCond { key, asc });
357
            }
358
60
            Ok(sort_cond)
359
        }
360
    }
361
432
}
362

            
363
/// To check if the user ID can access the dldata buffer. Choose `only_owner` to check if the user
364
/// is the unit owner or one of unit members.
365
///
366
/// # Errors
367
///
368
/// Returns OK if the device is found or not. Otherwise errors will be returned.
369
18
async fn check_data(
370
18
    fn_name: &str,
371
18
    data_id: &str,
372
18
    user_id: &str,
373
18
    only_owner: bool, // to check if this `user_id` is the owner.
374
18
    roles: &HashMap<String, bool>,
375
18
    state: &AppState,
376
18
) -> Result<Option<DlDataBuffer>, ErrResp> {
377
18
    let data = match state.model.dldata_buffer().get(data_id).await {
378
        Err(e) => {
379
            error!("[{}] get error: {}", fn_name, e);
380
            return Err(ErrResp::ErrDb(Some(e.to_string())));
381
        }
382
18
        Ok(data) => match data {
383
6
            None => return Ok(None),
384
12
            Some(data) => data,
385
12
        },
386
12
    };
387
12
    let unit_id = data.unit_id.as_str();
388
12
    match check_unit(fn_name, user_id, roles, unit_id, only_owner, state).await? {
389
6
        None => Ok(None),
390
6
        Some(_) => Ok(Some(data)),
391
    }
392
18
}
393

            
394
324
fn data_list_transform(list: &Vec<DlDataBuffer>) -> Vec<response::GetDlDataBufferData> {
395
324
    let mut ret = vec![];
396
894
    for data in list.iter() {
397
894
        ret.push(data_transform(&data));
398
894
    }
399
324
    ret
400
324
}
401

            
402
168
fn data_list_transform_bytes(
403
168
    list: &Vec<DlDataBuffer>,
404
168
    with_start: bool,
405
168
    with_end: bool,
406
168
    format: Option<&request::ListFormat>,
407
168
) -> Result<Bytes, Box<dyn StdError + Send + Sync>> {
408
168
    let mut build_str = match with_start {
409
90
        false => "".to_string(),
410
6
        true => match format {
411
6
            Some(request::ListFormat::Array) => "[".to_string(),
412
72
            _ => "{\"data\":[".to_string(),
413
        },
414
    };
415
168
    let mut is_first = with_start;
416

            
417
9834
    for item in list {
418
9666
        if is_first {
419
78
            is_first = false;
420
9588
        } else {
421
9588
            build_str.push(',');
422
9588
        }
423
9666
        let json_str = match serde_json::to_string(&data_transform(item)) {
424
            Err(e) => return Err(Box::new(e)),
425
9666
            Ok(str) => str,
426
9666
        };
427
9666
        build_str += json_str.as_str();
428
    }
429

            
430
168
    if with_end {
431
78
        build_str += match format {
432
6
            Some(request::ListFormat::Array) => "]",
433
72
            _ => "]}",
434
        }
435
90
    }
436
168
    Ok(Bytes::copy_from_slice(build_str.as_str().as_bytes()))
437
168
}
438

            
439
10560
fn data_transform(data: &DlDataBuffer) -> response::GetDlDataBufferData {
440
10560
    response::GetDlDataBufferData {
441
10560
        data_id: data.data_id.clone(),
442
10560
        unit_id: data.unit_id.clone(),
443
10560
        application_id: data.application_id.clone(),
444
10560
        application_code: data.application_code.clone(),
445
10560
        device_id: data.device_id.clone(),
446
10560
        network_id: data.network_id.clone(),
447
10560
        network_addr: data.network_addr.clone(),
448
10560
        created_at: time_str(&data.created_at),
449
10560
        expired_at: time_str(&data.expired_at),
450
10560
    }
451
10560
}