1
use std::{
2
    cmp::Ordering,
3
    collections::HashMap,
4
    error::Error as StdError,
5
    io::{Error as IoError, ErrorKind},
6
    sync::{Arc, Mutex},
7
    time::Duration,
8
};
9

            
10
use async_trait::async_trait;
11
use axum::{
12
    body::{Body, Bytes},
13
    extract::State,
14
    http::{header, StatusCode},
15
    response::IntoResponse,
16
    Extension,
17
};
18
use chrono::Utc;
19
use log::{debug, error, info, warn};
20
use serde::{Deserialize, Serialize};
21
use serde_json::{self, Map};
22
use tokio::time;
23
use url::Url;
24

            
25
use general_mq::{
26
    queue::{EventHandler as QueueEventHandler, GmqQueue, Message, MessageHandler, Status},
27
    Queue,
28
};
29
use sylvia_iot_corelib::{
30
    constants::ContentType,
31
    err::ErrResp,
32
    http::{Json, Path, Query},
33
    role::Role,
34
    strings::{self, time_str},
35
};
36

            
37
use super::{
38
    super::{
39
        super::{middleware::GetTokenInfoData, ErrReq, State as AppState},
40
        lib::{check_unit, gen_mgr_key},
41
    },
42
    request, response,
43
};
44
use crate::{
45
    libs::{
46
        config::BrokerCtrl as CfgCtrl,
47
        mq::{self, Connection},
48
    },
49
    models::{
50
        application::{
51
            self, Cursor as ApplicationCursor, ListOptions as ApplicationListOpts,
52
            ListQueryCond as ApplicationCond,
53
        },
54
        device, device_route, dldata_buffer,
55
        network::{
56
            self, Cursor as NetworkCursor, ListOptions as NetworkListOpts,
57
            ListQueryCond as NetworkCond,
58
        },
59
        network_route,
60
        unit::{
61
            Cursor, ListOptions, ListQueryCond, QueryCond, SortCond, SortKey, Unit,
62
            UpdateQueryCond, Updates,
63
        },
64
        Cache,
65
    },
66
};
67

            
68
21
#[derive(Deserialize, Serialize)]
69
#[serde(tag = "operation")]
70
enum RecvCtrlMsg {
71
    #[serde(rename = "del-unit")]
72
    DelUnit { new: CtrlDelUnit },
73
}
74

            
75
#[derive(Serialize)]
76
#[serde(untagged)]
77
enum SendCtrlMsg {
78
    DelUnit { operation: String, new: CtrlDelUnit },
79
}
80

            
81
struct CtrlMsgOp;
82

            
83
21
#[derive(Deserialize, Serialize)]
84
struct CtrlDelUnit {
85
    #[serde(rename = "unitId")]
86
    unit_id: String,
87
    #[serde(rename = "unitCode")]
88
    unit_code: String,
89
}
90

            
91
struct CtrlSenderHandler;
92

            
93
struct CtrlReceiverHandler {
94
    cache: Option<Arc<dyn Cache>>,
95
}
96

            
97
impl CtrlMsgOp {
98
    const DEL_UNIT: &'static str = "del-unit";
99
}
100

            
101
const LIST_LIMIT_DEFAULT: u64 = 100;
102
const LIST_CURSOR_MAX: u64 = 100;
103
const ID_RAND_LEN: usize = 8;
104
const CTRL_QUEUE_NAME: &'static str = "unit";
105

            
106
/// Initialize channels.
107
15
pub async fn init(state: &AppState, ctrl_conf: &CfgCtrl) -> Result<(), Box<dyn StdError>> {
108
    const FN_NAME: &'static str = "init";
109

            
110
15
    let q = new_ctrl_receiver(state, ctrl_conf)?;
111
15
    {
112
15
        state
113
15
            .ctrl_receivers
114
15
            .lock()
115
15
            .unwrap()
116
15
            .insert(CTRL_QUEUE_NAME.to_string(), q.clone());
117
15
    }
118
15

            
119
15
    let ctrl_sender = { state.ctrl_senders.unit.lock().unwrap().clone() };
120
    // Wait for connected.
121
1381
    for _ in 0..500 {
122
1381
        if ctrl_sender.status() == Status::Connected && q.status() == Status::Connected {
123
15
            break;
124
1366
        }
125
2712
        time::sleep(Duration::from_millis(10)).await;
126
    }
127
15
    if ctrl_sender.status() != Status::Connected {
128
        error!(
129
            "[{}] {} control sender not connected",
130
            FN_NAME, CTRL_QUEUE_NAME
131
        );
132
        return Err(Box::new(IoError::new(
133
            ErrorKind::NotConnected,
134
            format!("control sender {} not connected", CTRL_QUEUE_NAME),
135
        )));
136
15
    }
137
15
    if q.status() != Status::Connected {
138
        error!(
139
            "[{}] {} control receiver not connected",
140
            FN_NAME, CTRL_QUEUE_NAME
141
        );
142
        return Err(Box::new(IoError::new(
143
            ErrorKind::NotConnected,
144
            format!("control receiver {} not connected", CTRL_QUEUE_NAME),
145
        )));
146
15
    }
147
15

            
148
15
    Ok(())
149
15
}
150

            
151
/// Create control channel sender queue.
152
15
pub fn new_ctrl_sender(
153
15
    conn_pool: &Arc<Mutex<HashMap<String, Connection>>>,
154
15
    config: &CfgCtrl,
155
15
) -> Result<Arc<Mutex<Queue>>, Box<dyn StdError>> {
156
15
    let url = match config.url.as_ref() {
157
        None => {
158
            return Err(Box::new(IoError::new(
159
                ErrorKind::InvalidInput,
160
                "empty control url",
161
            )))
162
        }
163
15
        Some(url) => match Url::parse(url.as_str()) {
164
            Err(e) => return Err(Box::new(e)),
165
15
            Ok(url) => url,
166
15
        },
167
15
    };
168
15

            
169
15
    match mq::control::new(
170
15
        conn_pool.clone(),
171
15
        &url,
172
15
        config.prefetch,
173
15
        CTRL_QUEUE_NAME,
174
15
        false,
175
15
        Arc::new(CtrlSenderHandler {}),
176
15
        Arc::new(CtrlSenderHandler {}),
177
15
    ) {
178
        Err(e) => Err(Box::new(IoError::new(ErrorKind::InvalidInput, e))),
179
15
        Ok(q) => Ok(Arc::new(Mutex::new(q))),
180
    }
181
15
}
182

            
183
/// Create control channel receiver queue.
184
15
pub fn new_ctrl_receiver(state: &AppState, config: &CfgCtrl) -> Result<Queue, Box<dyn StdError>> {
185
15
    let url = match config.url.as_ref() {
186
        None => {
187
            return Err(Box::new(IoError::new(
188
                ErrorKind::InvalidInput,
189
                "empty control url",
190
            )))
191
        }
192
15
        Some(url) => match Url::parse(url.as_str()) {
193
            Err(e) => return Err(Box::new(e)),
194
15
            Ok(url) => url,
195
15
        },
196
15
    };
197
15
    let handler = Arc::new(CtrlReceiverHandler {
198
15
        cache: state.cache.clone(),
199
15
    });
200
15
    match mq::control::new(
201
15
        state.mq_conns.clone(),
202
15
        &url,
203
15
        config.prefetch,
204
15
        CTRL_QUEUE_NAME,
205
15
        true,
206
15
        handler.clone(),
207
15
        handler,
208
15
    ) {
209
        Err(e) => Err(Box::new(IoError::new(ErrorKind::InvalidInput, e))),
210
15
        Ok(q) => Ok(q),
211
    }
212
15
}
213

            
214
/// `POST /{base}/api/v1/unit`
215
45
pub async fn post_unit(
216
45
    State(state): State<AppState>,
217
45
    Extension(token_info): Extension<GetTokenInfoData>,
218
45
    Json(body): Json<request::PostUnitBody>,
219
45
) -> impl IntoResponse {
220
    const FN_NAME: &'static str = "post_unit";
221

            
222
45
    let user_id = token_info.user_id.as_str();
223
45
    let roles = &token_info.roles;
224
45
    let token = token_info.token.as_str();
225
45

            
226
45
    let code = body.data.code.to_lowercase();
227
45
    if !strings::is_code(code.as_str()) {
228
3
        return Err(ErrResp::ErrParam(Some(
229
3
            "`code` must be [A-Za-z0-9]{1}[A-Za-z0-9-_]*".to_string(),
230
3
        )));
231
42
    }
232
42
    let owner_id = match body.data.owner_id.as_ref() {
233
24
        None => user_id,
234
18
        Some(owner_id) => {
235
18
            if Role::is_role(roles, Role::ADMIN) || Role::is_role(roles, Role::MANAGER) {
236
15
                if owner_id.len() == 0 {
237
3
                    return Err(ErrResp::ErrParam(Some(
238
3
                        "`ownerId` must with at least one character".to_string(),
239
3
                    )));
240
12
                }
241
24
                match check_user(FN_NAME, token, owner_id.as_str(), &state).await {
242
                    Err(e) => return Err(e),
243
12
                    Ok(result) => match result {
244
                        false => {
245
3
                            return Err(ErrResp::Custom(
246
3
                                ErrReq::OWNER_NOT_EXIST.0,
247
3
                                ErrReq::OWNER_NOT_EXIST.1,
248
3
                                None,
249
3
                            ))
250
                        }
251
9
                        true => owner_id.as_str(),
252
                    },
253
                }
254
            } else {
255
3
                user_id
256
            }
257
        }
258
    };
259
36
    if let Some(info) = body.data.info.as_ref() {
260
6
        for (k, _) in info.iter() {
261
6
            if k.len() == 0 {
262
3
                return Err(ErrResp::ErrParam(Some(
263
3
                    "`info` key must not be empty".to_string(),
264
3
                )));
265
3
            }
266
        }
267
30
    }
268

            
269
33
    let cond = QueryCond {
270
33
        code: Some(code.as_str()),
271
33
        ..Default::default()
272
33
    };
273
66
    match state.model.unit().get(&cond).await {
274
        Err(e) => {
275
            error!("[{}] get error: {}", FN_NAME, e);
276
            return Err(ErrResp::ErrDb(Some(e.to_string())));
277
        }
278
33
        Ok(unit) => match unit {
279
30
            None => (),
280
            Some(_) => {
281
3
                return Err(ErrResp::Custom(
282
3
                    ErrReq::UNIT_EXIST.0,
283
3
                    ErrReq::UNIT_EXIST.1,
284
3
                    None,
285
3
                ))
286
            }
287
        },
288
    }
289

            
290
30
    let now = Utc::now();
291
30
    let unit = Unit {
292
30
        unit_id: strings::random_id(&now, ID_RAND_LEN),
293
30
        code,
294
30
        created_at: now,
295
30
        modified_at: now,
296
30
        owner_id: owner_id.to_string(),
297
30
        member_ids: vec![owner_id.to_string()],
298
30
        name: match body.data.name.as_ref() {
299
27
            None => "".to_string(),
300
3
            Some(name) => name.clone(),
301
        },
302
30
        info: match body.data.info.as_ref() {
303
27
            None => Map::new(),
304
3
            Some(info) => info.clone(),
305
        },
306
    };
307
59
    if let Err(e) = state.model.unit().add(&unit).await {
308
        error!("[{}] add error: {}", FN_NAME, e);
309
        return Err(ErrResp::ErrDb(Some(e.to_string())));
310
30
    }
311
30
    Ok(Json(response::PostUnit {
312
30
        data: response::PostUnitData {
313
30
            unit_id: unit.unit_id,
314
30
        },
315
30
    }))
316
45
}
317

            
318
/// `GET /{base}/api/v1/unit/count`
319
45
pub async fn get_unit_count(
320
45
    State(state): State<AppState>,
321
45
    Extension(token_info): Extension<GetTokenInfoData>,
322
45
    Query(query): Query<request::GetUnitCountQuery>,
323
45
) -> impl IntoResponse {
324
    const FN_NAME: &'static str = "get_unit_count";
325

            
326
45
    let user_id = token_info.user_id.as_str();
327
45
    let roles = &token_info.roles;
328
45

            
329
45
    let mut owner_cond = None;
330
45
    let mut member_cond = None;
331
45
    let mut code_contains_cond = None;
332
45
    if Role::is_role(roles, Role::ADMIN) || Role::is_role(roles, Role::MANAGER) {
333
15
        if let Some(owner) = query.owner.as_ref() {
334
3
            if owner.len() > 0 {
335
3
                owner_cond = Some(owner.as_str());
336
3
            }
337
12
        }
338
15
        if let Some(member) = query.member.as_ref() {
339
6
            if member.len() > 0 {
340
6
                member_cond = Some(member.as_str());
341
6
            }
342
9
        }
343
30
    } else {
344
30
        member_cond = Some(user_id);
345
30
    }
346
45
    if let Some(contains) = query.contains.as_ref() {
347
9
        if contains.len() > 0 {
348
9
            code_contains_cond = Some(contains.as_str());
349
9
        }
350
36
    }
351
45
    let cond = ListQueryCond {
352
45
        owner_id: owner_cond,
353
45
        member_id: member_cond,
354
45
        code_contains: code_contains_cond,
355
45
        ..Default::default()
356
45
    };
357
90
    match state.model.unit().count(&cond).await {
358
        Err(e) => {
359
            error!("[{}] count error: {}", FN_NAME, e);
360
            Err(ErrResp::ErrDb(Some(e.to_string())))
361
        }
362
45
        Ok(count) => Ok(Json(response::GetUnitCount {
363
45
            data: response::GetCountData { count },
364
45
        })),
365
    }
366
45
}
367

            
368
/// `GET /{base}/api/v1/unit/list`
369
114
pub async fn get_unit_list(
370
114
    State(state): State<AppState>,
371
114
    Extension(token_info): Extension<GetTokenInfoData>,
372
114
    Query(query): Query<request::GetUnitListQuery>,
373
114
) -> impl IntoResponse {
374
    const FN_NAME: &'static str = "get_unit_list";
375

            
376
114
    let user_id = token_info.user_id;
377
114
    let roles = token_info.roles;
378
114

            
379
114
    let mut owner_cond = None;
380
114
    let mut member_cond = None;
381
114
    let mut code_contains_cond = None;
382
114
    if Role::is_role(&roles, Role::ADMIN) || Role::is_role(&roles, Role::MANAGER) {
383
84
        if let Some(owner) = query.owner.as_ref() {
384
24
            if owner.len() > 0 {
385
24
                owner_cond = Some(owner.as_str());
386
24
            }
387
60
        }
388
84
        if let Some(member) = query.member.as_ref() {
389
27
            if member.len() > 0 {
390
27
                member_cond = Some(member.as_str());
391
27
            }
392
57
        }
393
30
    } else {
394
30
        member_cond = Some(user_id.as_str());
395
30
    }
396
114
    if let Some(contains) = query.contains.as_ref() {
397
30
        if contains.len() > 0 {
398
30
            code_contains_cond = Some(contains.as_str());
399
30
        }
400
84
    }
401
114
    let cond = ListQueryCond {
402
114
        owner_id: owner_cond,
403
114
        member_id: member_cond,
404
114
        code_contains: code_contains_cond,
405
114
        ..Default::default()
406
114
    };
407
114
    let sort_cond = get_sort_cond(&query.sort)?;
408
99
    let opts = ListOptions {
409
99
        cond: &cond,
410
99
        offset: query.offset,
411
99
        limit: match query.limit {
412
78
            None => Some(LIST_LIMIT_DEFAULT),
413
21
            Some(limit) => match limit {
414
6
                0 => None,
415
15
                _ => Some(limit),
416
            },
417
        },
418
99
        sort: Some(sort_cond.as_slice()),
419
99
        cursor_max: Some(LIST_CURSOR_MAX),
420
    };
421

            
422
198
    let (list, cursor) = match state.model.unit().list(&opts, None).await {
423
        Err(e) => {
424
            error!("[{}] list error: {}", FN_NAME, e);
425
            return Err(ErrResp::ErrDb(Some(e.to_string())));
426
        }
427
99
        Ok((list, cursor)) => match cursor {
428
3
            None => match query.format {
429
                Some(request::ListFormat::Array) => {
430
3
                    return Ok(Json(unit_list_transform(&list)).into_response())
431
                }
432
                _ => {
433
78
                    return Ok(Json(response::GetUnitList {
434
78
                        data: unit_list_transform(&list),
435
78
                    })
436
78
                    .into_response())
437
                }
438
            },
439
18
            Some(_) => (list, cursor),
440
18
        },
441
18
    };
442
18

            
443
18
    let body = Body::from_stream(async_stream::stream! {
444
18
        let mut owner_cond = None;
445
18
        let mut member_cond = Some(user_id.as_str());
446
18
        let mut code_contains_cond = None;
447
18
        if Role::is_role(&roles, Role::ADMIN) || Role::is_role(&roles, Role::MANAGER) {
448
18
            if let Some(owner) = query.owner.as_ref() {
449
18
                if owner.len() > 0 {
450
18
                    owner_cond = Some(owner.as_str());
451
18
                }
452
18
            }
453
18
            if let Some(member) = query.member.as_ref() {
454
18
                if member.len() > 0{
455
18
                    member_cond = Some(member.as_str());
456
18
                }
457
18
            }
458
18
        } else {
459
18
            member_cond = Some(user_id.as_str());
460
18
        }
461
18
        if let Some(contains) = query.contains.as_ref() {
462
18
            if contains.len() > 0 {
463
18
                code_contains_cond = Some(contains.as_str());
464
18
            }
465
18
        }
466
18
        let cond = ListQueryCond {
467
18
            owner_id: owner_cond,
468
18
            member_id: member_cond,
469
18
            code_contains: code_contains_cond,
470
18
            ..Default::default()
471
18
        };
472
18
        let opts = ListOptions {
473
18
            cond: &cond,
474
18
            offset: query.offset,
475
18
            limit: match query.limit {
476
18
                None => Some(LIST_LIMIT_DEFAULT),
477
18
                Some(limit) => match limit {
478
18
                    0 => None,
479
18
                    _ => Some(limit),
480
18
                },
481
18
            },
482
18
            sort: Some(sort_cond.as_slice()),
483
18
            cursor_max: Some(LIST_CURSOR_MAX),
484
18
        };
485
18

            
486
18
        let mut list = list;
487
18
        let mut cursor = cursor;
488
18
        let mut is_first = true;
489
18
        loop {
490
18
            yield unit_list_transform_bytes(&list, is_first, cursor.is_none(), query.format.as_ref());
491
18
            is_first = false;
492
18
            if cursor.is_none() {
493
18
                break;
494
18
            }
495
18
            let (_list, _cursor) = match state.model.unit().list(&opts, cursor).await {
496
18
                Err(_) => break,
497
18
                Ok((list, cursor)) => (list, cursor),
498
18
            };
499
18
            list = _list;
500
18
            cursor = _cursor;
501
18
        }
502
18
    });
503
18
    Ok(([(header::CONTENT_TYPE, ContentType::JSON)], body).into_response())
504
114
}
505

            
506
/// `GET /{base}/api/v1/unit/{unitId}`
507
21
pub async fn get_unit(
508
21
    State(state): State<AppState>,
509
21
    Extension(token_info): Extension<GetTokenInfoData>,
510
21
    Path(param): Path<request::UnitIdPath>,
511
21
) -> impl IntoResponse {
512
    const FN_NAME: &'static str = "get_unit";
513

            
514
21
    let user_id = token_info.user_id.as_str();
515
21
    let roles = &token_info.roles;
516
21
    let unit_id = param.unit_id.as_str();
517
21

            
518
42
    match check_unit(FN_NAME, user_id, roles, unit_id, false, &state).await? {
519
12
        None => Err(ErrResp::ErrNotFound(None)),
520
9
        Some(unit) => Ok(Json(response::GetUnit {
521
9
            data: unit_transform(&unit),
522
9
        })),
523
    }
524
21
}
525

            
526
/// `PATCH /{base}/api/v1/unit/{unitId}`
527
42
pub async fn patch_unit(
528
42
    State(state): State<AppState>,
529
42
    Extension(token_info): Extension<GetTokenInfoData>,
530
42
    Path(param): Path<request::UnitIdPath>,
531
42
    Json(mut body): Json<request::PatchUnitBody>,
532
42
) -> impl IntoResponse {
533
    const FN_NAME: &'static str = "patch_unit";
534

            
535
42
    let user_id = token_info.user_id.as_str();
536
42
    let roles = &token_info.roles;
537
42
    let unit_id = param.unit_id.as_str();
538
42
    let token = &token_info.token;
539

            
540
    // To check if the unit is for the owner.
541
84
    let target_unit = match check_unit(FN_NAME, user_id, roles, unit_id, true, &state).await? {
542
6
        None => return Err(ErrResp::ErrNotFound(None)),
543
36
        Some(unit) => unit,
544
    };
545

            
546
36
    let updates = get_updates(FN_NAME, token, &state, &mut body.data, roles, &target_unit).await?;
547
15
    let cond = UpdateQueryCond { unit_id };
548
30
    if let Err(e) = state.model.unit().update(&cond, &updates).await {
549
        error!("[{}] update error: {}", FN_NAME, e);
550
        return Err(ErrResp::ErrDb(Some(e.to_string())));
551
15
    }
552
15
    Ok(StatusCode::NO_CONTENT)
553
42
}
554

            
555
/// `DELETE /{base}/api/v1/unit/{unitId}`
556
24
pub async fn delete_unit(
557
24
    State(state): State<AppState>,
558
24
    Extension(token_info): Extension<GetTokenInfoData>,
559
24
    Path(param): Path<request::UnitIdPath>,
560
24
) -> impl IntoResponse {
561
    const FN_NAME: &'static str = "delete_unit";
562

            
563
24
    let user_id = token_info.user_id.as_str();
564
24
    let roles = &token_info.roles;
565
24
    let unit_id = param.unit_id.as_str();
566

            
567
    // To check if the unit is for the owner.
568
48
    let unit = match check_unit(FN_NAME, user_id, roles, unit_id, true, &state).await {
569
        Err(e) => return Err(e), // XXX: not use "?" to solve E0282 error.
570
24
        Ok(unit) => match unit {
571
9
            None => return Ok(StatusCode::NO_CONTENT),
572
15
            Some(unit) => unit,
573
15
        },
574
15
    };
575
15

            
576
270
    del_unit_rsc(FN_NAME, unit_id, unit.code.as_str(), &state).await?;
577
15
    send_del_ctrl_message(FN_NAME, unit, &state).await?;
578

            
579
15
    Ok(StatusCode::NO_CONTENT)
580
24
}
581

            
582
/// `DELETE /{base}/api/v1/unit/user/{userId}`
583
9
pub async fn delete_unit_user(
584
9
    State(state): State<AppState>,
585
9
    Path(param): Path<request::UserIdPath>,
586
9
) -> impl IntoResponse {
587
    const FN_NAME: &'static str = "delete_unit_user";
588

            
589
9
    let cond = ListQueryCond {
590
9
        owner_id: Some(param.user_id.as_str()),
591
9
        ..Default::default()
592
9
    };
593
9
    let opts = ListOptions {
594
9
        cond: &cond,
595
9
        offset: None,
596
9
        limit: None,
597
9
        sort: None,
598
9
        cursor_max: Some(LIST_CURSOR_MAX),
599
9
    };
600
9
    let mut cursor: Option<Box<dyn Cursor>> = None;
601
9
    let mut rm_units = vec![];
602
    loop {
603
18
        match state.model.unit().list(&opts, cursor).await {
604
            Err(e) => {
605
                error!("[{}] list error: {}", FN_NAME, e);
606
                return Err(ErrResp::ErrDb(Some(e.to_string())));
607
            }
608
9
            Ok((mut list, _cursor)) => {
609
9
                rm_units.append(&mut list);
610
9
                if _cursor.is_none() {
611
9
                    break;
612
                }
613
                cursor = _cursor;
614
            }
615
        }
616
    }
617
15
    for unit in rm_units {
618
108
        del_unit_rsc(FN_NAME, unit.unit_id.as_str(), unit.code.as_str(), &state).await?;
619
6
        send_del_ctrl_message(FN_NAME, unit, &state).await?;
620
    }
621

            
622
9
    Ok(StatusCode::NO_CONTENT)
623
9
}
624

            
625
114
fn get_sort_cond(sort_args: &Option<String>) -> Result<Vec<SortCond>, ErrResp> {
626
114
    match sort_args.as_ref() {
627
75
        None => Ok(vec![SortCond {
628
75
            key: SortKey::Code,
629
75
            asc: true,
630
75
        }]),
631
39
        Some(args) => {
632
39
            let mut args = args.split(",");
633
39
            let mut sort_cond = vec![];
634
66
            while let Some(arg) = args.next() {
635
42
                let mut cond = arg.split(":");
636
42
                let key = match cond.next() {
637
                    None => return Err(ErrResp::ErrParam(Some("wrong sort argument".to_string()))),
638
42
                    Some(field) => match field {
639
42
                        "code" => SortKey::Code,
640
36
                        "created" => SortKey::CreatedAt,
641
21
                        "modified" => SortKey::ModifiedAt,
642
15
                        "name" => SortKey::Name,
643
                        _ => {
644
6
                            return Err(ErrResp::ErrParam(Some(format!(
645
6
                                "invalid sort key {}",
646
6
                                field
647
6
                            ))))
648
                        }
649
                    },
650
                };
651
36
                let asc = match cond.next() {
652
3
                    None => return Err(ErrResp::ErrParam(Some("wrong sort argument".to_string()))),
653
33
                    Some(asc) => match asc {
654
33
                        "asc" => true,
655
15
                        "desc" => false,
656
                        _ => {
657
3
                            return Err(ErrResp::ErrParam(Some(format!(
658
3
                                "invalid sort asc {}",
659
3
                                asc
660
3
                            ))))
661
                        }
662
                    },
663
                };
664
30
                if cond.next().is_some() {
665
3
                    return Err(ErrResp::ErrParam(Some(
666
3
                        "invalid sort condition".to_string(),
667
3
                    )));
668
27
                }
669
27
                sort_cond.push(SortCond { key, asc });
670
            }
671
24
            Ok(sort_cond)
672
        }
673
    }
674
114
}
675

            
676
36
async fn get_updates<'a>(
677
36
    fn_name: &str,
