1
use std::{
2
    collections::HashMap,
3
    error::Error as StdError,
4
    io::{Error as IoError, ErrorKind},
5
    sync::{Arc, Mutex},
6
    time::Duration,
7
};
8

            
9
use async_trait::async_trait;
10
use axum::{
11
    body::{Body, Bytes},
12
    extract::State,
13
    http::{header, StatusCode},
14
    response::IntoResponse,
15
    Extension,
16
};
17
use chrono::Utc;
18
use log::{debug, error, info, warn};
19
use serde::{Deserialize, Serialize};
20
use serde_json::{self, Map};
21
use tokio::time;
22
use url::Url;
23

            
24
use general_mq::{
25
    queue::{EventHandler as QueueEventHandler, GmqQueue, Message, MessageHandler, Status},
26
    Queue,
27
};
28
use sylvia_iot_corelib::{
29
    constants::ContentType,
30
    err::ErrResp,
31
    http::{Json, Path, Query},
32
    role::Role,
33
    strings::{self, hex_addr_to_u128, time_str, u128_to_addr},
34
};
35

            
36
use super::{
37
    super::{
38
        super::{middleware::GetTokenInfoData, ErrReq, State as AppState},
39
        lib::{check_device, check_unit, gen_mgr_key},
40
    },
41
    request, response,
42
};
43
use crate::{
44
    libs::{
45
        config::BrokerCtrl as CfgCtrl,
46
        mq::{self, Connection},
47
    },
48
    models::{
49
        device::{
50
            self, Device, ListOptions, ListQueryCond, QueryCond, SortCond, SortKey,
51
            UpdateQueryCond, Updates,
52
        },
53
        device_route, dldata_buffer,
54
        network::{Network, QueryCond as NetworkQueryCond},
55
        Cache,
56
    },
57
};
58

            
59
#[derive(Deserialize, Serialize)]
60
#[serde(tag = "operation")]
61
enum RecvCtrlMsg {
62
    #[serde(rename = "del-device")]
63
    DelDevice { new: CtrlDelDevice },
64
    #[serde(rename = "del-device-bulk")]
65
    DelDeviceBulk { new: CtrlDelDeviceBulk },
66
}
67

            
68
/// Control message inside broker cluster.
69
#[derive(Serialize)]
70
#[serde(untagged)]
71
enum SendCtrlMsg {
72
    DelDevice {
73
        operation: String,
74
        new: CtrlDelDevice,
75
    },
76
    DelDeviceBulk {
77
        operation: String,
78
        new: CtrlDelDeviceBulk,
79
    },
80
}
81

            
82
struct CtrlMsgOp;
83

            
84
#[derive(Deserialize, Serialize)]
85
struct CtrlDelDevice {
86
    #[serde(rename = "unitId")]
87
    unit_id: String,
88
    #[serde(rename = "unitCode")]
89
    unit_code: Option<String>,
90
    #[serde(rename = "networkId")]
91
    network_id: String,
92
    #[serde(rename = "networkCode")]
93
    network_code: String,
94
    #[serde(rename = "networkAddr")]
95
    network_addr: String,
96
    #[serde(rename = "deviceId")]
97
    device_id: String,
98
}
99

            
100
#[derive(Deserialize, Serialize)]
101
struct CtrlDelDeviceBulk {
102
    #[serde(rename = "unitId")]
103
    unit_id: String,
104
    #[serde(rename = "unitCode")]
105
    unit_code: Option<String>,
106
    #[serde(rename = "networkId")]
107
    network_id: String,
108
    #[serde(rename = "networkCode")]
109
    network_code: String,
110
    #[serde(rename = "networkAddrs")]
111
    network_addrs: Vec<String>,
112
    #[serde(rename = "deviceIds")]
113
    device_ids: Vec<String>,
114
}
115

            
116
struct CtrlSenderHandler {
117
    cache: Option<Arc<dyn Cache>>,
118
}
119

            
120
struct CtrlReceiverHandler {
121
    cache: Option<Arc<dyn Cache>>,
122
}
123

            
124
/// Control message from broker to network servers.
125
#[derive(Serialize)]
126
#[serde(untagged)]
127
enum SendNetCtrlMsg {
128
    AddDevice {
129
        time: String,
130
        operation: String,
131
        new: NetCtrlAddr,
132
    },
133
    AddDeviceBulk {
134
        time: String,
135
        operation: String,
136
        new: NetCtrlAddrs,
137
    },
138
    AddDeviceRange {
139
        time: String,
140
        operation: String,
141
        new: NetCtrlAddrRange,
142
    },
143
    DelDevice {
144
        time: String,
145
        operation: String,
146
        new: NetCtrlAddr,
147
    },
148
    DelDeviceBulk {
149
        time: String,
150
        operation: String,
151
        new: NetCtrlAddrs,
152
    },
153
    DelDeviceRange {
154
        time: String,
155
        operation: String,
156
        new: NetCtrlAddrRange,
157
    },
158
}
159

            
160
struct NetCtrlMsgOp;
161

            
162
/// Shared structure to keep simple design.
163
#[derive(Serialize)]
164
struct NetCtrlAddr {
165
    #[serde(rename = "networkAddr")]
166
    network_addr: String,
167
}
168

            
169
/// Shared structure to keep simple design.
170
#[derive(Serialize)]
171
struct NetCtrlAddrs {
172
    #[serde(rename = "networkAddrs")]
173
    network_addrs: Vec<String>,
174
}
175

            
176
/// Shared structure to keep simple design.
177
#[derive(Serialize)]
178
struct NetCtrlAddrRange {
179
    #[serde(rename = "startAddr")]
180
    pub start_addr: String,
181
    #[serde(rename = "endAddr")]
182
    pub end_addr: String,
183
}
184

            
185
impl CtrlMsgOp {
186
    const DEL_DEVICE: &'static str = "del-device";
187
    const DEL_DEVICE_BULK: &'static str = "del-device-bulk";
188
}
189

            
190
impl NetCtrlMsgOp {
191
    const ADD_DEVICE: &'static str = "add-device";
192
    const ADD_DEVICE_BULK: &'static str = "add-device-bulk";
193
    const ADD_DEVICE_RANGE: &'static str = "add-device-range";
194
    const DEL_DEVICE: &'static str = "del-device";
195
    const DEL_DEVICE_BULK: &'static str = "del-device-bulk";
196
    const DEL_DEVICE_RANGE: &'static str = "del-device-range";
197
}
198

            
199
const BULK_MAX: usize = 1024;
200
const LIST_LIMIT_DEFAULT: u64 = 100;
201
const LIST_CURSOR_MAX: u64 = 100;
202
const ID_RAND_LEN: usize = 8;
203
const CTRL_QUEUE_NAME: &'static str = "device";
204

            
205
/// Initialize channels.
206
30
pub async fn init(state: &AppState, ctrl_conf: &CfgCtrl) -> Result<(), Box<dyn StdError>> {
207
    const FN_NAME: &'static str = "init";
208

            
209
30
    let q = new_ctrl_receiver(state, ctrl_conf)?;
210
30
    {
211
30
        state
212
30
            .ctrl_receivers
213
30
            .lock()
214
30
            .unwrap()
215
30
            .insert(CTRL_QUEUE_NAME.to_string(), q.clone());
216
30
    }
217
30

            
218
30
    let ctrl_sender = { state.ctrl_senders.device.lock().unwrap().clone() };
219
    // Wait for connected.
220
2762
    for _ in 0..500 {
221
2762
        if ctrl_sender.status() == Status::Connected && q.status() == Status::Connected {
222
30
            break;
223
2732
        }
224
2732
        time::sleep(Duration::from_millis(10)).await;
225
    }
226
30
    if ctrl_sender.status() != Status::Connected {
227
        error!(
228
            "[{}] {} control sender not connected",
229
            FN_NAME, CTRL_QUEUE_NAME
230
        );
231
        return Err(Box::new(IoError::new(
232
            ErrorKind::NotConnected,
233
            format!("control sender {} not connected", CTRL_QUEUE_NAME),
234
        )));
235
30
    }
236
30
    if q.status() != Status::Connected {
237
        error!(
238
            "[{}] {} control receiver not connected",
239
            FN_NAME, CTRL_QUEUE_NAME
240
        );
241
        return Err(Box::new(IoError::new(
242
            ErrorKind::NotConnected,
243
            format!("control receiver {} not connected", CTRL_QUEUE_NAME),
244
        )));
245
30
    }
246
30

            
247
30
    Ok(())
248
30
}
249

            
250
/// Create control channel sender queue.
251
30
pub fn new_ctrl_sender(
252
30
    conn_pool: &Arc<Mutex<HashMap<String, Connection>>>,
253
30
    config: &CfgCtrl,
254
30
    cache: Option<Arc<dyn Cache>>,
255
30
) -> Result<Arc<Mutex<Queue>>, Box<dyn StdError>> {
256
30
    let url = match config.url.as_ref() {
257
        None => {
258
            return Err(Box::new(IoError::new(
259
                ErrorKind::InvalidInput,
260
                "empty control url",
261
            )))
262
        }
263
30
        Some(url) => match Url::parse(url.as_str()) {
264
            Err(e) => return Err(Box::new(e)),
265
30
            Ok(url) => url,
266
30
        },
267
30
    };
268
30

            
269
30
    match mq::control::new(
270
30
        conn_pool.clone(),
271
30
        &url,
272
30
        config.prefetch,
273
30
        CTRL_QUEUE_NAME,
274
30
        false,
275
30
        Arc::new(CtrlSenderHandler {
276
30
            cache: cache.clone(),
277
30
        }),
278
30
        Arc::new(CtrlSenderHandler { cache }),
279
30
    ) {
280
        Err(e) => Err(Box::new(IoError::new(ErrorKind::InvalidInput, e))),
281
30
        Ok(q) => Ok(Arc::new(Mutex::new(q))),
282
    }
283
30
}
284

            
285
/// Create control channel receiver queue.
286
30
pub fn new_ctrl_receiver(state: &AppState, config: &CfgCtrl) -> Result<Queue, Box<dyn StdError>> {
287
30
    let url = match config.url.as_ref() {
288
        None => {
289
            return Err(Box::new(IoError::new(
290
                ErrorKind::InvalidInput,
291
                "empty control url",
292
            )))
293
        }
294
30
        Some(url) => match Url::parse(url.as_str()) {
295
            Err(e) => return Err(Box::new(e)),
296
30
            Ok(url) => url,
297
30
        },
298
30
    };
299
30
    let handler = Arc::new(CtrlReceiverHandler {
300
30
        cache: state.cache.clone(),
301
30
    });
302
30
    match mq::control::new(
303
30
        state.mq_conns.clone(),
304
30
        &url,
305
30
        config.prefetch,
306
30
        CTRL_QUEUE_NAME,
307
30
        true,
308
30
        handler.clone(),
309
30
        handler,
310
30
    ) {
311
        Err(e) => Err(Box::new(IoError::new(ErrorKind::InvalidInput, e))),
312
30
        Ok(q) => Ok(q),
313
    }
314
30
}
315

            
316
/// `POST /{base}/api/v1/device`
317
210
pub async fn post_device(
318
210
    State(state): State<AppState>,
319
210
    Extension(token_info): Extension<GetTokenInfoData>,
320
210
    Json(mut body): Json<request::PostDeviceBody>,
321
210
) -> impl IntoResponse {
322
    const FN_NAME: &'static str = "post_device";
323

            
324
210
    let user_id = token_info.user_id.as_str();
325
210
    let roles = &token_info.roles;
326
210

            
327
210
    body.data.network_addr = body.data.network_addr.trim().to_lowercase();
328
210
    if body.data.unit_id.len() == 0 {
329
6
        return Err(ErrResp::ErrParam(Some(
330
6
            "`unitId` must with at least one character".to_string(),
331
6
        )));
332
204
    } else if body.data.network_id.len() == 0 {
333
6
        return Err(ErrResp::ErrParam(Some(
334
6
            "`networkId` must with at least one character".to_string(),
335
6
        )));
336
198
    } else if body.data.network_addr.len() == 0 {
337
6
        return Err(ErrResp::ErrParam(Some(
338
6
            "`networkAddr` must with at least one character".to_string(),
339
6
        )));
340
192
    }
341
192
    if let Some(profile) = body.data.profile.as_ref() {
342
30
        let profile = profile.to_lowercase();
343
30
        if profile.len() > 0 && !strings::is_code(profile.as_str()) {
344
6
            return Err(ErrResp::ErrParam(Some(
345
6
                "`profile` must be [A-Za-z0-9]{1}[A-Za-z0-9-_]*".to_string(),
346
6
            )));
347
24
        }
348
162
    }
349
186
    let unit_id = body.data.unit_id.as_str();
350
186
    let unit = match check_unit(FN_NAME, user_id, roles, unit_id, true, &state).await? {
351
        None => {
352
12
            return Err(ErrResp::Custom(
353
12
                ErrReq::UNIT_NOT_EXIST.0,
354
12
                ErrReq::UNIT_NOT_EXIST.1,
355
12
                None,
356
12
            ));
357
        }
358
174
        Some(unit) => unit,
359
174
    };
360
174
    let network_id = body.data.network_id.as_str();
361
174
    let network = match check_network(FN_NAME, unit_id, network_id, roles, &state).await? {
362
        None => {
363
18
            return Err(ErrResp::Custom(
364
18
                ErrReq::NETWORK_NOT_EXIST.0,
365
18
                ErrReq::NETWORK_NOT_EXIST.1,
366
18
                None,
367
18
            ));
368
        }
369
156
        Some(network) => network,
370
156
    };
371
156
    let network_addr = body.data.network_addr.as_str();
372
156
    if check_addr(FN_NAME, network_id, network_addr, &state)
373
156
        .await?
374
156
        .is_some()
375
    {
376
12
        return Err(ErrResp::Custom(
377
12
            ErrReq::NETWORK_ADDR_EXIST.0,
378
12
            ErrReq::NETWORK_ADDR_EXIST.1,
379
12
            None,
380
12
        ));
381
144
    }
382
144

            
383
144
    let now = Utc::now();
384
144
    let device_id = strings::random_id(&now, ID_RAND_LEN);
385
144
    let device = Device {
386
144
        device_id: device_id.clone(),
387
144
        unit_id: unit.unit_id,
388
144
        unit_code: match network.unit_id.as_ref() {
389
42
            None => None,
390
102
            Some(_) => Some(unit.code),
391
        },
392
144
        network_id: network.network_id,
393
144
        network_code: network.code.clone(),
394
144
        network_addr: body.data.network_addr,
395
144
        created_at: now,
396
144
        modified_at: now,
397
144
        profile: match body.data.profile.as_ref() {
398
120
            None => "".to_string(),
399
24
            Some(profile) => profile.to_lowercase(),
400
        },
401
144
        name: match body.data.name.as_ref() {
402
138
            None => "".to_string(),
403
6
            Some(name) => name.clone(),
404
        },
405
144
        info: match body.data.info.as_ref() {
406
138
            None => Map::new(),
407
6
            Some(info) => info.clone(),
408
        },
409
    };
410
144
    if let Err(e) = state.model.device().add(&device).await {
411
        error!("[{}] add error: {}", FN_NAME, e);
412
        return Err(ErrResp::ErrDb(Some(e.to_string())));
413
144
    }
414
144

            
415
144
    // Clear the not-exist device in cache.
416
144
    if state.cache.is_some() {
417
48
        let msg = SendCtrlMsg::DelDevice {
418
48
            operation: CtrlMsgOp::DEL_DEVICE.to_string(),
419
48
            new: CtrlDelDevice {
420
48
                unit_id: device.unit_id,
421
48
                unit_code: device.unit_code,
422
48
                network_id: device.network_id,
423
48
                network_code: device.network_code,
424
48
                network_addr: device.network_addr.clone(),
425
48
                device_id: device.device_id,
426
48
            },
427
48
        };
428
48
        send_del_ctrl_message(FN_NAME, &msg, &state).await?;
429
96
    }
430

            
431
    // Send message to the device's network server.
432
144
    let mgr_key = match network.unit_code.as_ref() {
433
42
        None => gen_mgr_key("", network.code.as_str()),
434
102
        Some(unit_code) => gen_mgr_key(unit_code.as_str(), network.code.as_str()),
435
    };
436
144
    let msg_op = NetCtrlMsgOp::ADD_DEVICE;
437
144
    let msg = SendNetCtrlMsg::AddDevice {
438
144
        time: time_str(&Utc::now()),
439
144
        operation: msg_op.to_string(),
440
144
        new: NetCtrlAddr {
441
144
            network_addr: device.network_addr,
442
144
        },
443
144
    };
444
144
    let _ = send_net_ctrl_message(FN_NAME, &msg, msg_op, &state, &mgr_key).await;
445

            
446
144
    Ok(Json(response::PostDevice {
447
144
        data: response::PostDeviceData { device_id },
448
144
    }))
449
210
}
450

            
451
/// `POST /{base}/api/v1/device/bulk`
452
126
pub async fn post_device_bulk(
453
126
    State(state): State<AppState>,
454
126
    Extension(token_info): Extension<GetTokenInfoData>,
455
126
    Json(mut body): Json<request::PostDeviceBulkBody>,
456
126
) -> impl IntoResponse {
457
    const FN_NAME: &'static str = "post_device_bulk";
458

            
459
126
    let user_id = token_info.user_id.as_str();
460
126
    let roles = &token_info.roles;
461
126

            
462
126
    if body.data.unit_id.len() == 0 {
463
6
        return Err(ErrResp::ErrParam(Some(
464
6
            "`unitId` must with at least one character".to_string(),
465
6
        )));
466
120
    } else if body.data.network_id.len() == 0 {
467
6
        return Err(ErrResp::ErrParam(Some(
468
6
            "`networkId` must with at least one character".to_string(),
469
6
        )));
470
114
    } else if body.data.network_addrs.len() == 0 {
471
6
        return Err(ErrResp::ErrParam(Some(
472
6
            "`networkAddrs` must with at least one address".to_string(),
473
6
        )));
474
108
    } else if body.data.network_addrs.len() > BULK_MAX {
475
6
        return Err(ErrResp::ErrParam(Some(format!(
476
6
            "`networkAddrs` cannot more than {}",
477
6
            BULK_MAX
478
6
        ))));
479
102
    }
480
102
    if let Some(profile) = body.data.profile.as_ref() {
481
18
        let profile = profile.to_lowercase();
482
18
        if profile.len() > 0 && !strings::is_code(profile.as_str()) {
483
6
            return Err(ErrResp::ErrParam(Some(
484
6
                "`profile` must be [A-Za-z0-9]{1}[A-Za-z0-9-_]*".to_string(),
485
6
            )));
486
12
        }
487
84
    }
488
96
    let mut addrs = vec![];
489
36924
    for addr in body.data.network_addrs.iter() {
490
36924
        let addr = addr.trim().to_lowercase();
491
36924
        if addr.len() == 0 {
492
6
            return Err(ErrResp::ErrParam(Some(
493
6
                "`networkAddrs` must be non-empty address array".to_string(),
494
6
            )));
495
36918
        }
496
36918
        addrs.push(addr);
497
    }
498
90
    body.data.network_addrs = addrs;
499
90
    let unit_id = body.data.unit_id.as_str();
500
90
    let unit = match check_unit(FN_NAME, user_id, roles, unit_id, true, &state).await? {
501
        None => {
502
12
            return Err(ErrResp::Custom(
503
12
                ErrReq::UNIT_NOT_EXIST.0,
504
12
                ErrReq::UNIT_NOT_EXIST.1,
505
12
                None,
506
12
            ));
507
        }
508
78
        Some(unit) => unit,
509
78
    };
510
78
    let network_id = body.data.network_id.as_str();
511
78
    let network = match check_network(FN_NAME, unit_id, network_id, roles, &state).await? {
512
        None => {
513
18
            return Err(ErrResp::Custom(
514
18
                ErrReq::NETWORK_NOT_EXIST.0,
515
18
                ErrReq::NETWORK_NOT_EXIST.1,
516
18
                None,
517
18
            ));
518
        }
519
60
        Some(network) => network,
520
60
    };
521
60

            
522
60
    let mut devices = vec![];
523
36888
    for network_addr in body.data.network_addrs.iter() {
524
36888
        let now = Utc::now();
525
36888
        let device = Device {
526
36888
            device_id: strings::random_id(&now, ID_RAND_LEN),
527
36888
            unit_id: unit.unit_id.clone(),
528
36888
            unit_code: match network.unit_id.as_ref() {
529
18438
                None => None,
530
18450
                Some(_) => Some(unit.code.clone()),
531
            },
532
36888
            network_id: network.network_id.clone(),
533
36888
            network_code: network.code.clone(),
534
36888
            network_addr: network_addr.clone(),
535
36888
            created_at: now,
536
36888
            modified_at: now,
537
36888
            profile: match body.data.profile.as_ref() {
538
24600
                None => "".to_string(),
539
12288
                Some(profile) => profile.to_lowercase(),
540
            },
541
36888
            name: network_addr.clone(),
542
36888
            info: Map::new(),
543
36888
        };
544
36888
        devices.push(device);
545
    }
546
60
    if let Err(e) = state.model.device().add_bulk(&devices).await {
547
        error!("[{}] add error: {}", FN_NAME, e);
548
        return Err(ErrResp::ErrDb(Some(e.to_string())));
549
60
    }
550
60

            
551
60
    if state.cache.is_some() {
552
20
        let msg = SendCtrlMsg::DelDeviceBulk {
553
20
            operation: CtrlMsgOp::DEL_DEVICE_BULK.to_string(),
554
20
            new: CtrlDelDeviceBulk {
555
20
                unit_id: unit.unit_id,
556
20
                unit_code: network.unit_code.clone(),
557
20
                network_id: network.network_id,
558
20
                network_code: network.code.clone(),
559
20
                network_addrs: body.data.network_addrs.clone(),
560
12296
                device_ids: devices.iter().map(|x| x.device_id.clone()).collect(),
561
20
            },
562
20
        };
563
20
        send_del_ctrl_message(FN_NAME, &msg, &state).await?;
564
40
    }
565

            
566
    // Send message to the device's network server.
567
60
    let mgr_key = match network.unit_code.as_ref() {
568
24
        None => gen_mgr_key("", network.code.as_str()),
569
36
        Some(unit_code) => gen_mgr_key(unit_code.as_str(), network.code.as_str()),
570
    };
571
60
    let msg_op = NetCtrlMsgOp::ADD_DEVICE_BULK;
572
60
    let msg = SendNetCtrlMsg::AddDeviceBulk {
573
60
        time: time_str(&Utc::now()),
574
60
        operation: msg_op.to_string(),
575
60
        new: NetCtrlAddrs {
576
60
            network_addrs: body.data.network_addrs,
577
60
        },
578
60
    };
579
60
    let _ = send_net_ctrl_message(FN_NAME, &msg, msg_op, &state, &mgr_key).await;
580

            
581
60
    Ok(StatusCode::NO_CONTENT)
582
126
}
583

            
584
/// `POST /{base}/api/v1/device/bulk-delete`
585
78
pub async fn post_device_bulk_del(
586
78
    State(state): State<AppState>,
587
78
    Extension(token_info): Extension<GetTokenInfoData>,
588
78
    Json(mut body): Json<request::PostDeviceBulkBody>,
589
78
) -> impl IntoResponse {
590
    const FN_NAME: &'static str = "post_device_bulk_del";
591

            
592
78
    let user_id = token_info.user_id.as_str();
593
78
    let roles = &token_info.roles;
594
78

            
595
78
    if body.data.unit_id.len() == 0 {
596
6
        return Err(ErrResp::ErrParam(Some(
597
6
            "`unitId` must with at least one character".to_string(),
598
6
        )));
599
72
    } else if body.data.network_id.len() == 0 {
600
6
        return Err(ErrResp::ErrParam(Some(
601
6
            "`networkId` must with at least one character".to_string(),
602
6
        )));
603
66
    } else if body.data.network_addrs.len() == 0 {
604
6
        return Err(ErrResp::ErrParam(Some(
605
6
            "`networkAddrs` must with at least one address".to_string(),
606
6
        )));
607
60
    } else if body.data.network_addrs.len() > BULK_MAX {
608
6
        return Err(ErrResp::ErrParam(Some(format!(
609
6
            "`networkAddrs` cannot more than {}",
610
6
            BULK_MAX
611
6
        ))));
612
54
    }
613
54
    let mut addrs = vec![];
614
12330
    for addr in body.data.network_addrs.iter() {
615
12330
        let addr = addr.trim().to_lowercase();
616
12330
        if addr.len() == 0 {
617
6
            return Err(ErrResp::ErrParam(Some(
618
6
                "`networkAddrs` must be non-empty address array".to_string(),
619
6
            )));
620
12324
        }
621
12324
        addrs.push(addr);
622
    }
623
48
    body.data.network_addrs = addrs;
624
48
    let unit_id = body.data.unit_id.as_str();
625
48
    if check_unit(FN_NAME, user_id, roles, unit_id, true, &state)
626
48
        .await?
627
48
        .is_none()
628
    {
629
12
        return Err(ErrResp::Custom(
630
12
            ErrReq::UNIT_NOT_EXIST.0,
631
12
            ErrReq::UNIT_NOT_EXIST.1,
632
12
            None,
633
12
        ));
634
36
    }
635
36
    let network_id = body.data.network_id.as_str();
636
36
    let network = match check_network(FN_NAME, unit_id, network_id, roles, &state).await? {
637
        None => {
638
18
            return Err(ErrResp::Custom(
639
18
                ErrReq::NETWORK_NOT_EXIST.0,
640
18
                ErrReq::NETWORK_NOT_EXIST.1,
641
18
                None,
642
18
            ));
643
        }
644
18
        Some(network) => network,
645
18
    };
646
18

            
647
18
    del_device_rsc_bulk(FN_NAME, &body.data, &network, &state).await?;
648

            
649
    // Send message to the device's network server.
650
18
    let mgr_key = match network.unit_code.as_ref() {
651
6
        None => gen_mgr_key("", network.code.as_str()),
652
12
        Some(unit_code) => gen_mgr_key(unit_code.as_str(), network.code.as_str()),
653
    };
654
18
    let msg_op = NetCtrlMsgOp::DEL_DEVICE_BULK;
655
18
    let msg = SendNetCtrlMsg::DelDeviceBulk {
656
18
        time: time_str(&Utc::now()),
657
18
        operation: msg_op.to_string(),
658
18
        new: NetCtrlAddrs {
659
18
            network_addrs: body.data.network_addrs,
660
18
        },
661
18
    };
662
18
    let _ = send_net_ctrl_message(FN_NAME, &msg, msg_op, &state, &mgr_key).await;
663

            
664
18
    Ok(StatusCode::NO_CONTENT)
665
78
}
666

            
667
/// `POST /{base}/api/v1/device/range`
668
156
pub async fn post_device_range(
669
156
    State(state): State<AppState>,
670
156
    Extension(token_info): Extension<GetTokenInfoData>,
671
156
    Json(mut body): Json<request::PostDeviceRangeBody>,
672
156
) -> impl IntoResponse {
673
    const FN_NAME: &'static str = "post_device_range";
674

            
675
156
    let user_id = token_info.user_id.as_str();
676
156
    let roles = &token_info.roles;
677
156

            
678
156
    body.data.start_addr = body.data.start_addr.trim().to_lowercase();
679
156
    body.data.end_addr = body.data.end_addr.trim().to_lowercase();
680
156
    if body.data.unit_id.len() == 0 {
681
6
        return Err(ErrResp::ErrParam(Some(
682
6
            "`unitId` must with at least one character".to_string(),
683
6
        )));
684
150
    } else if body.data.network_id.len() == 0 {
685
6
        return Err(ErrResp::ErrParam(Some(
686
6
            "`networkId` must with at least one character".to_string(),
687
6
        )));
688
144
    } else if body.data.start_addr.len() == 0
689
138
        || body.data.start_addr.len() != body.data.end_addr.len()
690
    {
691
18
        return Err(ErrResp::ErrParam(Some(
692
18
            "`startAddr` and `endAddr` must have at least one character and with the same length"
693
18
                .to_string(),
694
18
        )));
695
126
    }
696
126
    if let Some(profile) = body.data.profile.as_ref() {
697
18
        let profile = profile.to_lowercase();
698
18
        if profile.len() > 0 && !strings::is_code(profile.as_str()) {
699
6
            return Err(ErrResp::ErrParam(Some(
700
6
                "`profile` must be [A-Za-z0-9]{1}[A-Za-z0-9-_]*".to_string(),
701
6
            )));
702
12
        }
703
12
        body.data.profile = Some(profile);
704
108
    }
705
120
    let start_addr = match hex_addr_to_u128(body.data.start_addr.as_str()) {
706
12
        Err(e) => return Err(ErrResp::ErrParam(Some(e.to_string()))),
707
108
        Ok(addr) => addr,
708
    };
709
108
    let mut end_addr = match hex_addr_to_u128(body.data.end_addr.as_str()) {
710
6
        Err(e) => return Err(ErrResp::ErrParam(Some(e.to_string()))),
711
102
        Ok(addr) => addr,
712
102
    };
713
102
    if start_addr > end_addr {
714
6
        return Err(ErrResp::ErrParam(Some(
715
6
            "`startAddr` cannot larger than `endAddr`".to_string(),
716
6
        )));
717
96
    } else if (end_addr - start_addr) as usize >= BULK_MAX {
718
6
        return Err(ErrResp::ErrParam(Some(format!(
719
6
            "numbers between `startAddr` and `endAddr` cannot more than {}",
720
6
            BULK_MAX
721
6
        ))));
722
90
    }
723
90

            
724
90
    let unit_id = body.data.unit_id.as_str();
725
90
    let unit = match check_unit(FN_NAME, user_id, roles, unit_id, true, &state).await? {
726
        None => {
727
12
            return Err(ErrResp::Custom(
728
12
                ErrReq::UNIT_NOT_EXIST.0,
729
12
                ErrReq::UNIT_NOT_EXIST.1,
730
12
                None,
731
12
            ));
732
        }
733
78
        Some(unit) => unit,
734
78
    };
735
78
    let network_id = body.data.network_id.as_str();
736
78
    let network = match check_network(FN_NAME, unit_id, network_id, roles, &state).await? {
737
        None => {
738
18
            return Err(ErrResp::Custom(
739
18
                ErrReq::NETWORK_NOT_EXIST.0,
740
18
                ErrReq::NETWORK_NOT_EXIST.1,
741
18
                None,
742
18
            ));
743
        }
744
60
        Some(network) => network,
745
60
    };
746
60

            
747
60
    let mut devices = vec![];
748
60
    end_addr += 1;
749
60
    let addr_len = body.data.start_addr.len();
750
36888
    for addr in start_addr..end_addr {
751
36888
        let now = Utc::now();
752
36888
        let network_addr = u128_to_addr(addr, addr_len);
753
36888
        let device = Device {
754
36888
            device_id: strings::random_id(&now, ID_RAND_LEN),
755
36888
            unit_id: unit.unit_id.clone(),
756
36888
            unit_code: match network.unit_id.as_ref() {
757
18438
                None => None,
758
18450
                Some(_) => Some(unit.code.clone()),
759
            },
760
36888
            network_id: network.network_id.clone(),
761
36888
            network_code: network.code.clone(),
762
36888
            network_addr: network_addr.clone(),
763
36888
            created_at: now,
764
36888
            modified_at: now,
765
36888
            profile: match body.data.profile.as_ref() {
766
24600
                None => "".to_string(),
767
12288
                Some(profile) => profile.clone(),
768
            },
769
36888
            name: network_addr,
770
36888
            info: Map::new(),
771
36888
        };
772
36888
        devices.push(device);
773
    }
774
60
    if let Err(e) = state.model.device().add_bulk(&devices).await {
775
        error!("[{}] add error: {}", FN_NAME, e);
776
        return Err(ErrResp::ErrDb(Some(e.to_string())));
777
60
    }
778
60

            
779
60
    if state.cache.is_some() {
780
20
        let msg = SendCtrlMsg::DelDeviceBulk {
781
20
            operation: CtrlMsgOp::DEL_DEVICE_BULK.to_string(),
782
20
            new: CtrlDelDeviceBulk {
783
20
                unit_id: unit.unit_id,
784
20
                unit_code: network.unit_code.clone(),
785
20
                network_id: network.network_id,
786
20
                network_code: network.code.clone(),
787
12296
                network_addrs: devices.iter().map(|x| x.network_addr.clone()).collect(),
788
12296
                device_ids: devices.iter().map(|x| x.device_id.clone()).collect(),
789
20
            },
790
20
        };
791
20
        send_del_ctrl_message(FN_NAME, &msg, &state).await?;
792
40
    }
793

            
794
    // Send message to the device's network server.
795
60
    let mgr_key = match network.unit_code.as_ref() {
796
24
        None => gen_mgr_key("", network.code.as_str()),
797
36
        Some(unit_code) => gen_mgr_key(unit_code.as_str(), network.code.as_str()),
798
    };
799
60
    let msg_op = NetCtrlMsgOp::ADD_DEVICE_RANGE;
800
60
    let msg = SendNetCtrlMsg::AddDeviceRange {
801
60
        time: time_str(&Utc::now()),
802
60
        operation: msg_op.to_string(),
803
60
        new: NetCtrlAddrRange {
804
60
            start_addr: body.data.start_addr,
805
60
            end_addr: body.data.end_addr,
806
60
        },
807
60
    };
808
60
    let _ = send_net_ctrl_message(FN_NAME, &msg, msg_op, &state, &mgr_key).await;
809

            
810
60
    Ok(StatusCode::NO_CONTENT)
811
156
}
812

            
813
/// `POST /{base}/api/v1/device/range-delete`
814
108
pub async fn post_device_range_del(
815
108
    State(state): State<AppState>,
816
108
    Extension(token_info): Extension<GetTokenInfoData>,
817
108
    Json(mut body): Json<request::PostDeviceRangeBody>,
818
108
) -> impl IntoResponse {
819
    const FN_NAME: &'static str = "post_device_range_del";
820

            
821
108
    let user_id = token_info.user_id.as_str();
822
108
    let roles = &token_info.roles;
823
108

            
824
108
    body.data.start_addr = body.data.start_addr.trim().to_lowercase();
825
108
    body.data.end_addr = body.data.end_addr.trim().to_lowercase();
826
108
    if body.data.unit_id.len() == 0 {
827
6
        return Err(ErrResp::ErrParam(Some(
828
6
            "`unitId` must with at least one character".to_string(),
829
6
        )));
830
102
    } else if body.data.network_id.len() == 0 {
831
6
        return Err(ErrResp::ErrParam(Some(
832
6
            "`networkId` must with at least one character".to_string(),
833
6
        )));
834
96
    } else if body.data.start_addr.len() == 0
835
90
        || body.data.start_addr.len() != body.data.end_addr.len()
836
    {
837
18
        return Err(ErrResp::ErrParam(Some(
838
18
            "`startAddr` and `endAddr` must have at least one character and with the same length"
839
18
                .to_string(),
840
18
        )));
841
78
    }
842
78
    let start_addr = match hex_addr_to_u128(body.data.start_addr.as_str()) {
843
12
        Err(e) => return Err(ErrResp::ErrParam(Some(e.to_string()))),
844
66
        Ok(addr) => addr,
845
    };
846
66
    let mut end_addr = match hex_addr_to_u128(body.data.end_addr.as_str()) {
847
6
        Err(e) => return Err(ErrResp::ErrParam(Some(e.to_string()))),
848
60
        Ok(addr) => addr,
849
60
    };
850
60
    if start_addr > end_addr {
851
6
        return Err(ErrResp::ErrParam(Some(
852
6
            "`startAddr` cannot larger than `endAddr`".to_string(),
853
6
        )));
854
54
    } else if (end_addr - start_addr) as usize >= BULK_MAX {
855
6
        return Err(ErrResp::ErrParam(Some(format!(
856
6
            "numbers between `startAddr` and `endAddr` cannot more than {}",
857
6
            BULK_MAX
858
6
        ))));
859
48
    }
860
48

            
861
48
    let unit_id = body.data.unit_id.as_str();
862
48
    if check_unit(FN_NAME, user_id, roles, unit_id, true, &state)
863
48
        .await?
864
48
        .is_none()
865
    {
866
12
        return Err(ErrResp::Custom(
867
12
            ErrReq::UNIT_NOT_EXIST.0,
868
12
            ErrReq::UNIT_NOT_EXIST.1,
869
12
            None,
870
12
        ));
871
36
    }
872
36
    let network_id = body.data.network_id.as_str();
873
36
    let network = match check_network(FN_NAME, unit_id, network_id, roles, &state).await? {
874
        None => {
875
18
            return Err(ErrResp::Custom(
876
18
                ErrReq::NETWORK_NOT_EXIST.0,
877
18
                ErrReq::NETWORK_NOT_EXIST.1,
878
18
                None,
879
18
            ));
880
        }
881
18
        Some(network) => network,
882
18
    };
883
18

            
884
18
    let mut network_addrs = vec![];
885
18
    end_addr += 1;
886
18
    let addr_len = body.data.start_addr.len();
887
12294
    for addr in start_addr..end_addr {
888
12294
        network_addrs.push(u128_to_addr(addr, addr_len));
889
12294
    }
890
18
    let rm_cond = request::PostDeviceBulkData {
891
18
        unit_id: body.data.unit_id.clone(),
892
18
        network_id: body.data.network_id.clone(),
893
18
        network_addrs,
894
18
        profile: None,
895
18
    };
896
18

            
897
18
    del_device_rsc_bulk(FN_NAME, &rm_cond, &network, &state).await?;
898

            
899
    // Send message to the device's network server.
900
18
    let mgr_key = match network.unit_code.as_ref() {
901
6
        None => gen_mgr_key("", network.code.as_str()),
902
12
        Some(unit_code) => gen_mgr_key(unit_code.as_str(), network.code.as_str()),
903
    };
904
18
    let msg_op = NetCtrlMsgOp::DEL_DEVICE_RANGE;
905
18
    let msg = SendNetCtrlMsg::DelDeviceRange {
906
18
        time: time_str(&Utc::now()),
907
18
        operation: msg_op.to_string(),
908
18
        new: NetCtrlAddrRange {
909
18
            start_addr: body.data.start_addr,
910
18
            end_addr: body.data.end_addr,
911
18
        },
912
18
    };
913
18
    let _ = send_net_ctrl_message(FN_NAME, &msg, msg_op, &state, &mgr_key).await;
914

            
915
18
    Ok(StatusCode::NO_CONTENT)
916
108
}
917

            
918
/// `GET /{base}/api/v1/device/count`
919
210
pub async fn get_device_count(
920
210
    State(state): State<AppState>,
921
210
    Extension(token_info): Extension<GetTokenInfoData>,
922
210
    Query(mut query): Query<request::GetDeviceCountQuery>,
923
210
) -> impl IntoResponse {
924
    const FN_NAME: &'static str = "get_device_count";
925

            
926
210
    let user_id = token_info.user_id.as_str();
927
210
    let roles = &token_info.roles;
928
210

            
929
210
    if !Role::is_role(roles, Role::ADMIN) && !Role::is_role(roles, Role::MANAGER) {
930
108
        match query.unit.as_ref() {
931
6
            None => return Err(ErrResp::ErrParam(Some("missing `unit`".to_string()))),
932
102
            Some(unit_id) => {
933
102
                if unit_id.len() == 0 {
934
6
                    return Err(ErrResp::ErrParam(Some("missing `unit`".to_string())));
935
96
                }
936
            }
937
        }
938
102
    }
939
198
    let unit_cond = match query.unit.as_ref() {
940
36
        None => None,
941
162
        Some(unit_id) => match unit_id.len() {
942
12
            0 => None,
943
            _ => {
944
150
                match check_unit(FN_NAME, user_id, roles, unit_id.as_str(), false, &state).await? {
945
                    None => {
946
18
                        return Err(ErrResp::Custom(
947
18
                            ErrReq::UNIT_NOT_EXIST.0,
948
18
                            ErrReq::UNIT_NOT_EXIST.1,
949
18
                            None,
950
18
                        ))
951
                    }
952
132
                    Some(_) => Some(unit_id.as_str()),
953
                }
954
            }
955
        },
956
    };
957
180
    if let Some(addr) = query.addr.as_ref() {
958
54
        query.addr = Some(addr.trim().to_lowercase());
959
126
    }
960
180
    if let Some(profile) = query.profile.as_ref() {
961
24
        query.profile = Some((*profile).to_lowercase());
962
156
    }
963
180
    let mut name_contains_cond = None;
964
180
    if let Some(contains) = query.contains.as_ref() {
965
18
        if contains.len() > 0 {
966
18
            name_contains_cond = Some(contains.as_str());
967
18
        }
968
162
    }
969
180
    let cond = ListQueryCond {
970
180
        unit_id: unit_cond,
971
180
        network_code: match query.network.as_ref() {
972
132
            None => None,
973
48
            Some(network) => match network.len() {
974
6
                0 => None,
975
42
                _ => Some(network.as_ref()),
976
            },
977
        },
978
180
        network_addr: match query.addr.as_ref() {
979
126
            None => None,
980
54
            Some(addr) => match addr.len() {
981
6
                0 => None,
982
48
                _ => Some(addr.as_ref()),
983
            },
984
        },
985
180
        profile: match query.profile.as_ref() {
986
156
            None => None,
987
24
            Some(profile) => match profile.len() {
988
6
                0 => None,
989
18
                _ => Some(profile.as_str()),
990
            },
991
        },
992
180
        name_contains: name_contains_cond,
993
180
        ..Default::default()
994
180
    };
995
180
    match state.model.device().count(&cond).await {
996
        Err(e) => {
997
            error!("[{}] count error: {}", FN_NAME, e);
998
            Err(ErrResp::ErrDb(Some(e.to_string())))
999
        }
180
        Ok(count) => Ok(Json(response::GetDeviceCount {
180
            data: response::GetCountData { count },
180
        })),
    }
210
}
/// `GET /{base}/api/v1/device/list`
438
pub async fn get_device_list(
438
    State(state): State<AppState>,
438
    Extension(token_info): Extension<GetTokenInfoData>,
438
    Query(mut query): Query<request::GetDeviceListQuery>,
438
) -> impl IntoResponse {
    const FN_NAME: &'static str = "get_device_list";
438
    let user_id = token_info.user_id.as_str();
438
    let roles = &token_info.roles;
438

            
438
    if !Role::is_role(roles, Role::ADMIN) && !Role::is_role(roles, Role::MANAGER) {
108
        match query.unit.as_ref() {
6
            None => return Err(ErrResp::ErrParam(Some("missing `unit`".to_string()))),
102
            Some(unit_id) => {
102
                if unit_id.len() == 0 {
6
                    return Err(ErrResp::ErrParam(Some("missing `unit`".to_string())));
96
                }
            }
        }
330
    }
426
    let unit_cond = match query.unit.as_ref() {
246
        None => None,
180
        Some(unit_id) => match unit_id.len() {
18
            0 => None,
            _ => {
162
                match check_unit(FN_NAME, user_id, roles, unit_id.as_str(), false, &state).await? {
                    None => {
18
                        return Err(ErrResp::Custom(
18
                            ErrReq::UNIT_NOT_EXIST.0,
18
                            ErrReq::UNIT_NOT_EXIST.1,
18
                            None,
18
                        ))
                    }
144
                    Some(_) => Some(unit_id.as_str()),
                }
            }
        },
    };
408
    if let Some(addr) = query.addr.as_ref() {
60
        query.addr = Some(addr.trim().to_lowercase());
348
    }
408
    if let Some(profile) = query.profile.as_ref() {
24
        query.profile = Some((*profile).to_lowercase());
384
    }
408
    let mut name_contains_cond = None;
408
    if let Some(contains) = query.contains.as_ref() {
102
        if contains.len() > 0 {
102
            name_contains_cond = Some(contains.as_str());
102
        }
306
    }
408
    let cond = ListQueryCond {
408
        unit_id: unit_cond,
408
        network_code: match query.network.as_ref() {
330
            None => None,
78
            Some(network) => match network.len() {
12
                0 => None,
66
                _ => Some(network.as_ref()),
            },
        },
408
        network_addr: match query.addr.as_ref() {
348
            None => None,
60
            Some(addr) => match addr.len() {
12
                0 => None,
48
                _ => Some(addr.as_ref()),
            },
        },
408
        profile: match query.profile.as_ref() {
384
            None => None,
24
            Some(profile) => match profile.len() {
6
                0 => None,
18
                _ => Some(profile.as_str()),
            },
        },
408
        name_contains: name_contains_cond,
408
        ..Default::default()
    };
408
    let sort_cond = get_sort_cond(&query.sort)?;
378
    let opts = ListOptions {
378
        cond: &cond,
378
        offset: query.offset,
378
        limit: match query.limit {
318
            None => Some(LIST_LIMIT_DEFAULT),
60
            Some(limit) => match limit {
30
                0 => None,
30
                _ => Some(limit),
            },
        },
378
        sort: Some(sort_cond.as_slice()),
378
        cursor_max: Some(LIST_CURSOR_MAX),
    };
378
    let (list, cursor) = match state.model.device().list(&opts, None).await {
        Err(e) => {
            error!("[{}] list error: {}", FN_NAME, e);
            return Err(ErrResp::ErrDb(Some(e.to_string())));
        }
378
        Ok((list, cursor)) => match cursor {
6
            None => match query.format {
                Some(request::ListFormat::Array) => {
6
                    return Ok(Json(device_list_transform(&list)).into_response())
                }
                _ => {
294
                    return Ok(Json(response::GetDeviceList {
294
                        data: device_list_transform(&list),
294
                    })
294
                    .into_response())
                }
            },
78
            Some(_) => (list, cursor),
78
        },
78
    };
78

            
78
    let body = Body::from_stream(async_stream::stream! {
78
        let unit_cond = match query.unit.as_ref() {
78
            None => None,
78
            Some(unit_id) => match unit_id.len() {
78
                0 => None,
78
                _ => Some(unit_id.as_str()),
78
            },
78
        };
78
        let mut name_contains_cond = None;
78
        if let Some(contains) = query.contains.as_ref() {
78
            if contains.len() > 0 {
78
                name_contains_cond = Some(contains.as_str());
78
            }
78
        }
78
        let cond = ListQueryCond {
78
            unit_id: unit_cond,
78
            network_code: match query.network.as_ref() {
78
                None => None,
78
                Some(network) => match network.len() {
78
                    0 => None,
78
                    _ => Some(network.as_ref())
78
                },
78
            },
78
            network_addr: match query.addr.as_ref() {
78
                None => None,
78
                Some(addr) => match addr.len() {
78
                    0 => None,
78
                    _ => Some(addr.as_ref())
78
                },
78
            },
78
            name_contains: name_contains_cond,
78
            ..Default::default()
78
        };
78
        let opts = ListOptions {
78
            cond: &cond,
78
            offset: query.offset,
78
            limit: match query.limit {
78
                None => Some(LIST_LIMIT_DEFAULT),
78
                Some(limit) => match limit {
78
                    0 => None,
78
                    _ => Some(limit),
78
                },
78
            },
78
            sort: Some(sort_cond.as_slice()),
78
            cursor_max: Some(LIST_CURSOR_MAX),
78
        };
78

            
78
        let mut list = list;
78
        let mut cursor = cursor;
78
        let mut is_first = true;
78
        loop {
78
            yield device_list_transform_bytes(&list, is_first, cursor.is_none(), query.format.as_ref());
78
            is_first = false;
78
            if cursor.is_none() {
78
                break;
78
            }
78
            let (_list, _cursor) = match state.model.device().list(&opts, cursor).await {
78
                Err(_) => break,
78
                Ok((list, cursor)) => (list, cursor),
78
            };
78
            list = _list;
78
            cursor = _cursor;
78
        }
78
    });
78
    Ok(([(header::CONTENT_TYPE, ContentType::JSON)], body).into_response())
438
}
/// `GET /{base}/api/v1/device/{deviceId}`
90
pub async fn get_device(
90
    State(state): State<AppState>,
90
    Extension(token_info): Extension<GetTokenInfoData>,
90
    Path(param): Path<request::DeviceIdPath>,
90
) -> impl IntoResponse {
    const FN_NAME: &'static str = "get_device";
90
    let user_id = token_info.user_id.as_str();
90
    let roles = &token_info.roles;
90
    let device_id = param.device_id.as_str();
90

            
90
    match check_device(FN_NAME, device_id, user_id, false, roles, &state).await? {
30
        None => Err(ErrResp::ErrNotFound(None)),
60
        Some(device) => Ok(Json(response::GetDevice {
60
            data: device_transform(&device),
60
        })),
    }
90
}
/// `PATCH /{base}/api/v1/device/{deviceId}`
186
pub async fn patch_device(
186
    State(state): State<AppState>,
186
    Extension(token_info): Extension<GetTokenInfoData>,
186
    Path(param): Path<request::DeviceIdPath>,
186
    Json(mut body): Json<request::PatchDeviceBody>,
186
) -> impl IntoResponse {
    const FN_NAME: &'static str = "patch_device";
186
    let user_id = token_info.user_id.as_str();
186
    let roles = &token_info.roles;
186
    let device_id = param.device_id.as_str();
186
    if let Some(network_id) = body.data.network_id.as_ref() {
54
        if network_id.len() == 0 {
6
            return Err(ErrResp::ErrParam(Some(
6
                "`networkId` must with at least one character".to_string(),
6
            )));
48
        }
132
    }
180
    if let Some(network_addr) = body.data.network_addr.as_ref() {
60
        let network_addr = network_addr.trim().to_lowercase();
60
        if network_addr.len() == 0 {
6
            return Err(ErrResp::ErrParam(Some(
6
                "`networkAddr` must with at least one character".to_string(),
6
            )));
54
        }
54
        body.data.network_addr = Some(network_addr);
120
    }
174
    if let Some(profile) = body.data.profile.as_ref() {
84
        let profile = profile.to_lowercase();
84
        if profile.len() > 0 && !strings::is_code(profile.as_str()) {
6
            return Err(ErrResp::ErrParam(Some(
6
                "`profile` must be [A-Za-z0-9]{1}[A-Za-z0-9-_]*".to_string(),
6
            )));
78
        }
78
        body.data.profile = Some(profile);
90
    }
168
    let mut updates = get_updates(&mut body.data).await?;
    // To check if the device is for the user.
150
    let device = match check_device(FN_NAME, device_id, user_id, true, roles, &state).await? {
42
        None => return Err(ErrResp::ErrNotFound(None)),
108
        Some(device) => device,
108
    };
108
    let unit_id = device.unit_id.as_str();
108
    let network = match updates.network.as_ref() {
60
        None => None,
48
        Some((network_id, _)) => {
48
            match check_network(FN_NAME, unit_id, network_id, roles, &state).await? {
                None => {
18
                    return Err(ErrResp::Custom(
18
                        ErrReq::NETWORK_NOT_EXIST.0,
18
                        ErrReq::NETWORK_NOT_EXIST.1,
18
                        None,
18
                    ));
                }
30
                Some(network) => Some(network),
            }
        }
    };
90
    if let Some(network) = network.as_ref() {
30
        updates.network = Some((network.network_id.as_str(), network.code.as_str()));
60
    }
90
    if let Some(network_addr) = updates.network_addr {
42
        let network_id = match updates.network {
18
            None => device.network_id.as_str(),
24
            Some((network_id, _)) => network_id,
        };
42
        if check_addr(FN_NAME, network_id, network_addr, &state)
42
            .await?
42
            .is_some()
        {
6
            return Err(ErrResp::Custom(
6
                ErrReq::NETWORK_ADDR_EXIST.0,
6
                ErrReq::NETWORK_ADDR_EXIST.1,
6
                None,
6
            ));
36
        }
48
    }
84
    let cond = UpdateQueryCond { device_id };
84
    if let Err(e) = state.model.device().update(&cond, &updates).await {
        error!("[{}] update error: {}", FN_NAME, e);
        return Err(ErrResp::ErrDb(Some(e.to_string())));
84
    }
84
    if let Some(profile) = updates.profile {
54
        let cond = device_route::UpdateQueryCond { device_id };
54
        let updates = device_route::Updates {
54
            profile: Some(profile),
54
            modified_at: Some(Utc::now()),
54
            ..Default::default()
54
        };
54
        if let Err(e) = state.model.device_route().update(&cond, &updates).await {
            error!("[{}] update device route error: {}", FN_NAME, e);
            return Err(ErrResp::ErrDb(Some(e.to_string())));
54
        }
30
    }
    // Delete device cache to update profile.
84
    if state.cache.is_some() {
28
        let msg = SendCtrlMsg::DelDevice {
28
            operation: CtrlMsgOp::DEL_DEVICE.to_string(),
28
            new: CtrlDelDevice {
28
                unit_id: device.unit_id.clone(),
28
                unit_code: device.unit_code.clone(),
28
                network_id: device.network_id.clone(),
28
                network_code: device.network_code.clone(),
28
                network_addr: device.network_addr.clone(),
28
                device_id: device.device_id.clone(),
28
            },
28
        };
28
        send_del_ctrl_message(FN_NAME, &msg, &state).await?;
28
        let msg = SendCtrlMsg::DelDevice {
28
            operation: CtrlMsgOp::DEL_DEVICE.to_string(),
28
            new: CtrlDelDevice {
28
                unit_id: device.unit_id,
28
                unit_code: device.unit_code.clone(),
28
                network_id: match updates.network.as_ref() {
20
                    None => device.network_id,
8
                    Some((network_id, _)) => network_id.to_string(),
                },
28
                network_code: match updates.network.as_ref() {
20
                    None => device.network_code.clone(),
8
                    Some((_, network_code)) => network_code.to_string(),
                },
28
                network_addr: match updates.network_addr.as_ref() {
16
                    None => device.network_addr.clone(),
12
                    Some(addr) => addr.to_string(),
                },
28
                device_id: device.device_id,
28
            },
28
        };
28
        send_del_ctrl_message(FN_NAME, &msg, &state).await?;
56
    }
    // Send message to the device's network server if device network or address is changed.
84
    if updates.network.is_some() || updates.network_addr.is_some() {
42
        let now = Utc::now();
42

            
42
        let msg_op = NetCtrlMsgOp::DEL_DEVICE;
42
        let msg = SendNetCtrlMsg::DelDevice {
42
            time: time_str(&now),
42
            operation: msg_op.to_string(),
42
            new: NetCtrlAddr {
42
                network_addr: device.network_addr.clone(),
42
            },
42
        };
42
        let mgr_key = match device.unit_code.as_ref() {
6
            None => gen_mgr_key("", device.network_code.as_str()),
36
            Some(code) => gen_mgr_key(code.as_str(), device.network_code.as_str()),
        };
42
        let _ = send_net_ctrl_message(FN_NAME, &msg, msg_op, &state, &mgr_key).await;
42
        let msg_op = NetCtrlMsgOp::ADD_DEVICE;
42
        let msg = SendNetCtrlMsg::AddDevice {
42
            time: time_str(&now),
42
            operation: msg_op.to_string(),
42
            new: NetCtrlAddr {
42
                network_addr: match updates.network_addr {
6
                    None => device.network_addr,
36
                    Some(addr) => addr.to_string(),
                },
            },
        };
42
        let mgr_key = match network.as_ref() {
18
            None => mgr_key,
24
            Some(network) => match network.unit_code.as_ref() {
12
                None => gen_mgr_key("", network.code.as_str()),
12
                Some(code) => gen_mgr_key(code.as_str(), network.code.as_str()),
            },
        };
42
        let _ = send_net_ctrl_message(FN_NAME, &msg, msg_op, &state, &mgr_key).await;
42
    }
84
    Ok(StatusCode::NO_CONTENT)
186
}
/// `DELETE /{base}/api/v1/device/{deviceId}`
108
pub async fn delete_device(
108
    State(state): State<AppState>,
108
    Extension(token_info): Extension<GetTokenInfoData>,
108
    Path(param): Path<request::DeviceIdPath>,
108
) -> impl IntoResponse {
    const FN_NAME: &'static str = "delete_device";
108
    let user_id = token_info.user_id.as_str();
108
    let roles = &token_info.roles;
108
    let device_id = param.device_id.as_str();
    // To check if the device is for the user.
108
    let device = match check_device(FN_NAME, device_id, user_id, true, roles, &state).await {
        Err(e) => return Err(e), // XXX: not use "?" to solve E0282 error.
108
        Ok(device) => match device {
48
            None => return Ok(StatusCode::NO_CONTENT),
60
            Some(device) => device,
60
        },
60
    };
60

            
60
    del_device_rsc(FN_NAME, device_id, &state).await?;
60
    if state.cache.is_some() {
20
        let msg = SendCtrlMsg::DelDevice {
20
            operation: CtrlMsgOp::DEL_DEVICE.to_string(),
20
            new: CtrlDelDevice {
20
                unit_id: device.unit_id,
20
                unit_code: device.unit_code.clone(),
20
                network_id: device.network_id,
20
                network_code: device.network_code.clone(),
20
                network_addr: device.network_addr.clone(),
20
                device_id: device.device_id,
20
            },
20
        };
20
        send_del_ctrl_message(FN_NAME, &msg, &state).await?;
40
    }
    // Send message to the device's network server.
60
    let mgr_key = match device.unit_code.as_ref() {
24
        None => gen_mgr_key("", device.network_code.as_str()),
36
        Some(unit_code) => gen_mgr_key(unit_code.as_str(), device.network_code.as_str()),
    };
60
    let msg_op = NetCtrlMsgOp::DEL_DEVICE;
60
    let msg = SendNetCtrlMsg::DelDevice {
60
        time: time_str(&Utc::now()),
60
        operation: msg_op.to_string(),
60
        new: NetCtrlAddr {
60
            network_addr: device.network_addr,
60
        },
60
    };
60
    let _ = send_net_ctrl_message(FN_NAME, &msg, msg_op, &state, &mgr_key).await;
60
    Ok(StatusCode::NO_CONTENT)
108
}
408
fn get_sort_cond(sort_args: &Option<String>) -> Result<Vec<SortCond>, ErrResp> {
408
    match sort_args.as_ref() {
306
        None => Ok(vec![
306
            SortCond {
306
                key: SortKey::NetworkCode,
306
                asc: true,
306
            },
306
            SortCond {
306
                key: SortKey::NetworkAddr,
306
                asc: true,
306
            },
306
        ]),
102
        Some(args) => {
102
            let mut args = args.split(",");
102
            let mut sort_cond = vec![];
204
            while let Some(arg) = args.next() {
132
                let mut cond = arg.split(":");
132
                let key = match cond.next() {
                    None => return Err(ErrResp::ErrParam(Some("wrong sort argument".to_string()))),
132
                    Some(field) => match field {
132
                        "network" => SortKey::NetworkCode,
108
                        "addr" => SortKey::NetworkAddr,
72
                        "created" => SortKey::CreatedAt,
42
                        "modified" => SortKey::ModifiedAt,
30
                        "profile" => SortKey::Profile,
30
                        "name" => SortKey::Name,
                        _ => {
12
                            return Err(ErrResp::ErrParam(Some(format!(
12
                                "invalid sort key {}",
12
                                field
12
                            ))))
                        }
                    },
                };
120
                let asc = match cond.next() {
6
                    None => return Err(ErrResp::ErrParam(Some("wrong sort argument".to_string()))),
114
                    Some(asc) => match asc {
114
                        "asc" => true,
54
                        "desc" => false,
                        _ => {
6
                            return Err(ErrResp::ErrParam(Some(format!(
6
                                "invalid sort asc {}",
6
                                asc
6
                            ))))
                        }
                    },
                };
108
                if cond.next().is_some() {
6
                    return Err(ErrResp::ErrParam(Some(
6
                        "invalid sort condition".to_string(),
6
                    )));
102
                }
102
                sort_cond.push(SortCond { key, asc });
            }
72
            Ok(sort_cond)
        }
    }
408
}
168
async fn get_updates<'a>(body: &'a mut request::PatchDeviceData) -> Result<Updates<'a>, ErrResp> {
168
    let mut updates = Updates {
168
        ..Default::default()
168
    };
