1
use std::{
2
    collections::HashMap,
3
    error::Error as StdError,
4
    io::{Error as IoError, ErrorKind},
5
    sync::{Arc, Mutex},
6
    time::Duration,
7
};
8

            
9
use async_trait::async_trait;
10
use axum::{
11
    body::{Body, Bytes},
12
    extract::State,
13
    http::{header, StatusCode},
14
    response::IntoResponse,
15
    Extension,
16
};
17
use chrono::Utc;
18
use log::{debug, error, info, warn};
19
use serde::{Deserialize, Serialize};
20
use serde_json::{self, Map};
21
use tokio::time;
22
use url::Url;
23

            
24
use general_mq::{
25
    queue::{EventHandler as QueueEventHandler, GmqQueue, Message, MessageHandler, Status},
26
    Queue,
27
};
28
use sylvia_iot_corelib::{
29
    constants::ContentType,
30
    err::ErrResp,
31
    http::{Json, Path, Query},
32
    role::Role,
33
    strings::{self, hex_addr_to_u128, time_str, u128_to_addr},
34
};
35

            
36
use super::{
37
    super::{
38
        super::{middleware::GetTokenInfoData, ErrReq, State as AppState},
39
        lib::{check_device, check_unit, gen_mgr_key},
40
    },
41
    request, response,
42
};
43
use crate::{
44
    libs::{
45
        config::BrokerCtrl as CfgCtrl,
46
        mq::{self, Connection},
47
    },
48
    models::{
49
        device::{
50
            self, Device, ListOptions, ListQueryCond, QueryCond, SortCond, SortKey,
51
            UpdateQueryCond, Updates,
52
        },
53
        device_route, dldata_buffer,
54
        network::{Network, QueryCond as NetworkQueryCond},
55
        Cache,
56
    },
57
};
58

            
59
#[derive(Deserialize, Serialize)]
60
#[serde(tag = "operation")]
61
enum RecvCtrlMsg {
62
    #[serde(rename = "del-device")]
63
    DelDevice { new: CtrlDelDevice },
64
    #[serde(rename = "del-device-bulk")]
65
    DelDeviceBulk { new: CtrlDelDeviceBulk },
66
}
67

            
68
/// Control message inside broker cluster.
69
#[derive(Serialize)]
70
#[serde(untagged)]
71
enum SendCtrlMsg {
72
    DelDevice {
73
        operation: String,
74
        new: CtrlDelDevice,
75
    },
76
    DelDeviceBulk {
77
        operation: String,
78
        new: CtrlDelDeviceBulk,
79
    },
80
}
81

            
82
struct CtrlMsgOp;
83

            
84
#[derive(Deserialize, Serialize)]
85
struct CtrlDelDevice {
86
    #[serde(rename = "unitId")]
87
    unit_id: String,
88
    #[serde(rename = "unitCode")]
89
    unit_code: Option<String>,
90
    #[serde(rename = "networkId")]
91
    network_id: String,
92
    #[serde(rename = "networkCode")]
93
    network_code: String,
94
    #[serde(rename = "networkAddr")]
95
    network_addr: String,
96
    #[serde(rename = "deviceId")]
97
    device_id: String,
98
}
99

            
100
#[derive(Deserialize, Serialize)]
101
struct CtrlDelDeviceBulk {
102
    #[serde(rename = "unitId")]
103
    unit_id: String,
104
    #[serde(rename = "unitCode")]
105
    unit_code: Option<String>,
106
    #[serde(rename = "networkId")]
107
    network_id: String,
108
    #[serde(rename = "networkCode")]
109
    network_code: String,
110
    #[serde(rename = "networkAddrs")]
111
    network_addrs: Vec<String>,
112
    #[serde(rename = "deviceIds")]
113
    device_ids: Vec<String>,
114
}
115

            
116
struct CtrlSenderHandler {
117
    cache: Option<Arc<dyn Cache>>,
118
}
119

            
120
struct CtrlReceiverHandler {
121
    cache: Option<Arc<dyn Cache>>,
122
}
123

            
124
/// Control message from broker to network servers.
125
#[derive(Serialize)]
126
#[serde(untagged)]
127
enum SendNetCtrlMsg {
128
    AddDevice {
129
        time: String,
130
        operation: String,
131
        new: NetCtrlAddr,
132
    },
133
    AddDeviceBulk {
134
        time: String,
135
        operation: String,
136
        new: NetCtrlAddrs,
137
    },
138
    AddDeviceRange {
139
        time: String,
140
        operation: String,
141
        new: NetCtrlAddrRange,
142
    },
143
    DelDevice {
144
        time: String,
145
        operation: String,
146
        new: NetCtrlAddr,
147
    },
148
    DelDeviceBulk {
149
        time: String,
150
        operation: String,
151
        new: NetCtrlAddrs,
152
    },
153
    DelDeviceRange {
154
        time: String,
155
        operation: String,
156
        new: NetCtrlAddrRange,
157
    },
158
}
159

            
160
struct NetCtrlMsgOp;
161

            
162
/// Shared structure to keep simple design.
163
#[derive(Serialize)]
164
struct NetCtrlAddr {
165
    #[serde(rename = "networkAddr")]
166
    network_addr: String,
167
}
168

            
169
/// Shared structure to keep simple design.
170
#[derive(Serialize)]
171
struct NetCtrlAddrs {
172
    #[serde(rename = "networkAddrs")]
173
    network_addrs: Vec<String>,
174
}
175

            
176
/// Shared structure to keep simple design.
177
#[derive(Serialize)]
178
struct NetCtrlAddrRange {
179
    #[serde(rename = "startAddr")]
180
    pub start_addr: String,
181
    #[serde(rename = "endAddr")]
182
    pub end_addr: String,
183
}
184

            
185
impl CtrlMsgOp {
186
    const DEL_DEVICE: &'static str = "del-device";
187
    const DEL_DEVICE_BULK: &'static str = "del-device-bulk";
188
}
189

            
190
impl NetCtrlMsgOp {
191
    const ADD_DEVICE: &'static str = "add-device";
192
    const ADD_DEVICE_BULK: &'static str = "add-device-bulk";
193
    const ADD_DEVICE_RANGE: &'static str = "add-device-range";
194
    const DEL_DEVICE: &'static str = "del-device";
195
    const DEL_DEVICE_BULK: &'static str = "del-device-bulk";
196
    const DEL_DEVICE_RANGE: &'static str = "del-device-range";
197
}
198

            
199
const BULK_MAX: usize = 1024;
200
const LIST_LIMIT_DEFAULT: u64 = 100;
201
const LIST_CURSOR_MAX: u64 = 100;
202
const ID_RAND_LEN: usize = 8;
203
const CTRL_QUEUE_NAME: &'static str = "device";
204

            
205
/// Initialize channels.
206
15
pub async fn init(state: &AppState, ctrl_conf: &CfgCtrl) -> Result<(), Box<dyn StdError>> {
207
    const FN_NAME: &'static str = "init";
208

            
209
15
    let q = new_ctrl_receiver(state, ctrl_conf)?;
210
15
    {
211
15
        state
212
15
            .ctrl_receivers
213
15
            .lock()
214
15
            .unwrap()
215
15
            .insert(CTRL_QUEUE_NAME.to_string(), q.clone());
216
15
    }
217
15

            
218
15
    let ctrl_sender = { state.ctrl_senders.device.lock().unwrap().clone() };
219
    // Wait for connected.
220
1382
    for _ in 0..500 {
221
1382
        if ctrl_sender.status() == Status::Connected && q.status() == Status::Connected {
222
15
            break;
223
1367
        }
224
1367
        time::sleep(Duration::from_millis(10)).await;
225
    }
226
15
    if ctrl_sender.status() != Status::Connected {
227
        error!(
228
            "[{}] {} control sender not connected",
229
            FN_NAME, CTRL_QUEUE_NAME
230
        );
231
        return Err(Box::new(IoError::new(
232
            ErrorKind::NotConnected,
233
            format!("control sender {} not connected", CTRL_QUEUE_NAME),
234
        )));
235
15
    }
236
15
    if q.status() != Status::Connected {
237
        error!(
238
            "[{}] {} control receiver not connected",
239
            FN_NAME, CTRL_QUEUE_NAME
240
        );
241
        return Err(Box::new(IoError::new(
242
            ErrorKind::NotConnected,
243
            format!("control receiver {} not connected", CTRL_QUEUE_NAME),
244
        )));
245
15
    }
246
15

            
247
15
    Ok(())
248
15
}
249

            
250
/// Create control channel sender queue.
251
15
pub fn new_ctrl_sender(
252
15
    conn_pool: &Arc<Mutex<HashMap<String, Connection>>>,
253
15
    config: &CfgCtrl,
254
15
    cache: Option<Arc<dyn Cache>>,
255
15
) -> Result<Arc<Mutex<Queue>>, Box<dyn StdError>> {
256
15
    let url = match config.url.as_ref() {
257
        None => {
258
            return Err(Box::new(IoError::new(
259
                ErrorKind::InvalidInput,
260
                "empty control url",
261
            )))
262
        }
263
15
        Some(url) => match Url::parse(url.as_str()) {
264
            Err(e) => return Err(Box::new(e)),
265
15
            Ok(url) => url,
266
15
        },
267
15
    };
268
15

            
269
15
    match mq::control::new(
270
15
        conn_pool.clone(),
271
15
        &url,
272
15
        config.prefetch,
273
15
        CTRL_QUEUE_NAME,
274
15
        false,
275
15
        Arc::new(CtrlSenderHandler {
276
15
            cache: cache.clone(),
277
15
        }),
278
15
        Arc::new(CtrlSenderHandler { cache }),
279
15
    ) {
280
        Err(e) => Err(Box::new(IoError::new(ErrorKind::InvalidInput, e))),
281
15
        Ok(q) => Ok(Arc::new(Mutex::new(q))),
282
    }
283
15
}
284

            
285
/// Create control channel receiver queue.
286
15
pub fn new_ctrl_receiver(state: &AppState, config: &CfgCtrl) -> Result<Queue, Box<dyn StdError>> {
287
15
    let url = match config.url.as_ref() {
288
        None => {
289
            return Err(Box::new(IoError::new(
290
                ErrorKind::InvalidInput,
291
                "empty control url",
292
            )))
293
        }
294
15
        Some(url) => match Url::parse(url.as_str()) {
295
            Err(e) => return Err(Box::new(e)),
296
15
            Ok(url) => url,
297
15
        },
298
15
    };
299
15
    let handler = Arc::new(CtrlReceiverHandler {
300
15
        cache: state.cache.clone(),
301
15
    });
302
15
    match mq::control::new(
303
15
        state.mq_conns.clone(),
304
15
        &url,
305
15
        config.prefetch,
306
15
        CTRL_QUEUE_NAME,
307
15
        true,
308
15
        handler.clone(),
309
15
        handler,
310
15
    ) {
311
        Err(e) => Err(Box::new(IoError::new(ErrorKind::InvalidInput, e))),
312
15
        Ok(q) => Ok(q),
313
    }
314
15
}
315

            
316
/// `POST /{base}/api/v1/device`
317
105
pub async fn post_device(
318
105
    State(state): State<AppState>,
319
105
    Extension(token_info): Extension<GetTokenInfoData>,
320
105
    Json(mut body): Json<request::PostDeviceBody>,
321
105
) -> impl IntoResponse {
322
    const FN_NAME: &'static str = "post_device";
323

            
324
105
    let user_id = token_info.user_id.as_str();
325
105
    let roles = &token_info.roles;
326
105

            
327
105
    body.data.network_addr = body.data.network_addr.trim().to_lowercase();
328
105
    if body.data.unit_id.len() == 0 {
329
3
        return Err(ErrResp::ErrParam(Some(
330
3
            "`unitId` must with at least one character".to_string(),
331
3
        )));
332
102
    } else if body.data.network_id.len() == 0 {
333
3
        return Err(ErrResp::ErrParam(Some(
334
3
            "`networkId` must with at least one character".to_string(),
335
3
        )));
336
99
    } else if body.data.network_addr.len() == 0 {
337
3
        return Err(ErrResp::ErrParam(Some(
338
3
            "`networkAddr` must with at least one character".to_string(),
339
3
        )));
340
96
    }
341
96
    if let Some(profile) = body.data.profile.as_ref() {
342
15
        let profile = profile.to_lowercase();
343
15
        if profile.len() > 0 && !strings::is_code(profile.as_str()) {
344
3
            return Err(ErrResp::ErrParam(Some(
345
3
                "`profile` must be [A-Za-z0-9]{1}[A-Za-z0-9-_]*".to_string(),
346
3
            )));
347
12
        }
348
81
    }
349
93
    let unit_id = body.data.unit_id.as_str();
350
93
    let unit = match check_unit(FN_NAME, user_id, roles, unit_id, true, &state).await? {
351
        None => {
352
6
            return Err(ErrResp::Custom(
353
6
                ErrReq::UNIT_NOT_EXIST.0,
354
6
                ErrReq::UNIT_NOT_EXIST.1,
355
6
                None,
356
6
            ));
357
        }
358
87
        Some(unit) => unit,
359
87
    };
360
87
    let network_id = body.data.network_id.as_str();
361
87
    let network = match check_network(FN_NAME, unit_id, network_id, roles, &state).await? {
362
        None => {
363
9
            return Err(ErrResp::Custom(
364
9
                ErrReq::NETWORK_NOT_EXIST.0,
365
9
                ErrReq::NETWORK_NOT_EXIST.1,
366
9
                None,
367
9
            ));
368
        }
369
78
        Some(network) => network,
370
78
    };
371
78
    let network_addr = body.data.network_addr.as_str();
372
78
    if check_addr(FN_NAME, network_id, network_addr, &state)
373
78
        .await?
374
78
        .is_some()
375
    {
376
6
        return Err(ErrResp::Custom(
377
6
            ErrReq::NETWORK_ADDR_EXIST.0,
378
6
            ErrReq::NETWORK_ADDR_EXIST.1,
379
6
            None,
380
6
        ));
381
72
    }
382
72

            
383
72
    let now = Utc::now();
384
72
    let device_id = strings::random_id(&now, ID_RAND_LEN);
385
72
    let device = Device {
386
72
        device_id: device_id.clone(),
387
72
        unit_id: unit.unit_id,
388
72
        unit_code: match network.unit_id.as_ref() {
389
21
            None => None,
390
51
            Some(_) => Some(unit.code),
391
        },
392
72
        network_id: network.network_id,
393
72
        network_code: network.code.clone(),
394
72
        network_addr: body.data.network_addr,
395
72
        created_at: now,
396
72
        modified_at: now,
397
72
        profile: match body.data.profile.as_ref() {
398
60
            None => "".to_string(),
399
12
            Some(profile) => profile.to_lowercase(),
400
        },
401
72
        name: match body.data.name.as_ref() {
402
69
            None => "".to_string(),
403
3
            Some(name) => name.clone(),
404
        },
405
72
        info: match body.data.info.as_ref() {
406
69
            None => Map::new(),
407
3
            Some(info) => info.clone(),
408
        },
409
    };
410
72
    if let Err(e) = state.model.device().add(&device).await {
411
        error!("[{}] add error: {}", FN_NAME, e);
412
        return Err(ErrResp::ErrDb(Some(e.to_string())));
413
72
    }
414
72

            
415
72
    // Clear the not-exist device in cache.
416
72
    if state.cache.is_some() {
417
24
        let msg = SendCtrlMsg::DelDevice {
418
24
            operation: CtrlMsgOp::DEL_DEVICE.to_string(),
419
24
            new: CtrlDelDevice {
420
24
                unit_id: device.unit_id,
421
24
                unit_code: device.unit_code,
422
24
                network_id: device.network_id,
423
24
                network_code: device.network_code,
424
24
                network_addr: device.network_addr.clone(),
425
24
                device_id: device.device_id,
426
24
            },
427
24
        };
428
24
        send_del_ctrl_message(FN_NAME, &msg, &state).await?;
429
48
    }
430

            
431
    // Send message to the device's network server.
432
72
    let mgr_key = match network.unit_code.as_ref() {
433
21
        None => gen_mgr_key("", network.code.as_str()),
434
51
        Some(unit_code) => gen_mgr_key(unit_code.as_str(), network.code.as_str()),
435
    };
436
72
    let msg_op = NetCtrlMsgOp::ADD_DEVICE;
437
72
    let msg = SendNetCtrlMsg::AddDevice {
438
72
        time: time_str(&Utc::now()),
439
72
        operation: msg_op.to_string(),
440
72
        new: NetCtrlAddr {
441
72
            network_addr: device.network_addr,
442
72
        },
443
72
    };
444
72
    let _ = send_net_ctrl_message(FN_NAME, &msg, msg_op, &state, &mgr_key).await;
445

            
446
72
    Ok(Json(response::PostDevice {
447
72
        data: response::PostDeviceData { device_id },
448
72
    }))
449
105
}
450

            
451
/// `POST /{base}/api/v1/device/bulk`
452
63
pub async fn post_device_bulk(
453
63
    State(state): State<AppState>,
454
63
    Extension(token_info): Extension<GetTokenInfoData>,
455
63
    Json(mut body): Json<request::PostDeviceBulkBody>,
456
63
) -> impl IntoResponse {
457
    const FN_NAME: &'static str = "post_device_bulk";
458

            
459
63
    let user_id = token_info.user_id.as_str();
460
63
    let roles = &token_info.roles;
461
63

            
462
63
    if body.data.unit_id.len() == 0 {
463
3
        return Err(ErrResp::ErrParam(Some(
464
3
            "`unitId` must with at least one character".to_string(),
465
3
        )));
466
60
    } else if body.data.network_id.len() == 0 {
467
3
        return Err(ErrResp::ErrParam(Some(
468
3
            "`networkId` must with at least one character".to_string(),
469
3
        )));
470
57
    } else if body.data.network_addrs.len() == 0 {
471
3
        return Err(ErrResp::ErrParam(Some(
472
3
            "`networkAddrs` must with at least one address".to_string(),
473
3
        )));
474
54
    } else if body.data.network_addrs.len() > BULK_MAX {
475
3
        return Err(ErrResp::ErrParam(Some(format!(
476
3
            "`networkAddrs` cannot more than {}",
477
3
            BULK_MAX
478
3
        ))));
479
51
    }
480
51
    if let Some(profile) = body.data.profile.as_ref() {
481
9
        let profile = profile.to_lowercase();
482
9
        if profile.len() > 0 && !strings::is_code(profile.as_str()) {
483
3
            return Err(ErrResp::ErrParam(Some(
484
3
                "`profile` must be [A-Za-z0-9]{1}[A-Za-z0-9-_]*".to_string(),
485
3
            )));
486
6
        }
487
42
    }
488
48
    let mut addrs = vec![];
489
18462
    for addr in body.data.network_addrs.iter() {
490
18462
        let addr = addr.trim().to_lowercase();
491
18462
        if addr.len() == 0 {
492
3
            return Err(ErrResp::ErrParam(Some(
493
3
                "`networkAddrs` must be non-empty address array".to_string(),
494
3
            )));
495
18459
        }
496
18459
        addrs.push(addr);
497
    }
498
45
    body.data.network_addrs = addrs;
499
45
    let unit_id = body.data.unit_id.as_str();
500
45
    let unit = match check_unit(FN_NAME, user_id, roles, unit_id, true, &state).await? {
501
        None => {
502
6
            return Err(ErrResp::Custom(
503
6
                ErrReq::UNIT_NOT_EXIST.0,
504
6
                ErrReq::UNIT_NOT_EXIST.1,
505
6
                None,
506
6
            ));
507
        }
508
39
        Some(unit) => unit,
509
39
    };
510
39
    let network_id = body.data.network_id.as_str();
511
39
    let network = match check_network(FN_NAME, unit_id, network_id, roles, &state).await? {
512
        None => {
513
9
            return Err(ErrResp::Custom(
514
9
                ErrReq::NETWORK_NOT_EXIST.0,
515
9
                ErrReq::NETWORK_NOT_EXIST.1,
516
9
                None,
517
9
            ));
518
        }
519
30
        Some(network) => network,
520
30
    };
521
30

            
522
30
    let mut devices = vec![];
523
18444
    for network_addr in body.data.network_addrs.iter() {
524
18444
        let now = Utc::now();
525
18444
        let device = Device {
526
18444
            device_id: strings::random_id(&now, ID_RAND_LEN),
527
18444
            unit_id: unit.unit_id.clone(),
528
18444
            unit_code: match network.unit_id.as_ref() {
529
9219
                None => None,
530
9225
                Some(_) => Some(unit.code.clone()),
531
            },
532
18444
            network_id: network.network_id.clone(),
533
18444
            network_code: network.code.clone(),
534
18444
            network_addr: network_addr.clone(),
535
18444
            created_at: now,
536
18444
            modified_at: now,
537
18444
            profile: match body.data.profile.as_ref() {
538
12300
                None => "".to_string(),
539
6144
                Some(profile) => profile.to_lowercase(),
540
            },
541
18444
            name: network_addr.clone(),
542
18444
            info: Map::new(),
543
18444
        };
544
18444
        devices.push(device);
545
    }
546
30
    if let Err(e) = state.model.device().add_bulk(&devices).await {
547
        error!("[{}] add error: {}", FN_NAME, e);
548
        return Err(ErrResp::ErrDb(Some(e.to_string())));
549
30
    }
550
30

            
551
30
    if state.cache.is_some() {
552
10
        let msg = SendCtrlMsg::DelDeviceBulk {
553
10
            operation: CtrlMsgOp::DEL_DEVICE_BULK.to_string(),
554
10
            new: CtrlDelDeviceBulk {
555
10
                unit_id: unit.unit_id,
556
10
                unit_code: network.unit_code.clone(),
557
10
                network_id: network.network_id,
558
10
                network_code: network.code.clone(),
559
10
                network_addrs: body.data.network_addrs.clone(),
560
6148
                device_ids: devices.iter().map(|x| x.device_id.clone()).collect(),
561
10
            },
562
10
        };
563
10
        send_del_ctrl_message(FN_NAME, &msg, &state).await?;
564
20
    }
565

            
566
    // Send message to the device's network server.
567
30
    let mgr_key = match network.unit_code.as_ref() {
568
12
        None => gen_mgr_key("", network.code.as_str()),
569
18
        Some(unit_code) => gen_mgr_key(unit_code.as_str(), network.code.as_str()),
570
    };
571
30
    let msg_op = NetCtrlMsgOp::ADD_DEVICE_BULK;
572
30
    let msg = SendNetCtrlMsg::AddDeviceBulk {
573
30
        time: time_str(&Utc::now()),
574
30
        operation: msg_op.to_string(),
575
30
        new: NetCtrlAddrs {
576
30
            network_addrs: body.data.network_addrs,
577
30
        },
578
30
    };
579
30
    let _ = send_net_ctrl_message(FN_NAME, &msg, msg_op, &state, &mgr_key).await;
580

            
581
30
    Ok(StatusCode::NO_CONTENT)
582
63
}
583

            
584
/// `POST /{base}/api/v1/device/bulk-delete`
585
39
pub async fn post_device_bulk_del(
586
39
    State(state): State<AppState>,
587
39
    Extension(token_info): Extension<GetTokenInfoData>,
588
39
    Json(mut body): Json<request::PostDeviceBulkBody>,
589
39
) -> impl IntoResponse {
590
    const FN_NAME: &'static str = "post_device_bulk_del";
591

            
592
39
    let user_id = token_info.user_id.as_str();
593
39
    let roles = &token_info.roles;
594
39

            
595
39
    if body.data.unit_id.len() == 0 {
596
3
        return Err(ErrResp::ErrParam(Some(
597
3
            "`unitId` must with at least one character".to_string(),
598
3
        )));
599
36
    } else if body.data.network_id.len() == 0 {
600
3
        return Err(ErrResp::ErrParam(Some(
601
3
            "`networkId` must with at least one character".to_string(),
602
3
        )));
603
33
    } else if body.data.network_addrs.len() == 0 {
604
3
        return Err(ErrResp::ErrParam(Some(
605
3
            "`networkAddrs` must with at least one address".to_string(),
606
3
        )));
607
30
    } else if body.data.network_addrs.len() > BULK_MAX {
608
3
        return Err(ErrResp::ErrParam(Some(format!(
609
3
            "`networkAddrs` cannot more than {}",
610
3
            BULK_MAX
611
3
        ))));
612
27
    }
613
27
    let mut addrs = vec![];
614
6165
    for addr in body.data.network_addrs.iter() {
615
6165
        let addr = addr.trim().to_lowercase();
616
6165
        if addr.len() == 0 {
617
3
            return Err(ErrResp::ErrParam(Some(
618
3
                "`networkAddrs` must be non-empty address array".to_string(),
619
3
            )));
620
6162
        }
621
6162
        addrs.push(addr);
622
    }
623
24
    body.data.network_addrs = addrs;
624
24
    let unit_id = body.data.unit_id.as_str();
625
24
    if check_unit(FN_NAME, user_id, roles, unit_id, true, &state)
626
24
        .await?
627
24
        .is_none()
628
    {
629
6
        return Err(ErrResp::Custom(
630
6
            ErrReq::UNIT_NOT_EXIST.0,
631
6
            ErrReq::UNIT_NOT_EXIST.1,
632
6
            None,
633
6
        ));
634
18
    }
635
18
    let network_id = body.data.network_id.as_str();
636
18
    let network = match check_network(FN_NAME, unit_id, network_id, roles, &state).await? {
637
        None => {
638
9
            return Err(ErrResp::Custom(
639
9
                ErrReq::NETWORK_NOT_EXIST.0,
640
9
                ErrReq::NETWORK_NOT_EXIST.1,
641
9
                None,
642
9
            ));
643
        }
644
9
        Some(network) => network,
645
9
    };
646
9

            
647
9
    del_device_rsc_bulk(FN_NAME, &body.data, &network, &state).await?;
648

            
649
    // Send message to the device's network server.
650
9
    let mgr_key = match network.unit_code.as_ref() {
651
3
        None => gen_mgr_key("", network.code.as_str()),
652
6
        Some(unit_code) => gen_mgr_key(unit_code.as_str(), network.code.as_str()),
653
    };
654
9
    let msg_op = NetCtrlMsgOp::DEL_DEVICE_BULK;
655
9
    let msg = SendNetCtrlMsg::DelDeviceBulk {
656
9
        time: time_str(&Utc::now()),
657
9
        operation: msg_op.to_string(),
658
9
        new: NetCtrlAddrs {
659
9
            network_addrs: body.data.network_addrs,
660
9
        },
661
9
    };
662
9
    let _ = send_net_ctrl_message(FN_NAME, &msg, msg_op, &state, &mgr_key).await;
663

            
664
9
    Ok(StatusCode::NO_CONTENT)
665
39
}
666

            
667
/// `POST /{base}/api/v1/device/range`
668
78
pub async fn post_device_range(
669
78
    State(state): State<AppState>,
670
78
    Extension(token_info): Extension<GetTokenInfoData>,
671
78
    Json(mut body): Json<request::PostDeviceRangeBody>,
672
78
) -> impl IntoResponse {
673
    const FN_NAME: &'static str = "post_device_range";
674

            
675
78
    let user_id = token_info.user_id.as_str();
676
78
    let roles = &token_info.roles;
677
78

            
678
78
    body.data.start_addr = body.data.start_addr.trim().to_lowercase();
679
78
    body.data.end_addr = body.data.end_addr.trim().to_lowercase();
680
78
    if body.data.unit_id.len() == 0 {
681
3
        return Err(ErrResp::ErrParam(Some(
682
3
            "`unitId` must with at least one character".to_string(),
683
3
        )));
684
75
    } else if body.data.network_id.len() == 0 {
685
3
        return Err(ErrResp::ErrParam(Some(
686
3
            "`networkId` must with at least one character".to_string(),
687
3
        )));
688
72
    } else if body.data.start_addr.len() == 0
689
69
        || body.data.start_addr.len() != body.data.end_addr.len()
690
    {
691
9
        return Err(ErrResp::ErrParam(Some(
692
9
            "`startAddr` and `endAddr` must have at least one character and with the same length"
693
9
                .to_string(),
694
9
        )));
695
63
    }
696
63
    if let Some(profile) = body.data.profile.as_ref() {
697
9
        let profile = profile.to_lowercase();
698
9
        if profile.len() > 0 && !strings::is_code(profile.as_str()) {
699
3
            return Err(ErrResp::ErrParam(Some(
700
3
                "`profile` must be [A-Za-z0-9]{1}[A-Za-z0-9-_]*".to_string(),
701
3
            )));
702
6
        }
703
6
        body.data.profile = Some(profile);
704
54
    }
705
60
    let start_addr = match hex_addr_to_u128(body.data.start_addr.as_str()) {
706
6
        Err(e) => return Err(ErrResp::ErrParam(Some(e.to_string()))),
707
54
        Ok(addr) => addr,
708
    };
709
54
    let mut end_addr = match hex_addr_to_u128(body.data.end_addr.as_str()) {
710
3
        Err(e) => return Err(ErrResp::ErrParam(Some(e.to_string()))),
711
51
        Ok(addr) => addr,
712
51
    };
713
51
    if start_addr > end_addr {
714
3
        return Err(ErrResp::ErrParam(Some(
715
3
            "`startAddr` cannot larger than `endAddr`".to_string(),
716
3
        )));
717
48
    } else if (end_addr - start_addr) as usize >= BULK_MAX {
718
3
        return Err(ErrResp::ErrParam(Some(format!(
719
3
            "numbers between `startAddr` and `endAddr` cannot more than {}",
720
3
            BULK_MAX
721
3
        ))));
722
45
    }
723
45

            
724
45
    let unit_id = body.data.unit_id.as_str();
725
45
    let unit = match check_unit(FN_NAME, user_id, roles, unit_id, true, &state).await? {
726
        None => {
727
6
            return Err(ErrResp::Custom(
728
6
                ErrReq::UNIT_NOT_EXIST.0,
729
6
                ErrReq::UNIT_NOT_EXIST.1,
730
6
                None,
731
6
            ));
732
        }
733
39
        Some(unit) => unit,
734
39
    };
735
39
    let network_id = body.data.network_id.as_str();
736
39
    let network = match check_network(FN_NAME, unit_id, network_id, roles, &state).await? {
737
        None => {
738
9
            return Err(ErrResp::Custom(
739
9
                ErrReq::NETWORK_NOT_EXIST.0,
740
9
                ErrReq::NETWORK_NOT_EXIST.1,
741
9
                None,
742
9
            ));
743
        }
744
30
        Some(network) => network,
745
30
    };
746
30

            
747
30
    let mut devices = vec![];
748
30
    end_addr += 1;
749
30
    let addr_len = body.data.start_addr.len();
750
18444
    for addr in start_addr..end_addr {
751
18444
        let now = Utc::now();
752
18444
        let network_addr = u128_to_addr(addr, addr_len);
753
18444
        let device = Device {
754
18444
            device_id: strings::random_id(&now, ID_RAND_LEN),
755
18444
            unit_id: unit.unit_id.clone(),
756
18444
            unit_code: match network.unit_id.as_ref() {
757
9219
                None => None,
758
9225
                Some(_) => Some(unit.code.clone()),
759
            },
760
18444
            network_id: network.network_id.clone(),
761
18444
            network_code: network.code.clone(),
762
18444
            network_addr: network_addr.clone(),
763
18444
            created_at: now,
764
18444
            modified_at: now,
765
18444
            profile: match body.data.profile.as_ref() {
766
12300
                None => "".to_string(),
767
6144
                Some(profile) => profile.clone(),
768
            },
769
18444
            name: network_addr,
770
18444
            info: Map::new(),
771
18444
        };
772
18444
        devices.push(device);
773
    }
774
30
    if let Err(e) = state.model.device().add_bulk(&devices).await {
775
        error!("[{}] add error: {}", FN_NAME, e);
776
        return Err(ErrResp::ErrDb(Some(e.to_string())));
777
30
    }
778
30

            
779
30
    if state.cache.is_some() {
780
10
        let msg = SendCtrlMsg::DelDeviceBulk {
781
10
            operation: CtrlMsgOp::DEL_DEVICE_BULK.to_string(),
782
10
            new: CtrlDelDeviceBulk {
783
10
                unit_id: unit.unit_id,
784
10
                unit_code: network.unit_code.clone(),
785
10
                network_id: network.network_id,
786
10
                network_code: network.code.clone(),
787
6148
                network_addrs: devices.iter().map(|x| x.network_addr.clone()).collect(),
788
6148
                device_ids: devices.iter().map(|x| x.device_id.clone()).collect(),
789
10
            },
790
10
        };
791
10
        send_del_ctrl_message(FN_NAME, &msg, &state).await?;
792
20
    }
793

            
794
    // Send message to the device's network server.
795
30
    let mgr_key = match network.unit_code.as_ref() {
796
12
        None => gen_mgr_key("", network.code.as_str()),
797
18
        Some(unit_code) => gen_mgr_key(unit_code.as_str(), network.code.as_str()),
798
    };
799
30
    let msg_op = NetCtrlMsgOp::ADD_DEVICE_RANGE;
800
30
    let msg = SendNetCtrlMsg::AddDeviceRange {
801
30
        time: time_str(&Utc::now()),
802
30
        operation: msg_op.to_string(),
803
30
        new: NetCtrlAddrRange {
804
30
            start_addr: body.data.start_addr,
805
30
            end_addr: body.data.end_addr,
806
30
        },
807
30
    };
808
30
    let _ = send_net_ctrl_message(FN_NAME, &msg, msg_op, &state, &mgr_key).await;
809

            
810
30
    Ok(StatusCode::NO_CONTENT)
811
78
}
812

            
813
/// `POST /{base}/api/v1/device/range-delete`
814
54
pub async fn post_device_range_del(
815
54
    State(state): State<AppState>,
816
54
    Extension(token_info): Extension<GetTokenInfoData>,
817
54
    Json(mut body): Json<request::PostDeviceRangeBody>,
818
54
) -> impl IntoResponse {
819
    const FN_NAME: &'static str = "post_device_range_del";
820

            
821
54
    let user_id = token_info.user_id.as_str();
822
54
    let roles = &token_info.roles;
823
54

            
824
54
    body.data.start_addr = body.data.start_addr.trim().to_lowercase();
825
54
    body.data.end_addr = body.data.end_addr.trim().to_lowercase();
826
54
    if body.data.unit_id.len() == 0 {
827
3
        return Err(ErrResp::ErrParam(Some(
828
3
            "`unitId` must with at least one character".to_string(),
829
3
        )));
830
51
    } else if body.data.network_id.len() == 0 {
831
3
        return Err(ErrResp::ErrParam(Some(
832
3
            "`networkId` must with at least one character".to_string(),
833
3
        )));
834
48
    } else if body.data.start_addr.len() == 0
835
45
        || body.data.start_addr.len() != body.data.end_addr.len()
836
    {
837
9
        return Err(ErrResp::ErrParam(Some(
838
9
            "`startAddr` and `endAddr` must have at least one character and with the same length"
839
9
                .to_string(),
840
9
        )));
841
39
    }
842
39
    let start_addr = match hex_addr_to_u128(body.data.start_addr.as_str()) {
843
6
        Err(e) => return Err(ErrResp::ErrParam(Some(e.to_string()))),
844
33
        Ok(addr) => addr,
845
    };
846
33
    let mut end_addr = match hex_addr_to_u128(body.data.end_addr.as_str()) {
847
3
        Err(e) => return Err(ErrResp::ErrParam(Some(e.to_string()))),
848
30
        Ok(addr) => addr,
849
30
    };
850
30
    if start_addr > end_addr {
851
3
        return Err(ErrResp::ErrParam(Some(
852
3
            "`startAddr` cannot larger than `endAddr`".to_string(),
853
3
        )));
854
27
    } else if (end_addr - start_addr) as usize >= BULK_MAX {
855
3
        return Err(ErrResp::ErrParam(Some(format!(
856
3
            "numbers between `startAddr` and `endAddr` cannot more than {}",
857
3
            BULK_MAX
858
3
        ))));
859
24
    }
860
24

            
861
24
    let unit_id = body.data.unit_id.as_str();
862
24
    if check_unit(FN_NAME, user_id, roles, unit_id, true, &state)
863
24
        .await?
864
24
        .is_none()
865
    {
866
6
        return Err(ErrResp::Custom(
867
6
            ErrReq::UNIT_NOT_EXIST.0,
868
6
            ErrReq::UNIT_NOT_EXIST.1,
869
6
            None,
870
6
        ));
871
18
    }
872
18
    let network_id = body.data.network_id.as_str();
873
18
    let network = match check_network(FN_NAME, unit_id, network_id, roles, &state).await? {
874
        None => {
875
9
            return Err(ErrResp::Custom(
876
9
                ErrReq::NETWORK_NOT_EXIST.0,
877
9
                ErrReq::NETWORK_NOT_EXIST.1,
878
9
                None,
879
9
            ));
880
        }
881
9
        Some(network) => network,
882
9
    };
883
9

            
884
9
    let mut network_addrs = vec![];
885
9
    end_addr += 1;
886
9
    let addr_len = body.data.start_addr.len();
887
6147
    for addr in start_addr..end_addr {
888
6147
        network_addrs.push(u128_to_addr(addr, addr_len));
889
6147
    }
890
9
    let rm_cond = request::PostDeviceBulkData {
891
9
        unit_id: body.data.unit_id.clone(),
892
9
        network_id: body.data.network_id.clone(),
893
9
        network_addrs,
894
9
        profile: None,
895
9
    };
896
9

            
897
9
    del_device_rsc_bulk(FN_NAME, &rm_cond, &network, &state).await?;
898

            
899
    // Send message to the device's network server.
900
9
    let mgr_key = match network.unit_code.as_ref() {
901
3
        None => gen_mgr_key("", network.code.as_str()),
902
6
        Some(unit_code) => gen_mgr_key(unit_code.as_str(), network.code.as_str()),
903
    };
904
9
    let msg_op = NetCtrlMsgOp::DEL_DEVICE_RANGE;
905
9
    let msg = SendNetCtrlMsg::DelDeviceRange {
906
9
        time: time_str(&Utc::now()),
907
9
        operation: msg_op.to_string(),
908
9
        new: NetCtrlAddrRange {
909
9
            start_addr: body.data.start_addr,
910
9
            end_addr: body.data.end_addr,
911
9
        },
912
9
    };
913
9
    let _ = send_net_ctrl_message(FN_NAME, &msg, msg_op, &state, &mgr_key).await;
914

            
915
9
    Ok(StatusCode::NO_CONTENT)
916
54
}
917

            
918
/// `GET /{base}/api/v1/device/count`
919
105
pub async fn get_device_count(
920
105
    State(state): State<AppState>,
921
105
    Extension(token_info): Extension<GetTokenInfoData>,
922
105
    Query(mut query): Query<request::GetDeviceCountQuery>,
923
105
) -> impl IntoResponse {
924
    const FN_NAME: &'static str = "get_device_count";
925

            
926
105
    let user_id = token_info.user_id.as_str();
927
105
    let roles = &token_info.roles;
928
105

            
929
105
    if !Role::is_role(roles, Role::ADMIN) && !Role::is_role(roles, Role::MANAGER) {
930
54
        match query.unit.as_ref() {
931
3
            None => return Err(ErrResp::ErrParam(Some("missing `unit`".to_string()))),
932
51
            Some(unit_id) => {
933
51
                if unit_id.len() == 0 {
934
3
                    return Err(ErrResp::ErrParam(Some("missing `unit`".to_string())));
935
48
                }
936
            }
937
        }
938
51
    }
939
99
    let unit_cond = match query.unit.as_ref() {
940
18
        None => None,
941
81
        Some(unit_id) => match unit_id.len() {
942
6
            0 => None,
943
            _ => {
944
75
                match check_unit(FN_NAME, user_id, roles, unit_id.as_str(), false, &state).await? {
945
                    None => {
946
9
                        return Err(ErrResp::Custom(
947
9
                            ErrReq::UNIT_NOT_EXIST.0,
948
9
                            ErrReq::UNIT_NOT_EXIST.1,
949
9
                            None,
950
9
                        ))
951
                    }
952
66
                    Some(_) => Some(unit_id.as_str()),
953
                }
954
            }
955
        },
956
    };
957
90
    if let Some(addr) = query.addr.as_ref() {
958
27
        query.addr = Some(addr.trim().to_lowercase());
959
63
    }
960
90
    if let Some(profile) = query.profile.as_ref() {
961
12
        query.profile = Some((*profile).to_lowercase());
962
78
    }
963
90
    let mut name_contains_cond = None;
964
90
    if let Some(contains) = query.contains.as_ref() {
965
9
        if contains.len() > 0 {
966
9
            name_contains_cond = Some(contains.as_str());
967
9
        }
968
81
    }
969
90
    let cond = ListQueryCond {
970
90
        unit_id: unit_cond,
971
90
        network_code: match query.network.as_ref() {
972
66
            None => None,
973
24
            Some(network) => match network.len() {
974
3
                0 => None,
975
21
                _ => Some(network.as_ref()),
976
            },
977
        },
978
90
        network_addr: match query.addr.as_ref() {
979
63
            None => None,
980
27
            Some(addr) => match addr.len() {
981
3
                0 => None,
982
24
                _ => Some(addr.as_ref()),
983
            },
984
        },
985
90
        profile: match query.profile.as_ref() {
986
78
            None => None,
987
12
            Some(profile) => match profile.len() {
988
3
                0 => None,
989
9
                _ => Some(profile.as_str()),
990
            },
991
        },
992
90
        name_contains: name_contains_cond,
993
90
        ..Default::default()
994
90
    };
995
90
    match state.model.device().count(&cond).await {
996
        Err(e) => {
997
            error!("[{}] count error: {}", FN_NAME, e);
998
            Err(ErrResp::ErrDb(Some(e.to_string())))
999
        }
90
        Ok(count) => Ok(Json(response::GetDeviceCount {
90
            data: response::GetCountData { count },
90
        })),
    }
105
}
/// `GET /{base}/api/v1/device/list`
219
pub async fn get_device_list(
219
    State(state): State<AppState>,
219
    Extension(token_info): Extension<GetTokenInfoData>,
219
    Query(mut query): Query<request::GetDeviceListQuery>,
219
) -> impl IntoResponse {
    const FN_NAME: &'static str = "get_device_list";
219
    let user_id = token_info.user_id.as_str();
219
    let roles = &token_info.roles;
219

            
219
    if !Role::is_role(roles, Role::ADMIN) && !Role::is_role(roles, Role::MANAGER) {
54
        match query.unit.as_ref() {
3
            None => return Err(ErrResp::ErrParam(Some("missing `unit`".to_string()))),
51
            Some(unit_id) => {
51
                if unit_id.len() == 0 {
3
                    return Err(ErrResp::ErrParam(Some("missing `unit`".to_string())));
48
                }
            }
        }
165
    }
213
    let unit_cond = match query.unit.as_ref() {
123
        None => None,
90
        Some(unit_id) => match unit_id.len() {
9
            0 => None,
            _ => {
81
                match check_unit(FN_NAME, user_id, roles, unit_id.as_str(), false, &state).await? {
                    None => {
9
                        return Err(ErrResp::Custom(
9
                            ErrReq::UNIT_NOT_EXIST.0,
9
                            ErrReq::UNIT_NOT_EXIST.1,
9
                            None,
9
                        ))
                    }
72
                    Some(_) => Some(unit_id.as_str()),
                }
            }
        },
    };
204
    if let Some(addr) = query.addr.as_ref() {
30
        query.addr = Some(addr.trim().to_lowercase());
174
    }
204
    if let Some(profile) = query.profile.as_ref() {
12
        query.profile = Some((*profile).to_lowercase());
192
    }
204
    let mut name_contains_cond = None;
204
    if let Some(contains) = query.contains.as_ref() {
51
        if contains.len() > 0 {
51
            name_contains_cond = Some(contains.as_str());
51
        }
153
    }
204
    let cond = ListQueryCond {
204
        unit_id: unit_cond,
204
        network_code: match query.network.as_ref() {
165
            None => None,
39
            Some(network) => match network.len() {
6
                0 => None,
33
                _ => Some(network.as_ref()),
            },
        },
204
        network_addr: match query.addr.as_ref() {
174
            None => None,
30
            Some(addr) => match addr.len() {
6
                0 => None,
24
                _ => Some(addr.as_ref()),
            },
        },
204
        profile: match query.profile.as_ref() {
192
            None => None,
12
            Some(profile) => match profile.len() {
3
                0 => None,
9
                _ => Some(profile.as_str()),
            },
        },
204
        name_contains: name_contains_cond,
204
        ..Default::default()
    };
204
    let sort_cond = get_sort_cond(&query.sort)?;
189
    let opts = ListOptions {
189
        cond: &cond,
189
        offset: query.offset,
189
        limit: match query.limit {
159
            None => Some(LIST_LIMIT_DEFAULT),
30
            Some(limit) => match limit {
15
                0 => None,
15
                _ => Some(limit),
            },
        },
189
        sort: Some(sort_cond.as_slice()),
189
        cursor_max: Some(LIST_CURSOR_MAX),
    };
189
    let (list, cursor) = match state.model.device().list(&opts, None).await {
        Err(e) => {
            error!("[{}] list error: {}", FN_NAME, e);
            return Err(ErrResp::ErrDb(Some(e.to_string())));
        }
189
        Ok((list, cursor)) => match cursor {
3
            None => match query.format {
                Some(request::ListFormat::Array) => {
3
                    return Ok(Json(device_list_transform(&list)).into_response())
                }
                _ => {
147
                    return Ok(Json(response::GetDeviceList {
147
                        data: device_list_transform(&list),
147
                    })
147
                    .into_response())
                }
            },
39
            Some(_) => (list, cursor),
39
        },
39
    };
39

            
39
    let body = Body::from_stream(async_stream::stream! {
39
        let unit_cond = match query.unit.as_ref() {
39
            None => None,
39
            Some(unit_id) => match unit_id.len() {
39
                0 => None,
39
                _ => Some(unit_id.as_str()),
39
            },
39
        };
39
        let mut name_contains_cond = None;
39
        if let Some(contains) = query.contains.as_ref() {
39
            if contains.len() > 0 {
39
                name_contains_cond = Some(contains.as_str());
39
            }
39
        }
39
        let cond = ListQueryCond {
39
            unit_id: unit_cond,
39
            network_code: match query.network.as_ref() {
39
                None => None,
39
                Some(network) => match network.len() {
39
                    0 => None,
39
                    _ => Some(network.as_ref())
39
                },
39
            },
39
            network_addr: match query.addr.as_ref() {
39
                None => None,
39
                Some(addr) => match addr.len() {
39
                    0 => None,
39
                    _ => Some(addr.as_ref())
39
                },
39
            },
39
            name_contains: name_contains_cond,
39
            ..Default::default()
39
        };
39
        let opts = ListOptions {
39
            cond: &cond,
39
            offset: query.offset,
39
            limit: match query.limit {
39
                None => Some(LIST_LIMIT_DEFAULT),
39
                Some(limit) => match limit {
39
                    0 => None,
39
                    _ => Some(limit),
39
                },
39
            },
39
            sort: Some(sort_cond.as_slice()),
39
            cursor_max: Some(LIST_CURSOR_MAX),
39
        };
39

            
39
        let mut list = list;
39
        let mut cursor = cursor;
39
        let mut is_first = true;
39
        loop {
39
            yield device_list_transform_bytes(&list, is_first, cursor.is_none(), query.format.as_ref());
39
            is_first = false;
39
            if cursor.is_none() {
39
                break;
39
            }
39
            let (_list, _cursor) = match state.model.device().list(&opts, cursor).await {
39
                Err(_) => break,
39
                Ok((list, cursor)) => (list, cursor),
39
            };
39
            list = _list;
39
            cursor = _cursor;
39
        }
39
    });
39
    Ok(([(header::CONTENT_TYPE, ContentType::JSON)], body).into_response())
219
}
/// `GET /{base}/api/v1/device/{deviceId}`
45
pub async fn get_device(
45
    State(state): State<AppState>,
45
    Extension(token_info): Extension<GetTokenInfoData>,
45
    Path(param): Path<request::DeviceIdPath>,
45
) -> impl IntoResponse {
    const FN_NAME: &'static str = "get_device";
45
    let user_id = token_info.user_id.as_str();
45
    let roles = &token_info.roles;
45
    let device_id = param.device_id.as_str();
45

            
45
    match check_device(FN_NAME, device_id, user_id, false, roles, &state).await? {
15
        None => Err(ErrResp::ErrNotFound(None)),
30
        Some(device) => Ok(Json(response::GetDevice {
30
            data: device_transform(&device),
30
        })),
    }
45
}
/// `PATCH /{base}/api/v1/device/{deviceId}`
93
pub async fn patch_device(
93
    State(state): State<AppState>,
93
    Extension(token_info): Extension<GetTokenInfoData>,
93
    Path(param): Path<request::DeviceIdPath>,
93
    Json(mut body): Json<request::PatchDeviceBody>,
93
) -> impl IntoResponse {
    const FN_NAME: &'static str = "patch_device";
93
    let user_id = token_info.user_id.as_str();
93
    let roles = &token_info.roles;
93
    let device_id = param.device_id.as_str();
93
    if let Some(network_id) = body.data.network_id.as_ref() {
27
        if network_id.len() == 0 {
3
            return Err(ErrResp::ErrParam(Some(
3
                "`networkId` must with at least one character".to_string(),
3
            )));
24
        }
66
    }
90
    if let Some(network_addr) = body.data.network_addr.as_ref() {
30
        let network_addr = network_addr.trim().to_lowercase();
30
        if network_addr.len() == 0 {
3
            return Err(ErrResp::ErrParam(Some(
3
                "`networkAddr` must with at least one character".to_string(),
3
            )));
27
        }
27
        body.data.network_addr = Some(network_addr);
60
    }
87
    if let Some(profile) = body.data.profile.as_ref() {
42
        let profile = profile.to_lowercase();
42
        if profile.len() > 0 && !strings::is_code(profile.as_str()) {
3
            return Err(ErrResp::ErrParam(Some(
3
                "`profile` must be [A-Za-z0-9]{1}[A-Za-z0-9-_]*".to_string(),
3
            )));
39
        }
39
        body.data.profile = Some(profile);
45
    }
84
    let mut updates = get_updates(&mut body.data).await?;
    // To check if the device is for the user.
75
    let device = match check_device(FN_NAME, device_id, user_id, true, roles, &state).await? {
21
        None => return Err(ErrResp::ErrNotFound(None)),
54
        Some(device) => device,
54
    };
54
    let unit_id = device.unit_id.as_str();
54
    let network = match updates.network.as_ref() {
30
        None => None,
24
        Some((network_id, _)) => {
24
            match check_network(FN_NAME, unit_id, network_id, roles, &state).await? {
                None => {
9
                    return Err(ErrResp::Custom(
9
                        ErrReq::NETWORK_NOT_EXIST.0,
9
                        ErrReq::NETWORK_NOT_EXIST.1,
9
                        None,
9
                    ));
                }
15
                Some(network) => Some(network),
            }
        }
    };
45
    if let Some(network) = network.as_ref() {
15
        updates.network = Some((network.network_id.as_str(), network.code.as_str()));
30
    }
45
    if let Some(network_addr) = updates.network_addr {
21
        let network_id = match updates.network {
9
            None => device.network_id.as_str(),
12
            Some((network_id, _)) => network_id,
        };
21
        if check_addr(FN_NAME, network_id, network_addr, &state)
21
            .await?
21
            .is_some()
        {
3
            return Err(ErrResp::Custom(
3
                ErrReq::NETWORK_ADDR_EXIST.0,
3
                ErrReq::NETWORK_ADDR_EXIST.1,
3
                None,
3
            ));
18
        }
24
    }
42
    let cond = UpdateQueryCond { device_id };
42
    if let Err(e) = state.model.device().update(&cond, &updates).await {
        error!("[{}] update error: {}", FN_NAME, e);
        return Err(ErrResp::ErrDb(Some(e.to_string())));
42
    }
42
    if let Some(profile) = updates.profile {
27
        let cond = device_route::UpdateQueryCond { device_id };
27
        let updates = device_route::Updates {
27
            profile: Some(profile),
27
            modified_at: Some(Utc::now()),
27
            ..Default::default()
27
        };
27
        if let Err(e) = state.model.device_route().update(&cond, &updates).await {
            error!("[{}] update device route error: {}", FN_NAME, e);
            return Err(ErrResp::ErrDb(Some(e.to_string())));
27
        }
15
    }
    // Delete device cache to update profile.
42
    if state.cache.is_some() {
14
        let msg = SendCtrlMsg::DelDevice {
14
            operation: CtrlMsgOp::DEL_DEVICE.to_string(),
14
            new: CtrlDelDevice {
14
                unit_id: device.unit_id.clone(),
14
                unit_code: device.unit_code.clone(),
14
                network_id: device.network_id.clone(),
14
                network_code: device.network_code.clone(),
14
                network_addr: device.network_addr.clone(),
14
                device_id: device.device_id.clone(),
14
            },
14
        };
14
        send_del_ctrl_message(FN_NAME, &msg, &state).await?;
14
        let msg = SendCtrlMsg::DelDevice {
14
            operation: CtrlMsgOp::DEL_DEVICE.to_string(),
14
            new: CtrlDelDevice {
14
                unit_id: device.unit_id,
14
                unit_code: device.unit_code.clone(),
14
                network_id: match updates.network.as_ref() {
10
                    None => device.network_id,
4
                    Some((network_id, _)) => network_id.to_string(),
                },
14
                network_code: match updates.network.as_ref() {
10
                    None => device.network_code.clone(),
4
                    Some((_, network_code)) => network_code.to_string(),
                },
14
                network_addr: match updates.network_addr.as_ref() {
8
                    None => device.network_addr.clone(),
6
                    Some(addr) => addr.to_string(),
                },
14
                device_id: device.device_id,
14
            },
14
        };
14
        send_del_ctrl_message(FN_NAME, &msg, &state).await?;
28
    }
    // Send message to the device's network server if device network or address is changed.
42
    if updates.network.is_some() || updates.network_addr.is_some() {
21
        let now = Utc::now();
21

            
21
        let msg_op = NetCtrlMsgOp::DEL_DEVICE;
21
        let msg = SendNetCtrlMsg::DelDevice {
21
            time: time_str(&now),
21
            operation: msg_op.to_string(),
21
            new: NetCtrlAddr {
21
                network_addr: device.network_addr.clone(),
21
            },
21
        };
21
        let mgr_key = match device.unit_code.as_ref() {
3
            None => gen_mgr_key("", device.network_code.as_str()),
18
            Some(code) => gen_mgr_key(code.as_str(), device.network_code.as_str()),
        };
21
        let _ = send_net_ctrl_message(FN_NAME, &msg, msg_op, &state, &mgr_key).await;
21
        let msg_op = NetCtrlMsgOp::ADD_DEVICE;
21
        let msg = SendNetCtrlMsg::AddDevice {
21
            time: time_str(&now),
21
            operation: msg_op.to_string(),
21
            new: NetCtrlAddr {
21
                network_addr: match updates.network_addr {
3
                    None => device.network_addr,
18
                    Some(addr) => addr.to_string(),
                },
            },
        };
21
        let mgr_key = match network.as_ref() {
9
            None => mgr_key,
12
            Some(network) => match network.unit_code.as_ref() {
6
                None => gen_mgr_key("", network.code.as_str()),
6
                Some(code) => gen_mgr_key(code.as_str(), network.code.as_str()),
            },
        };
21
        let _ = send_net_ctrl_message(FN_NAME, &msg, msg_op, &state, &mgr_key).await;
21
    }
42
    Ok(StatusCode::NO_CONTENT)
93
}
/// `DELETE /{base}/api/v1/device/{deviceId}`
54
pub async fn delete_device(
54
    State(state): State<AppState>,
54
    Extension(token_info): Extension<GetTokenInfoData>,
54
    Path(param): Path<request::DeviceIdPath>,
54
) -> impl IntoResponse {
    const FN_NAME: &'static str = "delete_device";
54
    let user_id = token_info.user_id.as_str();
54
    let roles = &token_info.roles;
54
    let device_id = param.device_id.as_str();
    // To check if the device is for the user.
54
    let device = match check_device(FN_NAME, device_id, user_id, true, roles, &state).await {
        Err(e) => return Err(e), // XXX: not use "?" to solve E0282 error.
54
        Ok(device) => match device {
24
            None => return Ok(StatusCode::NO_CONTENT),
30
            Some(device) => device,
30
        },
30
    };
30

            
30
    del_device_rsc(FN_NAME, device_id, &state).await?;
30
    if state.cache.is_some() {
10
        let msg = SendCtrlMsg::DelDevice {
10
            operation: CtrlMsgOp::DEL_DEVICE.to_string(),
10
            new: CtrlDelDevice {
10
                unit_id: device.unit_id,
10
                unit_code: device.unit_code.clone(),
10
                network_id: device.network_id,
10
                network_code: device.network_code.clone(),
10
                network_addr: device.network_addr.clone(),
10
                device_id: device.device_id,
10
            },
10
        };
10
        send_del_ctrl_message(FN_NAME, &msg, &state).await?;
20
    }
    // Send message to the device's network server.
30
    let mgr_key = match device.unit_code.as_ref() {
12
        None => gen_mgr_key("", device.network_code.as_str()),
18
        Some(unit_code) => gen_mgr_key(unit_code.as_str(), device.network_code.as_str()),
    };
30
    let msg_op = NetCtrlMsgOp::DEL_DEVICE;
30
    let msg = SendNetCtrlMsg::DelDevice {
30
        time: time_str(&Utc::now()),
30
        operation: msg_op.to_string(),
30
        new: NetCtrlAddr {
30
            network_addr: device.network_addr,
30
        },
30
    };
30
    let _ = send_net_ctrl_message(FN_NAME, &msg, msg_op, &state, &mgr_key).await;
30
    Ok(StatusCode::NO_CONTENT)
54
}
204
fn get_sort_cond(sort_args: &Option<String>) -> Result<Vec<SortCond>, ErrResp> {
204
    match sort_args.as_ref() {
153
        None => Ok(vec![
153
            SortCond {
153
                key: SortKey::NetworkCode,
153
                asc: true,
153
            },
153
            SortCond {
153
                key: SortKey::NetworkAddr,
153
                asc: true,
153
            },
153
        ]),
51
        Some(args) => {
51
            let mut args = args.split(",");
51
            let mut sort_cond = vec![];
102
            while let Some(arg) = args.next() {
66
                let mut cond = arg.split(":");
66
                let key = match cond.next() {
                    None => return Err(ErrResp::ErrParam(Some("wrong sort argument".to_string()))),
66
                    Some(field) => match field {
66
                        "network" => SortKey::NetworkCode,
54
                        "addr" => SortKey::NetworkAddr,
36
                        "created" => SortKey::CreatedAt,
21
                        "modified" => SortKey::ModifiedAt,
15
                        "profile" => SortKey::Profile,
15
                        "name" => SortKey::Name,
                        _ => {
6
                            return Err(ErrResp::ErrParam(Some(format!(
6
                                "invalid sort key {}",
6
                                field
6
                            ))))
                        }
                    },
                };
60
                let asc = match cond.next() {
3
                    None => return Err(ErrResp::ErrParam(Some("wrong sort argument".to_string()))),
57
                    Some(asc) => match asc {
57
                        "asc" => true,
27
                        "desc" => false,
                        _ => {
3
                            return Err(ErrResp::ErrParam(Some(format!(
3
                                "invalid sort asc {}",
3
                                asc
3
                            ))))
                        }
                    },
                };
54
                if cond.next().is_some() {
3
                    return Err(ErrResp::ErrParam(Some(
3
                        "invalid sort condition".to_string(),
3
                    )));
51
                }
51
                sort_cond.push(SortCond { key, asc });
            }
36
            Ok(sort_cond)
        }
    }
204
}
84
async fn get_updates<'a>(body: &'a mut request::PatchDeviceData) -> Result<Updates<'a>, ErrResp> {
84
    let mut updates = Updates {
84
        ..Default::default()
84
    };
