1
use std::{
2
    cmp::Ordering,
3
    collections::HashMap,
4
    error::Error as StdError,
5
    io::{Error as IoError, ErrorKind},
6
    sync::{Arc, Mutex},
7
    time::Duration,
8
};
9

            
10
use async_trait::async_trait;
11
use axum::{
12
    Extension,
13
    body::{Body, Bytes},
14
    extract::State,
15
    http::{StatusCode, header},
16
    response::IntoResponse,
17
};
18
use chrono::Utc;
19
use log::{debug, error, info, warn};
20
use serde::{Deserialize, Serialize};
21
use serde_json::{self, Map};
22
use tokio::time;
23
use url::Url;
24

            
25
use general_mq::{
26
    Queue,
27
    queue::{EventHandler as QueueEventHandler, GmqQueue, Message, MessageHandler, Status},
28
};
29
use sylvia_iot_corelib::{
30
    constants::ContentType,
31
    err::ErrResp,
32
    http::{Json, Path, Query},
33
    role::Role,
34
    strings::{self, time_str},
35
};
36

            
37
use super::{
38
    super::{
39
        super::{ErrReq, State as AppState, middleware::GetTokenInfoData},
40
        lib::{check_unit, gen_mgr_key},
41
    },
42
    request, response,
43
};
44
use crate::{
45
    libs::{
46
        config::BrokerCtrl as CfgCtrl,
47
        mq::{self, Connection},
48
    },
49
    models::{
50
        Cache,
51
        application::{
52
            self, Cursor as ApplicationCursor, ListOptions as ApplicationListOpts,
53
            ListQueryCond as ApplicationCond,
54
        },
55
        device, device_route, dldata_buffer,
56
        network::{
57
            self, Cursor as NetworkCursor, ListOptions as NetworkListOpts,
58
            ListQueryCond as NetworkCond,
59
        },
60
        network_route,
61
        unit::{
62
            Cursor, ListOptions, ListQueryCond, QueryCond, SortCond, SortKey, Unit,
63
            UpdateQueryCond, Updates,
64
        },
65
    },
66
};
67

            
68
#[derive(Deserialize, Serialize)]
69
#[serde(tag = "operation")]
70
enum RecvCtrlMsg {
71
    #[serde(rename = "del-unit")]
72
    DelUnit { new: CtrlDelUnit },
73
}
74

            
75
#[derive(Serialize)]
76
#[serde(untagged)]
77
enum SendCtrlMsg {
78
    DelUnit { operation: String, new: CtrlDelUnit },
79
}
80

            
81
struct CtrlMsgOp;
82

            
83
#[derive(Deserialize, Serialize)]
84
struct CtrlDelUnit {
85
    #[serde(rename = "unitId")]
86
    unit_id: String,
87
    #[serde(rename = "unitCode")]
88
    unit_code: String,
89
}
90

            
91
struct CtrlSenderHandler;
92

            
93
struct CtrlReceiverHandler {
94
    cache: Option<Arc<dyn Cache>>,
95
}
96

            
97
impl CtrlMsgOp {
98
    const DEL_UNIT: &'static str = "del-unit";
99
}
100

            
101
const LIST_LIMIT_DEFAULT: u64 = 100;
102
const LIST_CURSOR_MAX: u64 = 100;
103
const ID_RAND_LEN: usize = 8;
104
const CTRL_QUEUE_NAME: &'static str = "unit";
105

            
106
/// Initialize channels.
107
30
pub async fn init(state: &AppState, ctrl_conf: &CfgCtrl) -> Result<(), Box<dyn StdError>> {
108
    const FN_NAME: &'static str = "init";
109

            
110
30
    let q = new_ctrl_receiver(state, ctrl_conf)?;
111
30
    {
112
30
        state
113
30
            .ctrl_receivers
114
30
            .lock()
115
30
            .unwrap()
116
30
            .insert(CTRL_QUEUE_NAME.to_string(), q.clone());
117
30
    }
118
30

            
119
30
    let ctrl_sender = { state.ctrl_senders.unit.lock().unwrap().clone() };
120
    // Wait for connected.
121
2760
    for _ in 0..500 {
122
2760
        if ctrl_sender.status() == Status::Connected && q.status() == Status::Connected {
123
30
            break;
124
2730
        }
125
2730
        time::sleep(Duration::from_millis(10)).await;
126
    }
127
30
    if ctrl_sender.status() != Status::Connected {
128
        error!(
129
            "[{}] {} control sender not connected",
130
            FN_NAME, CTRL_QUEUE_NAME
131
        );
132
        return Err(Box::new(IoError::new(
133
            ErrorKind::NotConnected,
134
            format!("control sender {} not connected", CTRL_QUEUE_NAME),
135
        )));
136
30
    }
137
30
    if q.status() != Status::Connected {
138
        error!(
139
            "[{}] {} control receiver not connected",
140
            FN_NAME, CTRL_QUEUE_NAME
141
        );
142
        return Err(Box::new(IoError::new(
143
            ErrorKind::NotConnected,
144
            format!("control receiver {} not connected", CTRL_QUEUE_NAME),
145
        )));
146
30
    }
147
30

            
148
30
    Ok(())
149
30
}
150

            
151
/// Create control channel sender queue.
152
30
pub fn new_ctrl_sender(
153
30
    conn_pool: &Arc<Mutex<HashMap<String, Connection>>>,
154
30
    config: &CfgCtrl,
155
30
) -> Result<Arc<Mutex<Queue>>, Box<dyn StdError>> {
156
30
    let url = match config.url.as_ref() {
157
        None => {
158
            return Err(Box::new(IoError::new(
159
                ErrorKind::InvalidInput,
160
                "empty control url",
161
            )));
162
        }
163
30
        Some(url) => match Url::parse(url.as_str()) {
164
            Err(e) => return Err(Box::new(e)),
165
30
            Ok(url) => url,
166
30
        },
167
30
    };
168
30

            
169
30
    match mq::control::new(
170
30
        conn_pool.clone(),
171
30
        &url,
172
30
        config.prefetch,
173
30
        CTRL_QUEUE_NAME,
174
30
        false,
175
30
        Arc::new(CtrlSenderHandler {}),
176
30
        Arc::new(CtrlSenderHandler {}),
177
30
    ) {
178
        Err(e) => Err(Box::new(IoError::new(ErrorKind::InvalidInput, e))),
179
30
        Ok(q) => Ok(Arc::new(Mutex::new(q))),
180
    }
181
30
}
182

            
183
/// Create control channel receiver queue.
184
30
pub fn new_ctrl_receiver(state: &AppState, config: &CfgCtrl) -> Result<Queue, Box<dyn StdError>> {
185
30
    let url = match config.url.as_ref() {
186
        None => {
187
            return Err(Box::new(IoError::new(
188
                ErrorKind::InvalidInput,
189
                "empty control url",
190
            )));
191
        }
192
30
        Some(url) => match Url::parse(url.as_str()) {
193
            Err(e) => return Err(Box::new(e)),
194
30
            Ok(url) => url,
195
30
        },
196
30
    };
197
30
    let handler = Arc::new(CtrlReceiverHandler {
198
30
        cache: state.cache.clone(),
199
30
    });
200
30
    match mq::control::new(
201
30
        state.mq_conns.clone(),
202
30
        &url,
203
30
        config.prefetch,
204
30
        CTRL_QUEUE_NAME,
205
30
        true,
206
30
        handler.clone(),
207
30
        handler,
208
30
    ) {
209
        Err(e) => Err(Box::new(IoError::new(ErrorKind::InvalidInput, e))),
210
30
        Ok(q) => Ok(q),
211
    }
212
30
}
213

            
214
/// `POST /{base}/api/v1/unit`
215
90
pub async fn post_unit(
216
90
    State(state): State<AppState>,
217
90
    Extension(token_info): Extension<GetTokenInfoData>,
218
90
    Json(body): Json<request::PostUnitBody>,
219
90
) -> impl IntoResponse {
220
    const FN_NAME: &'static str = "post_unit";
221

            
222
90
    let user_id = token_info.user_id.as_str();
223
90
    let roles = &token_info.roles;
224
90
    let token = token_info.token.as_str();
225
90

            
226
90
    let code = body.data.code.to_lowercase();
227
90
    if !strings::is_code(code.as_str()) {
228
6
        return Err(ErrResp::ErrParam(Some(
229
6
            "`code` must be [A-Za-z0-9]{1}[A-Za-z0-9-_]*".to_string(),
230
6
        )));
231
84
    }
232
84
    let owner_id = match body.data.owner_id.as_ref() {
233
48
        None => user_id,
234
36
        Some(owner_id) => {
235
36
            if Role::is_role(roles, Role::ADMIN) || Role::is_role(roles, Role::MANAGER) {
236
30
                if owner_id.len() == 0 {
237
6
                    return Err(ErrResp::ErrParam(Some(
238
6
                        "`ownerId` must with at least one character".to_string(),
239
6
                    )));
240
24
                }
241
24
                match check_user(FN_NAME, token, owner_id.as_str(), &state).await {
242
                    Err(e) => return Err(e),
243
24
                    Ok(result) => match result {
244
                        false => {
245
6
                            return Err(ErrResp::Custom(
246
6
                                ErrReq::OWNER_NOT_EXIST.0,
247
6
                                ErrReq::OWNER_NOT_EXIST.1,
248
6
                                None,
249
6
                            ));
250
                        }
251
18
                        true => owner_id.as_str(),
252
                    },
253
                }
254
            } else {
255
6
                user_id
256
            }
257
        }
258
    };
259
72
    if let Some(info) = body.data.info.as_ref() {
260
12
        for (k, _) in info.iter() {
261
12
            if k.len() == 0 {
262
6
                return Err(ErrResp::ErrParam(Some(
263
6
                    "`info` key must not be empty".to_string(),
264
6
                )));
265
6
            }
266
        }
267
60
    }
268

            
269
66
    let cond = QueryCond {
270
66
        code: Some(code.as_str()),
271
66
        ..Default::default()
272
66
    };
273
66
    match state.model.unit().get(&cond).await {
274
        Err(e) => {
275
            error!("[{}] get error: {}", FN_NAME, e);
276
            return Err(ErrResp::ErrDb(Some(e.to_string())));
277
        }
278
66
        Ok(unit) => match unit {
279
60
            None => (),
280
            Some(_) => {
281
6
                return Err(ErrResp::Custom(
282
6
                    ErrReq::UNIT_EXIST.0,
283
6
                    ErrReq::UNIT_EXIST.1,
284
6
                    None,
285
6
                ));
286
            }
287
        },
288
    }
289

            
290
60
    let now = Utc::now();
291
60
    let unit = Unit {
292
60
        unit_id: strings::random_id(&now, ID_RAND_LEN),
293
60
        code,
294
60
        created_at: now,
295
60
        modified_at: now,
296
60
        owner_id: owner_id.to_string(),
297
60
        member_ids: vec![owner_id.to_string()],
298
60
        name: match body.data.name.as_ref() {
299
54
            None => "".to_string(),
300
6
            Some(name) => name.clone(),
301
        },
302
60
        info: match body.data.info.as_ref() {
303
54
            None => Map::new(),
304
6
            Some(info) => info.clone(),
305
        },
306
    };
307
60
    if let Err(e) = state.model.unit().add(&unit).await {
308
        error!("[{}] add error: {}", FN_NAME, e);
309
        return Err(ErrResp::ErrDb(Some(e.to_string())));
310
60
    }
311
60
    Ok(Json(response::PostUnit {
312
60
        data: response::PostUnitData {
313
60
            unit_id: unit.unit_id,
314
60
        },
315
60
    }))
316
90
}
317

            
318
/// `GET /{base}/api/v1/unit/count`
319
90
pub async fn get_unit_count(
320
90
    State(state): State<AppState>,
321
90
    Extension(token_info): Extension<GetTokenInfoData>,
322
90
    Query(query): Query<request::GetUnitCountQuery>,
323
90
) -> impl IntoResponse {
324
    const FN_NAME: &'static str = "get_unit_count";
325

            
326
90
    let user_id = token_info.user_id.as_str();
327
90
    let roles = &token_info.roles;
328
90

            
329
90
    let mut owner_cond = None;
330
90
    let mut member_cond = None;
331
90
    let mut code_contains_cond = None;
332
90
    if Role::is_role(roles, Role::ADMIN) || Role::is_role(roles, Role::MANAGER) {
333
30
        if let Some(owner) = query.owner.as_ref() {
334
6
            if owner.len() > 0 {
335
6
                owner_cond = Some(owner.as_str());
336
6
            }
337
24
        }
338
30
        if let Some(member) = query.member.as_ref() {
339
12
            if member.len() > 0 {
340
12
                member_cond = Some(member.as_str());
341
12
            }
342
18
        }
343
60
    } else {
344
60
        member_cond = Some(user_id);
345
60
    }
346
90
    if let Some(contains) = query.contains.as_ref() {
347
18
        if contains.len() > 0 {
348
18
            code_contains_cond = Some(contains.as_str());
349
18
        }
350
72
    }
351
90
    let cond = ListQueryCond {
352
90
        owner_id: owner_cond,
353
90
        member_id: member_cond,
354
90
        code_contains: code_contains_cond,
355
90
        ..Default::default()
356
90
    };
357
90
    match state.model.unit().count(&cond).await {
358
        Err(e) => {
359
            error!("[{}] count error: {}", FN_NAME, e);
360
            Err(ErrResp::ErrDb(Some(e.to_string())))
361
        }
362
90
        Ok(count) => Ok(Json(response::GetUnitCount {
363
90
            data: response::GetCountData { count },
364
90
        })),
365
    }
366
90
}
367

            
368
/// `GET /{base}/api/v1/unit/list`
369
228
pub async fn get_unit_list(
370
228
    State(state): State<AppState>,
371
228
    Extension(token_info): Extension<GetTokenInfoData>,
372
228
    Query(query): Query<request::GetUnitListQuery>,
373
228
) -> impl IntoResponse {
374
    const FN_NAME: &'static str = "get_unit_list";
375

            
376
228
    let user_id = token_info.user_id;
377
228
    let roles = token_info.roles;
378
228

            
379
228
    let mut owner_cond = None;
380
228
    let mut member_cond = None;
381
228
    let mut code_contains_cond = None;
382
228
    if Role::is_role(&roles, Role::ADMIN) || Role::is_role(&roles, Role::MANAGER) {
383
168
        if let Some(owner) = query.owner.as_ref() {
384
48
            if owner.len() > 0 {
385
48
                owner_cond = Some(owner.as_str());
386
48
            }
387
120
        }
388
168
        if let Some(member) = query.member.as_ref() {
389
54
            if member.len() > 0 {
390
54
                member_cond = Some(member.as_str());
391
54
            }
392
114
        }
393
60
    } else {
394
60
        member_cond = Some(user_id.as_str());
395
60
    }
396
228
    if let Some(contains) = query.contains.as_ref() {
397
60
        if contains.len() > 0 {
398
60
            code_contains_cond = Some(contains.as_str());
399
60
        }
400
168
    }
401
228
    let cond = ListQueryCond {
402
228
        owner_id: owner_cond,
403
228
        member_id: member_cond,
404
228
        code_contains: code_contains_cond,
405
228
        ..Default::default()
406
228
    };
407
228
    let sort_cond = get_sort_cond(&query.sort)?;
408
198
    let opts = ListOptions {
409
198
        cond: &cond,
410
198
        offset: query.offset,
411
198
        limit: match query.limit {
412
156
            None => Some(LIST_LIMIT_DEFAULT),
413
42
            Some(limit) => match limit {
414
12
                0 => None,
415
30
                _ => Some(limit),
416
            },
417
        },
418
198
        sort: Some(sort_cond.as_slice()),
419
198
        cursor_max: Some(LIST_CURSOR_MAX),
420
    };
421

            
422
198
    let (list, cursor) = match state.model.unit().list(&opts, None).await {
423
        Err(e) => {
424
            error!("[{}] list error: {}", FN_NAME, e);
425
            return Err(ErrResp::ErrDb(Some(e.to_string())));
426
        }
427
198
        Ok((list, cursor)) => match cursor {
428
6
            None => match query.format {
429
                Some(request::ListFormat::Array) => {
430
6
                    return Ok(Json(unit_list_transform(&list)).into_response());
431
                }
432
                _ => {
433
156
                    return Ok(Json(response::GetUnitList {
434
156
                        data: unit_list_transform(&list),
435
156
                    })
436
156
                    .into_response());
437
                }
438
            },
439
36
            Some(_) => (list, cursor),
440
36
        },
441
36
    };
442
36

            
443
36
    let body = Body::from_stream(async_stream::stream! {
444
36
        let mut owner_cond = None;
445
36
        let mut member_cond = Some(user_id.as_str());
446
36
        let mut code_contains_cond = None;
447
36
        if Role::is_role(&roles, Role::ADMIN) || Role::is_role(&roles, Role::MANAGER) {
448
36
            if let Some(owner) = query.owner.as_ref() {
449
36
                if owner.len() > 0 {
450
36
                    owner_cond = Some(owner.as_str());
451
36
                }
452
36
            }
453
36
            if let Some(member) = query.member.as_ref() {
454
36
                if member.len() > 0{
455
36
                    member_cond = Some(member.as_str());
456
36
                }
457
36
            }
458
36
        } else {
459
36
            member_cond = Some(user_id.as_str());
460
36
        }
461
36
        if let Some(contains) = query.contains.as_ref() {
462
36
            if contains.len() > 0 {
463
36
                code_contains_cond = Some(contains.as_str());
464
36
            }
465
36
        }
466
36
        let cond = ListQueryCond {
467
36
            owner_id: owner_cond,
468
36
            member_id: member_cond,
469
36
            code_contains: code_contains_cond,
470
36
            ..Default::default()
471
36
        };
472
36
        let opts = ListOptions {
473
36
            cond: &cond,
474
36
            offset: query.offset,
475
36
            limit: match query.limit {
476
36
                None => Some(LIST_LIMIT_DEFAULT),
477
36
                Some(limit) => match limit {
478
36
                    0 => None,
479
36
                    _ => Some(limit),
480
36
                },
481
36
            },
482
36
            sort: Some(sort_cond.as_slice()),
483
36
            cursor_max: Some(LIST_CURSOR_MAX),
484
36
        };
485
36

            
486
36
        let mut list = list;
487
36
        let mut cursor = cursor;
488
36
        let mut is_first = true;
489
36
        loop {
490
36
            yield unit_list_transform_bytes(&list, is_first, cursor.is_none(), query.format.as_ref());
491
36
            is_first = false;
492
36
            if cursor.is_none() {
493
36
                break;
494
36
            }
495
36
            let (_list, _cursor) = match state.model.unit().list(&opts, cursor).await {
496
36
                Err(_) => break,
497
36
                Ok((list, cursor)) => (list, cursor),
498
36
            };
499
36
            list = _list;
500
36
            cursor = _cursor;
501
36
        }
502
36
    });
503
36
    Ok(([(header::CONTENT_TYPE, ContentType::JSON)], body).into_response())
504
228
}
505

            
506
/// `GET /{base}/api/v1/unit/{unitId}`
507
42
pub async fn get_unit(
508
42
    State(state): State<AppState>,
509
42
    Extension(token_info): Extension<GetTokenInfoData>,
510
42
    Path(param): Path<request::UnitIdPath>,
511
42
) -> impl IntoResponse {
512
    const FN_NAME: &'static str = "get_unit";
513

            
514
42
    let user_id = token_info.user_id.as_str();
515
42
    let roles = &token_info.roles;
516
42
    let unit_id = param.unit_id.as_str();
517
42

            
518
42
    match check_unit(FN_NAME, user_id, roles, unit_id, false, &state).await? {
519
24
        None => Err(ErrResp::ErrNotFound(None)),
520
18
        Some(unit) => Ok(Json(response::GetUnit {
521
18
            data: unit_transform(&unit),
522
18
        })),
523
    }
524
42
}
525

            
526
/// `PATCH /{base}/api/v1/unit/{unitId}`
527
84
pub async fn patch_unit(
528
84
    State(state): State<AppState>,
529
84
    Extension(token_info): Extension<GetTokenInfoData>,
530
84
    Path(param): Path<request::UnitIdPath>,
531
84
    Json(mut body): Json<request::PatchUnitBody>,
532
84
) -> impl IntoResponse {
533
    const FN_NAME: &'static str = "patch_unit";
534

            
535
84
    let user_id = token_info.user_id.as_str();
536
84
    let roles = &token_info.roles;
537
84
    let unit_id = param.unit_id.as_str();
538
84
    let token = &token_info.token;
539

            
540
    // To check if the unit is for the owner.
541
84
    let target_unit = match check_unit(FN_NAME, user_id, roles, unit_id, true, &state).await? {
542
12
        None => return Err(ErrResp::ErrNotFound(None)),
543
72
        Some(unit) => unit,
544
    };
545

            
546
72
    let updates = get_updates(FN_NAME, token, &state, &mut body.data, roles, &target_unit).await?;
547
30
    let cond = UpdateQueryCond { unit_id };
548
30
    if let Err(e) = state.model.unit().update(&cond, &updates).await {
549
        error!("[{}] update error: {}", FN_NAME, e);
550
        return Err(ErrResp::ErrDb(Some(e.to_string())));
551
30
    }
552
30
    Ok(StatusCode::NO_CONTENT)
553
84
}
554

            
555
/// `DELETE /{base}/api/v1/unit/{unitId}`
556
48
pub async fn delete_unit(
557
48
    State(state): State<AppState>,
558
48
    Extension(token_info): Extension<GetTokenInfoData>,
559
48
    Path(param): Path<request::UnitIdPath>,
560
48
) -> impl IntoResponse {
561
    const FN_NAME: &'static str = "delete_unit";
562

            
563
48
    let user_id = token_info.user_id.as_str();
564
48
    let roles = &token_info.roles;
565
48
    let unit_id = param.unit_id.as_str();
566

            
567
    // To check if the unit is for the owner.
568
48
    let unit = match check_unit(FN_NAME, user_id, roles, unit_id, true, &state).await {
569
        Err(e) => return Err(e), // XXX: not use "?" to solve E0282 error.
570
48
        Ok(unit) => match unit {
571
18
            None => return Ok(StatusCode::NO_CONTENT),
572
30
            Some(unit) => unit,
573
30
        },
574
30
    };
575
30

            
576
30
    del_unit_rsc(FN_NAME, unit_id, unit.code.as_str(), &state).await?;
577
30
    send_del_ctrl_message(FN_NAME, unit, &state).await?;
578

            
579
30
    Ok(StatusCode::NO_CONTENT)
580
48
}
581

            
582
/// `DELETE /{base}/api/v1/unit/user/{userId}`
583
18
pub async fn delete_unit_user(
584
18
    State(state): State<AppState>,
585
18
    Path(param): Path<request::UserIdPath>,
586
18
) -> impl IntoResponse {
587
    const FN_NAME: &'static str = "delete_unit_user";
588

            
589
18
    let cond = ListQueryCond {
590
18
        owner_id: Some(param.user_id.as_str()),
591
18
        ..Default::default()
592
18
    };
593
18
    let opts = ListOptions {
594
18
        cond: &cond,
595
18
        offset: None,
596
18
        limit: None,
597
18
        sort: None,
598
18
        cursor_max: Some(LIST_CURSOR_MAX),
599
18
    };
600
18
    let mut cursor: Option<Box<dyn Cursor>> = None;
601
18
    let mut rm_units = vec![];
602
    loop {
603
18
        match state.model.unit().list(&opts, cursor).await {
604
            Err(e) => {
605
                error!("[{}] list error: {}", FN_NAME, e);
606
                return Err(ErrResp::ErrDb(Some(e.to_string())));
607
            }
608
18
            Ok((mut list, _cursor)) => {
609
18
                rm_units.append(&mut list);
610
18
                if _cursor.is_none() {
611
18
                    break;
612
                }
613
                cursor = _cursor;
614
            }
615
        }
616
    }
617
30
    for unit in rm_units {
618
12
        del_unit_rsc(FN_NAME, unit.unit_id.as_str(), unit.code.as_str(), &state).await?;
619
12
        send_del_ctrl_message(FN_NAME, unit, &state).await?;
620
    }
621

            
622
18
    Ok(StatusCode::NO_CONTENT)
623
18
}
624

            
625
228
fn get_sort_cond(sort_args: &Option<String>) -> Result<Vec<SortCond>, ErrResp> {
626
228
    match sort_args.as_ref() {
627
150
        None => Ok(vec![SortCond {
628
150
            key: SortKey::Code,
629
150
            asc: true,
630
150
        }]),
631
78
        Some(args) => {
632
78
            let mut args = args.split(",");
633
78
            let mut sort_cond = vec![];
634
132
            while let Some(arg) = args.next() {
635
84
                let mut cond = arg.split(":");
636
84
                let key = match cond.next() {
637
                    None => return Err(ErrResp::ErrParam(Some("wrong sort argument".to_string()))),
638
84
                    Some(field) => match field {
639
84
                        "code" => SortKey::Code,
640
72
                        "created" => SortKey::CreatedAt,
641
42
                        "modified" => SortKey::ModifiedAt,
642
30
                        "name" => SortKey::Name,
643
                        _ => {
644
12
                            return Err(ErrResp::ErrParam(Some(format!(
645
12
                                "invalid sort key {}",
646
12
                                field
647
12
                            ))));
648
                        }
649
                    },
650
                };
651
72
                let asc = match cond.next() {
652
6
                    None => return Err(ErrResp::ErrParam(Some("wrong sort argument".to_string()))),
653
66
                    Some(asc) => match asc {
654
66
                        "asc" => true,
655
30
                        "desc" => false,
656
                        _ => {
657
6
                            return Err(ErrResp::ErrParam(Some(format!(
658
6
                                "invalid sort asc {}",
659
6
                                asc
660
6
                            ))));
661
                        }
662
                    },
663
                };
664
60
                if cond.next().is_some() {
665
6
                    return Err(ErrResp::ErrParam(Some(
666
6
                        "invalid sort condition".to_string(),
667
6
                    )));
668
54
                }
669
54
                sort_cond.push(SortCond { key, asc });
670
            }
671
48
            Ok(sort_cond)
672
        }
673
    }
674
228
}
675

            
676
72
async fn get_updates<'a>(
677
72
    fn_name: &str,