168
    let mut count = 0;
168
    if let Some(network_id) = body.network_id.as_ref() {
48
        updates.network = Some((network_id.as_str(), ""));
48
        count += 1;
120
    }
168
    if let Some(network_addr) = body.network_addr.as_ref() {
54
        updates.network_addr = Some(network_addr.as_str());
54
        count += 1;
114
    }
168
    if let Some(profile) = body.profile.as_ref() {
78
        updates.profile = Some(profile.as_str());
78
        count += 1;
90
    }
168
    if let Some(name) = body.name.as_ref() {
120
        updates.name = Some(name.as_str());
120
        count += 1;
120
    }
168
    if let Some(info) = body.info.as_ref() {
120
        for (k, _) in info.iter() {
54
            if k.len() == 0 {
6
                return Err(ErrResp::ErrParam(Some(
6
                    "`info` key must not be empty".to_string(),
6
                )));
48
            }
        }
114
        updates.info = Some(info);
114
        count += 1;
48
    }
162
    if count == 0 {
12
        return Err(ErrResp::ErrParam(Some(
12
            "at least one parameter".to_string(),
12
        )));
150
    }
150
    updates.modified_at = Some(Utc::now());
150
    Ok(updates)
168
}
/// To check if the network is exists for the unit. Public network can be matched for admin or
/// manager roles.
///
/// # Errors
///
/// Returns OK if the network is found or not. Otherwise errors will be returned.
450
async fn check_network(
450
    fn_name: &str,
450
    unit_id: &str,
450
    network_id: &str,
450
    roles: &HashMap<String, bool>,
450
    state: &AppState,
450
) -> Result<Option<Network>, ErrResp> {
450
    let cond = NetworkQueryCond {
450
        network_id: Some(network_id),
450
        ..Default::default()
450
    };
450
    let network = match state.model.network().get(&cond).await {
        Err(e) => {
            error!("[{}] get error: {}", fn_name, e);
            return Err(ErrResp::ErrDb(Some(e.to_string())));
        }
450
        Ok(network) => match network {
36
            None => return Ok(None),
414
            Some(network) => network,
414
        },
414
    };
414
    match network.unit_id.as_ref() {
156
        None => match Role::is_role(roles, Role::ADMIN) || Role::is_role(roles, Role::MANAGER) {
36
            false => Ok(None),
120
            true => Ok(Some(network)),
        },
258
        Some(id) => match id.as_str() == unit_id {
36
            false => Ok(None),
222
            true => Ok(Some(network)),
        },
    }
450
}
/// To check if the address is exists for the network.
///
/// # Errors
///
/// Returns OK if the device is found or not. Otherwise errors will be returned.
198
async fn check_addr(
198
    fn_name: &str,
198
    network_id: &str,
198
    network_addr: &str,
198
    state: &AppState,
198
) -> Result<Option<Device>, ErrResp> {
198
    let cond = ListQueryCond {
198
        network_id: Some(network_id),
198
        network_addr: Some(network_addr),
198
        ..Default::default()
198
    };
198
    let opts = ListOptions {
198
        cond: &cond,
198
        offset: None,
198
        limit: None,
198
        sort: None,
198
        cursor_max: None,
198
    };
198
    match state.model.device().list(&opts, None).await {
        Err(e) => {
            error!("[{}] get error: {}", fn_name, e);
            Err(ErrResp::ErrDb(Some(e.to_string())))
        }
198
        Ok((mut list, _)) => Ok(list.pop()),
    }
198
}
300
fn device_list_transform(list: &Vec<Device>) -> Vec<response::GetDeviceData> {
300
    let mut ret = vec![];
1188
    for device in list.iter() {
1188
        ret.push(device_transform(&device));
1188
    }
300
    ret
300
}
192
fn device_list_transform_bytes(
192
    list: &Vec<Device>,
192
    with_start: bool,
192
    with_end: bool,
192
    format: Option<&request::ListFormat>,
192
) -> Result<Bytes, Box<dyn StdError + Send + Sync>> {
192
    let mut build_str = match with_start {
114
        false => "".to_string(),
6
        true => match format {
6
            Some(request::ListFormat::Array) => "[".to_string(),
72
            _ => "{\"data\":[".to_string(),
        },
    };
192
    let mut is_first = with_start;
12294
    for item in list {
12102
        if is_first {
78
            is_first = false;
12024
        } else {
12024
            build_str.push(',');
12024
        }
12102
        let json_str = match serde_json::to_string(&device_transform(item)) {
            Err(e) => return Err(Box::new(e)),
12102
            Ok(str) => str,
12102
        };
12102
        build_str += json_str.as_str();
    }
192
    if with_end {
78
        build_str += match format {
6
            Some(request::ListFormat::Array) => "]",
72
            _ => "]}",
        }
114
    }
192
    Ok(Bytes::copy_from_slice(build_str.as_str().as_bytes()))
192
}
13350
fn device_transform(device: &Device) -> response::GetDeviceData {
13350
    response::GetDeviceData {
13350
        device_id: device.device_id.clone(),
13350
        unit_id: device.unit_id.clone(),
13350
        unit_code: device.unit_code.clone(),
13350
        network_id: device.network_id.clone(),
13350
        network_code: device.network_code.clone(),
13350
        network_addr: device.network_addr.clone(),
13350
        created_at: time_str(&device.created_at),
13350
        modified_at: time_str(&device.modified_at),
13350
        profile: device.profile.clone(),
13350
        name: device.name.clone(),
13350
        info: device.info.clone(),
13350
    }
13350
}
60
async fn del_device_rsc(fn_name: &str, device_id: &str, state: &AppState) -> Result<(), ErrResp> {
60
    let cond = device_route::QueryCond {
60
        device_id: Some(device_id),
60
        ..Default::default()
60
    };
60
    if let Err(e) = state.model.device_route().del(&cond).await {
        error!("[{}] del device_route error: {}", fn_name, e);
        return Err(ErrResp::ErrDb(Some(e.to_string())));
60
    }
60

            
60
    let cond = dldata_buffer::QueryCond {
60
        device_id: Some(device_id),
60
        ..Default::default()
60
    };
60
    if let Err(e) = state.model.dldata_buffer().del(&cond).await {
        error!("[{}] del dldata_buffer error: {}", fn_name, e);
        return Err(ErrResp::ErrDb(Some(e.to_string())));
60
    }
60

            
60
    let cond = QueryCond {
60
        device_id: Some(device_id),
60
        ..Default::default()
60
    };
60
    if let Err(e) = state.model.device().del(&cond).await {
        error!("[{}] del device error: {}", fn_name, e);
        return Err(ErrResp::ErrDb(Some(e.to_string())));
60
    }
60

            
60
    Ok(())
60
}
36
async fn del_device_rsc_bulk(
36
    fn_name: &str,
36
    rm_cond: &request::PostDeviceBulkData,
36
    network: &Network,
36
    state: &AppState,
36
) -> Result<(), ErrResp> {
24588
    let addrs: Vec<&str> = rm_cond.network_addrs.iter().map(|x| x.as_str()).collect();
36
    let ctrl_msg = match state.cache.as_ref() {
24
        None => None,
        Some(_) => {
12
            let cond = ListQueryCond {
12
                unit_id: Some(rm_cond.unit_id.as_str()),
12
                network_id: Some(rm_cond.network_id.as_str()),
12
                network_addrs: Some(&addrs),
12
                ..Default::default()
12
            };
12
            let opts = ListOptions {
12
                cond: &cond,
12
                offset: None,
12
                limit: None,
12
                sort: None,
12
                cursor_max: None,
12
            };
12
            let devices = match state.model.device().list(&opts, None).await {
                Err(e) => {
                    error!("[{}] list device for cache error: {}", fn_name, e);
                    return Err(ErrResp::ErrDb(Some(e.to_string())));
                }
12
                Ok((list, _)) => list,
12
            };
12
            Some(SendCtrlMsg::DelDeviceBulk {
12
                operation: CtrlMsgOp::DEL_DEVICE_BULK.to_string(),
12
                new: CtrlDelDeviceBulk {
12
                    unit_id: rm_cond.unit_id.clone(),
12
                    unit_code: match network.unit_code.as_ref() {
4
                        None => None,
8
                        Some(code) => Some(code.clone()),
                    },
12
                    network_id: network.network_id.clone(),
12
                    network_code: network.code.clone(),
12
                    network_addrs: rm_cond.network_addrs.clone(),
8196
                    device_ids: devices.iter().map(|x| x.device_id.clone()).collect(),
12
                },
12
            })
        }
    };
36
    let cond = device_route::QueryCond {
36
        unit_id: Some(rm_cond.unit_id.as_str()),
36
        network_id: Some(rm_cond.network_id.as_str()),
36
        network_addrs: Some(&addrs),
36
        ..Default::default()
36
    };
36
    if let Err(e) = state.model.device_route().del(&cond).await {
        error!("[{}] del device_route error: {}", fn_name, e);
        return Err(ErrResp::ErrDb(Some(e.to_string())));
36
    }
36

            
36
    let cond = dldata_buffer::QueryCond {
36
        unit_id: Some(rm_cond.unit_id.as_str()),
36
        network_id: Some(rm_cond.network_id.as_str()),
36
        network_addrs: Some(&addrs),
36
        ..Default::default()
36
    };
36
    if let Err(e) = state.model.dldata_buffer().del(&cond).await {
        error!("[{}] del dldata_buffer error: {}", fn_name, e);
        return Err(ErrResp::ErrDb(Some(e.to_string())));
36
    }
36

            
36
    let cond = QueryCond {
36
        unit_id: Some(rm_cond.unit_id.as_str()),
36
        network_id: Some(rm_cond.network_id.as_str()),
36
        network_addrs: Some(&addrs),
36
        ..Default::default()
36
    };
36
    if let Err(e) = state.model.device().del(&cond).await {
        error!("[{}] del device error: {}", fn_name, e);
        return Err(ErrResp::ErrDb(Some(e.to_string())));
36
    }
36
    if let Some(msg) = ctrl_msg.as_ref() {
12
        send_del_ctrl_message(fn_name, msg, state).await?;
24
    }
36
    Ok(())
36
}
/// Send delete control message.
176
async fn send_del_ctrl_message(
176
    fn_name: &str,
176
    msg: &SendCtrlMsg,
176
    state: &AppState,
176
) -> Result<(), ErrResp> {
176
    let payload = match serde_json::to_vec(&msg) {
        Err(e) => {
            error!(
                "[{}] marshal JSON for {} error: {}",
                fn_name,
                CtrlMsgOp::DEL_DEVICE,
                e
            );
            return Err(ErrResp::ErrRsc(Some(format!(
                "marshal control message error: {}",
                e
            ))));
        }
176
        Ok(payload) => payload,
176
    };
176
    let ctrl_sender = { state.ctrl_senders.device.lock().unwrap().clone() };
176
    if let Err(e) = ctrl_sender.send_msg(payload).await {
        error!(
            "[{}] send control message for {} error: {}",
            fn_name,
            CtrlMsgOp::DEL_DEVICE,
            e
        );
        return Err(ErrResp::ErrIntMsg(Some(format!(
            "send control message error: {}",
            e
        ))));
176
    }
176

            
176
    Ok(())
176
}
/// Send network control message.
444
async fn send_net_ctrl_message(
444
    fn_name: &str,
444
    msg: &SendNetCtrlMsg,
444
    msg_op: &str,
444
    state: &AppState,
444
    mgr_key: &str,
444
) -> Result<(), ErrResp> {
180
    let mgr = {
444
        match state.network_mgrs.lock().unwrap().get_mut(mgr_key) {
264
            None => return Ok(()),
180
            Some(mgr) => mgr.clone(),
180
        }
180
    };
180
    match serde_json::to_vec(msg) {
        Err(e) => warn!("[{}] marshal {} error: {}", fn_name, msg_op, e),
180
        Ok(payload) => match mgr.send_ctrl(payload).await {
            Err(e) => warn!("[{}] send {} error: {}", fn_name, msg_op, e),
180
            Ok(_) => (),
        },
    }
180
    Ok(())
444
}
/// Clear the device and relative cache.
24
async fn clear_cache(fn_name: &str, queue_name: &str, cache: &Arc<dyn Cache>) {
24
    if let Err(e) = cache.device().clear().await {
        error!(
            "[{}] {} clear device cache error: {}",
            fn_name, queue_name, e
        );
24
    }
24
    if let Err(e) = cache.device_route().clear().await {
        error!(
            "[{}] {} clear device route cache error: {}",
            fn_name, queue_name, e
        );
24
    }
24
}
#[async_trait]
impl QueueEventHandler for CtrlSenderHandler {
    async fn on_error(&self, queue: Arc<dyn GmqQueue>, err: Box<dyn StdError + Send + Sync>) {
        const FN_NAME: &'static str = "CtrlSenderHandler::on_error";
        let queue_name = queue.name();
        // Clear cache to avoid missing update cache content during queue status changing.
        if let Some(cache) = self.cache.as_ref() {
            clear_cache(FN_NAME, queue_name, cache).await;
        }
        error!("[{}] {} error: {}", FN_NAME, queue_name, err);
    }
76
    async fn on_status(&self, queue: Arc<dyn GmqQueue>, status: Status) {
        const FN_NAME: &'static str = "CtrlSenderHandler::on_status";
76
        let queue_name = queue.name();
        // Clear cache to avoid missing update cache content during queue status changing.
76
        if let Some(cache) = self.cache.as_ref() {
12
            clear_cache(FN_NAME, queue_name, cache).await;
64
        }
76
        match status {
30
            Status::Connected => info!("[{}] {} connected", queue_name, FN_NAME),
46
            _ => warn!("[{}] {} status to {:?}", FN_NAME, queue_name, status),
        }
152
    }
}
#[async_trait]
impl MessageHandler for CtrlSenderHandler {
    async fn on_message(&self, _queue: Arc<dyn GmqQueue>, _msg: Box<dyn Message>) {}
}
#[async_trait]
impl QueueEventHandler for CtrlReceiverHandler {
    async fn on_error(&self, queue: Arc<dyn GmqQueue>, err: Box<dyn StdError + Send + Sync>) {
        const FN_NAME: &'static str = "CtrlReceiverHandler::on_error";
        let queue_name = queue.name();
        // Clear cache to avoid missing update cache content during queue status changing.
        if let Some(cache) = self.cache.as_ref() {
            clear_cache(FN_NAME, queue_name, cache).await;
        }
        error!("[{}] {} error: {}", FN_NAME, queue_name, err);
    }
60
    async fn on_status(&self, queue: Arc<dyn GmqQueue>, status: Status) {
        const FN_NAME: &'static str = "CtrlReceiverHandler::on_status";
60
        let queue_name = queue.name();
        // Clear cache to avoid missing update cache content during queue status changing.
60
        if let Some(cache) = self.cache.as_ref() {
12
            clear_cache(FN_NAME, queue_name, cache).await;
48
        }
60
        match status {
30
            Status::Connected => info!("[{}] {} connected", queue_name, FN_NAME),
30
            _ => warn!("[{}] {} status to {:?}", FN_NAME, queue_name, status),
        }
120
    }
}
#[async_trait]
impl MessageHandler for CtrlReceiverHandler {
176
    async fn on_message(&self, queue: Arc<dyn GmqQueue>, msg: Box<dyn Message>) {
        const FN_NAME: &'static str = "CtrlReceiverHandler::on_message";
176
        let queue_name = queue.name();
176
        let ctrl_msg = match serde_json::from_slice::<RecvCtrlMsg>(msg.payload()) {
            Err(e) => {
                let src_str: String = String::from_utf8_lossy(msg.payload()).into();
                warn!(
                    "[{}] {} parse JSON error: {}, src: {}",
                    FN_NAME, queue_name, e, src_str
                );
                if let Err(e) = msg.ack().await {
                    error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
                }
                return;
            }
176
            Ok(msg) => msg,
176
        };
176
        match ctrl_msg {
124
            RecvCtrlMsg::DelDevice { new } => {
124
                if let Some(cache) = self.cache.as_ref() {
124
                    let unit_code = match new.unit_code.as_ref() {
38
                        None => "",
86
                        Some(unit_code) => unit_code.as_str(),
                    };
124
                    let network_code = new.network_code.as_str();
124
                    let network_addr = new.network_addr.as_str();
124
                    let cond = device::DelCacheQueryCond {
124
                        unit_code,
124
                        network_code: Some(network_code),
124
                        network_addr: Some(network_addr),
124
                    };
124
                    if let Err(e) = cache.device().del(&cond).await {
                        error!(
                            "[{}] {} delete device cache {}.{}.{} error: {}",
                            FN_NAME, queue_name, unit_code, network_code, network_addr, e
                        );
                    } else {
124
                        debug!(
                            "[{}] {} delete device cache {}.{}.{}",
                            FN_NAME, queue_name, unit_code, network_code, network_addr
                        );
                    }
124
                    let device_id = new.device_id.as_str();
124
                    if let Err(e) = cache.device_route().del_uldata(device_id).await {
                        error!(
                            "[{}] {} delete device route cache uldata {} error: {}",
                            FN_NAME, queue_name, device_id, e
                        );
                    } else {
124
                        debug!(
                            "[{}] {} delete device route cache uldata {}",
                            FN_NAME, queue_name, device_id
                        );
                    }
124
                    let cond = device_route::DelCacheQueryCond {
124
                        unit_code,
124
                        network_code: Some(network_code),
124
                        network_addr: Some(network_addr),
124
                    };
124
                    if let Err(e) = cache.device_route().del_dldata(&cond).await {
                        error!(
                            "[{}] {} delete device route cache dldata {}.{}.{} error: {}",
                            FN_NAME, queue_name, unit_code, network_code, network_addr, e
                        );
                    } else {
124
                        debug!(
                            "[{}] {} delete device route cache dldata {}.{}.{}",
                            FN_NAME, queue_name, unit_code, network_code, network_addr
                        );
                    }
124
                    let unit_id = new.unit_id.as_str();
124
                    let cond = device_route::DelCachePubQueryCond {
124
                        unit_id,
124
                        device_id: Some(device_id),
124
                    };
124
                    if let Err(e) = cache.device_route().del_dldata_pub(&cond).await {
                        error!(
                            "[{}] {} delete device route cache dldata_pub {}.{} error: {}",
                            FN_NAME, queue_name, unit_id, device_id, e
                        );
                    } else {
124
                        debug!(
                            "[{}] {} delete device route cache dldata_pub {}.{}",
                            FN_NAME, queue_name, unit_id, device_id
                        );
                    }
                }
            }
52
            RecvCtrlMsg::DelDeviceBulk { new } => {
52
                if let Some(cache) = self.cache.as_ref() {
32788
                    for device_id in new.device_ids.iter() {
32788
                        let device_id = device_id.as_str();
32788
                        if let Err(e) = cache.device_route().del_uldata(device_id).await {
                            error!(
                                "[{}] {} delete bulk device route cache uldata {} error: {}",
                                FN_NAME, queue_name, device_id, e
                            );
                        } else {
32788
                            debug!(
                                "[{}] {} delete bulk device route cache uldata {}",
                                FN_NAME, queue_name, device_id
                            );
                        }
32788
                        let unit_id = new.unit_id.as_str();
32788
                        let cond = device_route::DelCachePubQueryCond {
32788
                            unit_id,
32788
                            device_id: Some(device_id),
32788
                        };
32788
                        if let Err(e) = cache.device_route().del_dldata_pub(&cond).await {
                            error!(
                                "[{}] {} delete bulk device route cache dldata_pub {}.{} error: {}",
                                FN_NAME, queue_name, unit_id, device_id, e
                            );
                        } else {
32788
                            debug!(
                                "[{}] {} delete bulk device route cache dldata_pub {}.{}",
                                FN_NAME, queue_name, unit_id, device_id
                            );
                        }
                    }
52
                    let unit_code = match new.unit_code.as_ref() {
20
                        None => "",
32
                        Some(unit_code) => unit_code.as_str(),
                    };
52
                    let network_code = new.network_code.as_str();
32788
                    for network_addr in new.network_addrs.iter() {
32788
                        let network_addr = network_addr.as_str();
32788
                        let cond = device::DelCacheQueryCond {
32788
                            unit_code,
32788
                            network_code: Some(network_code),
32788
                            network_addr: Some(network_addr),
32788
                        };
32788
                        if let Err(e) = cache.device().del(&cond).await {
                            error!(
                                "[{}] {} delete bulk device cache {}.{}.{} error: {}",
                                FN_NAME, queue_name, unit_code, network_code, network_addr, e
                            );
                        } else {
32788
                            debug!(
                                "[{}] {} delete bulk device cache {}.{}.{}",
                                FN_NAME, queue_name, unit_code, network_code, network_addr
                            );
                        }
32788
                        let cond = device_route::DelCacheQueryCond {
32788
                            unit_code,
32788
                            network_code: Some(network_code),
32788
                            network_addr: Some(network_addr),
32788
                        };
32788
                        if let Err(e) = cache.device_route().del_dldata(&cond).await {
                            error!(
                                "[{}] {} delete device route cache dldata {}.{}.{} error: {}",
                                FN_NAME, queue_name, unit_code, network_code, network_addr, e
                            );
                        } else {
32788
                            debug!(
                                "[{}] {} delete device route cache dldata {}.{}.{}",
                                FN_NAME, queue_name, unit_code, network_code, network_addr
                            );
                        }
                    }
                }
            }
        }
176
        if let Err(e) = msg.ack().await {
            error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
176
        }
352
    }
}