1
use std::{
2
    cmp::Ordering,
3
    collections::HashMap,
4
    error::Error as StdError,
5
    io::{Error as IoError, ErrorKind},
6
    sync::{Arc, Mutex},
7
    time::Duration,
8
};
9

            
10
use async_trait::async_trait;
11
use axum::{
12
    body::{Body, Bytes},
13
    extract::State,
14
    http::{header, StatusCode},
15
    response::IntoResponse,
16
    Extension,
17
};
18
use chrono::Utc;
19
use log::{debug, error, info, warn};
20
use serde::{Deserialize, Serialize};
21
use serde_json;
22
use tokio::time;
23
use url::Url;
24

            
25
use general_mq::{
26
    queue::{EventHandler as QueueEventHandler, GmqQueue, Message, MessageHandler, Status},
27
    Queue,
28
};
29
use sylvia_iot_corelib::{
30
    constants::ContentType,
31
    err::ErrResp,
32
    http::{Json, Path, Query},
33
    role::Role,
34
    strings::{self, hex_addr_to_u128, time_str, u128_to_addr},
35
};
36

            
37
use super::{
38
    super::{
39
        super::{middleware::GetTokenInfoData, ErrReq, State as AppState},
40
        lib::{check_application, check_device, check_network, check_unit},
41
    },
42
    request, response,
43
};
44
use crate::{
45
    libs::{
46
        config::BrokerCtrl as CfgCtrl,
47
        mq::{self, Connection},
48
    },
49
    models::{
50
        device::{ListOptions as DeviceListOptions, ListQueryCond as DeviceListQueryCond},
51
        device_route::{DeviceRoute, ListOptions, ListQueryCond, QueryCond, SortCond, SortKey},
52
        Cache,
53
    },
54
};
55

            
56
#[derive(Deserialize, Serialize)]
57
#[serde(tag = "operation")]
58
enum RecvCtrlMsg {
59
    #[serde(rename = "del-device-route")]
60
    DelDeviceRoute { new: CtrlDelDeviceRoute },
61
    #[serde(rename = "del-device-route-bulk")]
62
    DelDeviceRouteBulk { new: CtrlDelDeviceRouteBulk },
63
}
64

            
65
#[derive(Serialize)]
66
#[serde(untagged)]
67
enum SendCtrlMsg {
68
    DelDeviceRoute {
69
        operation: String,
70
        new: CtrlDelDeviceRoute,
71
    },
72
    DelDeviceRouteBulk {
73
        operation: String,
74
        new: CtrlDelDeviceRouteBulk,
75
    },
76
}
77

            
78
struct CtrlMsgOp;
79

            
80
#[derive(Deserialize, Serialize)]
81
struct CtrlDelDeviceRoute {
82
    #[serde(rename = "deviceId")]
83
    device_id: String,
84
}
85

            
86
#[derive(Deserialize, Serialize)]
87
struct CtrlDelDeviceRouteBulk {
88
    #[serde(rename = "deviceIds")]
89
    device_ids: Vec<String>,
90
}
91

            
92
struct CtrlSenderHandler {
93
    cache: Option<Arc<dyn Cache>>,
94
}
95

            
96
struct CtrlReceiverHandler {
97
    cache: Option<Arc<dyn Cache>>,
98
}
99

            
100
impl CtrlMsgOp {
101
    const DEL_DEVICE_ROUTE: &'static str = "del-device-route";
102
    const DEL_DEVICE_ROUTE_BULK: &'static str = "del-device-route-bulk";
103
}
104

            
105
const BULK_MAX: usize = 1024;
106
const LIST_LIMIT_DEFAULT: u64 = 100;
107
const LIST_CURSOR_MAX: u64 = 100;
108
const ID_RAND_LEN: usize = 12;
109
const CTRL_QUEUE_NAME: &'static str = "device-route";
110

            
111
/// Initialize channels.
112
15
pub async fn init(state: &AppState, ctrl_conf: &CfgCtrl) -> Result<(), Box<dyn StdError>> {
113
    const FN_NAME: &'static str = "init";
114

            
115
15
    let q = new_ctrl_receiver(state, ctrl_conf)?;
116
15
    {
117
15
        state
118
15
            .ctrl_receivers
119
15
            .lock()
120
15
            .unwrap()
121
15
            .insert(CTRL_QUEUE_NAME.to_string(), q.clone());
122
15
    }
123
15

            
124
15
    let ctrl_sender = { state.ctrl_senders.device_route.lock().unwrap().clone() };
125
    // Wait for connected.
126
1383
    for _ in 0..500 {
127
1383
        if ctrl_sender.status() == Status::Connected && q.status() == Status::Connected {
128
15
            break;
129
1368
        }
130
1368
        time::sleep(Duration::from_millis(10)).await;
131
    }
132
15
    if ctrl_sender.status() != Status::Connected {
133
        error!(
134
            "[{}] {} control sender not connected",
135
            FN_NAME, CTRL_QUEUE_NAME
136
        );
137
        return Err(Box::new(IoError::new(
138
            ErrorKind::NotConnected,
139
            format!("control sender {} not connected", CTRL_QUEUE_NAME),
140
        )));
141
15
    }
142
15
    if q.status() != Status::Connected {
143
        error!(
144
            "[{}] {} control receiver not connected",
145
            FN_NAME, CTRL_QUEUE_NAME
146
        );
147
        return Err(Box::new(IoError::new(
148
            ErrorKind::NotConnected,
149
            format!("control receiver {} not connected", CTRL_QUEUE_NAME),
150
        )));
151
15
    }
152
15

            
153
15
    Ok(())
154
15
}
155

            
156
/// Create control channel sender queue.
157
15
pub fn new_ctrl_sender(
158
15
    conn_pool: &Arc<Mutex<HashMap<String, Connection>>>,
159
15
    config: &CfgCtrl,
160
15
    cache: Option<Arc<dyn Cache>>,
161
15
) -> Result<Arc<Mutex<Queue>>, Box<dyn StdError>> {
162
15
    let url = match config.url.as_ref() {
163
        None => {
164
            return Err(Box::new(IoError::new(
165
                ErrorKind::InvalidInput,
166
                "empty control url",
167
            )))
168
        }
169
15
        Some(url) => match Url::parse(url.as_str()) {
170
            Err(e) => return Err(Box::new(e)),
171
15
            Ok(url) => url,
172
15
        },
173
15
    };
174
15

            
175
15
    match mq::control::new(
176
15
        conn_pool.clone(),
177
15
        &url,
178
15
        config.prefetch,
179
15
        CTRL_QUEUE_NAME,
180
15
        false,
181
15
        Arc::new(CtrlSenderHandler {
182
15
            cache: cache.clone(),
183
15
        }),
184
15
        Arc::new(CtrlSenderHandler { cache }),
185
15
    ) {
186
        Err(e) => Err(Box::new(IoError::new(ErrorKind::InvalidInput, e))),
187
15
        Ok(q) => Ok(Arc::new(Mutex::new(q))),
188
    }
189
15
}
190

            
191
/// Create control channel receiver queue.
192
15
pub fn new_ctrl_receiver(state: &AppState, config: &CfgCtrl) -> Result<Queue, Box<dyn StdError>> {
193
15
    let url = match config.url.as_ref() {
194
        None => {
195
            return Err(Box::new(IoError::new(
196
                ErrorKind::InvalidInput,
197
                "empty control url",
198
            )))
199
        }
200
15
        Some(url) => match Url::parse(url.as_str()) {
201
            Err(e) => return Err(Box::new(e)),
202
15
            Ok(url) => url,
203
15
        },
204
15
    };
205
15
    let handler = Arc::new(CtrlReceiverHandler {
206
15
        cache: state.cache.clone(),
207
15
    });
208
15
    match mq::control::new(
209
15
        state.mq_conns.clone(),
210
15
        &url,
211
15
        config.prefetch,
212
15
        CTRL_QUEUE_NAME,
213
15
        true,
214
15
        handler.clone(),
215
15
        handler,
216
15
    ) {
217
        Err(e) => Err(Box::new(IoError::new(ErrorKind::InvalidInput, e))),
218
15
        Ok(q) => Ok(q),
219
    }
220
15
}
221

            
222
/// `POST /{base}/api/v1/device-route`
223
81
pub async fn post_device_route(
224
81
    State(state): State<AppState>,
225
81
    Extension(token_info): Extension<GetTokenInfoData>,
226
81
    Json(body): Json<request::PostDeviceRouteBody>,
227
81
) -> impl IntoResponse {
228
    const FN_NAME: &'static str = "post_device_route";
229

            
230
81
    let user_id = token_info.user_id.as_str();
231
81
    let roles = &token_info.roles;
232
81

            
233
81
    if body.data.device_id.len() == 0 {
234
3
        return Err(ErrResp::ErrParam(Some(
235
3
            "`deviceId` must with at least one character".to_string(),
236
3
        )));
237
78
    } else if body.data.application_id.len() == 0 {
238
3
        return Err(ErrResp::ErrParam(Some(
239
3
            "`applicationId` must with at least one character".to_string(),
240
3
        )));
241
75
    }
242
75
    let device_id = body.data.device_id.as_str();
243
75
    let application_id = body.data.application_id.as_str();
244
75
    let device = match check_device(FN_NAME, device_id, user_id, true, roles, &state).await? {
245
        None => {
246
9
            return Err(ErrResp::Custom(
247
9
                ErrReq::DEVICE_NOT_EXIST.0,
248
9
                ErrReq::DEVICE_NOT_EXIST.1,
249
9
                None,
250
9
            ))
251
        }
252
66
        Some(device) => device,
253
    };
254
60
    let application =
255
66
        match check_application(FN_NAME, application_id, user_id, true, roles, &state).await? {
256
            None => {
257
6
                return Err(ErrResp::Custom(
258
6
                    ErrReq::APPLICATION_NOT_EXIST.0,
259
6
                    ErrReq::APPLICATION_NOT_EXIST.1,
260
6
                    None,
261
6
                ))
262
            }
263
60
            Some(application) => application,
264
60
        };
265
60
    if device.unit_id.as_str().cmp(application.unit_id.as_str()) != Ordering::Equal {
266
6
        return Err(ErrResp::Custom(
267
6
            ErrReq::UNIT_NOT_MATCH.0,
268
6
            ErrReq::UNIT_NOT_MATCH.1,
269
6
            None,
270
6
        ));
271
54
    }
272
54
    let cond = ListQueryCond {
273
54
        application_id: Some(application_id),
274
54
        device_id: Some(device_id),
275
54
        ..Default::default()
276
54
    };
277
54
    let opts = ListOptions {
278
54
        cond: &cond,
279
54
        offset: None,
280
54
        limit: Some(1),
281
54
        sort: None,
282
54
        cursor_max: None,
283
54
    };
284
54
    match state.model.device_route().list(&opts, None).await {
285
        Err(e) => {
286
            error!("[{}] get error: {}", FN_NAME, e);
287
            return Err(ErrResp::ErrDb(Some(e.to_string())));
288
        }
289
54
        Ok((list, _)) => match list.len() {
290
48
            0 => (),
291
            _ => {
292
6
                return Err(ErrResp::Custom(
293
6
                    ErrReq::ROUTE_EXIST.0,
294
6
                    ErrReq::ROUTE_EXIST.1,
295
6
                    None,
296
6
                ))
297
            }
298
        },
299
    }
300

            
301
48
    let now = Utc::now();
302
48
    let route_id = strings::random_id(&now, ID_RAND_LEN);
303
48
    let route = DeviceRoute {
304
48
        route_id: route_id.clone(),
305
48
        unit_id: application.unit_id,
306
48
        unit_code: application.unit_code,
307
48
        application_id: application.application_id,
308
48
        application_code: application.code,
309
48
        device_id: device.device_id,
310
48
        network_id: device.network_id,
311
48
        network_code: device.network_code,
312
48
        network_addr: device.network_addr,
313
48
        profile: device.profile,
314
48
        created_at: now,
315
48
        modified_at: now,
316
48
    };
317
48
    if let Err(e) = state.model.device_route().add(&route).await {
318
        error!("[{}] add error: {}", FN_NAME, e);
319
        return Err(ErrResp::ErrDb(Some(e.to_string())));
320
48
    }
321
48

            
322
48
    // Clear the denied route in cache.
323
48
    if state.cache.is_some() {
324
16
        let msg = SendCtrlMsg::DelDeviceRoute {
325
16
            operation: CtrlMsgOp::DEL_DEVICE_ROUTE.to_string(),
326
16
            new: CtrlDelDeviceRoute {
327
16
                device_id: route.device_id,
328
16
            },
329
16
        };
330
16
        send_del_ctrl_message(FN_NAME, &msg, &state).await?;
331
32
    }
332

            
333
48
    Ok(Json(response::PostDeviceRoute {
334
48
        data: response::PostDeviceRouteData { route_id },
335
48
    }))
336
81
}
337

            
338
/// `POST /{base}/api/v1/device-route/bulk`
339
57
pub async fn post_device_route_bulk(
340
57
    State(state): State<AppState>,
341
57
    Extension(token_info): Extension<GetTokenInfoData>,
342
57
    Json(mut body): Json<request::PostDeviceRouteBulkBody>,
343
57
) -> impl IntoResponse {
344
    const FN_NAME: &'static str = "post_device_route_bulk";
345

            
346
57
    let user_id = token_info.user_id.as_str();
347
57
    let roles = &token_info.roles;
348
57

            
349
57
    if body.data.application_id.len() == 0 {
350
3
        return Err(ErrResp::ErrParam(Some(
351
3
            "`applicationId` must with at least one character".to_string(),
352
3
        )));
353
54
    } else if body.data.network_id.len() == 0 {
354
3
        return Err(ErrResp::ErrParam(Some(
355
3
            "`networkId` must with at least one character".to_string(),
356
3
        )));
357
51
    } else if body.data.network_addrs.len() == 0 {
358
3
        return Err(ErrResp::ErrParam(Some(
359
3
            "`networkAddrs` must with at least one address".to_string(),
360
3
        )));
361
48
    } else if body.data.network_addrs.len() > BULK_MAX {
362
3
        return Err(ErrResp::ErrParam(Some(format!(
363
3
            "`networkAddrs` cannot more than {}",
364
3
            BULK_MAX
365
3
        ))));
366
45
    }
367
45
    let mut addrs = vec![];
368
24597
    for addr in body.data.network_addrs.iter() {
369
24597
        if addr.len() == 0 {
370
3
            return Err(ErrResp::ErrParam(Some(
371
3
                "`networkAddrs` must be non-empty address array".to_string(),
372
3
            )));
373
24594
        }
374
24594
        addrs.push(addr.to_lowercase());
375
    }
376
42
    body.data.network_addrs = addrs;
377
42
    let application_id = body.data.application_id.as_str();
378
36
    let application =
379
42
        match check_application(FN_NAME, application_id, user_id, true, roles, &state).await? {
380
            None => {
381
6
                return Err(ErrResp::Custom(
382
6
                    ErrReq::APPLICATION_NOT_EXIST.0,
383
6
                    ErrReq::APPLICATION_NOT_EXIST.1,
384
6
                    None,
385
6
                ))
386
            }
387
36
            Some(application) => application,
388
36
        };
389
36
    let network_id = body.data.network_id.as_str();
390
36
    let network = match check_network(FN_NAME, network_id, user_id, true, roles, &state).await? {
391
        None => {
392
6
            return Err(ErrResp::Custom(
393
6
                ErrReq::NETWORK_NOT_EXIST.0,
394
6
                ErrReq::NETWORK_NOT_EXIST.1,
395
6
                None,
396
6
            ));
397
        }
398
30
        Some(network) => network,
399
30
    };
400
30

            
401
24582
    let addrs: Vec<&str> = body.data.network_addrs.iter().map(|x| x.as_str()).collect();
402
30
    let cond = DeviceListQueryCond {
403
30
        unit_id: match network.unit_id.as_ref() {
404
15
            None => None,
405
15
            Some(unit_id) => Some(unit_id.as_str()),
406
        },
407
30
        network_id: Some(network_id),
408
30
        network_addrs: Some(&addrs),
409
30
        ..Default::default()
410
30
    };
411
30
    let opts = DeviceListOptions {
412
30
        cond: &cond,
413
30
        offset: None,
414
30
        limit: None,
415
30
        sort: None,
416
30
        cursor_max: None,
417
30
    };
418
30
    let devices = match state.model.device().list(&opts, None).await {
419
        Err(e) => {
420
            error!("[{}] list device error: {}", FN_NAME, e);
421
            return Err(ErrResp::ErrDb(Some(e.to_string())));
422
        }
423
30
        Ok((list, _)) => list,
424
30
    };
425
30
    if devices.len() == 0 {
426
6
        return Err(ErrResp::Custom(
427
6
            ErrReq::DEVICE_NOT_EXIST.0,
428
6
            ErrReq::DEVICE_NOT_EXIST.1,
429
6
            None,
430
6
        ));
431
24
    }
432
24

            
433
24
    let mut routes = vec![];
434
24576
    for device in devices.iter() {
435
24576
        let now = Utc::now();
436
24576
        let route = DeviceRoute {
437
24576
            route_id: strings::random_id(&now, ID_RAND_LEN),
438
24576
            unit_id: application.unit_id.clone(),
439
24576
            unit_code: application.unit_code.clone(),
440
24576
            application_id: application.application_id.clone(),
441
24576
            application_code: application.code.clone(),
442
24576
            network_id: network.network_id.clone(),
443
24576
            network_code: network.code.clone(),
444
24576
            network_addr: device.network_addr.clone(),
445
24576
            device_id: device.device_id.clone(),
446
24576
            profile: device.profile.clone(),
447
24576
            created_at: now,
448
24576
            modified_at: now,
449
24576
        };
450
24576
        routes.push(route);
451
24576
    }
452
24
    if let Err(e) = state.model.device_route().add_bulk(&routes).await {
453
        error!("[{}] add error: {}", FN_NAME, e);
454
        return Err(ErrResp::ErrDb(Some(e.to_string())));
455
24
    }
456
24

            
457
24
    if state.cache.is_some() {
458
8
        let msg = SendCtrlMsg::DelDeviceRouteBulk {
459
8
            operation: CtrlMsgOp::DEL_DEVICE_ROUTE_BULK.to_string(),
460
8
            new: CtrlDelDeviceRouteBulk {
461
8192
                device_ids: devices.iter().map(|x| x.device_id.clone()).collect(),
462
8
            },
463
8
        };
464
8
        send_del_ctrl_message(FN_NAME, &msg, &state).await?;
465
16
    }
466

            
467
24
    Ok(StatusCode::NO_CONTENT)
468
57
}
469

            
470
/// `POST /{base}/api/v1/device-route/bulk-delete`
471
33
pub async fn post_device_route_bulk_del(
472
33
    State(state): State<AppState>,
473
33
    Extension(token_info): Extension<GetTokenInfoData>,
474
33
    Json(mut body): Json<request::PostDeviceRouteBulkBody>,
475
33
) -> impl IntoResponse {
476
    const FN_NAME: &'static str = "post_device_route_bulk_del";
477

            
478
33
    let user_id = token_info.user_id.as_str();
479
33
    let roles = &token_info.roles;
480
33

            
481
33
    if body.data.application_id.len() == 0 {
482
3
        return Err(ErrResp::ErrParam(Some(
483
3
            "`applicationId` must with at least one character".to_string(),
484
3
        )));
485
30
    } else if body.data.network_id.len() == 0 {
486
3
        return Err(ErrResp::ErrParam(Some(
487
3
            "`networkId` must with at least one character".to_string(),
488
3
        )));
489
27
    } else if body.data.network_addrs.len() == 0 {
490
3
        return Err(ErrResp::ErrParam(Some(
491
3
            "`networkAddrs` must with at least one address".to_string(),
492
3
        )));
493
24
    } else if body.data.network_addrs.len() > BULK_MAX {
494
3
        return Err(ErrResp::ErrParam(Some(format!(
495
3
            "`networkAddrs` cannot more than {}",
496
3
            BULK_MAX
497
3
        ))));
498
21
    }
499
21
    let mut addrs = vec![];
500
6159
    for addr in body.data.network_addrs.iter() {
501
6159
        if addr.len() == 0 {
502
3
            return Err(ErrResp::ErrParam(Some(
503
3
                "`networkAddrs` must be non-empty address array".to_string(),
504
3
            )));
505
6156
        }
506
6156
        addrs.push(addr.to_lowercase());
507
    }
508
18
    body.data.network_addrs = addrs;
509
18
    let application_id = body.data.application_id.as_str();
510
18
    if check_application(FN_NAME, application_id, user_id, true, roles, &state)
511
18
        .await?
512
18
        .is_none()
513
    {
514
6
        return Err(ErrResp::Custom(
515
6
            ErrReq::APPLICATION_NOT_EXIST.0,
516
6
            ErrReq::APPLICATION_NOT_EXIST.1,
517
6
            None,
518
6
        ));
519
12
    }
520
12
    let network_id = body.data.network_id.as_str();
521
12
    let network = match check_network(FN_NAME, network_id, user_id, true, roles, &state).await? {
522
        None => {
523
6
            return Err(ErrResp::Custom(
524
6
                ErrReq::NETWORK_NOT_EXIST.0,
525
6
                ErrReq::NETWORK_NOT_EXIST.1,
526
6
                None,
527
6
            ));
528
        }
529
6
        Some(network) => network,
530
6
    };
531
6

            
532
6144
    let addrs: Vec<&str> = body.data.network_addrs.iter().map(|x| x.as_str()).collect();
533
6
    let cond = QueryCond {
534
6
        application_id: Some(body.data.application_id.as_str()),
535
6
        network_id: Some(body.data.network_id.as_str()),
536
6
        network_addrs: Some(&addrs),
537
6
        ..Default::default()
538
6
    };
539
6
    if let Err(e) = state.model.device_route().del(&cond).await {
540
        error!("[{}] del error: {}", FN_NAME, e);
541
        return Err(ErrResp::ErrDb(Some(e.to_string())));
542
6
    }
543
6

            
544
6
    if state.cache.is_some() {
545
2
        let cond = DeviceListQueryCond {
546
2
            unit_id: match network.unit_id.as_ref() {
547
1
                None => None,
548
1
                Some(unit_id) => Some(unit_id.as_str()),
549
            },
550
2
            network_id: Some(network_id),
551
2
            network_addrs: Some(&addrs),
552
2
            ..Default::default()
553
2
        };
554
2
        let opts = DeviceListOptions {
555
2
            cond: &cond,
556
2
            offset: None,
557
2
            limit: None,
558
2
            sort: None,
559
2
            cursor_max: None,
560
2
        };
561
2
        let devices = match state.model.device().list(&opts, None).await {
562
            Err(e) => {
563
                error!("[{}] list device error: {}", FN_NAME, e);
564
                return Err(ErrResp::ErrDb(Some(e.to_string())));
565
            }
566
2
            Ok((list, _)) => list,
567
2
        };
568
2
        let msg = SendCtrlMsg::DelDeviceRouteBulk {
569
2
            operation: CtrlMsgOp::DEL_DEVICE_ROUTE_BULK.to_string(),
570
2
            new: CtrlDelDeviceRouteBulk {
571
2048
                device_ids: devices.iter().map(|x| x.device_id.clone()).collect(),
572
2
            },
573
2
        };
574
2
        send_del_ctrl_message(FN_NAME, &msg, &state).await?;
575
4
    }
576

            
577
6
    Ok(StatusCode::NO_CONTENT)
578
33
}
579

            
580
/// `POST /{base}/api/v1/device-route/range`
581
72
pub async fn post_device_route_range(
582
72
    State(state): State<AppState>,
583
72
    Extension(token_info): Extension<GetTokenInfoData>,
584
72
    Json(body): Json<request::PostDeviceRouteRangeBody>,
585
72
) -> impl IntoResponse {
586
    const FN_NAME: &'static str = "post_device_route_range";
587

            
588
72
    let user_id = token_info.user_id.as_str();
589
72
    let roles = &token_info.roles;
590
72

            
591
72
    if body.data.application_id.len() == 0 {
592
3
        return Err(ErrResp::ErrParam(Some(
593
3
            "`applicationId` must with at least one character".to_string(),
594
3
        )));
595
69
    } else if body.data.network_id.len() == 0 {
596
3
        return Err(ErrResp::ErrParam(Some(
597
3
            "`networkId` must with at least one character".to_string(),
598
3
        )));
599
66
    } else if body.data.start_addr.len() != body.data.end_addr.len() {
600
9
        return Err(ErrResp::ErrParam(Some(
601
9
            "`startAddr` and `endAddr` must have the same length".to_string(),
602
9
        )));
603
57
    }
604
57
    let start_addr = match hex_addr_to_u128(body.data.start_addr.as_str()) {
605
6
        Err(e) => return Err(ErrResp::ErrParam(Some(e.to_string()))),
606
51
        Ok(addr) => addr,
607
    };
608
51
    let mut end_addr = match hex_addr_to_u128(body.data.end_addr.as_str()) {
609
3
        Err(e) => return Err(ErrResp::ErrParam(Some(e.to_string()))),
610
48
        Ok(addr) => addr,
611
48
    };
612
48
    if start_addr > end_addr {
613
3
        return Err(ErrResp::ErrParam(Some(
614
3
            "`startAddr` cannot larger than `endAddr`".to_string(),
615
3
        )));
616
45
    } else if (end_addr - start_addr) as usize >= BULK_MAX {
617
3
        return Err(ErrResp::ErrParam(Some(format!(
618
3
            "numbers between `startAddr` and `endAddr` cannot more than {}",
619
3
            BULK_MAX
620
3
        ))));
621
42
    }
622
42

            
623
42
    let application_id = body.data.application_id.as_str();
624
36
    let application =
625
42
        match check_application(FN_NAME, application_id, user_id, true, roles, &state).await? {
626
            None => {
627
6
                return Err(ErrResp::Custom(
628
6
                    ErrReq::APPLICATION_NOT_EXIST.0,
629
6
                    ErrReq::APPLICATION_NOT_EXIST.1,
630
6
                    None,
631
6
                ))
632
            }
633
36
            Some(application) => application,
634
36
        };
635
36
    let network_id = body.data.network_id.as_str();
636
36
    let network = match check_network(FN_NAME, network_id, user_id, true, roles, &state).await? {
637
        None => {
638
6
            return Err(ErrResp::Custom(
639
6
                ErrReq::NETWORK_NOT_EXIST.0,
640
6
                ErrReq::NETWORK_NOT_EXIST.1,
641
6
                None,
642
6
            ));
643
        }
644
30
        Some(network) => network,
645
30
    };
646
30

            
647
30
    let mut network_addrs = vec![];
648
30
    end_addr += 1;
649
30
    let addr_len = body.data.start_addr.len();
650
18444
    for addr in start_addr..end_addr {
651
18444
        network_addrs.push(u128_to_addr(addr, addr_len));
652
18444
    }
653
18444
    let addrs: Vec<&str> = network_addrs.iter().map(|x| x.as_str()).collect();
654
30
    let cond = DeviceListQueryCond {
655
30
        unit_id: match network.unit_id.as_ref() {
656
15
            None => None,
657
15
            Some(unit_id) => Some(unit_id.as_str()),
658
        },
659
30
        network_id: Some(network_id),
660
30
        network_addrs: Some(&addrs),
661
30
        ..Default::default()
662
30
    };
663
30
    let opts = DeviceListOptions {
664
30
        cond: &cond,
665
30
        offset: None,
666
30
        limit: None,
667
30
        sort: None,
668
30
        cursor_max: None,
669
30
    };
670
30
    let devices = match state.model.device().list(&opts, None).await {
671
        Err(e) => {
672
            error!("[{}] list device error: {}", FN_NAME, e);
673
            return Err(ErrResp::ErrDb(Some(e.to_string())));
674
        }
675
30
        Ok((list, _)) => list,
676
30
    };
677
30
    if devices.len() == 0 {
678
6
        return Err(ErrResp::Custom(
679
6
            ErrReq::DEVICE_NOT_EXIST.0,
680
6
            ErrReq::DEVICE_NOT_EXIST.1,
681
6
            None,
682
6
        ));
683
24
    }
684
24

            
685
24
    let mut routes = vec![];
686
18438
    for device in devices.iter() {
687
18438
        let now = Utc::now();
688
18438
        let route = DeviceRoute {
689
18438
            route_id: strings::random_id(&now, ID_RAND_LEN),
690
18438
            unit_id: application.unit_id.clone(),
691
18438
            unit_code: application.unit_code.clone(),
692
18438
            application_id: application.application_id.clone(),
693
18438
            application_code: application.code.clone(),
694
18438
            network_id: network.network_id.clone(),
695
18438
            network_code: network.code.clone(),
696
18438
            network_addr: device.network_addr.clone(),
697
18438
            device_id: device.device_id.clone(),
698
18438
            profile: device.profile.clone(),
699
18438
            created_at: now,
700
18438
            modified_at: now,
701
18438
        };
702
18438
        routes.push(route);
703
18438
    }
704
24
    if let Err(e) = state.model.device_route().add_bulk(&routes).await {
705
        error!("[{}] add error: {}", FN_NAME, e);
706
        return Err(ErrResp::ErrDb(Some(e.to_string())));
707
24
    }
708
24

            
709
24
    if state.cache.is_some() {
710
8
        let msg = SendCtrlMsg::DelDeviceRouteBulk {
711
8
            operation: CtrlMsgOp::DEL_DEVICE_ROUTE_BULK.to_string(),
712
8
            new: CtrlDelDeviceRouteBulk {
713
6146
                device_ids: devices.iter().map(|x| x.device_id.clone()).collect(),
714
8
            },
715
8
        };
716
8
        send_del_ctrl_message(FN_NAME, &msg, &state).await?;
717
16
    }
718

            
719
24
    Ok(StatusCode::NO_CONTENT)
720
72
}
721

            
722
/// `POST /{base}/api/v1/device-route/range-delete`
723
48
pub async fn post_device_route_range_del(
724
48
    State(state): State<AppState>,
725
48
    Extension(token_info): Extension<GetTokenInfoData>,
726
48
    Json(body): Json<request::PostDeviceRouteRangeBody>,
727
48
) -> impl IntoResponse {
728
    const FN_NAME: &'static str = "post_device_route_range_del";
729

            
730
48
    let user_id = token_info.user_id.as_str();
731
48
    let roles = &token_info.roles;
732
48

            
733
48
    if body.data.application_id.len() == 0 {
734
3
        return Err(ErrResp::ErrParam(Some(
735
3
            "`applicationId` must with at least one character".to_string(),
736
3
        )));
737
45
    } else if body.data.network_id.len() == 0 {
738
3
        return Err(ErrResp::ErrParam(Some(
739
3
            "`networkId` must with at least one character".to_string(),
740
3
        )));
741
42
    } else if body.data.start_addr.len() != body.data.end_addr.len() {
742
9
        return Err(ErrResp::ErrParam(Some(
743
9
            "`startAddr` and `endAddr` must have the same length".to_string(),
744
9
        )));
745
33
    }
746
33
    let start_addr = match hex_addr_to_u128(body.data.start_addr.as_str()) {
747
6
        Err(e) => return Err(ErrResp::ErrParam(Some(e.to_string()))),
748
27
        Ok(addr) => addr,
749
    };
750
27
    let mut end_addr = match hex_addr_to_u128(body.data.end_addr.as_str()) {
751
3
        Err(e) => return Err(ErrResp::ErrParam(Some(e.to_string()))),
752
24
        Ok(addr) => addr,
753
24
    };
754
24
    if start_addr > end_addr {
755
3
        return Err(ErrResp::ErrParam(Some(
756
3
            "`startAddr` cannot larger than `endAddr`".to_string(),
757
3
        )));
758
21
    } else if (end_addr - start_addr) as usize >= BULK_MAX {
759
3
        return Err(ErrResp::ErrParam(Some(format!(
760
3
            "numbers between `startAddr` and `endAddr` cannot more than {}",
761
3
            BULK_MAX
762
3
        ))));
763
18
    }
764
18

            
765
18
    let application_id = body.data.application_id.as_str();
766
18
    if check_application(FN_NAME, application_id, user_id, true, roles, &state)
767
18
        .await?
768
18
        .is_none()
769
    {
770
6
        return Err(ErrResp::Custom(
771
6
            ErrReq::APPLICATION_NOT_EXIST.0,
772
6
            ErrReq::APPLICATION_NOT_EXIST.1,
773
6
            None,
774
6
        ));
775
12
    }
776
12
    let network_id = body.data.network_id.as_str();
777
12
    let network = match check_network(FN_NAME, network_id, user_id, true, roles, &state).await? {
778
        None => {
779
6
            return Err(ErrResp::Custom(
780
6
                ErrReq::NETWORK_NOT_EXIST.0,
781
6
                ErrReq::NETWORK_NOT_EXIST.1,
782
6
                None,
783
6
            ));
784
        }
785
6
        Some(network) => network,
786
6
    };
787
6

            
788
6
    let mut network_addrs = vec![];
789
6
    end_addr += 1;
790
6
    let addr_len = body.data.start_addr.len();
791
6144
    for addr in start_addr..end_addr {
792
6144
        network_addrs.push(u128_to_addr(addr, addr_len));
793
6144
    }
794

            
795
6144
    let addrs: Vec<&str> = network_addrs.iter().map(|x| x.as_str()).collect();
796
6
    let cond = QueryCond {
797
6
        application_id: Some(body.data.application_id.as_str()),
798
6
        network_id: Some(body.data.network_id.as_str()),
799
6
        network_addrs: Some(&addrs),
800
6
        ..Default::default()
801
6
    };
802
6
    if let Err(e) = state.model.device_route().del(&cond).await {
803
        error!("[{}] del error: {}", FN_NAME, e);
804
        return Err(ErrResp::ErrDb(Some(e.to_string())));
805
6
    }
806
6

            
807
6
    if state.cache.is_some() {
808
2
        let cond = DeviceListQueryCond {
809
2
            unit_id: match network.unit_id.as_ref() {
810
1
                None => None,
811
1
                Some(unit_id) => Some(unit_id.as_str()),
812
            },
813
2
            network_id: Some(network_id),
814
2
            network_addrs: Some(&addrs),
815
2
            ..Default::default()
816
2
        };
817
2
        let opts = DeviceListOptions {
818
2
            cond: &cond,
819
2
            offset: None,
820
2
            limit: None,
821
2
            sort: None,
822
2
            cursor_max: None,
823
2
        };
824
2
        let devices = match state.model.device().list(&opts, None).await {
825
            Err(e) => {
826
                error!("[{}] list device error: {}", FN_NAME, e);
827
                return Err(ErrResp::ErrDb(Some(e.to_string())));
828
            }
829
2
            Ok((list, _)) => list,
830
2
        };
831
2
        let msg = SendCtrlMsg::DelDeviceRouteBulk {
832
2
            operation: CtrlMsgOp::DEL_DEVICE_ROUTE_BULK.to_string(),
833
2
            new: CtrlDelDeviceRouteBulk {
834
2048
                device_ids: devices.iter().map(|x| x.device_id.clone()).collect(),
835
2
            },
836
2
        };
837
2
        send_del_ctrl_message(FN_NAME, &msg, &state).await?;
838
4
    }
839

            
840
6
    Ok(StatusCode::NO_CONTENT)
841
48
}
842

            
843
/// `GET /{base}/api/v1/device-route/count`
844
135
pub async fn get_device_route_count(
845
135
    State(state): State<AppState>,
846
135
    Extension(token_info): Extension<GetTokenInfoData>,
847
135
    Query(query): Query<request::GetDeviceRouteCountQuery>,
848
135
) -> impl IntoResponse {
849
    const FN_NAME: &'static str = "get_device_route_count";
850

            
851
135
    let user_id = token_info.user_id.as_str();
852
135
    let roles = &token_info.roles;
853
135

            
854
135
    if !Role::is_role(roles, Role::ADMIN) && !Role::is_role(roles, Role::MANAGER) {
855
69
        match query.unit.as_ref() {
856
3
            None => return Err(ErrResp::ErrParam(Some("missing `unit`".to_string()))),
857
66
            Some(unit_id) => {
858
66
                if unit_id.len() == 0 {
859
3
                    return Err(ErrResp::ErrParam(Some("missing `unit`".to_string())));
860
63
                }
861
            }
862
        }
863
66
    }
864
129
    let unit_cond = match query.unit.as_ref() {
865
27
        None => None,
866
102
        Some(unit_id) => match unit_id.len() {
867
6
            0 => None,
868
            _ => {
869
96
                match check_unit(FN_NAME, user_id, roles, unit_id.as_str(), false, &state).await? {
870
                    None => {
871
9
                        return Err(ErrResp::Custom(
872
9
                            ErrReq::UNIT_NOT_EXIST.0,
873
9
                            ErrReq::UNIT_NOT_EXIST.1,
874
9
                            None,
875
9
                        ))
876
                    }
877
87
                    Some(_) => Some(unit_id.as_str()),
878
                }
879
            }
880
        },
881
    };
882
120
    let cond = ListQueryCond {
883
120
        unit_id: unit_cond,
884
120
        application_id: match query.application.as_ref() {
885
75
            None => None,
886
45
            Some(application) => match application.len() {
887
3
                0 => None,
888
42
                _ => Some(application.as_ref()),
889
            },
890
        },
891
120
        network_id: match query.network.as_ref() {
892
75
            None => None,
893
45
            Some(network_id) => match network_id.len() {
894
3
                0 => None,
895
42
                _ => Some(network_id.as_ref()),
896
            },
897
        },
898
120
        device_id: match query.device.as_ref() {
899
99
            None => None,
900
21
            Some(device_id) => match device_id.len() {
901
3
                0 => None,
902
18
                _ => Some(device_id.as_ref()),
903
            },
904
        },
905
120
        ..Default::default()
906
120
    };
907
120
    match state.model.device_route().count(&cond).await {
908
        Err(e) => {
909
            error!("[{}] count error: {}", FN_NAME, e);
910
            Err(ErrResp::ErrDb(Some(e.to_string())))
911
        }
912
120
        Ok(count) => Ok(Json(response::GetDeviceRouteCount {
913
120
            data: response::GetCountData { count },
914
120
        })),
915
    }
916
135
}
917

            
918
/// `GET /{base}/api/v1/device-route/list`
919
240
pub async fn get_device_route_list(
920
240
    State(state): State<AppState>,
921
240
    Extension(token_info): Extension<GetTokenInfoData>,
922
240
    Query(query): Query<request::GetDeviceRouteListQuery>,
923
240
) -> impl IntoResponse {
924
    const FN_NAME: &'static str = "get_device_route_list";
925

            
926
240
    let user_id = token_info.user_id.as_str();
927
240
    let roles = &token_info.roles;
928
240

            
929
240
    if !Role::is_role(roles, Role::ADMIN) && !Role::is_role(roles, Role::MANAGER) {
930
69
        match query.unit.as_ref() {
931
3
            None => return Err(ErrResp::ErrParam(Some("missing `unit`".to_string()))),
932
66
            Some(unit_id) => {
933
66
                if unit_id.len() == 0 {
934
3
                    return Err(ErrResp::ErrParam(Some("missing `unit`".to_string())));
935
63
                }
936
            }
937
        }
938
171
    }
939
234
    let unit_cond = match query.unit.as_ref() {
940
120
        None => None,
941
114
        Some(unit_id) => match unit_id.len() {
942
9
            0 => None,
943
            _ => {
944
105
                match check_unit(FN_NAME, user_id, roles, unit_id.as_str(), false, &state).await? {
945
                    None => {
946
9
                        return Err(ErrResp::Custom(
947
9
                            ErrReq::UNIT_NOT_EXIST.0,
948
9
                            ErrReq::UNIT_NOT_EXIST.1,
949
9
                            None,
950
9
                        ))
951
                    }
952
96
                    Some(_) => Some(unit_id.as_str()),
953
                }
954
            }
955
        },
956
    };
957
225
    let cond = ListQueryCond {
958
225
        unit_id: unit_cond,
959
225
        application_id: match query.application.as_ref() {
960
168
            None => None,
961
57
            Some(application) => match application.len() {
962
6
                0 => None,
963
51
                _ => Some(application.as_ref()),
964
            },
965
        },
966
225
        network_id: match query.network.as_ref() {
967
168
            None => None,
968
57
            Some(network_id) => match network_id.len() {
969
6
                0 => None,
970
51
                _ => Some(network_id.as_ref()),
971
            },
972
        },
973
225
        device_id: match query.device.as_ref() {
974
201
            None => None,
975
24
            Some(device_id) => match device_id.len() {
976
6
                0 => None,
977
18
                _ => Some(device_id.as_ref()),
978
            },
979
        },
980
225
        ..Default::default()
981
    };
982
225
    let sort_cond = get_sort_cond(&query.sort)?;
983
210
    let opts = ListOptions {
984
210
        cond: &cond,
985
210
        offset: query.offset,
986
210
        limit: match query.limit {
987
186
            None => Some(LIST_LIMIT_DEFAULT),
988
24
            Some(limit) => match limit {
989
9
                0 => None,
990
15
                _ => Some(limit),
991
            },
992
        },
993
210
        sort: Some(sort_cond.as_slice()),
994
210
        cursor_max: Some(LIST_CURSOR_MAX),
995
    };
996

            
997
210
    let (list, cursor) = match state.model.device_route().list(&opts, None).await {
998
        Err(e) => {
999
            error!("[{}] list error: {}", FN_NAME, e);
            return Err(ErrResp::ErrDb(Some(e.to_string())));
        }
210
        Ok((list, cursor)) => match cursor {
3
            None => match query.format {
                Some(request::ListFormat::Array) => {
3
                    return Ok(Json(route_list_transform(&list)).into_response())
                }
                _ => {
168
                    return Ok(Json(response::GetDeviceRouteList {
168
                        data: route_list_transform(&list),
168
                    })
168
                    .into_response())
                }
            },
39
            Some(_) => (list, cursor),
39
        },
39
    };
39

            
39
    let body = Body::from_stream(async_stream::stream! {
39
        let unit_cond = match query.unit.as_ref() {
39
            None => None,
39
            Some(unit_id) => match unit_id.len() {
39
                0 => None,
39
                _ => Some(unit_id.as_str()),
39
            },
39
        };
39
        let cond = ListQueryCond {
39
            unit_id: unit_cond,
39
            application_id: match query.application.as_ref() {
39
                None => None,
39
                Some(application) => match application.len() {
39
                    0 => None,
39
                    _ => Some(application.as_ref())
39
                },
39
            },
39
            network_id: match query.network.as_ref() {
39
                None => None,
39
                Some(network_id) => match network_id.len() {
39
                    0 => None,
39
                    _ => Some(network_id.as_ref())
39
                },
39
            },
39
            device_id: match query.device.as_ref() {
39
                None => None,
39
                Some(device_id) => match device_id.len() {
39
                    0 => None,
39
                    _ => Some(device_id.as_ref())
39
                },
39
            },
39
            ..Default::default()
39
        };
39
        let opts = ListOptions {
39
            cond: &cond,
39
            offset: query.offset,
39
            limit: match query.limit {
39
                None => Some(LIST_LIMIT_DEFAULT),
39
                Some(limit) => match limit {
39
                    0 => None,
39
                    _ => Some(limit),
39
                },
39
            },
39
            sort: Some(sort_cond.as_slice()),
39
            cursor_max: Some(LIST_CURSOR_MAX),
39
        };
39

            
39
        let mut list = list;
39
        let mut cursor = cursor;
39
        let mut is_first = true;
39
        loop {
39
            yield route_list_transform_bytes(&list, is_first, cursor.is_none(), query.format.as_ref());
39
            is_first = false;
39
            if cursor.is_none() {
39
                break;
39
            }
39
            let (_list, _cursor) = match state.model.device_route().list(&opts, cursor).await {
39
                Err(_) => break,
39
                Ok((list, cursor)) => (list, cursor),
39
            };
39
            list = _list;
39
            cursor = _cursor;
39
        }
39
    });
39
    Ok(([(header::CONTENT_TYPE, ContentType::JSON)], body).into_response())
240
}
/// `DELETE /{base}/api/v1/device-route/{routeId}`
12
pub async fn delete_device_route(
12
    State(state): State<AppState>,
12
    Extension(token_info): Extension<GetTokenInfoData>,
12
    Path(param): Path<request::RouteIdPath>,
12
) -> impl IntoResponse {
    const FN_NAME: &'static str = "delete_device_route";
12
    let user_id = token_info.user_id.as_str();
12
    let roles = &token_info.roles;
12
    let route_id = param.route_id.as_str();
    // To check if the device route is for the user.
12
    let route = match check_route(FN_NAME, route_id, user_id, true, roles, &state).await {
        Err(e) => return Err(e), // XXX: not use "?" to solve E0282 error.
12
        Ok(route) => match route {
6
            None => return Ok(StatusCode::NO_CONTENT),
6
            Some(route) => route,
6
        },
6
    };
6

            
6
    let cond = QueryCond {
6
        route_id: Some(route_id),
6
        ..Default::default()
6
    };
6
    if let Err(e) = state.model.device_route().del(&cond).await {
        error!("[{}] del error: {}", FN_NAME, e);
        return Err(ErrResp::ErrDb(Some(e.to_string())));
6
    }
6

            
6
    if state.cache.is_some() {
2
        let msg = SendCtrlMsg::DelDeviceRoute {
2
            operation: CtrlMsgOp::DEL_DEVICE_ROUTE.to_string(),
2
            new: CtrlDelDeviceRoute {
2
                device_id: route.device_id,
2
            },
2
        };
2
        send_del_ctrl_message(FN_NAME, &msg, &state).await?;
4
    }
6
    Ok(StatusCode::NO_CONTENT)
12
}
225
fn get_sort_cond(sort_args: &Option<String>) -> Result<Vec<SortCond>, ErrResp> {
225
    match sort_args.as_ref() {
174
        None => Ok(vec![
174
            SortCond {
174
                key: SortKey::NetworkCode,
174
                asc: true,
174
            },
174
            SortCond {
174
                key: SortKey::NetworkAddr,
174
                asc: true,
174
            },
174
            SortCond {
174
                key: SortKey::CreatedAt,
174
                asc: false,
174
            },
174
        ]),
51
        Some(args) => {
51
            let mut args = args.split(",");
51
            let mut sort_cond = vec![];
126
            while let Some(arg) = args.next() {
90
                let mut cond = arg.split(":");
90
                let key = match cond.next() {
                    None => return Err(ErrResp::ErrParam(Some("wrong sort argument".to_string()))),
90
                    Some(field) => match field {
90
                        "application" => SortKey::ApplicationCode,
84
                        "network" => SortKey::NetworkCode,
69
                        "addr" => SortKey::NetworkAddr,
51
                        "created" => SortKey::CreatedAt,
12
                        "modified" => SortKey::ModifiedAt,
                        _ => {
6
                            return Err(ErrResp::ErrParam(Some(format!(
6
                                "invalid sort key {}",
6
                                field
6
                            ))))
                        }
                    },
                };
84
                let asc = match cond.next() {
3
                    None => return Err(ErrResp::ErrParam(Some("wrong sort argument".to_string()))),
81
                    Some(asc) => match asc {
81
                        "asc" => true,
27
                        "desc" => false,
                        _ => {
3
                            return Err(ErrResp::ErrParam(Some(format!(
3
                                "invalid sort asc {}",
3
                                asc
3
                            ))))
                        }
                    },
                };
78
                if cond.next().is_some() {
3
                    return Err(ErrResp::ErrParam(Some(
3
                        "invalid sort condition".to_string(),
3
                    )));
75
                }
75
                sort_cond.push(SortCond { key, asc });
            }
36
            Ok(sort_cond)
        }
    }
225
}
/// To check if the user ID can access the device route. Choose `only_owner` to check if the user
/// is the unit owner or one of unit members.
///
/// # Errors
///
/// Returns OK if the device route is found or not. Otherwise errors will be returned.
12
async fn check_route(
12
    fn_name: &str,
12
    route_id: &str,
12
    user_id: &str,
12
    only_owner: bool, // to check if this `user_id` is the owner.
12
    roles: &HashMap<String, bool>,
12
    state: &AppState,
12
) -> Result<Option<DeviceRoute>, ErrResp> {
12
    let route = match state.model.device_route().get(route_id).await {
        Err(e) => {
            error!("[{}] get error: {}", fn_name, e);
            return Err(ErrResp::ErrDb(Some(e.to_string())));
        }
12
        Ok(route) => match route {
3
            None => return Ok(None),
9
            Some(route) => route,
9
        },
9
    };
9
    let unit_id = route.unit_id.as_str();
9
    match check_unit(fn_name, user_id, roles, unit_id, only_owner, state).await? {
3
        None => Ok(None),
6
        Some(_) => Ok(Some(route)),
    }
12
}
171
fn route_list_transform(list: &Vec<DeviceRoute>) -> Vec<response::GetDeviceRouteData> {
171
    let mut ret = vec![];
501
    for route in list.iter() {
501
        ret.push(route_transform(&route));
501
    }
171
    ret
171
}
84
fn route_list_transform_bytes(
84
    list: &Vec<DeviceRoute>,
84
    with_start: bool,
84
    with_end: bool,
84
    format: Option<&request::ListFormat>,
84
) -> Result<Bytes, Box<dyn StdError + Send + Sync>> {
84
    let mut build_str = match with_start {
45
        false => "".to_string(),
3
        true => match format {
3
            Some(request::ListFormat::Array) => "[".to_string(),
36
            _ => "{\"data\":[".to_string(),
        },
    };
84
    let mut is_first = with_start;
4917
    for item in list {
4833
        if is_first {
39
            is_first = false;
4794
        } else {
4794
            build_str.push(',');
4794
        }
4833
        let json_str = match serde_json::to_string(&route_transform(item)) {
            Err(e) => return Err(Box::new(e)),
4833
            Ok(str) => str,
4833
        };
4833
        build_str += json_str.as_str();
    }
84
    if with_end {
39
        build_str += match format {
3
            Some(request::ListFormat::Array) => "]",
36
            _ => "]}",
        }
45
    }
84
    Ok(Bytes::copy_from_slice(build_str.as_str().as_bytes()))
84
}
5334
fn route_transform(route: &DeviceRoute) -> response::GetDeviceRouteData {
5334
    response::GetDeviceRouteData {
5334
        route_id: route.route_id.clone(),
5334
        unit_id: route.unit_id.clone(),
5334
        application_id: route.application_id.clone(),
5334
        application_code: route.application_code.clone(),
5334
        device_id: route.device_id.clone(),
5334
        network_id: route.network_id.clone(),
5334
        network_code: route.network_code.clone(),
5334
        network_addr: route.network_addr.clone(),
5334
        profile: route.profile.clone(),
5334
        created_at: time_str(&route.created_at),
5334
        modified_at: time_str(&route.modified_at),
5334
    }
5334
}
/// Send delete control message.
38
async fn send_del_ctrl_message(
38
    fn_name: &str,
38
    msg: &SendCtrlMsg,
38
    state: &AppState,
38
) -> Result<(), ErrResp> {
38
    let payload = match serde_json::to_vec(&msg) {
        Err(e) => {
            error!(
                "[{}] marshal JSON for {} error: {}",
                fn_name,
                CtrlMsgOp::DEL_DEVICE_ROUTE,
                e
            );
            return Err(ErrResp::ErrRsc(Some(format!(
                "marshal control message error: {}",
                e
            ))));
        }
38
        Ok(payload) => payload,
38
    };
38
    let ctrl_sender = { state.ctrl_senders.device_route.lock().unwrap().clone() };
38
    if let Err(e) = ctrl_sender.send_msg(payload).await {
        error!(
            "[{}] send control message for {} error: {}",
            fn_name,
            CtrlMsgOp::DEL_DEVICE_ROUTE,
            e
        );
        return Err(ErrResp::ErrIntMsg(Some(format!(
            "send control message error: {}",
            e
        ))));
38
    }
38

            
38
    Ok(())
38
}
/// Clear the device route cache.
11
async fn clear_cache(fn_name: &str, queue_name: &str, cache: &Arc<dyn Cache>) {
11
    if let Err(e) = cache.device_route().clear().await {
        error!(
            "[{}] {} clear device route cache error: {}",
            fn_name, queue_name, e
        );
11
    }
11
}
#[async_trait]
impl QueueEventHandler for CtrlSenderHandler {
    async fn on_error(&self, queue: Arc<dyn GmqQueue>, err: Box<dyn StdError + Send + Sync>) {
        const FN_NAME: &'static str = "CtrlSenderHandler::on_error";
        let queue_name = queue.name();
        // Clear cache to avoid missing update cache content during queue status changing.
        if let Some(cache) = self.cache.as_ref() {
            clear_cache(FN_NAME, queue_name, cache).await;
        }
        error!("[{}] {} error: {}", FN_NAME, queue_name, err);
    }
34
    async fn on_status(&self, queue: Arc<dyn GmqQueue>, status: Status) {
        const FN_NAME: &'static str = "CtrlSenderHandler::on_status";
34
        let queue_name = queue.name();
        // Clear cache to avoid missing update cache content during queue status changing.
34
        if let Some(cache) = self.cache.as_ref() {
5
            clear_cache(FN_NAME, queue_name, cache).await;
29
        }
34
        match status {
15
            Status::Connected => info!("[{}] {} connected", queue_name, FN_NAME),
19
            _ => warn!("[{}] {} status to {:?}", FN_NAME, queue_name, status),
        }
68
    }
}
#[async_trait]
impl MessageHandler for CtrlSenderHandler {
    async fn on_message(&self, _queue: Arc<dyn GmqQueue>, _msg: Box<dyn Message>) {}
}
#[async_trait]
impl QueueEventHandler for CtrlReceiverHandler {
    async fn on_error(&self, queue: Arc<dyn GmqQueue>, err: Box<dyn StdError + Send + Sync>) {
        const FN_NAME: &'static str = "CtrlReceiverHandler::on_error";
        let queue_name = queue.name();
        // Clear cache to avoid missing update cache content during queue status changing.
        if let Some(cache) = self.cache.as_ref() {
            clear_cache(FN_NAME, queue_name, cache).await;
        }
        error!("[{}] {} error: {}", FN_NAME, queue_name, err);
    }
30
    async fn on_status(&self, queue: Arc<dyn GmqQueue>, status: Status) {
        const FN_NAME: &'static str = "CtrlReceiverHandler::on_status";
30
        let queue_name = queue.name();
        // Clear cache to avoid missing update cache content during queue status changing.
30
        if let Some(cache) = self.cache.as_ref() {
6
            clear_cache(FN_NAME, queue_name, cache).await;
24
        }
30
        match status {
15
            Status::Connected => info!("[{}] {} connected", queue_name, FN_NAME),
15
            _ => warn!("[{}] {} status to {:?}", FN_NAME, queue_name, status),
        }
60
    }
}
#[async_trait]
impl MessageHandler for CtrlReceiverHandler {
38
    async fn on_message(&self, queue: Arc<dyn GmqQueue>, msg: Box<dyn Message>) {
        const FN_NAME: &'static str = "CtrlReceiverHandler::on_message";
38
        let queue_name = queue.name();
38
        let ctrl_msg = match serde_json::from_slice::<RecvCtrlMsg>(msg.payload()) {
            Err(e) => {
                let src_str: String = String::from_utf8_lossy(msg.payload()).into();
                warn!(
                    "[{}] {} parse JSON error: {}, src: {}",
                    FN_NAME, queue_name, e, src_str
                );
                if let Err(e) = msg.ack().await {
                    error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
                }
                return;
            }
38
            Ok(msg) => msg,
38
        };
38
        match ctrl_msg {
18
            RecvCtrlMsg::DelDeviceRoute { new } => {
18
                if let Some(cache) = self.cache.as_ref() {
18
                    if let Err(e) = cache
18
                        .device_route()
18
                        .del_uldata(new.device_id.as_str())
18
                        .await
                    {
                        error!(
                            "[{}] {} delete device route cache {} error: {}",
                            FN_NAME, queue_name, new.device_id, e
                        );
                    } else {
18
                        debug!(
                            "[{}] {} delete device route cache {}",
                            FN_NAME, queue_name, new.device_id
                        );
                    }
                }
            }
20
            RecvCtrlMsg::DelDeviceRouteBulk { new } => {
20
                if let Some(cache) = self.cache.as_ref() {
18434
                    for device_id in new.device_ids.iter() {
18434
                        if let Err(e) = cache.device_route().del_uldata(device_id.as_str()).await {
                            error!(
                                "[{}] {} delete device route cache {} error: {}",
                                FN_NAME, queue_name, device_id, e
                            );
                        } else {
18434
                            debug!(
                                "[{}] {} delete device route cache {}",
                                FN_NAME, queue_name, device_id
                            );
                        }
                    }
                }
            }
        }
38
        if let Err(e) = msg.ack().await {
            error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
38
        }
76
    }
}