678
72
    token: &str,
679
72
    state: &AppState,
680
72
    body: &'a mut request::PatchUnitData,
681
72
    roles: &HashMap<String, bool>,
682
72
    target_unit: &Unit,
683
72
) -> Result<Updates<'a>, ErrResp> {
684
72
    let mut updates = Updates {
685
72
        ..Default::default()
686
72
    };
687
72
    let mut count = 0;
688
72
    if Role::is_role(roles, Role::ADMIN) || Role::is_role(roles, Role::MANAGER) {
689
60
        let mut target_owner_id = target_unit.owner_id.as_str();
690
60
        if let Some(owner_id) = body.owner_id.as_ref() {
691
30
            if owner_id.len() == 0 {
692
6
                return Err(ErrResp::ErrParam(Some(
693
6
                    "`ownerId` must with at least one character".to_string(),
694
6
                )));
695
24
            } else if !check_user(fn_name, token, owner_id.as_str(), state).await? {
696
6
                return Err(ErrResp::Custom(
697
6
                    ErrReq::OWNER_NOT_EXIST.0,
698
6
                    ErrReq::OWNER_NOT_EXIST.1,
699
6
                    None,
700
6
                ));
701
18
            }
702
18
            target_owner_id = owner_id.as_str();
703
18
            updates.owner_id = Some(owner_id.as_str());
704
18
            count += 1;
705
18

            
706
18
            if body.member_ids.is_none() && !target_unit.member_ids.contains(owner_id) {
707
6
                body.member_ids = Some(target_unit.member_ids.clone());
708
12
            }
709
30
        }
710
48
        if let Some(member_ids) = body.member_ids.as_mut() {
711
30
            member_ids.sort();
712
30
            member_ids.dedup();
713
30
            let mut found_owner = false;
714
30
            for v in member_ids.iter() {
715
30
                if v.len() == 0 {
716
6
                    return Err(ErrResp::ErrParam(Some(
717
6
                        "`memberIds` item must with at least one character".to_string(),
718
6
                    )));
719
24
                } else if !check_user(fn_name, token, v.as_str(), state).await? {
720
6
                    return Err(ErrResp::Custom(
721
6
                        ErrReq::MEMBER_NOT_EXIST.0,
722
6
                        ErrReq::MEMBER_NOT_EXIST.1,
723
6
                        None,
724
6
                    ));
725
18
                }
726
18
                if v.as_str().cmp(target_owner_id) == Ordering::Equal {
727
6
                    found_owner = true;
728
12
                }
729
            }
730
18
            if !found_owner {
731
12
                member_ids.push(target_owner_id.to_string());
732
12
            }
733
18
            updates.member_ids = Some(member_ids);
734
18
            count += 1;
735
18
        }
736
12
    }