678
36
    token: &str,
679
36
    state: &AppState,
680
36
    body: &'a mut request::PatchUnitData,
681
36
    roles: &HashMap<String, bool>,
682
36
    target_unit: &Unit,
683
36
) -> Result<Updates<'a>, ErrResp> {
684
36
    let mut updates = Updates {
685
36
        ..Default::default()
686
36
    };
687
36
    let mut count = 0;
688
36
    if Role::is_role(roles, Role::ADMIN) || Role::is_role(roles, Role::MANAGER) {
689
30
        let mut target_owner_id = target_unit.owner_id.as_str();
690
30
        if let Some(owner_id) = body.owner_id.as_ref() {
691
15
            if owner_id.len() == 0 {
692
3
                return Err(ErrResp::ErrParam(Some(
693
3
                    "`ownerId` must with at least one character".to_string(),
694
3
                )));
695
12
            } else if !check_user(fn_name, token, owner_id.as_str(), state).await? {
696
3
                return Err(ErrResp::Custom(
697
3
                    ErrReq::OWNER_NOT_EXIST.0,
698
3
                    ErrReq::OWNER_NOT_EXIST.1,
699
3
                    None,
700
3
                ));
701
9
            }
702
9
            target_owner_id = owner_id.as_str();
703
9
            updates.owner_id = Some(owner_id.as_str());
704
9
            count += 1;
705
9

            
706
9
            if body.member_ids.is_none() && !target_unit.member_ids.contains(owner_id) {
707
3
                body.member_ids = Some(target_unit.member_ids.clone());
708
6
            }
709
15
        }
710
24
        if let Some(member_ids) = body.member_ids.as_mut() {
711
15
            member_ids.sort();
712
15
            member_ids.dedup();
713
15
            let mut found_owner = false;
714
15
            for v in member_ids.iter() {
715
15
                if v.len() == 0 {
716
3
                    return Err(ErrResp::ErrParam(Some(
717
3
                        "`memberIds` item must with at least one character".to_string(),
718
3
                    )));
719
12
                } else if !check_user(fn_name, token, v.as_str(), state).await? {
720
3
                    return Err(ErrResp::Custom(
721
3
                        ErrReq::MEMBER_NOT_EXIST.0,
722
3
                        ErrReq::MEMBER_NOT_EXIST.1,
723
3
                        None,
724
3
                    ));
725
9
                }
726
9
                if v.as_str().cmp(target_owner_id) == Ordering::Equal {
727
3
                    found_owner = true;
728
6
                }
729
            }
730
9
            if !found_owner {
731
6
                member_ids.push(target_owner_id.to_string());
732
6
            }
733
9
            updates.member_ids = Some(member_ids);
734
9
            count += 1;
735
9
        }
736
6
    }
