1
use std::{collections::HashMap, error::Error as StdError};
2

            
3
use axum::{
4
    body::{Body, Bytes},
5
    extract::State,
6
    http::{header, StatusCode},
7
    response::IntoResponse,
8
    Extension,
9
};
10
use log::error;
11
use serde_json;
12

            
13
use sylvia_iot_corelib::{
14
    constants::ContentType,
15
    err::ErrResp,
16
    http::{Json, Path, Query},
17
    role::Role,
18
    strings::time_str,
19
};
20

            
21
use super::{
22
    super::{
23
        super::{middleware::GetTokenInfoData, ErrReq, State as AppState},
24
        lib::check_unit,
25
    },
26
    request, response,
27
};
28
use crate::models::dldata_buffer::{
29
    DlDataBuffer, ListOptions, ListQueryCond, QueryCond, SortCond, SortKey,
30
};
31

            
32
const LIST_LIMIT_DEFAULT: u64 = 100;
33
const LIST_CURSOR_MAX: u64 = 100;
34

            
35
/// `GET /{base}/api/v1/dldata-buffer/count`
36
135
pub async fn get_dldata_buffer_count(
37
135
    State(state): State<AppState>,
38
135
    Extension(token_info): Extension<GetTokenInfoData>,
39
135
    Query(query): Query<request::GetDlDataBufferCountQuery>,
40
135
) -> impl IntoResponse {
41
    const FN_NAME: &'static str = "get_dldata_buffer_count";
42

            
43
135
    let user_id = token_info.user_id.as_str();
44
135
    let roles = &token_info.roles;
45
135

            
46
135
    if !Role::is_role(roles, Role::ADMIN) && !Role::is_role(roles, Role::MANAGER) {
47
69
        match query.unit.as_ref() {
48
3
            None => return Err(ErrResp::ErrParam(Some("missing `unit`".to_string()))),
49
66
            Some(unit_id) => {
50
66
                if unit_id.len() == 0 {
51
3
                    return Err(ErrResp::ErrParam(Some("missing `unit`".to_string())));
52
63
                }
53
            }
54
        }
55
66
    }
56
129
    let unit_cond = match query.unit.as_ref() {
57
24
        None => None,
58
105
        Some(unit_id) => match unit_id.len() {
59
6
            0 => None,
60
            _ => {
61
198
                match check_unit(FN_NAME, user_id, roles, unit_id.as_str(), false, &state).await? {
62
                    None => {
63
9
                        return Err(ErrResp::Custom(
64
9
                            ErrReq::UNIT_NOT_EXIST.0,
65
9
                            ErrReq::UNIT_NOT_EXIST.1,
66
9
                            None,
67
9
                        ))
68
                    }
69
90
                    Some(_) => Some(unit_id.as_str()),
70
                }
71
            }
72
        },
73
    };
74
120
    let cond = ListQueryCond {
75
120
        unit_id: unit_cond,
76
120
        application_id: match query.application.as_ref() {
77
75
            None => None,
78
45
            Some(application) => match application.len() {
79
3
                0 => None,
80
42
                _ => Some(application.as_ref()),
81
            },
82
        },
83
120
        network_id: match query.network.as_ref() {
84
75
            None => None,
85
45
            Some(network_id) => match network_id.len() {
86
3
                0 => None,
87
42
                _ => Some(network_id.as_ref()),
88
            },
89
        },
90
120
        device_id: match query.device.as_ref() {
91
99
            None => None,
92
21
            Some(device_id) => match device_id.len() {
93
3
                0 => None,
94
18
                _ => Some(device_id.as_ref()),
95
            },
96
        },
97
120
        ..Default::default()
98
120
    };
99
240
    match state.model.dldata_buffer().count(&cond).await {
100
        Err(e) => {
101
            error!("[{}] count error: {}", FN_NAME, e);
102
            Err(ErrResp::ErrDb(Some(e.to_string())))
103
        }
104
120
        Ok(count) => Ok(Json(response::GetDlDataBufferCount {
105
120
            data: response::GetCountData { count },
106
120
        })),
107
    }
108
135
}
109

            
110
/// `GET /{base}/api/v1/dldata-buffer/list`
111
231
pub async fn get_dldata_buffer_list(
112
231
    State(state): State<AppState>,
113
231
    Extension(token_info): Extension<GetTokenInfoData>,
114
231
    Query(query): Query<request::GetDlDataBufferListQuery>,
115
231
) -> impl IntoResponse {
116
    const FN_NAME: &'static str = "get_dldata_buffer_list";
117

            
118
231
    let user_id = token_info.user_id.as_str();
119
231
    let roles = &token_info.roles;
120
231

            
121
231
    if !Role::is_role(roles, Role::ADMIN) && !Role::is_role(roles, Role::MANAGER) {
122
69
        match query.unit.as_ref() {
123
3
            None => return Err(ErrResp::ErrParam(Some("missing `unit`".to_string()))),
124
66
            Some(unit_id) => {
125
66
                if unit_id.len() == 0 {
126
3
                    return Err(ErrResp::ErrParam(Some("missing `unit`".to_string())));
127
63
                }
128
            }
129
        }
130
162
    }
131
225
    let unit_cond = match query.unit.as_ref() {
132
108
        None => None,
133
117
        Some(unit_id) => match unit_id.len() {
134
9
            0 => None,
135
            _ => {
136
216
                match check_unit(FN_NAME, user_id, roles, unit_id.as_str(), false, &state).await? {
137
                    None => {
138
9
                        return Err(ErrResp::Custom(
139
9
                            ErrReq::UNIT_NOT_EXIST.0,
140
9
                            ErrReq::UNIT_NOT_EXIST.1,
141
9
                            None,
142
9
                        ))
143
                    }
144
99
                    Some(_) => Some(unit_id.as_str()),
145
                }
146
            }
147
        },
148
    };
149
216
    let cond = ListQueryCond {
150
216
        unit_id: unit_cond,
151
216
        application_id: match query.application.as_ref() {
152
159
            None => None,
153
57
            Some(application) => match application.len() {
154
6
                0 => None,
155
51
                _ => Some(application.as_ref()),
156
            },
157
        },
158
216
        network_id: match query.network.as_ref() {
159
159
            None => None,
160
57
            Some(network_id) => match network_id.len() {
161
6
                0 => None,
162
51
                _ => Some(network_id.as_ref()),
163
            },
164
        },
165
216
        device_id: match query.device.as_ref() {
166
192
            None => None,
167
24
            Some(device_id) => match device_id.len() {
168
6
                0 => None,
169
18
                _ => Some(device_id.as_ref()),
170
            },
171
        },
172
216
        ..Default::default()
173
    };
174
216
    let sort_cond = get_sort_cond(&query.sort)?;
175
201
    let opts = ListOptions {
176
201
        cond: &cond,
177
201
        offset: query.offset,
178
201
        limit: match query.limit {
179
177
            None => Some(LIST_LIMIT_DEFAULT),
180
24
            Some(limit) => match limit {
181
9
                0 => None,
182
15
                _ => Some(limit),
183
            },
184
        },
185
201
        sort: Some(sort_cond.as_slice()),
186
201
        cursor_max: Some(LIST_CURSOR_MAX),
187
    };
188

            
189
402
    let (list, cursor) = match state.model.dldata_buffer().list(&opts, None).await {
190
        Err(e) => {
191
            error!("[{}] list error: {}", FN_NAME, e);
192
            return Err(ErrResp::ErrDb(Some(e.to_string())));
193
        }
194
201
        Ok((list, cursor)) => match cursor {
195
3
            None => match query.format {
196
                Some(request::ListFormat::Array) => {
197
3
                    return Ok(Json(data_list_transform(&list)).into_response())
198
                }
199
                _ => {
200
159
                    return Ok(Json(response::GetDlDataBufferList {
201
159
                        data: data_list_transform(&list),
202
159
                    })
203
159
                    .into_response())
204
                }
205
            },
206
39
            Some(_) => (list, cursor),
207
39
        },
208
39
    };
209
39

            
210
39
    let body = Body::from_stream(async_stream::stream! {
211
39
        let unit_cond = match query.unit.as_ref() {
212
39
            None => None,
213
39
            Some(unit_id) => match unit_id.len() {
214
39
                0 => None,
215
39
                _ => Some(unit_id.as_str()),
216
39
            },
217
39
        };
218
39
        let cond = ListQueryCond {
219
39
            unit_id: unit_cond,
220
39
            application_id: match query.application.as_ref() {
221
39
                None => None,
222
39
                Some(application) => match application.len() {
223
39
                    0 => None,
224
39
                    _ => Some(application.as_ref())
225
39
                },
226
39
            },
227
39
            network_id: match query.network.as_ref() {
228
39
                None => None,
229
39
                Some(network_id) => match network_id.len() {
230
39
                    0 => None,
231
39
                    _ => Some(network_id.as_ref())
232
39
                },
233
39
            },
234
39
            device_id: match query.device.as_ref() {
235
39
                None => None,
236
39
                Some(device_id) => match device_id.len() {
237
39
                    0 => None,
238
39
                    _ => Some(device_id.as_ref())
239
39
                },
240
39
            },
241
39
            ..Default::default()
242
39
        };
243
39
        let opts = ListOptions {
244
39
            cond: &cond,
245
39
            offset: query.offset,
246
39
            limit: match query.limit {
247
39
                None => Some(LIST_LIMIT_DEFAULT),
248
39
                Some(limit) => match limit {
249
39
                    0 => None,
250
39
                    _ => Some(limit),
251
39
                },
252
39
            },
253
39
            sort: Some(sort_cond.as_slice()),
254
39
            cursor_max: Some(LIST_CURSOR_MAX),
255
39
        };
256
39

            
257
39
        let mut list = list;
258
39
        let mut cursor = cursor;
259
39
        let mut is_first = true;
260
39
        loop {
261
39
            yield data_list_transform_bytes(&list, is_first, cursor.is_none(), query.format.as_ref());
262
39
            is_first = false;
263
39
            if cursor.is_none() {
264
39
                break;
265
39
            }
266
39
            let (_list, _cursor) = match state.model.dldata_buffer().list(&opts, cursor).await {
267
39
                Err(_) => break,
268
39
                Ok((list, cursor)) => (list, cursor),
269
39
            };
270
39
            list = _list;
271
39
            cursor = _cursor;
272
39
        }
273
39
    });
274
39
    Ok(([(header::CONTENT_TYPE, ContentType::JSON)], body).into_response())
275
231
}
276

            
277
/// `DELETE /{base}/api/v1/dldata-buffer/{dataId}`
278
9
pub async fn delete_dldata_buffer(
279
9
    State(state): State<AppState>,
280
9
    Extension(token_info): Extension<GetTokenInfoData>,
281
9
    Path(param): Path<request::DataIdPath>,
282
9
) -> impl IntoResponse {
283
    const FN_NAME: &'static str = "delete_dldata_buffer";
284

            
285
9
    let user_id = token_info.user_id.as_str();
286
9
    let roles = &token_info.roles;
287
9
    let data_id = param.data_id.as_str();
288
9

            
289
9
    // To check if the dldata buffer is for the user.
290
30
    match check_data(FN_NAME, data_id, user_id, true, roles, &state).await? {
291
6
        None => return Ok(StatusCode::NO_CONTENT),
292
3
        Some(_) => (),
293
3
    }
294
3

            
295
3
    let cond = QueryCond {
296
3
        data_id: Some(data_id),
297
3
        ..Default::default()
298
3
    };
299
6
    if let Err(e) = state.model.dldata_buffer().del(&cond).await {
300
        error!("[{}] del error: {}", FN_NAME, e);
301
        return Err(ErrResp::ErrDb(Some(e.to_string())));
302
3
    }
303
3

            
304
3
    Ok(StatusCode::NO_CONTENT)
305
9
}
306

            
307
216
fn get_sort_cond(sort_args: &Option<String>) -> Result<Vec<SortCond>, ErrResp> {
308
216
    match sort_args.as_ref() {
309
171
        None => Ok(vec![
310
171
            SortCond {
311
171
                key: SortKey::ApplicationCode,
312
171
                asc: true,
313
171
            },
314
171
            SortCond {
315
171
                key: SortKey::CreatedAt,
316
171
                asc: false,
317
171
            },
318
171
        ]),
319
45
        Some(args) => {
320
45
            let mut args = args.split(",");
321
45
            let mut sort_cond = vec![];
322
102
            while let Some(arg) = args.next() {
323
72
                let mut cond = arg.split(":");
324
72
                let key = match cond.next() {
325
                    None => return Err(ErrResp::ErrParam(Some("wrong sort argument".to_string()))),
326
72
                    Some(field) => match field {
327
72
                        "application" => SortKey::ApplicationCode,
328
45
                        "created" => SortKey::CreatedAt,
329
24
                        "expired" => SortKey::ExpiredAt,
330
                        _ => {
331
6
                            return Err(ErrResp::ErrParam(Some(format!(
332
6
                                "invalid sort key {}",
333
6
                                field
334
6
                            ))))
335
                        }
336
                    },
337
                };
338
66
                let asc = match cond.next() {
339
3
                    None => return Err(ErrResp::ErrParam(Some("wrong sort argument".to_string()))),
340
63
                    Some(asc) => match asc {
341
63
                        "asc" => true,
342
30
                        "desc" => false,
343
                        _ => {
344
3
                            return Err(ErrResp::ErrParam(Some(format!(
345
3
                                "invalid sort asc {}",
346
3
                                asc
347
3
                            ))))
348
                        }
349
                    },
350
                };
351
60
                if cond.next().is_some() {
352
3
                    return Err(ErrResp::ErrParam(Some(
353
3
                        "invalid sort condition".to_string(),
354
3
                    )));
355
57
                }
356
57
                sort_cond.push(SortCond { key, asc });
357
            }
358
30
            Ok(sort_cond)
359
        }
360
    }
361
216
}
362

            
363
/// To check if the user ID can access the dldata buffer. Choose `only_owner` to check if the user
364
/// is the unit owner or one of unit members.
365
///
366
/// # Errors
367
///
368
/// Returns OK if the device is found or not. Otherwise errors will be returned.
369
9
async fn check_data(
370
9
    fn_name: &str,
371
9
    data_id: &str,
372
9
    user_id: &str,
373
9
    only_owner: bool, // to check if this `user_id` is the owner.
374
9
    roles: &HashMap<String, bool>,
375
9
    state: &AppState,
376
9
) -> Result<Option<DlDataBuffer>, ErrResp> {
377
18
    let data = match state.model.dldata_buffer().get(data_id).await {
378
        Err(e) => {
379
            error!("[{}] get error: {}", fn_name, e);
380
            return Err(ErrResp::ErrDb(Some(e.to_string())));
381
        }
382
9
        Ok(data) => match data {
383
3
            None => return Ok(None),
384
6
            Some(data) => data,
385
6
        },
386
6
    };
387
6
    let unit_id = data.unit_id.as_str();
388
12
    match check_unit(fn_name, user_id, roles, unit_id, only_owner, state).await? {
389
3
        None => Ok(None),
390
3
        Some(_) => Ok(Some(data)),
391
    }
392
9
}
393

            
394
162
fn data_list_transform(list: &Vec<DlDataBuffer>) -> Vec<response::GetDlDataBufferData> {
395
162
    let mut ret = vec![];
396
447
    for data in list.iter() {
397
447
        ret.push(data_transform(&data));
398
447
    }
399
162
    ret
400
162
}
401

            
402
84
fn data_list_transform_bytes(
403
84
    list: &Vec<DlDataBuffer>,
404
84
    with_start: bool,
405
84
    with_end: bool,
406
84
    format: Option<&request::ListFormat>,
407
84
) -> Result<Bytes, Box<dyn StdError + Send + Sync>> {
408
84
    let mut build_str = match with_start {
409
45
        false => "".to_string(),
410
3
        true => match format {
411
3
            Some(request::ListFormat::Array) => "[".to_string(),
412
36
            _ => "{\"data\":[".to_string(),
413
        },
414
    };
415
84
    let mut is_first = with_start;
416

            
417
4917
    for item in list {
418
4833
        if is_first {
419
39
            is_first = false;
420
4794
        } else {
421
4794
            build_str.push(',');
422
4794
        }
423
4833
        let json_str = match serde_json::to_string(&data_transform(item)) {
424
            Err(e) => return Err(Box::new(e)),
425
4833
            Ok(str) => str,
426
4833
        };
427
4833
        build_str += json_str.as_str();
428
    }
429

            
430
84
    if with_end {
431
39
        build_str += match format {
432
3
            Some(request::ListFormat::Array) => "]",
433
36
            _ => "]}",
434
        }
435
45
    }
436
84
    Ok(Bytes::copy_from_slice(build_str.as_str().as_bytes()))
437
84
}
438

            
439
5280
fn data_transform(data: &DlDataBuffer) -> response::GetDlDataBufferData {
440
5280
    response::GetDlDataBufferData {
441
5280
        data_id: data.data_id.clone(),
442
5280
        unit_id: data.unit_id.clone(),
443
5280
        application_id: data.application_id.clone(),
444
5280
        application_code: data.application_code.clone(),
445
5280
        device_id: data.device_id.clone(),
446
5280
        network_id: data.network_id.clone(),
447
5280
        network_addr: data.network_addr.clone(),
448
5280
        created_at: time_str(&data.created_at),
449
5280
        expired_at: time_str(&data.expired_at),
450
5280
    }
451
5280
}