737
48
    if let Some(name) = body.name.as_ref() {
738
24
        updates.name = Some(name.as_str());
739
24
        count += 1;
740
24
    }
741
48
    if let Some(info) = body.info.as_ref() {
742
30
        for (k, _) in info.iter() {
743
18
            if k.len() == 0 {
744
6
                return Err(ErrResp::ErrParam(Some(
745
6
                    "`info` key must not be empty".to_string(),
746
6
                )));
747
12
            }
748
        }
749
24
        updates.info = Some(info);
750
24
        count += 1;
751
18
    }
752

            
753
42
    if count == 0 {
754
12
        return Err(ErrResp::ErrParam(Some(
755
12
            "at least one parameter".to_string(),
756
12
        )));
757
30
    }
758
30
    updates.modified_at = Some(Utc::now());
759
30
    Ok(updates)
760
72
}
761

            
762
/// Use the Bearer token to check if the user ID is exist.
763
///
764
/// # Errors
765
///
766
/// Returns OK if status code is 200/404. Otherwise errors will be returned.
767
72
async fn check_user(
768
72
    fn_name: &str,
769
72
    token: &str,
770
72
    owner_id: &str,
771
72
    state: &AppState,
772
72
) -> Result<bool, ErrResp> {
773
72
    let uri = format!("{}/api/v1/user/{}", state.auth_base.as_str(), owner_id);
774
72
    let req = match state
775
72
        .client
776
72
        .request(reqwest::Method::GET, uri)
777
72
        .header(reqwest::header::AUTHORIZATION, format!("Bearer {}", token))
778
72
        .build()
779
    {
780
        Err(e) => {
781
            error!("[{}] create request error: {}", fn_name, e);
782
            return Err(ErrResp::ErrRsc(Some(format!(
783
                "create request error: {}",
784
                e
785
            ))));
786
        }
787
72
        Ok(req) => req,
788
72
    };
789
72
    match state.client.execute(req).await {
790
        Err(e) => {
791
            error!("[{}] request owner check error: {}", fn_name, e);
792
            return Err(ErrResp::ErrIntMsg(Some(format!(
793
                "check owner error: {}",
794
                e
795
            ))));
796
        }
797
72
        Ok(resp) => match resp.status() {
798
            reqwest::StatusCode::UNAUTHORIZED => {
799
                return Err(ErrResp::ErrAuth(None));
800
            }
801
54
            reqwest::StatusCode::OK => return Ok(true),
802
18
            reqwest::StatusCode::NOT_FOUND => return Ok(false),
803
            _ => {
804
                error!(
805
                    "[{}] check owner with status code: {}",
806
                    fn_name,
807
                    resp.status()
808
                );
809
                return Err(ErrResp::ErrIntMsg(Some(format!(
810
                    "check owner with status code: {}",
811
                    resp.status()
812
                ))));
813
            }
814
        },
815
    }
816
72
}
817

            
818
162
fn unit_list_transform(list: &Vec<Unit>) -> Vec<response::GetUnitData> {
819
162
    let mut ret = vec![];
820
486
    for unit in list.iter() {
821
486
        ret.push(unit_transform(&unit));
822
486
    }
823
162
    ret
824
162
}
825

            
826
78
fn unit_list_transform_bytes(
827
78
    list: &Vec<Unit>,
828
78
    with_start: bool,
829
78
    with_end: bool,
830
78
    format: Option<&request::ListFormat>,
831
78
) -> Result<Bytes, Box<dyn StdError + Send + Sync>> {
832
78
    let mut build_str = match with_start {
833
42
        false => "".to_string(),
834
6
        true => match format {
835
6
            Some(request::ListFormat::Array) => "[".to_string(),
836
30
            _ => "{\"data\":[".to_string(),
837
        },
838
    };
839
78
    let mut is_first = with_start;
840

            
841
4932
    for item in list {
842
4854
        if is_first {
843
36
            is_first = false;
844
4818
        } else {
845
4818
            build_str.push(',');
846
4818
        }
847
4854
        let json_str = match serde_json::to_string(&unit_transform(item)) {
848
            Err(e) => return Err(Box::new(e)),
849
4854
            Ok(str) => str,
850
4854
        };
851
4854
        build_str += json_str.as_str();
852
    }
853

            
854
78
    if with_end {
855
36
        build_str += match format {
856
6
            Some(request::ListFormat::Array) => "]",
857
30
            _ => "]}",
858
        }
859
42
    }
860
78
    Ok(Bytes::copy_from_slice(build_str.as_str().as_bytes()))
861
78
}
862

            
863
5358
fn unit_transform(unit: &Unit) -> response::GetUnitData {
864
5358
    response::GetUnitData {
865
5358
        unit_id: unit.unit_id.clone(),
866
5358
        code: unit.code.clone(),
867
5358
        created_at: time_str(&unit.created_at),
868
5358
        modified_at: time_str(&unit.modified_at),
869
5358
        owner_id: unit.owner_id.clone(),
870
5358
        member_ids: unit.member_ids.clone(),
871
5358
        name: unit.name.clone(),
872
5358
        info: unit.info.clone(),
873
5358
    }
874
5358
}
875

            
876
42
async fn del_unit_rsc(
877
42
    fn_name: &str,
878
42
    unit_id: &str,
879
42
    unit_code: &str,
880
42
    state: &AppState,
881
42
) -> Result<(), ErrResp> {
882
42
    let cond = ApplicationCond {
883
42
        unit_id: Some(unit_id),
884
42
        ..Default::default()
885
42
    };
886
42
    let opts = ApplicationListOpts {
887
42
        cond: &cond,
888
42
        offset: None,
889
42
        limit: None,
890
42
        sort: None,
891
42
        cursor_max: Some(LIST_CURSOR_MAX),
892
42
    };
893
42
    let mut cursor: Option<Box<dyn ApplicationCursor>> = None;
894
42
    let mut rm_mgrs = vec![];
895
    loop {
896
42
        match state.model.application().list(&opts, cursor).await {
897
            Err(e) => {
898
                error!("[{}] list application error: {}", fn_name, e);
899
                return Err(ErrResp::ErrDb(Some(e.to_string())));
900
            }
901
42
            Ok((list, _cursor)) => {
902
66
                for app in list {
903
24
                    let key = gen_mgr_key(unit_code, app.code.as_str());
904
24
                    match { state.application_mgrs.lock().unwrap().remove(&key) } {
905
24
                        None => error!("[{}] get no application manager {}", fn_name, key),
906
                        Some(old_mgr) => {
907
                            rm_mgrs.push(old_mgr);
908
                        }
909
                    }
910
                }
911
42
                if _cursor.is_none() {
912
42
                    break;
913
                }
914
                cursor = _cursor;
915
            }
916
        }
917
    }
918
42
    for mgr in rm_mgrs {
919
        if let Err(e) = mgr.close().await {
920
            error!(
921
                "[{}] close old application manager {} error: {}",
922
                fn_name,
923
                mgr.name(),
924
                e
925
            );
926
        }
927
    }
928

            
929
42
    let cond = NetworkCond {
930
42
        unit_id: Some(Some(unit_id)),
931
42
        ..Default::default()
932
42
    };
933
42
    let opts = NetworkListOpts {
934
42
        cond: &cond,
935
42
        offset: None,
936
42
        limit: None,
937
42
        sort: None,
938
42
        cursor_max: Some(LIST_CURSOR_MAX),
939
42
    };
940
42
    let mut cursor: Option<Box<dyn NetworkCursor>> = None;
941
42
    let mut rm_mgrs = vec![];
942
    loop {
943
42
        match state.model.network().list(&opts, cursor).await {
944
            Err(e) => {
945
                error!("[{}] list network error: {}", fn_name, e);
946
                return Err(ErrResp::ErrDb(Some(e.to_string())));
947
            }
948
42
            Ok((list, _cursor)) => {
949
66
                for app in list {
950
24
                    let key = gen_mgr_key(unit_code, app.code.as_str());
951
24
                    match { state.network_mgrs.lock().unwrap().remove(&key) } {
952
24
                        None => error!("[{}] get no network manager {}", fn_name, key),
953
                        Some(old_mgr) => {
954
                            rm_mgrs.push(old_mgr);
955
                        }
956
                    }
957
                }
958
42
                if _cursor.is_none() {
959
42
                    break;
960
                }
961
                cursor = _cursor;
962
            }
963
        }
964
    }
965
42
    for mgr in rm_mgrs {
966
        if let Err(e) = mgr.close().await {
967
            error!(
968
                "[{}] close old network manager {} error: {}",
969
                fn_name,
970
                mgr.name(),
971
                e
972
            );
973
        }
974
    }
975

            
976
42
    let cond = network_route::QueryCond {
977
42
        unit_id: Some(unit_id),
978
42
        ..Default::default()
979
42
    };
980
42
    if let Err(e) = state.model.network_route().del(&cond).await {
981
        error!("[{}] del network_route error: {}", fn_name, e);
982
        return Err(ErrResp::ErrDb(Some(e.to_string())));
983
42
    }
984
42

            
985
42
    let cond = device_route::QueryCond {
986
42
        unit_id: Some(unit_id),
987
42
        ..Default::default()
988
42
    };
989
42
    if let Err(e) = state.model.device_route().del(&cond).await {
990
        error!("[{}] del device_route error: {}", fn_name, e);
991
        return Err(ErrResp::ErrDb(Some(e.to_string())));
992
42
    }
993
42

            
994
42
    let cond = dldata_buffer::QueryCond {
995
42
        unit_id: Some(unit_id),
996
42
        ..Default::default()
997
42
    };
998
42
    if let Err(e) = state.model.dldata_buffer().del(&cond).await {
999
        error!("[{}] del dldata_buffer error: {}", fn_name, e);
        return Err(ErrResp::ErrDb(Some(e.to_string())));
42
    }
42

            
42
    let cond = device::QueryCond {
42
        unit_id: Some(unit_id),
42
        ..Default::default()
42
    };
42
    if let Err(e) = state.model.device().del(&cond).await {
        error!("[{}] del device error: {}", fn_name, e);
        return Err(ErrResp::ErrDb(Some(e.to_string())));
42
    }
42

            
42
    let cond = network::QueryCond {
42
        unit_id: Some(Some(unit_id)),
42
        ..Default::default()
42
    };
42
    if let Err(e) = state.model.network().del(&cond).await {
        error!("[{}] del network error: {}", fn_name, e);
        return Err(ErrResp::ErrDb(Some(e.to_string())));
42
    }
42

            
42
    let cond = application::QueryCond {
42
        unit_id: Some(unit_id),
42
        ..Default::default()
42
    };
42
    if let Err(e) = state.model.application().del(&cond).await {
        error!("[{}] del application error: {}", fn_name, e);
        return Err(ErrResp::ErrDb(Some(e.to_string())));
42
    }
42

            
42
    let cond = QueryCond {
42
        unit_id: Some(unit_id),
42
        ..Default::default()
42
    };
42
    if let Err(e) = state.model.unit().del(&cond).await {
        error!("[{}] del unit error: {}", fn_name, e);
        return Err(ErrResp::ErrDb(Some(e.to_string())));
42
    }
42

            
42
    Ok(())
42
}
/// Send delete control message.
42
async fn send_del_ctrl_message(fn_name: &str, unit: Unit, state: &AppState) -> Result<(), ErrResp> {
42
    if state.cache.is_some() {
14
        let msg = SendCtrlMsg::DelUnit {
14
            operation: CtrlMsgOp::DEL_UNIT.to_string(),
14
            new: CtrlDelUnit {
14
                unit_id: unit.unit_id,
14
                unit_code: unit.code,
14
            },
14
        };
14
        let payload = match serde_json::to_vec(&msg) {
            Err(e) => {
                error!(
                    "[{}] marshal JSON for {} error: {}",
                    fn_name,
                    CtrlMsgOp::DEL_UNIT,
                    e
                );
                return Err(ErrResp::ErrRsc(Some(format!(
                    "marshal control message error: {}",
                    e
                ))));
            }
14
            Ok(payload) => payload,
14
        };
14
        let ctrl_sender = { state.ctrl_senders.unit.lock().unwrap().clone() };
14
        if let Err(e) = ctrl_sender.send_msg(payload).await {
            error!(
                "[{}] send control message for {} error: {}",
                fn_name,
                CtrlMsgOp::DEL_UNIT,
                e
            );
            return Err(ErrResp::ErrIntMsg(Some(format!(
                "send control message error: {}",
                e
            ))));
14
        }
28
    }
42
    Ok(())
42
}
#[async_trait]
impl QueueEventHandler for CtrlSenderHandler {
    async fn on_error(&self, queue: Arc<dyn GmqQueue>, err: Box<dyn StdError + Send + Sync>) {
        const FN_NAME: &'static str = "CtrlSenderHandler::on_error";
        let queue_name = queue.name();
        error!("[{}] {} error: {}", FN_NAME, queue_name, err);
    }
70
    async fn on_status(&self, queue: Arc<dyn GmqQueue>, status: Status) {
        const FN_NAME: &'static str = "CtrlSenderHandler::on_status";
70
        let queue_name = queue.name();
70
        match status {
30
            Status::Connected => info!("[{}] {} connected", queue_name, FN_NAME),
40
            _ => warn!("[{}] {} status to {:?}", FN_NAME, queue_name, status),
        }
140
    }
}
#[async_trait]
impl MessageHandler for CtrlSenderHandler {
    async fn on_message(&self, _queue: Arc<dyn GmqQueue>, _msg: Box<dyn Message>) {}
}
#[async_trait]
impl QueueEventHandler for CtrlReceiverHandler {
    async fn on_error(&self, queue: Arc<dyn GmqQueue>, err: Box<dyn StdError + Send + Sync>) {
        const FN_NAME: &'static str = "CtrlReceiverHandler::on_error";
        let queue_name = queue.name();
        error!("[{}] {} error: {}", FN_NAME, queue_name, err);
    }
60
    async fn on_status(&self, queue: Arc<dyn GmqQueue>, status: Status) {
        const FN_NAME: &'static str = "CtrlReceiverHandler::on_status";
60
        let queue_name = queue.name();
60
        match status {
30
            Status::Connected => info!("[{}] {} connected", queue_name, FN_NAME),
30
            _ => warn!("[{}] {} status to {:?}", FN_NAME, queue_name, status),
        }
120
    }
}
#[async_trait]
impl MessageHandler for CtrlReceiverHandler {
14
    async fn on_message(&self, queue: Arc<dyn GmqQueue>, msg: Box<dyn Message>) {
        const FN_NAME: &'static str = "CtrlReceiverHandler::on_message";
14
        let queue_name = queue.name();
14
        let ctrl_msg = match serde_json::from_slice::<RecvCtrlMsg>(msg.payload()) {
            Err(e) => {
                let src_str: String = String::from_utf8_lossy(msg.payload()).into();
                warn!(
                    "[{}] {} parse JSON error: {}, src: {}",
                    FN_NAME, queue_name, e, src_str
                );
                if let Err(e) = msg.ack().await {
                    error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
                }
                return;
            }
14
            Ok(msg) => msg,
14
        };
14
        match ctrl_msg {
14
            RecvCtrlMsg::DelUnit { new } => {
14
                if let Some(cache) = self.cache.as_ref() {
14
                    let cond = device_route::DelCacheQueryCond {
14
                        unit_code: new.unit_code.as_str(),
14
                        network_code: None,
14
                        network_addr: None,
14
                    };
14
                    if let Err(e) = cache.device_route().del_dldata(&cond).await {
                        error!(
                            "[{}] {} delete device route cache error: {}",
                            FN_NAME, queue_name, e
                        );
                    } else {
14
                        debug!("[{}] {} delete device route cache", FN_NAME, queue_name);
                    }
14
                    let cond = device_route::DelCachePubQueryCond {
14
                        unit_id: new.unit_id.as_str(),
14
                        device_id: None,
14
                    };
14
                    if let Err(e) = cache.device_route().del_dldata_pub(&cond).await {
                        error!(
                            "[{}] {} delete device route cache error: {}",
                            FN_NAME, queue_name, e
                        );
                    } else {
14
                        debug!("[{}] {} delete device route cache", FN_NAME, queue_name);
                    }
                }
14
                if let Err(e) = msg.ack().await {
                    error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
14
                }
            }
        }
28
    }
}