84
    let mut count = 0;
84
    if let Some(network_id) = body.network_id.as_ref() {
24
        updates.network = Some((network_id.as_str(), ""));
24
        count += 1;
60
    }
84
    if let Some(network_addr) = body.network_addr.as_ref() {
27
        updates.network_addr = Some(network_addr.as_str());
27
        count += 1;
57
    }
84
    if let Some(profile) = body.profile.as_ref() {
39
        updates.profile = Some(profile.as_str());
39
        count += 1;
45
    }
84
    if let Some(name) = body.name.as_ref() {
60
        updates.name = Some(name.as_str());
60
        count += 1;
60
    }
84
    if let Some(info) = body.info.as_ref() {
60
        for (k, _) in info.iter() {
27
            if k.len() == 0 {
3
                return Err(ErrResp::ErrParam(Some(
3
                    "`info` key must not be empty".to_string(),
3
                )));
24
            }
        }
57
        updates.info = Some(info);
57
        count += 1;
24
    }
81
    if count == 0 {
6
        return Err(ErrResp::ErrParam(Some(
6
            "at least one parameter".to_string(),
6
        )));
75
    }
75
    updates.modified_at = Some(Utc::now());
75
    Ok(updates)
84
}
/// To check if the network is exists for the unit. Public network can be matched for admin or
/// manager roles.
///
/// # Errors
///
/// Returns OK if the network is found or not. Otherwise errors will be returned.
225
async fn check_network(
225
    fn_name: &str,
225
    unit_id: &str,
225
    network_id: &str,
225
    roles: &HashMap<String, bool>,
225
    state: &AppState,
225
) -> Result<Option<Network>, ErrResp> {
225
    let cond = NetworkQueryCond {
225
        network_id: Some(network_id),
225
        ..Default::default()
225
    };
225
    let network = match state.model.network().get(&cond).await {
        Err(e) => {
            error!("[{}] get error: {}", fn_name, e);
            return Err(ErrResp::ErrDb(Some(e.to_string())));
        }
225
        Ok(network) => match network {
18
            None => return Ok(None),
207
            Some(network) => network,
207
        },
207
    };
207
    match network.unit_id.as_ref() {
78
        None => match Role::is_role(roles, Role::ADMIN) || Role::is_role(roles, Role::MANAGER) {
18
            false => Ok(None),
60
            true => Ok(Some(network)),
        },
129
        Some(id) => match id.as_str() == unit_id {
18
            false => Ok(None),
111
            true => Ok(Some(network)),
        },
    }
225
}
/// To check if the address is exists for the network.
///
/// # Errors
///
/// Returns OK if the device is found or not. Otherwise errors will be returned.
99
async fn check_addr(
99
    fn_name: &str,
99
    network_id: &str,
99
    network_addr: &str,
99
    state: &AppState,
99
) -> Result<Option<Device>, ErrResp> {
99
    let cond = ListQueryCond {
99
        network_id: Some(network_id),
99
        network_addr: Some(network_addr),
99
        ..Default::default()
99
    };
99
    let opts = ListOptions {
99
        cond: &cond,
99
        offset: None,
99
        limit: None,
99
        sort: None,
99
        cursor_max: None,
99
    };
99
    match state.model.device().list(&opts, None).await {
        Err(e) => {
            error!("[{}] get error: {}", fn_name, e);
            Err(ErrResp::ErrDb(Some(e.to_string())))
        }
99
        Ok((mut list, _)) => Ok(list.pop()),
    }
99
}
150
fn device_list_transform(list: &Vec<Device>) -> Vec<response::GetDeviceData> {
150
    let mut ret = vec![];
594
    for device in list.iter() {
594
        ret.push(device_transform(&device));
594
    }
150
    ret
150
}
96
fn device_list_transform_bytes(
96
    list: &Vec<Device>,
96
    with_start: bool,
96
    with_end: bool,
96
    format: Option<&request::ListFormat>,
96
) -> Result<Bytes, Box<dyn StdError + Send + Sync>> {
96
    let mut build_str = match with_start {
57
        false => "".to_string(),
3
        true => match format {
3
            Some(request::ListFormat::Array) => "[".to_string(),
36
            _ => "{\"data\":[".to_string(),
        },
    };
96
    let mut is_first = with_start;
6147
    for item in list {
6051
        if is_first {
39
            is_first = false;
6012
        } else {
6012
            build_str.push(',');
6012
        }
6051
        let json_str = match serde_json::to_string(&device_transform(item)) {
            Err(e) => return Err(Box::new(e)),
6051
            Ok(str) => str,
6051
        };
6051
        build_str += json_str.as_str();
    }
96
    if with_end {
39
        build_str += match format {
3
            Some(request::ListFormat::Array) => "]",
36
            _ => "]}",
        }
57
    }
96
    Ok(Bytes::copy_from_slice(build_str.as_str().as_bytes()))
96
}
6675
fn device_transform(device: &Device) -> response::GetDeviceData {
6675
    response::GetDeviceData {
6675
        device_id: device.device_id.clone(),
6675
        unit_id: device.unit_id.clone(),
6675
        unit_code: device.unit_code.clone(),
6675
        network_id: device.network_id.clone(),
6675
        network_code: device.network_code.clone(),
6675
        network_addr: device.network_addr.clone(),
6675
        created_at: time_str(&device.created_at),
6675
        modified_at: time_str(&device.modified_at),
6675
        profile: device.profile.clone(),
6675
        name: device.name.clone(),
6675
        info: device.info.clone(),
6675
    }
6675
}
30
async fn del_device_rsc(fn_name: &str, device_id: &str, state: &AppState) -> Result<(), ErrResp> {
30
    let cond = device_route::QueryCond {
30
        device_id: Some(device_id),
30
        ..Default::default()
30
    };
30
    if let Err(e) = state.model.device_route().del(&cond).await {
        error!("[{}] del device_route error: {}", fn_name, e);
        return Err(ErrResp::ErrDb(Some(e.to_string())));
30
    }
30

            
30
    let cond = dldata_buffer::QueryCond {
30
        device_id: Some(device_id),
30
        ..Default::default()
30
    };
30
    if let Err(e) = state.model.dldata_buffer().del(&cond).await {
        error!("[{}] del dldata_buffer error: {}", fn_name, e);
        return Err(ErrResp::ErrDb(Some(e.to_string())));
30
    }
30

            
30
    let cond = QueryCond {
30
        device_id: Some(device_id),
30
        ..Default::default()
30
    };
30
    if let Err(e) = state.model.device().del(&cond).await {
        error!("[{}] del device error: {}", fn_name, e);
        return Err(ErrResp::ErrDb(Some(e.to_string())));
30
    }
30

            
30
    Ok(())
30
}
18
async fn del_device_rsc_bulk(
18
    fn_name: &str,
18
    rm_cond: &request::PostDeviceBulkData,
18
    network: &Network,
18
    state: &AppState,
18
) -> Result<(), ErrResp> {
12294
    let addrs: Vec<&str> = rm_cond.network_addrs.iter().map(|x| x.as_str()).collect();
18
    let ctrl_msg = match state.cache.as_ref() {
12
        None => None,
        Some(_) => {
6
            let cond = ListQueryCond {
6
                unit_id: Some(rm_cond.unit_id.as_str()),
6
                network_id: Some(rm_cond.network_id.as_str()),
6
                network_addrs: Some(&addrs),
6
                ..Default::default()
6
            };
6
            let opts = ListOptions {
6
                cond: &cond,
6
                offset: None,
6
                limit: None,
6
                sort: None,
6
                cursor_max: None,
6
            };
6
            let devices = match state.model.device().list(&opts, None).await {
                Err(e) => {
                    error!("[{}] list device for cache error: {}", fn_name, e);
                    return Err(ErrResp::ErrDb(Some(e.to_string())));
                }
6
                Ok((list, _)) => list,
6
            };
6
            Some(SendCtrlMsg::DelDeviceBulk {
6
                operation: CtrlMsgOp::DEL_DEVICE_BULK.to_string(),
6
                new: CtrlDelDeviceBulk {
6
                    unit_id: rm_cond.unit_id.clone(),
6
                    unit_code: match network.unit_code.as_ref() {
2
                        None => None,
4
                        Some(code) => Some(code.clone()),
                    },
6
                    network_id: network.network_id.clone(),
6
                    network_code: network.code.clone(),
6
                    network_addrs: rm_cond.network_addrs.clone(),
4098
                    device_ids: devices.iter().map(|x| x.device_id.clone()).collect(),
6
                },
6
            })
        }
    };
18
    let cond = device_route::QueryCond {
18
        unit_id: Some(rm_cond.unit_id.as_str()),
18
        network_id: Some(rm_cond.network_id.as_str()),
18
        network_addrs: Some(&addrs),
18
        ..Default::default()
18
    };
18
    if let Err(e) = state.model.device_route().del(&cond).await {
        error!("[{}] del device_route error: {}", fn_name, e);
        return Err(ErrResp::ErrDb(Some(e.to_string())));
18
    }
18

            
18
    let cond = dldata_buffer::QueryCond {
18
        unit_id: Some(rm_cond.unit_id.as_str()),
18
        network_id: Some(rm_cond.network_id.as_str()),
18
        network_addrs: Some(&addrs),
18
        ..Default::default()
18
    };
18
    if let Err(e) = state.model.dldata_buffer().del(&cond).await {
        error!("[{}] del dldata_buffer error: {}", fn_name, e);
        return Err(ErrResp::ErrDb(Some(e.to_string())));
18
    }
18

            
18
    let cond = QueryCond {
18
        unit_id: Some(rm_cond.unit_id.as_str()),
18
        network_id: Some(rm_cond.network_id.as_str()),
18
        network_addrs: Some(&addrs),
18
        ..Default::default()
18
    };
18
    if let Err(e) = state.model.device().del(&cond).await {
        error!("[{}] del device error: {}", fn_name, e);
        return Err(ErrResp::ErrDb(Some(e.to_string())));
18
    }
18
    if let Some(msg) = ctrl_msg.as_ref() {
6
        send_del_ctrl_message(fn_name, msg, state).await?;
12
    }
18
    Ok(())
18
}
/// Send delete control message.
88
async fn send_del_ctrl_message(
88
    fn_name: &str,
88
    msg: &SendCtrlMsg,
88
    state: &AppState,
88
) -> Result<(), ErrResp> {
88
    let payload = match serde_json::to_vec(&msg) {
        Err(e) => {
            error!(
                "[{}] marshal JSON for {} error: {}",
                fn_name,
                CtrlMsgOp::DEL_DEVICE,
                e
            );
            return Err(ErrResp::ErrRsc(Some(format!(
                "marshal control message error: {}",
                e
            ))));
        }
88
        Ok(payload) => payload,
88
    };
88
    let ctrl_sender = { state.ctrl_senders.device.lock().unwrap().clone() };
88
    if let Err(e) = ctrl_sender.send_msg(payload).await {
        error!(
            "[{}] send control message for {} error: {}",
            fn_name,
            CtrlMsgOp::DEL_DEVICE,
            e
        );
        return Err(ErrResp::ErrIntMsg(Some(format!(
            "send control message error: {}",
            e
        ))));
88
    }
88

            
88
    Ok(())
88
}
/// Send network control message.
222
async fn send_net_ctrl_message(
222
    fn_name: &str,
222
    msg: &SendNetCtrlMsg,
222
    msg_op: &str,
222
    state: &AppState,
222
    mgr_key: &str,
222
) -> Result<(), ErrResp> {
86
    let mgr = {
222
        match state.network_mgrs.lock().unwrap().get_mut(mgr_key) {
136
            None => return Ok(()),
86
            Some(mgr) => mgr.clone(),
86
        }
86
    };
86
    match serde_json::to_vec(msg) {
        Err(e) => warn!("[{}] marshal {} error: {}", fn_name, msg_op, e),
86
        Ok(payload) => match mgr.send_ctrl(payload).await {
9
            Err(e) => warn!("[{}] send {} error: {}", fn_name, msg_op, e),
77
            Ok(_) => (),
        },
    }
86
    Ok(())
222
}
/// Clear the device and relative cache.
11
async fn clear_cache(fn_name: &str, queue_name: &str, cache: &Arc<dyn Cache>) {
11
    if let Err(e) = cache.device().clear().await {
        error!(
            "[{}] {} clear device cache error: {}",
            fn_name, queue_name, e
        );
11
    }
11
    if let Err(e) = cache.device_route().clear().await {
        error!(
            "[{}] {} clear device route cache error: {}",
            fn_name, queue_name, e
        );
11
    }
11
}
#[async_trait]
impl QueueEventHandler for CtrlSenderHandler {
    async fn on_error(&self, queue: Arc<dyn GmqQueue>, err: Box<dyn StdError + Send + Sync>) {
        const FN_NAME: &'static str = "CtrlSenderHandler::on_error";
        let queue_name = queue.name();
        // Clear cache to avoid missing update cache content during queue status changing.
        if let Some(cache) = self.cache.as_ref() {
            clear_cache(FN_NAME, queue_name, cache).await;
        }
        error!("[{}] {} error: {}", FN_NAME, queue_name, err);
    }
34
    async fn on_status(&self, queue: Arc<dyn GmqQueue>, status: Status) {
        const FN_NAME: &'static str = "CtrlSenderHandler::on_status";
34
        let queue_name = queue.name();
        // Clear cache to avoid missing update cache content during queue status changing.
34
        if let Some(cache) = self.cache.as_ref() {
5
            clear_cache(FN_NAME, queue_name, cache).await;
29
        }
34
        match status {
15
            Status::Connected => info!("[{}] {} connected", queue_name, FN_NAME),
19
            _ => warn!("[{}] {} status to {:?}", FN_NAME, queue_name, status),
        }
68
    }
}
#[async_trait]
impl MessageHandler for CtrlSenderHandler {
    async fn on_message(&self, _queue: Arc<dyn GmqQueue>, _msg: Box<dyn Message>) {}
}
#[async_trait]
impl QueueEventHandler for CtrlReceiverHandler {
    async fn on_error(&self, queue: Arc<dyn GmqQueue>, err: Box<dyn StdError + Send + Sync>) {
        const FN_NAME: &'static str = "CtrlReceiverHandler::on_error";
        let queue_name = queue.name();
        // Clear cache to avoid missing update cache content during queue status changing.
        if let Some(cache) = self.cache.as_ref() {
            clear_cache(FN_NAME, queue_name, cache).await;
        }
        error!("[{}] {} error: {}", FN_NAME, queue_name, err);
    }
30
    async fn on_status(&self, queue: Arc<dyn GmqQueue>, status: Status) {
        const FN_NAME: &'static str = "CtrlReceiverHandler::on_status";
30
        let queue_name = queue.name();
        // Clear cache to avoid missing update cache content during queue status changing.
30
        if let Some(cache) = self.cache.as_ref() {
6
            clear_cache(FN_NAME, queue_name, cache).await;
24
        }
30
        match status {
15
            Status::Connected => info!("[{}] {} connected", queue_name, FN_NAME),
15
            _ => warn!("[{}] {} status to {:?}", FN_NAME, queue_name, status),
        }
60
    }
}
#[async_trait]
impl MessageHandler for CtrlReceiverHandler {
88
    async fn on_message(&self, queue: Arc<dyn GmqQueue>, msg: Box<dyn Message>) {
        const FN_NAME: &'static str = "CtrlReceiverHandler::on_message";
88
        let queue_name = queue.name();
88
        let ctrl_msg = match serde_json::from_slice::<RecvCtrlMsg>(msg.payload()) {
            Err(e) => {
                let src_str: String = String::from_utf8_lossy(msg.payload()).into();
                warn!(
                    "[{}] {} parse JSON error: {}, src: {}",
                    FN_NAME, queue_name, e, src_str
                );
                if let Err(e) = msg.ack().await {
                    error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
                }
                return;
            }
88
            Ok(msg) => msg,
88
        };
88
        match ctrl_msg {
62
            RecvCtrlMsg::DelDevice { new } => {
62
                if let Some(cache) = self.cache.as_ref() {
62
                    let unit_code = match new.unit_code.as_ref() {
19
                        None => "",
43
                        Some(unit_code) => unit_code.as_str(),
                    };
62
                    let network_code = new.network_code.as_str();
62
                    let network_addr = new.network_addr.as_str();
62
                    let cond = device::DelCacheQueryCond {
62
                        unit_code,
62
                        network_code: Some(network_code),
62
                        network_addr: Some(network_addr),
62
                    };
62
                    if let Err(e) = cache.device().del(&cond).await {
                        error!(
                            "[{}] {} delete device cache {}.{}.{} error: {}",
                            FN_NAME, queue_name, unit_code, network_code, network_addr, e
                        );
                    } else {
62
                        debug!(
                            "[{}] {} delete device cache {}.{}.{}",
                            FN_NAME, queue_name, unit_code, network_code, network_addr
                        );
                    }
62
                    let device_id = new.device_id.as_str();
62
                    if let Err(e) = cache.device_route().del_uldata(device_id).await {
                        error!(
                            "[{}] {} delete device route cache uldata {} error: {}",
                            FN_NAME, queue_name, device_id, e
                        );
                    } else {
62
                        debug!(
                            "[{}] {} delete device route cache uldata {}",
                            FN_NAME, queue_name, device_id
                        );
                    }
62
                    let cond = device_route::DelCacheQueryCond {
62
                        unit_code,
62
                        network_code: Some(network_code),
62
                        network_addr: Some(network_addr),
62
                    };
62
                    if let Err(e) = cache.device_route().del_dldata(&cond).await {
                        error!(
                            "[{}] {} delete device route cache dldata {}.{}.{} error: {}",
                            FN_NAME, queue_name, unit_code, network_code, network_addr, e
                        );
                    } else {
62
                        debug!(
                            "[{}] {} delete device route cache dldata {}.{}.{}",
                            FN_NAME, queue_name, unit_code, network_code, network_addr
                        );
                    }
62
                    let unit_id = new.unit_id.as_str();
62
                    let cond = device_route::DelCachePubQueryCond {
62
                        unit_id,
62
                        device_id: Some(device_id),
62
                    };
62
                    if let Err(e) = cache.device_route().del_dldata_pub(&cond).await {
                        error!(
                            "[{}] {} delete device route cache dldata_pub {}.{} error: {}",
                            FN_NAME, queue_name, unit_id, device_id, e
                        );
                    } else {
62
                        debug!(
                            "[{}] {} delete device route cache dldata_pub {}.{}",
                            FN_NAME, queue_name, unit_id, device_id
                        );
                    }
                }
            }
26
            RecvCtrlMsg::DelDeviceBulk { new } => {
26
                if let Some(cache) = self.cache.as_ref() {
16394
                    for device_id in new.device_ids.iter() {
16394
                        let device_id = device_id.as_str();
16394
                        if let Err(e) = cache.device_route().del_uldata(device_id).await {
                            error!(
                                "[{}] {} delete bulk device route cache uldata {} error: {}",
                                FN_NAME, queue_name, device_id, e
                            );
                        } else {
16394
                            debug!(
                                "[{}] {} delete bulk device route cache uldata {}",
                                FN_NAME, queue_name, device_id
                            );
                        }
16394
                        let unit_id = new.unit_id.as_str();
16394
                        let cond = device_route::DelCachePubQueryCond {
16394
                            unit_id,
16394
                            device_id: Some(device_id),
16394
                        };
16394
                        if let Err(e) = cache.device_route().del_dldata_pub(&cond).await {
                            error!(
                                "[{}] {} delete bulk device route cache dldata_pub {}.{} error: {}",
                                FN_NAME, queue_name, unit_id, device_id, e
                            );
                        } else {
16394
                            debug!(
                                "[{}] {} delete bulk device route cache dldata_pub {}.{}",
                                FN_NAME, queue_name, unit_id, device_id
                            );
                        }
                    }
26
                    let unit_code = match new.unit_code.as_ref() {
10
                        None => "",
16
                        Some(unit_code) => unit_code.as_str(),
                    };
26
                    let network_code = new.network_code.as_str();
16394
                    for network_addr in new.network_addrs.iter() {
16394
                        let network_addr = network_addr.as_str();
16394
                        let cond = device::DelCacheQueryCond {
16394
                            unit_code,
16394
                            network_code: Some(network_code),
16394
                            network_addr: Some(network_addr),
16394
                        };
16394
                        if let Err(e) = cache.device().del(&cond).await {
                            error!(
                                "[{}] {} delete bulk device cache {}.{}.{} error: {}",
                                FN_NAME, queue_name, unit_code, network_code, network_addr, e
                            );
                        } else {
16394
                            debug!(
                                "[{}] {} delete bulk device cache {}.{}.{}",
                                FN_NAME, queue_name, unit_code, network_code, network_addr
                            );
                        }
16394
                        let cond = device_route::DelCacheQueryCond {
16394
                            unit_code,
16394
                            network_code: Some(network_code),
16394
                            network_addr: Some(network_addr),
16394
                        };
16394
                        if let Err(e) = cache.device_route().del_dldata(&cond).await {
                            error!(
                                "[{}] {} delete device route cache dldata {}.{}.{} error: {}",
                                FN_NAME, queue_name, unit_code, network_code, network_addr, e
                            );
                        } else {
16394
                            debug!(
                                "[{}] {} delete device route cache dldata {}.{}.{}",
                                FN_NAME, queue_name, unit_code, network_code, network_addr
                            );
                        }
                    }
                }
            }
        }
88
        if let Err(e) = msg.ack().await {
            error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
88
        }
176
    }
}