737
24
    if let Some(name) = body.name.as_ref() {
738
12
        updates.name = Some(name.as_str());
739
12
        count += 1;
740
12
    }
741
24
    if let Some(info) = body.info.as_ref() {
742
15
        for (k, _) in info.iter() {
743
9
            if k.len() == 0 {
744
3
                return Err(ErrResp::ErrParam(Some(
745
3
                    "`info` key must not be empty".to_string(),
746
3
                )));
747
6
            }
748
        }
749
12
        updates.info = Some(info);
750
12
        count += 1;
751
9
    }
752

            
753
21
    if count == 0 {
754
6
        return Err(ErrResp::ErrParam(Some(
755
6
            "at least one parameter".to_string(),
756
6
        )));
757
15
    }
758
15
    updates.modified_at = Some(Utc::now());
759
15
    Ok(updates)
760
36
}
761

            
762
/// Use the Bearer token to check if the user ID is exist.
763
///
764
/// # Errors
765
///
766
/// Returns OK if status code is 200/404. Otherwise errors will be returned.
767
36
async fn check_user(
768
36
    fn_name: &str,
769
36
    token: &str,
770
36
    owner_id: &str,
771
36
    state: &AppState,
772
36
) -> Result<bool, ErrResp> {
773
36
    let uri = format!("{}/api/v1/user/{}", state.auth_base.as_str(), owner_id);
774
36
    let req = match state
775
36
        .client
776
36
        .request(reqwest::Method::GET, uri)
777
36
        .header(reqwest::header::AUTHORIZATION, format!("Bearer {}", token))
778
36
        .build()
779
    {
780
        Err(e) => {
781
            error!("[{}] create request error: {}", fn_name, e);
782
            return Err(ErrResp::ErrRsc(Some(format!(
783
                "create request error: {}",
784
                e
785
            ))));
786
        }
787
36
        Ok(req) => req,
788
36
    };
789
48
    match state.client.execute(req).await {
790
        Err(e) => {
791
            error!("[{}] request owner check error: {}", fn_name, e);
792
            return Err(ErrResp::ErrIntMsg(Some(format!(
793
                "check owner error: {}",
794
                e
795
            ))));
796
        }
797
36
        Ok(resp) => match resp.status() {
798
            reqwest::StatusCode::UNAUTHORIZED => {
799
                return Err(ErrResp::ErrAuth(None));
800
            }
801
27
            reqwest::StatusCode::OK => return Ok(true),
802
9
            reqwest::StatusCode::NOT_FOUND => return Ok(false),
803
            _ => {
804
                error!(
805
                    "[{}] check owner with status code: {}",
806
                    fn_name,
807
                    resp.status()
808
                );
809
                return Err(ErrResp::ErrIntMsg(Some(format!(
810
                    "check owner with status code: {}",
811
                    resp.status()
812
                ))));
813
            }
814
        },
815
    }
816
36
}
817

            
818
81
fn unit_list_transform(list: &Vec<Unit>) -> Vec<response::GetUnitData> {
819
81
    let mut ret = vec![];
820
243
    for unit in list.iter() {
821
243
        ret.push(unit_transform(&unit));
822
243
    }
823
81
    ret
824
81
}
825

            
826
39
fn unit_list_transform_bytes(
827
39
    list: &Vec<Unit>,
828
39
    with_start: bool,
829
39
    with_end: bool,
830
39
    format: Option<&request::ListFormat>,
831
39
) -> Result<Bytes, Box<dyn StdError + Send + Sync>> {
832
39
    let mut build_str = match with_start {
833
21
        false => "".to_string(),
834
3
        true => match format {
835
3
            Some(request::ListFormat::Array) => "[".to_string(),
836
15
            _ => "{\"data\":[".to_string(),
837
        },
838
    };
839
39
    let mut is_first = with_start;
840

            
841
2466
    for item in list {
842
2427
        if is_first {
843
18
            is_first = false;
844
2409
        } else {
845
2409
            build_str.push(',');
846
2409
        }
847
2427
        let json_str = match serde_json::to_string(&unit_transform(item)) {
848
            Err(e) => return Err(Box::new(e)),
849
2427
            Ok(str) => str,
850
2427
        };
851
2427
        build_str += json_str.as_str();
852
    }
853

            
854
39
    if with_end {
855
18
        build_str += match format {
856
3
            Some(request::ListFormat::Array) => "]",
857
15
            _ => "]}",
858
        }
859
21
    }
860
39
    Ok(Bytes::copy_from_slice(build_str.as_str().as_bytes()))
861
39
}
862

            
863
2679
fn unit_transform(unit: &Unit) -> response::GetUnitData {
864
2679
    response::GetUnitData {
865
2679
        unit_id: unit.unit_id.clone(),
866
2679
        code: unit.code.clone(),
867
2679
        created_at: time_str(&unit.created_at),
868
2679
        modified_at: time_str(&unit.modified_at),
869
2679
        owner_id: unit.owner_id.clone(),
870
2679
        member_ids: unit.member_ids.clone(),
871
2679
        name: unit.name.clone(),
872
2679
        info: unit.info.clone(),
873
2679
    }
874
2679
}
875

            
876
21
async fn del_unit_rsc(
877
21
    fn_name: &str,
878
21
    unit_id: &str,
879
21
    unit_code: &str,
880
21
    state: &AppState,
881
21
) -> Result<(), ErrResp> {
882
21
    let cond = ApplicationCond {
883
21
        unit_id: Some(unit_id),
884
21
        ..Default::default()
885
21
    };
886
21
    let opts = ApplicationListOpts {
887
21
        cond: &cond,
888
21
        offset: None,
889
21
        limit: None,
890
21
        sort: None,
891
21
        cursor_max: Some(LIST_CURSOR_MAX),
892
21
    };
893
21
    let mut cursor: Option<Box<dyn ApplicationCursor>> = None;
894
21
    let mut rm_mgrs = vec![];
895
    loop {
896
42
        match state.model.application().list(&opts, cursor).await {
897
            Err(e) => {
898
                error!("[{}] list application error: {}", fn_name, e);
899
                return Err(ErrResp::ErrDb(Some(e.to_string())));
900
            }
901
21
            Ok((list, _cursor)) => {
902
33
                for app in list {
903
12
                    let key = gen_mgr_key(unit_code, app.code.as_str());
904
12
                    match { state.application_mgrs.lock().unwrap().remove(&key) } {
905
12
                        None => error!("[{}] get no application manager {}", fn_name, key),
906
                        Some(old_mgr) => {
907
                            rm_mgrs.push(old_mgr);
908
                        }
909
                    }
910
                }
911
21
                if _cursor.is_none() {
912
21
                    break;
913
                }
914
                cursor = _cursor;
915
            }
916
        }
917
    }
918
21
    for mgr in rm_mgrs {
919
        if let Err(e) = mgr.close().await {
920
            error!(
921
                "[{}] close old application manager {} error: {}",
922
                fn_name,
923
                mgr.name(),
924
                e
925
            );
926
        }
927
    }
928

            
929
21
    let cond = NetworkCond {
930
21
        unit_id: Some(Some(unit_id)),
931
21
        ..Default::default()
932
21
    };
933
21
    let opts = NetworkListOpts {
934
21
        cond: &cond,
935
21
        offset: None,
936
21
        limit: None,
937
21
        sort: None,
938
21
        cursor_max: Some(LIST_CURSOR_MAX),
939
21
    };
940
21
    let mut cursor: Option<Box<dyn NetworkCursor>> = None;
941
21
    let mut rm_mgrs = vec![];
942
    loop {
943
42
        match state.model.network().list(&opts, cursor).await {
944
            Err(e) => {
945
                error!("[{}] list network error: {}", fn_name, e);
946
                return Err(ErrResp::ErrDb(Some(e.to_string())));
947
            }
948
21
            Ok((list, _cursor)) => {
949
33
                for app in list {
950
12
                    let key = gen_mgr_key(unit_code, app.code.as_str());
951
12
                    match { state.network_mgrs.lock().unwrap().remove(&key) } {
952
12
                        None => error!("[{}] get no network manager {}", fn_name, key),
953
                        Some(old_mgr) => {
954
                            rm_mgrs.push(old_mgr);
955
                        }
956
                    }
957
                }
958
21
                if _cursor.is_none() {
959
21
                    break;
960
                }
961
                cursor = _cursor;
962
            }
963
        }
964
    }
965
21
    for mgr in rm_mgrs {
966
        if let Err(e) = mgr.close().await {
967
            error!(
968
                "[{}] close old network manager {} error: {}",
969
                fn_name,
970
                mgr.name(),
971
                e
972
            );
973
        }
974
    }
975

            
976
21
    let cond = network_route::QueryCond {
977
21
        unit_id: Some(unit_id),
978
21
        ..Default::default()
979
21
    };
980
42
    if let Err(e) = state.model.network_route().del(&cond).await {
981
        error!("[{}] del network_route error: {}", fn_name, e);
982
        return Err(ErrResp::ErrDb(Some(e.to_string())));
983
21
    }
984
21

            
985
21
    let cond = device_route::QueryCond {
986
21
        unit_id: Some(unit_id),
987
21
        ..Default::default()
988
21
    };
989
42
    if let Err(e) = state.model.device_route().del(&cond).await {
990
        error!("[{}] del device_route error: {}", fn_name, e);
991
        return Err(ErrResp::ErrDb(Some(e.to_string())));
992
21
    }
993
21

            
994
21
    let cond = dldata_buffer::QueryCond {
995
21
        unit_id: Some(unit_id),
996
21
        ..Default::default()
997
21
    };
998
42
    if let Err(e) = state.model.dldata_buffer().del(&cond).await {
999
        error!("[{}] del dldata_buffer error: {}", fn_name, e);
        return Err(ErrResp::ErrDb(Some(e.to_string())));
21
    }
21

            
21
    let cond = device::QueryCond {
21
        unit_id: Some(unit_id),
21
        ..Default::default()
21
    };
42
    if let Err(e) = state.model.device().del(&cond).await {
        error!("[{}] del device error: {}", fn_name, e);
        return Err(ErrResp::ErrDb(Some(e.to_string())));
21
    }
21

            
21
    let cond = network::QueryCond {
21
        unit_id: Some(Some(unit_id)),
21
        ..Default::default()
21
    };
42
    if let Err(e) = state.model.network().del(&cond).await {
        error!("[{}] del network error: {}", fn_name, e);
        return Err(ErrResp::ErrDb(Some(e.to_string())));
21
    }
21

            
21
    let cond = application::QueryCond {
21
        unit_id: Some(unit_id),
21
        ..Default::default()
21
    };
42
    if let Err(e) = state.model.application().del(&cond).await {
        error!("[{}] del application error: {}", fn_name, e);
        return Err(ErrResp::ErrDb(Some(e.to_string())));
21
    }
21

            
21
    let cond = QueryCond {
21
        unit_id: Some(unit_id),
21
        ..Default::default()
21
    };
42
    if let Err(e) = state.model.unit().del(&cond).await {
        error!("[{}] del unit error: {}", fn_name, e);
        return Err(ErrResp::ErrDb(Some(e.to_string())));
21
    }
21

            
21
    Ok(())
21
}
/// Send delete control message.
21
async fn send_del_ctrl_message(fn_name: &str, unit: Unit, state: &AppState) -> Result<(), ErrResp> {
21
    if state.cache.is_some() {
7
        let msg = SendCtrlMsg::DelUnit {
7
            operation: CtrlMsgOp::DEL_UNIT.to_string(),
7
            new: CtrlDelUnit {
7
                unit_id: unit.unit_id,
7
                unit_code: unit.code,
7
            },
7
        };
7
        let payload = match serde_json::to_vec(&msg) {
            Err(e) => {
                error!(
                    "[{}] marshal JSON for {} error: {}",
                    fn_name,
                    CtrlMsgOp::DEL_UNIT,
                    e
                );
                return Err(ErrResp::ErrRsc(Some(format!(
                    "marshal control message error: {}",
                    e
                ))));
            }
7
            Ok(payload) => payload,
7
        };
7
        let ctrl_sender = { state.ctrl_senders.unit.lock().unwrap().clone() };
7
        if let Err(e) = ctrl_sender.send_msg(payload).await {
            error!(
                "[{}] send control message for {} error: {}",
                fn_name,
                CtrlMsgOp::DEL_UNIT,
                e
            );
            return Err(ErrResp::ErrIntMsg(Some(format!(
                "send control message error: {}",
                e
            ))));
7
        }
14
    }
21
    Ok(())
21
}
#[async_trait]
impl QueueEventHandler for CtrlSenderHandler {
    async fn on_error(&self, queue: Arc<dyn GmqQueue>, err: Box<dyn StdError + Send + Sync>) {
        const FN_NAME: &'static str = "CtrlSenderHandler::on_error";
        let queue_name = queue.name();
        error!("[{}] {} error: {}", FN_NAME, queue_name, err);
    }
36
    async fn on_status(&self, queue: Arc<dyn GmqQueue>, status: Status) {
        const FN_NAME: &'static str = "CtrlSenderHandler::on_status";
36
        let queue_name = queue.name();
36
        match status {
15
            Status::Connected => info!("[{}] {} connected", queue_name, FN_NAME),
21
            _ => warn!("[{}] {} status to {:?}", FN_NAME, queue_name, status),
        }
72
    }
}
#[async_trait]
impl MessageHandler for CtrlSenderHandler {
    async fn on_message(&self, _queue: Arc<dyn GmqQueue>, _msg: Box<dyn Message>) {}
}
#[async_trait]
impl QueueEventHandler for CtrlReceiverHandler {
    async fn on_error(&self, queue: Arc<dyn GmqQueue>, err: Box<dyn StdError + Send + Sync>) {
        const FN_NAME: &'static str = "CtrlReceiverHandler::on_error";
        let queue_name = queue.name();
        error!("[{}] {} error: {}", FN_NAME, queue_name, err);
    }
30
    async fn on_status(&self, queue: Arc<dyn GmqQueue>, status: Status) {
        const FN_NAME: &'static str = "CtrlReceiverHandler::on_status";
30
        let queue_name = queue.name();
30
        match status {
15
            Status::Connected => info!("[{}] {} connected", queue_name, FN_NAME),
15
            _ => warn!("[{}] {} status to {:?}", FN_NAME, queue_name, status),
        }
60
    }
}
#[async_trait]
impl MessageHandler for CtrlReceiverHandler {
7
    async fn on_message(&self, queue: Arc<dyn GmqQueue>, msg: Box<dyn Message>) {
        const FN_NAME: &'static str = "CtrlReceiverHandler::on_message";
7
        let queue_name = queue.name();
7
        let ctrl_msg = match serde_json::from_slice::<RecvCtrlMsg>(msg.payload()) {
            Err(e) => {
                let src_str: String = String::from_utf8_lossy(msg.payload()).into();
                warn!(
                    "[{}] {} parse JSON error: {}, src: {}",
                    FN_NAME, queue_name, e, src_str
                );
                if let Err(e) = msg.ack().await {
                    error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
                }
                return;
            }
7
            Ok(msg) => msg,
7
        };
7
        match ctrl_msg {
7
            RecvCtrlMsg::DelUnit { new } => {
7
                if let Some(cache) = self.cache.as_ref() {
7
                    let cond = device_route::DelCacheQueryCond {
7
                        unit_code: new.unit_code.as_str(),
7
                        network_code: None,
7
                        network_addr: None,
7
                    };
7
                    if let Err(e) = cache.device_route().del_dldata(&cond).await {
                        error!(
                            "[{}] {} delete device route cache error: {}",
                            FN_NAME, queue_name, e
                        );
                    } else {
7
                        debug!("[{}] {} delete device route cache", FN_NAME, queue_name);
                    }
7
                    let cond = device_route::DelCachePubQueryCond {
7
                        unit_id: new.unit_id.as_str(),
7
                        device_id: None,
7
                    };
7
                    if let Err(e) = cache.device_route().del_dldata_pub(&cond).await {
                        error!(
                            "[{}] {} delete device route cache error: {}",
                            FN_NAME, queue_name, e
                        );
                    } else {
7
                        debug!("[{}] {} delete device route cache", FN_NAME, queue_name);
                    }
                }
7
                if let Err(e) = msg.ack().await {
                    error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
7
                }
            }
        }
14
    }
}