1
use std::{
2
    cmp::Ordering,
3
    collections::HashMap,
4
    error::Error as StdError,
5
    io::{Error as IoError, ErrorKind},
6
    sync::{Arc, Mutex},
7
    time::Duration,
8
};
9

            
10
use async_trait::async_trait;
11
use axum::{
12
    body::{Body, Bytes},
13
    extract::State,
14
    http::{header, StatusCode},
15
    response::IntoResponse,
16
    Extension,
17
};
18
use chrono::Utc;
19
use log::{debug, error, info, warn};
20
use serde::{Deserialize, Serialize};
21
use serde_json;
22
use tokio::time;
23
use url::Url;
24

            
25
use general_mq::{
26
    queue::{EventHandler as QueueEventHandler, GmqQueue, Message, MessageHandler, Status},
27
    Queue,
28
};
29
use sylvia_iot_corelib::{
30
    constants::ContentType,
31
    err::ErrResp,
32
    http::{Json, Path, Query},
33
    role::Role,
34
    strings::{self, hex_addr_to_u128, time_str, u128_to_addr},
35
};
36

            
37
use super::{
38
    super::{
39
        super::{middleware::GetTokenInfoData, ErrReq, State as AppState},
40
        lib::{check_application, check_device, check_network, check_unit},
41
    },
42
    request, response,
43
};
44
use crate::{
45
    libs::{
46
        config::BrokerCtrl as CfgCtrl,
47
        mq::{self, Connection},
48
    },
49
    models::{
50
        device::{ListOptions as DeviceListOptions, ListQueryCond as DeviceListQueryCond},
51
        device_route::{DeviceRoute, ListOptions, ListQueryCond, QueryCond, SortCond, SortKey},
52
        Cache,
53
    },
54
};
55

            
56
#[derive(Deserialize, Serialize)]
57
#[serde(tag = "operation")]
58
enum RecvCtrlMsg {
59
    #[serde(rename = "del-device-route")]
60
    DelDeviceRoute { new: CtrlDelDeviceRoute },
61
    #[serde(rename = "del-device-route-bulk")]
62
    DelDeviceRouteBulk { new: CtrlDelDeviceRouteBulk },
63
}
64

            
65
#[derive(Serialize)]
66
#[serde(untagged)]
67
enum SendCtrlMsg {
68
    DelDeviceRoute {
69
        operation: String,
70
        new: CtrlDelDeviceRoute,
71
    },
72
    DelDeviceRouteBulk {
73
        operation: String,
74
        new: CtrlDelDeviceRouteBulk,
75
    },
76
}
77

            
78
struct CtrlMsgOp;
79

            
80
#[derive(Deserialize, Serialize)]
81
struct CtrlDelDeviceRoute {
82
    #[serde(rename = "deviceId")]
83
    device_id: String,
84
}
85

            
86
#[derive(Deserialize, Serialize)]
87
struct CtrlDelDeviceRouteBulk {
88
    #[serde(rename = "deviceIds")]
89
    device_ids: Vec<String>,
90
}
91

            
92
struct CtrlSenderHandler {
93
    cache: Option<Arc<dyn Cache>>,
94
}
95

            
96
struct CtrlReceiverHandler {
97
    cache: Option<Arc<dyn Cache>>,
98
}
99

            
100
impl CtrlMsgOp {
101
    const DEL_DEVICE_ROUTE: &'static str = "del-device-route";
102
    const DEL_DEVICE_ROUTE_BULK: &'static str = "del-device-route-bulk";
103
}
104

            
105
const BULK_MAX: usize = 1024;
106
const LIST_LIMIT_DEFAULT: u64 = 100;
107
const LIST_CURSOR_MAX: u64 = 100;
108
const ID_RAND_LEN: usize = 12;
109
const CTRL_QUEUE_NAME: &'static str = "device-route";
110

            
111
/// Initialize channels.
112
30
pub async fn init(state: &AppState, ctrl_conf: &CfgCtrl) -> Result<(), Box<dyn StdError>> {
113
    const FN_NAME: &'static str = "init";
114

            
115
30
    let q = new_ctrl_receiver(state, ctrl_conf)?;
116
30
    {
117
30
        state
118
30
            .ctrl_receivers
119
30
            .lock()
120
30
            .unwrap()
121
30
            .insert(CTRL_QUEUE_NAME.to_string(), q.clone());
122
30
    }
123
30

            
124
30
    let ctrl_sender = { state.ctrl_senders.device_route.lock().unwrap().clone() };
125
    // Wait for connected.
126
2764
    for _ in 0..500 {
127
2764
        if ctrl_sender.status() == Status::Connected && q.status() == Status::Connected {
128
30
            break;
129
2734
        }
130
2734
        time::sleep(Duration::from_millis(10)).await;
131
    }
132
30
    if ctrl_sender.status() != Status::Connected {
133
        error!(
134
            "[{}] {} control sender not connected",
135
            FN_NAME, CTRL_QUEUE_NAME
136
        );
137
        return Err(Box::new(IoError::new(
138
            ErrorKind::NotConnected,
139
            format!("control sender {} not connected", CTRL_QUEUE_NAME),
140
        )));
141
30
    }
142
30
    if q.status() != Status::Connected {
143
        error!(
144
            "[{}] {} control receiver not connected",
145
            FN_NAME, CTRL_QUEUE_NAME
146
        );
147
        return Err(Box::new(IoError::new(
148
            ErrorKind::NotConnected,
149
            format!("control receiver {} not connected", CTRL_QUEUE_NAME),
150
        )));
151
30
    }
152
30

            
153
30
    Ok(())
154
30
}
155

            
156
/// Create control channel sender queue.
157
30
pub fn new_ctrl_sender(
158
30
    conn_pool: &Arc<Mutex<HashMap<String, Connection>>>,
159
30
    config: &CfgCtrl,
160
30
    cache: Option<Arc<dyn Cache>>,
161
30
) -> Result<Arc<Mutex<Queue>>, Box<dyn StdError>> {
162
30
    let url = match config.url.as_ref() {
163
        None => {
164
            return Err(Box::new(IoError::new(
165
                ErrorKind::InvalidInput,
166
                "empty control url",
167
            )))
168
        }
169
30
        Some(url) => match Url::parse(url.as_str()) {
170
            Err(e) => return Err(Box::new(e)),
171
30
            Ok(url) => url,
172
30
        },
173
30
    };
174
30

            
175
30
    match mq::control::new(
176
30
        conn_pool.clone(),
177
30
        &url,
178
30
        config.prefetch,
179
30
        CTRL_QUEUE_NAME,
180
30
        false,
181
30
        Arc::new(CtrlSenderHandler {
182
30
            cache: cache.clone(),
183
30
        }),
184
30
        Arc::new(CtrlSenderHandler { cache }),
185
30
    ) {
186
        Err(e) => Err(Box::new(IoError::new(ErrorKind::InvalidInput, e))),
187
30
        Ok(q) => Ok(Arc::new(Mutex::new(q))),
188
    }
189
30
}
190

            
191
/// Create control channel receiver queue.
192
30
pub fn new_ctrl_receiver(state: &AppState, config: &CfgCtrl) -> Result<Queue, Box<dyn StdError>> {
193
30
    let url = match config.url.as_ref() {
194
        None => {
195
            return Err(Box::new(IoError::new(
196
                ErrorKind::InvalidInput,
197
                "empty control url",
198
            )))
199
        }
200
30
        Some(url) => match Url::parse(url.as_str()) {
201
            Err(e) => return Err(Box::new(e)),
202
30
            Ok(url) => url,
203
30
        },
204
30
    };
205
30
    let handler = Arc::new(CtrlReceiverHandler {
206
30
        cache: state.cache.clone(),
207
30
    });
208
30
    match mq::control::new(
209
30
        state.mq_conns.clone(),
210
30
        &url,
211
30
        config.prefetch,
212
30
        CTRL_QUEUE_NAME,
213
30
        true,
214
30
        handler.clone(),
215
30
        handler,
216
30
    ) {
217
        Err(e) => Err(Box::new(IoError::new(ErrorKind::InvalidInput, e))),
218
30
        Ok(q) => Ok(q),
219
    }
220
30
}
221

            
222
/// `POST /{base}/api/v1/device-route`
223
162
pub async fn post_device_route(
224
162
    State(state): State<AppState>,
225
162
    Extension(token_info): Extension<GetTokenInfoData>,
226
162
    Json(body): Json<request::PostDeviceRouteBody>,
227
162
) -> impl IntoResponse {
228
    const FN_NAME: &'static str = "post_device_route";
229

            
230
162
    let user_id = token_info.user_id.as_str();
231
162
    let roles = &token_info.roles;
232
162

            
233
162
    if body.data.device_id.len() == 0 {
234
6
        return Err(ErrResp::ErrParam(Some(
235
6
            "`deviceId` must with at least one character".to_string(),
236
6
        )));
237
156
    } else if body.data.application_id.len() == 0 {
238
6
        return Err(ErrResp::ErrParam(Some(
239
6
            "`applicationId` must with at least one character".to_string(),
240
6
        )));
241
150
    }
242
150
    let device_id = body.data.device_id.as_str();
243
150
    let application_id = body.data.application_id.as_str();
244
150
    let device = match check_device(FN_NAME, device_id, user_id, true, roles, &state).await? {
245
        None => {
246
18
            return Err(ErrResp::Custom(
247
18
                ErrReq::DEVICE_NOT_EXIST.0,
248
18
                ErrReq::DEVICE_NOT_EXIST.1,
249
18
                None,
250
18
            ))
251
        }
252
132
        Some(device) => device,
253
    };
254
120
    let application =
255
132
        match check_application(FN_NAME, application_id, user_id, true, roles, &state).await? {
256
            None => {
257
12
                return Err(ErrResp::Custom(
258
12
                    ErrReq::APPLICATION_NOT_EXIST.0,
259
12
                    ErrReq::APPLICATION_NOT_EXIST.1,
260
12
                    None,
261
12
                ))
262
            }
263
120
            Some(application) => application,
264
120
        };
265
120
    if device.unit_id.as_str().cmp(application.unit_id.as_str()) != Ordering::Equal {
266
12
        return Err(ErrResp::Custom(
267
12
            ErrReq::UNIT_NOT_MATCH.0,
268
12
            ErrReq::UNIT_NOT_MATCH.1,
269
12
            None,
270
12
        ));
271
108
    }
272
108
    let cond = ListQueryCond {
273
108
        application_id: Some(application_id),
274
108
        device_id: Some(device_id),
275
108
        ..Default::default()
276
108
    };
277
108
    let opts = ListOptions {
278
108
        cond: &cond,
279
108
        offset: None,
280
108
        limit: Some(1),
281
108
        sort: None,
282
108
        cursor_max: None,
283
108
    };
284
108
    match state.model.device_route().list(&opts, None).await {
285
        Err(e) => {
286
            error!("[{}] get error: {}", FN_NAME, e);
287
            return Err(ErrResp::ErrDb(Some(e.to_string())));
288
        }
289
108
        Ok((list, _)) => match list.len() {
290
96
            0 => (),
291
            _ => {
292
12
                return Err(ErrResp::Custom(
293
12
                    ErrReq::ROUTE_EXIST.0,
294
12
                    ErrReq::ROUTE_EXIST.1,
295
12
                    None,
296
12
                ))
297
            }
298
        },
299
    }
300

            
301
96
    let now = Utc::now();
302
96
    let route_id = strings::random_id(&now, ID_RAND_LEN);
303
96
    let route = DeviceRoute {
304
96
        route_id: route_id.clone(),
305
96
        unit_id: application.unit_id,
306
96
        unit_code: application.unit_code,
307
96
        application_id: application.application_id,
308
96
        application_code: application.code,
309
96
        device_id: device.device_id,
310
96
        network_id: device.network_id,
311
96
        network_code: device.network_code,
312
96
        network_addr: device.network_addr,
313
96
        profile: device.profile,
314
96
        created_at: now,
315
96
        modified_at: now,
316
96
    };
317
96
    if let Err(e) = state.model.device_route().add(&route).await {
318
        error!("[{}] add error: {}", FN_NAME, e);
319
        return Err(ErrResp::ErrDb(Some(e.to_string())));
320
96
    }
321
96

            
322
96
    // Clear the denied route in cache.
323
96
    if state.cache.is_some() {
324
32
        let msg = SendCtrlMsg::DelDeviceRoute {
325
32
            operation: CtrlMsgOp::DEL_DEVICE_ROUTE.to_string(),
326
32
            new: CtrlDelDeviceRoute {
327
32
                device_id: route.device_id,
328
32
            },
329
32
        };
330
32
        send_del_ctrl_message(FN_NAME, &msg, &state).await?;
331
64
    }
332

            
333
96
    Ok(Json(response::PostDeviceRoute {
334
96
        data: response::PostDeviceRouteData { route_id },
335
96
    }))
336
162
}
337

            
338
/// `POST /{base}/api/v1/device-route/bulk`
339
114
pub async fn post_device_route_bulk(
340
114
    State(state): State<AppState>,
341
114
    Extension(token_info): Extension<GetTokenInfoData>,
342
114
    Json(mut body): Json<request::PostDeviceRouteBulkBody>,
343
114
) -> impl IntoResponse {
344
    const FN_NAME: &'static str = "post_device_route_bulk";
345

            
346
114
    let user_id = token_info.user_id.as_str();
347
114
    let roles = &token_info.roles;
348
114

            
349
114
    if body.data.application_id.len() == 0 {
350
6
        return Err(ErrResp::ErrParam(Some(
351
6
            "`applicationId` must with at least one character".to_string(),
352
6
        )));
353
108
    } else if body.data.network_id.len() == 0 {
354
6
        return Err(ErrResp::ErrParam(Some(
355
6
            "`networkId` must with at least one character".to_string(),
356
6
        )));
357
102
    } else if body.data.network_addrs.len() == 0 {
358
6
        return Err(ErrResp::ErrParam(Some(
359
6
            "`networkAddrs` must with at least one address".to_string(),
360
6
        )));
361
96
    } else if body.data.network_addrs.len() > BULK_MAX {
362
6
        return Err(ErrResp::ErrParam(Some(format!(
363
6
            "`networkAddrs` cannot more than {}",
364
6
            BULK_MAX
365
6
        ))));
366
90
    }
367
90
    let mut addrs = vec![];
368
49194
    for addr in body.data.network_addrs.iter() {
369
49194
        if addr.len() == 0 {
370
6
            return Err(ErrResp::ErrParam(Some(
371
6
                "`networkAddrs` must be non-empty address array".to_string(),
372
6
            )));
373
49188
        }
374
49188
        addrs.push(addr.to_lowercase());
375
    }
376
84
    body.data.network_addrs = addrs;
377
84
    let application_id = body.data.application_id.as_str();
378
72
    let application =
379
84
        match check_application(FN_NAME, application_id, user_id, true, roles, &state).await? {
380
            None => {
381
12
                return Err(ErrResp::Custom(
382
12
                    ErrReq::APPLICATION_NOT_EXIST.0,
383
12
                    ErrReq::APPLICATION_NOT_EXIST.1,
384
12
                    None,
385
12
                ))
386
            }
387
72
            Some(application) => application,
388
72
        };
389
72
    let network_id = body.data.network_id.as_str();
390
72
    let network = match check_network(FN_NAME, network_id, user_id, true, roles, &state).await? {
391
        None => {
392
12
            return Err(ErrResp::Custom(
393
12
                ErrReq::NETWORK_NOT_EXIST.0,
394
12
                ErrReq::NETWORK_NOT_EXIST.1,
395
12
                None,
396
12
            ));
397
        }
398
60
        Some(network) => network,
399
60
    };
400
60

            
401
49164
    let addrs: Vec<&str> = body.data.network_addrs.iter().map(|x| x.as_str()).collect();
402
60
    let cond = DeviceListQueryCond {
403
60
        unit_id: match network.unit_id.as_ref() {
404
30
            None => None,
405
30
            Some(unit_id) => Some(unit_id.as_str()),
406
        },
407
60
        network_id: Some(network_id),
408
60
        network_addrs: Some(&addrs),
409
60
        ..Default::default()
410
60
    };
411
60
    let opts = DeviceListOptions {
412
60
        cond: &cond,
413
60
        offset: None,
414
60
        limit: None,
415
60
        sort: None,
416
60
        cursor_max: None,
417
60
    };
418
60
    let devices = match state.model.device().list(&opts, None).await {
419
        Err(e) => {
420
            error!("[{}] list device error: {}", FN_NAME, e);
421
            return Err(ErrResp::ErrDb(Some(e.to_string())));
422
        }
423
60
        Ok((list, _)) => list,
424
60
    };
425
60
    if devices.len() == 0 {
426
12
        return Err(ErrResp::Custom(
427
12
            ErrReq::DEVICE_NOT_EXIST.0,
428
12
            ErrReq::DEVICE_NOT_EXIST.1,
429
12
            None,
430
12
        ));
431
48
    }
432
48

            
433
48
    let mut routes = vec![];
434
49152
    for device in devices.iter() {
435
49152
        let now = Utc::now();
436
49152
        let route = DeviceRoute {
437
49152
            route_id: strings::random_id(&now, ID_RAND_LEN),
438
49152
            unit_id: application.unit_id.clone(),
439
49152
            unit_code: application.unit_code.clone(),
440
49152
            application_id: application.application_id.clone(),
441
49152
            application_code: application.code.clone(),
442
49152
            network_id: network.network_id.clone(),
443
49152
            network_code: network.code.clone(),
444
49152
            network_addr: device.network_addr.clone(),
445
49152
            device_id: device.device_id.clone(),
446
49152
            profile: device.profile.clone(),
447
49152
            created_at: now,
448
49152
            modified_at: now,
449
49152
        };
450
49152
        routes.push(route);
451
49152
    }
452
48
    if let Err(e) = state.model.device_route().add_bulk(&routes).await {
453
        error!("[{}] add error: {}", FN_NAME, e);
454
        return Err(ErrResp::ErrDb(Some(e.to_string())));
455
48
    }
456
48

            
457
48
    if state.cache.is_some() {
458
16
        let msg = SendCtrlMsg::DelDeviceRouteBulk {
459
16
            operation: CtrlMsgOp::DEL_DEVICE_ROUTE_BULK.to_string(),
460
16
            new: CtrlDelDeviceRouteBulk {
461
16384
                device_ids: devices.iter().map(|x| x.device_id.clone()).collect(),
462
16
            },
463
16
        };
464
16
        send_del_ctrl_message(FN_NAME, &msg, &state).await?;
465
32
    }
466

            
467
48
    Ok(StatusCode::NO_CONTENT)
468
114
}
469

            
470
/// `POST /{base}/api/v1/device-route/bulk-delete`
471
66
pub async fn post_device_route_bulk_del(
472
66
    State(state): State<AppState>,
473
66
    Extension(token_info): Extension<GetTokenInfoData>,
474
66
    Json(mut body): Json<request::PostDeviceRouteBulkBody>,
475
66
) -> impl IntoResponse {
476
    const FN_NAME: &'static str = "post_device_route_bulk_del";
477

            
478
66
    let user_id = token_info.user_id.as_str();
479
66
    let roles = &token_info.roles;
480
66

            
481
66
    if body.data.application_id.len() == 0 {
482
6
        return Err(ErrResp::ErrParam(Some(
483
6
            "`applicationId` must with at least one character".to_string(),
484
6
        )));
485
60
    } else if body.data.network_id.len() == 0 {
486
6
        return Err(ErrResp::ErrParam(Some(
487
6
            "`networkId` must with at least one character".to_string(),
488
6
        )));
489
54
    } else if body.data.network_addrs.len() == 0 {
490
6
        return Err(ErrResp::ErrParam(Some(
491
6
            "`networkAddrs` must with at least one address".to_string(),
492
6
        )));
493
48
    } else if body.data.network_addrs.len() > BULK_MAX {
494
6
        return Err(ErrResp::ErrParam(Some(format!(
495
6
            "`networkAddrs` cannot more than {}",
496
6
            BULK_MAX
497
6
        ))));
498
42
    }
499
42
    let mut addrs = vec![];
500
12318
    for addr in body.data.network_addrs.iter() {
501
12318
        if addr.len() == 0 {
502
6
            return Err(ErrResp::ErrParam(Some(
503
6
                "`networkAddrs` must be non-empty address array".to_string(),
504
6
            )));
505
12312
        }
506
12312
        addrs.push(addr.to_lowercase());
507
    }
508
36
    body.data.network_addrs = addrs;
509
36
    let application_id = body.data.application_id.as_str();
510
36
    if check_application(FN_NAME, application_id, user_id, true, roles, &state)
511
36
        .await?
512
36
        .is_none()
513
    {
514
12
        return Err(ErrResp::Custom(
515
12
            ErrReq::APPLICATION_NOT_EXIST.0,
516
12
            ErrReq::APPLICATION_NOT_EXIST.1,
517
12
            None,
518
12
        ));
519
24
    }
520
24
    let network_id = body.data.network_id.as_str();
521
24
    let network = match check_network(FN_NAME, network_id, user_id, true, roles, &state).await? {
522
        None => {
523
12
            return Err(ErrResp::Custom(
524
12
                ErrReq::NETWORK_NOT_EXIST.0,
525
12
                ErrReq::NETWORK_NOT_EXIST.1,
526
12
                None,
527
12
            ));
528
        }
529
12
        Some(network) => network,
530
12
    };
531
12

            
532
12288
    let addrs: Vec<&str> = body.data.network_addrs.iter().map(|x| x.as_str()).collect();
533
12
    let cond = QueryCond {
534
12
        application_id: Some(body.data.application_id.as_str()),
535
12
        network_id: Some(body.data.network_id.as_str()),
536
12
        network_addrs: Some(&addrs),
537
12
        ..Default::default()
538
12
    };
539
12
    if let Err(e) = state.model.device_route().del(&cond).await {
540
        error!("[{}] del error: {}", FN_NAME, e);
541
        return Err(ErrResp::ErrDb(Some(e.to_string())));
542
12
    }
543
12

            
544
12
    if state.cache.is_some() {
545
4
        let cond = DeviceListQueryCond {
546
4
            unit_id: match network.unit_id.as_ref() {
547
2
                None => None,
548
2
                Some(unit_id) => Some(unit_id.as_str()),
549
            },
550
4
            network_id: Some(network_id),
551
4
            network_addrs: Some(&addrs),
552
4
            ..Default::default()
553
4
        };
554
4
        let opts = DeviceListOptions {
555
4
            cond: &cond,
556
4
            offset: None,
557
4
            limit: None,
558
4
            sort: None,
559
4
            cursor_max: None,
560
4
        };
561
4
        let devices = match state.model.device().list(&opts, None).await {
562
            Err(e) => {
563
                error!("[{}] list device error: {}", FN_NAME, e);
564
                return Err(ErrResp::ErrDb(Some(e.to_string())));
565
            }
566
4
            Ok((list, _)) => list,
567
4
        };
568
4
        let msg = SendCtrlMsg::DelDeviceRouteBulk {
569
4
            operation: CtrlMsgOp::DEL_DEVICE_ROUTE_BULK.to_string(),
570
4
            new: CtrlDelDeviceRouteBulk {
571
4096
                device_ids: devices.iter().map(|x| x.device_id.clone()).collect(),
572
4
            },
573
4
        };
574
4
        send_del_ctrl_message(FN_NAME, &msg, &state).await?;
575
8
    }
576

            
577
12
    Ok(StatusCode::NO_CONTENT)
578
66
}
579

            
580
/// `POST /{base}/api/v1/device-route/range`
581
144
pub async fn post_device_route_range(
582
144
    State(state): State<AppState>,
583
144
    Extension(token_info): Extension<GetTokenInfoData>,
584
144
    Json(body): Json<request::PostDeviceRouteRangeBody>,
585
144
) -> impl IntoResponse {
586
    const FN_NAME: &'static str = "post_device_route_range";
587

            
588
144
    let user_id = token_info.user_id.as_str();
589
144
    let roles = &token_info.roles;
590
144

            
591
144
    if body.data.application_id.len() == 0 {
592
6
        return Err(ErrResp::ErrParam(Some(
593
6
            "`applicationId` must with at least one character".to_string(),
594
6
        )));
595
138
    } else if body.data.network_id.len() == 0 {
596
6
        return Err(ErrResp::ErrParam(Some(
597
6
            "`networkId` must with at least one character".to_string(),
598
6
        )));
599
132
    } else if body.data.start_addr.len() != body.data.end_addr.len() {
600
18
        return Err(ErrResp::ErrParam(Some(
601
18
            "`startAddr` and `endAddr` must have the same length".to_string(),
602
18
        )));
603
114
    }
604
114
    let start_addr = match hex_addr_to_u128(body.data.start_addr.as_str()) {
605
12
        Err(e) => return Err(ErrResp::ErrParam(Some(e.to_string()))),
606
102
        Ok(addr) => addr,
607
    };
608
102
    let mut end_addr = match hex_addr_to_u128(body.data.end_addr.as_str()) {
609
6
        Err(e) => return Err(ErrResp::ErrParam(Some(e.to_string()))),
610
96
        Ok(addr) => addr,
611
96
    };
612
96
    if start_addr > end_addr {
613
6
        return Err(ErrResp::ErrParam(Some(
614
6
            "`startAddr` cannot larger than `endAddr`".to_string(),
615
6
        )));
616
90
    } else if (end_addr - start_addr) as usize >= BULK_MAX {
617
6
        return Err(ErrResp::ErrParam(Some(format!(
618
6
            "numbers between `startAddr` and `endAddr` cannot more than {}",
619
6
            BULK_MAX
620
6
        ))));
621
84
    }
622
84

            
623
84
    let application_id = body.data.application_id.as_str();
624
72
    let application =
625
84
        match check_application(FN_NAME, application_id, user_id, true, roles, &state).await? {
626
            None => {
627
12
                return Err(ErrResp::Custom(
628
12
                    ErrReq::APPLICATION_NOT_EXIST.0,
629
12
                    ErrReq::APPLICATION_NOT_EXIST.1,
630
12
                    None,
631
12
                ))
632
            }
633
72
            Some(application) => application,
634
72
        };
635
72
    let network_id = body.data.network_id.as_str();
636
72
    let network = match check_network(FN_NAME, network_id, user_id, true, roles, &state).await? {
637
        None => {
638
12
            return Err(ErrResp::Custom(
639
12
                ErrReq::NETWORK_NOT_EXIST.0,
640
12
                ErrReq::NETWORK_NOT_EXIST.1,
641
12
                None,
642
12
            ));
643
        }
644
60
        Some(network) => network,
645
60
    };
646
60

            
647
60
    let mut network_addrs = vec![];
648
60
    end_addr += 1;
649
60
    let addr_len = body.data.start_addr.len();
650
36888
    for addr in start_addr..end_addr {
651
36888
        network_addrs.push(u128_to_addr(addr, addr_len));
652
36888
    }
653
36888
    let addrs: Vec<&str> = network_addrs.iter().map(|x| x.as_str()).collect();
654
60
    let cond = DeviceListQueryCond {
655
60
        unit_id: match network.unit_id.as_ref() {
656
30
            None => None,
657
30
            Some(unit_id) => Some(unit_id.as_str()),
658
        },
659
60
        network_id: Some(network_id),
660
60
        network_addrs: Some(&addrs),
661
60
        ..Default::default()
662
60
    };
663
60
    let opts = DeviceListOptions {
664
60
        cond: &cond,
665
60
        offset: None,
666
60
        limit: None,
667
60
        sort: None,
668
60
        cursor_max: None,
669
60
    };
670
60
    let devices = match state.model.device().list(&opts, None).await {
671
        Err(e) => {
672
            error!("[{}] list device error: {}", FN_NAME, e);
673
            return Err(ErrResp::ErrDb(Some(e.to_string())));
674
        }
675
60
        Ok((list, _)) => list,
676
60
    };
677
60
    if devices.len() == 0 {
678
12
        return Err(ErrResp::Custom(
679
12
            ErrReq::DEVICE_NOT_EXIST.0,
680
12
            ErrReq::DEVICE_NOT_EXIST.1,
681
12
            None,
682
12
        ));
683
48
    }
684
48

            
685
48
    let mut routes = vec![];
686
36876
    for device in devices.iter() {
687
36876
        let now = Utc::now();
688
36876
        let route = DeviceRoute {
689
36876
            route_id: strings::random_id(&now, ID_RAND_LEN),
690
36876
            unit_id: application.unit_id.clone(),
691
36876
            unit_code: application.unit_code.clone(),
692
36876
            application_id: application.application_id.clone(),
693
36876
            application_code: application.code.clone(),
694
36876
            network_id: network.network_id.clone(),
695
36876
            network_code: network.code.clone(),
696
36876
            network_addr: device.network_addr.clone(),
697
36876
            device_id: device.device_id.clone(),
698
36876
            profile: device.profile.clone(),
699
36876
            created_at: now,
700
36876
            modified_at: now,
701
36876
        };
702
36876
        routes.push(route);
703
36876
    }
704
48
    if let Err(e) = state.model.device_route().add_bulk(&routes).await {
705
        error!("[{}] add error: {}", FN_NAME, e);
706
        return Err(ErrResp::ErrDb(Some(e.to_string())));
707
48
    }
708
48

            
709
48
    if state.cache.is_some() {
710
16
        let msg = SendCtrlMsg::DelDeviceRouteBulk {
711
16
            operation: CtrlMsgOp::DEL_DEVICE_ROUTE_BULK.to_string(),
712
16
            new: CtrlDelDeviceRouteBulk {
713
12292
                device_ids: devices.iter().map(|x| x.device_id.clone()).collect(),
714
16
            },
715
16
        };
716
16
        send_del_ctrl_message(FN_NAME, &msg, &state).await?;
717
32
    }
718

            
719
48
    Ok(StatusCode::NO_CONTENT)
720
144
}
721

            
722
/// `POST /{base}/api/v1/device-route/range-delete`
723
96
pub async fn post_device_route_range_del(
724
96
    State(state): State<AppState>,
725
96
    Extension(token_info): Extension<GetTokenInfoData>,
726
96
    Json(body): Json<request::PostDeviceRouteRangeBody>,
727
96
) -> impl IntoResponse {
728
    const FN_NAME: &'static str = "post_device_route_range_del";
729

            
730
96
    let user_id = token_info.user_id.as_str();
731
96
    let roles = &token_info.roles;
732
96

            
733
96
    if body.data.application_id.len() == 0 {
734
6
        return Err(ErrResp::ErrParam(Some(
735
6
            "`applicationId` must with at least one character".to_string(),
736
6
        )));
737
90
    } else if body.data.network_id.len() == 0 {
738
6
        return Err(ErrResp::ErrParam(Some(
739
6
            "`networkId` must with at least one character".to_string(),
740
6
        )));
741
84
    } else if body.data.start_addr.len() != body.data.end_addr.len() {
742
18
        return Err(ErrResp::ErrParam(Some(
743
18
            "`startAddr` and `endAddr` must have the same length".to_string(),
744
18
        )));
745
66
    }
746
66
    let start_addr = match hex_addr_to_u128(body.data.start_addr.as_str()) {
747
12
        Err(e) => return Err(ErrResp::ErrParam(Some(e.to_string()))),
748
54
        Ok(addr) => addr,
749
    };
750
54
    let mut end_addr = match hex_addr_to_u128(body.data.end_addr.as_str()) {
751
6
        Err(e) => return Err(ErrResp::ErrParam(Some(e.to_string()))),
752
48
        Ok(addr) => addr,
753
48
    };
754
48
    if start_addr > end_addr {
755
6
        return Err(ErrResp::ErrParam(Some(
756
6
            "`startAddr` cannot larger than `endAddr`".to_string(),
757
6
        )));
758
42
    } else if (end_addr - start_addr) as usize >= BULK_MAX {
759
6
        return Err(ErrResp::ErrParam(Some(format!(
760
6
            "numbers between `startAddr` and `endAddr` cannot more than {}",
761
6
            BULK_MAX
762
6
        ))));
763
36
    }
764
36

            
765
36
    let application_id = body.data.application_id.as_str();
766
36
    if check_application(FN_NAME, application_id, user_id, true, roles, &state)
767
36
        .await?
768
36
        .is_none()
769
    {
770
12
        return Err(ErrResp::Custom(
771
12
            ErrReq::APPLICATION_NOT_EXIST.0,
772
12
            ErrReq::APPLICATION_NOT_EXIST.1,
773
12
            None,
774
12
        ));
775
24
    }
776
24
    let network_id = body.data.network_id.as_str();
777
24
    let network = match check_network(FN_NAME, network_id, user_id, true, roles, &state).await? {
778
        None => {
779
12
            return Err(ErrResp::Custom(
780
12
                ErrReq::NETWORK_NOT_EXIST.0,
781
12
                ErrReq::NETWORK_NOT_EXIST.1,
782
12
                None,
783
12
            ));
784
        }
785
12
        Some(network) => network,
786
12
    };
787
12

            
788
12
    let mut network_addrs = vec![];
789
12
    end_addr += 1;
790
12
    let addr_len = body.data.start_addr.len();
791
12288
    for addr in start_addr..end_addr {
792
12288
        network_addrs.push(u128_to_addr(addr, addr_len));
793
12288
    }
794

            
795
12288
    let addrs: Vec<&str> = network_addrs.iter().map(|x| x.as_str()).collect();
796
12
    let cond = QueryCond {
797
12
        application_id: Some(body.data.application_id.as_str()),
798
12
        network_id: Some(body.data.network_id.as_str()),
799
12
        network_addrs: Some(&addrs),
800
12
        ..Default::default()
801
12
    };
802
12
    if let Err(e) = state.model.device_route().del(&cond).await {
803
        error!("[{}] del error: {}", FN_NAME, e);
804
        return Err(ErrResp::ErrDb(Some(e.to_string())));
805
12
    }
806
12

            
807
12
    if state.cache.is_some() {
808
4
        let cond = DeviceListQueryCond {
809
4
            unit_id: match network.unit_id.as_ref() {
810
2
                None => None,
811
2
                Some(unit_id) => Some(unit_id.as_str()),
812
            },
813
4
            network_id: Some(network_id),
814
4
            network_addrs: Some(&addrs),
815
4
            ..Default::default()
816
4
        };
817
4
        let opts = DeviceListOptions {
818
4
            cond: &cond,
819
4
            offset: None,
820
4
            limit: None,
821
4
            sort: None,
822
4
            cursor_max: None,
823
4
        };
824
4
        let devices = match state.model.device().list(&opts, None).await {
825
            Err(e) => {
826
                error!("[{}] list device error: {}", FN_NAME, e);
827
                return Err(ErrResp::ErrDb(Some(e.to_string())));
828
            }
829
4
            Ok((list, _)) => list,
830
4
        };
831
4
        let msg = SendCtrlMsg::DelDeviceRouteBulk {
832
4
            operation: CtrlMsgOp::DEL_DEVICE_ROUTE_BULK.to_string(),
833
4
            new: CtrlDelDeviceRouteBulk {
834
4096
                device_ids: devices.iter().map(|x| x.device_id.clone()).collect(),
835
4
            },
836
4
        };
837
4
        send_del_ctrl_message(FN_NAME, &msg, &state).await?;
838
8
    }
839

            
840
12
    Ok(StatusCode::NO_CONTENT)
841
96
}
842

            
843
/// `GET /{base}/api/v1/device-route/count`
844
270
pub async fn get_device_route_count(
845
270
    State(state): State<AppState>,
846
270
    Extension(token_info): Extension<GetTokenInfoData>,
847
270
    Query(query): Query<request::GetDeviceRouteCountQuery>,
848
270
) -> impl IntoResponse {
849
    const FN_NAME: &'static str = "get_device_route_count";
850

            
851
270
    let user_id = token_info.user_id.as_str();
852
270
    let roles = &token_info.roles;
853
270

            
854
270
    if !Role::is_role(roles, Role::ADMIN) && !Role::is_role(roles, Role::MANAGER) {
855
138
        match query.unit.as_ref() {
856
6
            None => return Err(ErrResp::ErrParam(Some("missing `unit`".to_string()))),
857
132
            Some(unit_id) => {
858
132
                if unit_id.len() == 0 {
859
6
                    return Err(ErrResp::ErrParam(Some("missing `unit`".to_string())));
860
126
                }
861
            }
862
        }
863
132
    }
864
258
    let unit_cond = match query.unit.as_ref() {
865
54
        None => None,
866
204
        Some(unit_id) => match unit_id.len() {
867
12
            0 => None,
868
            _ => {
869
192
                match check_unit(FN_NAME, user_id, roles, unit_id.as_str(), false, &state).await? {
870
                    None => {
871
18
                        return Err(ErrResp::Custom(
872
18
                            ErrReq::UNIT_NOT_EXIST.0,
873
18
                            ErrReq::UNIT_NOT_EXIST.1,
874
18
                            None,
875
18
                        ))
876
                    }
877
174
                    Some(_) => Some(unit_id.as_str()),
878
                }
879
            }
880
        },
881
    };
882
240
    let cond = ListQueryCond {
883
240
        unit_id: unit_cond,
884
240
        application_id: match query.application.as_ref() {
885
150
            None => None,
886
90
            Some(application) => match application.len() {
887
6
                0 => None,
888
84
                _ => Some(application.as_ref()),
889
            },
890
        },
891
240
        network_id: match query.network.as_ref() {
892
150
            None => None,
893
90
            Some(network_id) => match network_id.len() {
894
6
                0 => None,
895
84
                _ => Some(network_id.as_ref()),
896
            },
897
        },
898
240
        device_id: match query.device.as_ref() {
899
198
            None => None,
900
42
            Some(device_id) => match device_id.len() {
901
6
                0 => None,
902
36
                _ => Some(device_id.as_ref()),
903
            },
904
        },
905
240
        ..Default::default()
906
240
    };
907
240
    match state.model.device_route().count(&cond).await {
908
        Err(e) => {
909
            error!("[{}] count error: {}", FN_NAME, e);
910
            Err(ErrResp::ErrDb(Some(e.to_string())))
911
        }
912
240
        Ok(count) => Ok(Json(response::GetDeviceRouteCount {
913
240
            data: response::GetCountData { count },
914
240
        })),
915
    }
916
270
}
917

            
918
/// `GET /{base}/api/v1/device-route/list`
919
480
pub async fn get_device_route_list(
920
480
    State(state): State<AppState>,
921
480
    Extension(token_info): Extension<GetTokenInfoData>,
922
480
    Query(query): Query<request::GetDeviceRouteListQuery>,
923
480
) -> impl IntoResponse {
924
    const FN_NAME: &'static str = "get_device_route_list";
925

            
926
480
    let user_id = token_info.user_id.as_str();
927
480
    let roles = &token_info.roles;
928
480

            
929
480
    if !Role::is_role(roles, Role::ADMIN) && !Role::is_role(roles, Role::MANAGER) {
930
138
        match query.unit.as_ref() {
931
6
            None => return Err(ErrResp::ErrParam(Some("missing `unit`".to_string()))),
932
132
            Some(unit_id) => {
933
132
                if unit_id.len() == 0 {
934
6
                    return Err(ErrResp::ErrParam(Some("missing `unit`".to_string())));
935
126
                }
936
            }
937
        }
938
342
    }
939
468
    let unit_cond = match query.unit.as_ref() {
940
240
        None => None,
941
228
        Some(unit_id) => match unit_id.len() {
942
18
            0 => None,
943
            _ => {
944
210
                match check_unit(FN_NAME, user_id, roles, unit_id.as_str(), false, &state).await? {
945
                    None => {
946
18
                        return Err(ErrResp::Custom(
947
18
                            ErrReq::UNIT_NOT_EXIST.0,
948
18
                            ErrReq::UNIT_NOT_EXIST.1,
949
18
                            None,
950
18
                        ))
951
                    }
952
192
                    Some(_) => Some(unit_id.as_str()),
953
                }
954
            }
955
        },
956
    };
957
450
    let cond = ListQueryCond {
958
450
        unit_id: unit_cond,
959
450
        application_id: match query.application.as_ref() {
960
336
            None => None,
961
114
            Some(application) => match application.len() {
962
12
                0 => None,
963
102
                _ => Some(application.as_ref()),
964
            },
965
        },
966
450
        network_id: match query.network.as_ref() {
967
336
            None => None,
968
114
            Some(network_id) => match network_id.len() {
969
12
                0 => None,
970
102
                _ => Some(network_id.as_ref()),
971
            },
972
        },
973
450
        device_id: match query.device.as_ref() {
974
402
            None => None,
975
48
            Some(device_id) => match device_id.len() {
976
12
                0 => None,
977
36
                _ => Some(device_id.as_ref()),
978
            },
979
        },
980
450
        ..Default::default()
981
    };
982
450
    let sort_cond = get_sort_cond(&query.sort)?;
983
420
    let opts = ListOptions {
984
420
        cond: &cond,
985
420
        offset: query.offset,
986
420
        limit: match query.limit {
987
372
            None => Some(LIST_LIMIT_DEFAULT),
988
48
            Some(limit) => match limit {
989
18
                0 => None,
990
30
                _ => Some(limit),
991
            },
992
        },
993
420
        sort: Some(sort_cond.as_slice()),
994
420
        cursor_max: Some(LIST_CURSOR_MAX),
995
    };
996

            
997
420
    let (list, cursor) = match state.model.device_route().list(&opts, None).await {
998
        Err(e) => {
999
            error!("[{}] list error: {}", FN_NAME, e);
            return Err(ErrResp::ErrDb(Some(e.to_string())));
        }
420
        Ok((list, cursor)) => match cursor {
6
            None => match query.format {
                Some(request::ListFormat::Array) => {
6
                    return Ok(Json(route_list_transform(&list)).into_response())
                }
                _ => {
336
                    return Ok(Json(response::GetDeviceRouteList {
336
                        data: route_list_transform(&list),
336
                    })
336
                    .into_response())
                }
            },
78
            Some(_) => (list, cursor),
78
        },
78
    };
78

            
78
    let body = Body::from_stream(async_stream::stream! {
78
        let unit_cond = match query.unit.as_ref() {
78
            None => None,
78
            Some(unit_id) => match unit_id.len() {
78
                0 => None,
78
                _ => Some(unit_id.as_str()),
78
            },
78
        };
78
        let cond = ListQueryCond {
78
            unit_id: unit_cond,
78
            application_id: match query.application.as_ref() {
78
                None => None,
78
                Some(application) => match application.len() {
78
                    0 => None,
78
                    _ => Some(application.as_ref())
78
                },
78
            },
78
            network_id: match query.network.as_ref() {
78
                None => None,
78
                Some(network_id) => match network_id.len() {
78
                    0 => None,
78
                    _ => Some(network_id.as_ref())
78
                },
78
            },
78
            device_id: match query.device.as_ref() {
78
                None => None,
78
                Some(device_id) => match device_id.len() {
78
                    0 => None,
78
                    _ => Some(device_id.as_ref())
78
                },
78
            },
78
            ..Default::default()
78
        };
78
        let opts = ListOptions {
78
            cond: &cond,
78
            offset: query.offset,
78
            limit: match query.limit {
78
                None => Some(LIST_LIMIT_DEFAULT),
78
                Some(limit) => match limit {
78
                    0 => None,
78
                    _ => Some(limit),
78
                },
78
            },
78
            sort: Some(sort_cond.as_slice()),
78
            cursor_max: Some(LIST_CURSOR_MAX),
78
        };
78

            
78
        let mut list = list;
78
        let mut cursor = cursor;
78
        let mut is_first = true;
78
        loop {
78
            yield route_list_transform_bytes(&list, is_first, cursor.is_none(), query.format.as_ref());
78
            is_first = false;
78
            if cursor.is_none() {
78
                break;
78
            }
78
            let (_list, _cursor) = match state.model.device_route().list(&opts, cursor).await {
78
                Err(_) => break,
78
                Ok((list, cursor)) => (list, cursor),
78
            };
78
            list = _list;
78
            cursor = _cursor;
78
        }
78
    });
78
    Ok(([(header::CONTENT_TYPE, ContentType::JSON)], body).into_response())
480
}
/// `DELETE /{base}/api/v1/device-route/{routeId}`
24
pub async fn delete_device_route(
24
    State(state): State<AppState>,
24
    Extension(token_info): Extension<GetTokenInfoData>,
24
    Path(param): Path<request::RouteIdPath>,
24
) -> impl IntoResponse {
    const FN_NAME: &'static str = "delete_device_route";
24
    let user_id = token_info.user_id.as_str();
24
    let roles = &token_info.roles;
24
    let route_id = param.route_id.as_str();
    // To check if the device route is for the user.
24
    let route = match check_route(FN_NAME, route_id, user_id, true, roles, &state).await {
        Err(e) => return Err(e), // XXX: not use "?" to solve E0282 error.
24
        Ok(route) => match route {
12
            None => return Ok(StatusCode::NO_CONTENT),
12
            Some(route) => route,
12
        },
12
    };
12

            
12
    let cond = QueryCond {
12
        route_id: Some(route_id),
12
        ..Default::default()
12
    };
12
    if let Err(e) = state.model.device_route().del(&cond).await {
        error!("[{}] del error: {}", FN_NAME, e);
        return Err(ErrResp::ErrDb(Some(e.to_string())));
12
    }
12

            
12
    if state.cache.is_some() {
4
        let msg = SendCtrlMsg::DelDeviceRoute {
4
            operation: CtrlMsgOp::DEL_DEVICE_ROUTE.to_string(),
4
            new: CtrlDelDeviceRoute {
4
                device_id: route.device_id,
4
            },
4
        };
4
        send_del_ctrl_message(FN_NAME, &msg, &state).await?;
8
    }
12
    Ok(StatusCode::NO_CONTENT)
24
}
450
fn get_sort_cond(sort_args: &Option<String>) -> Result<Vec<SortCond>, ErrResp> {
450
    match sort_args.as_ref() {
348
        None => Ok(vec![
348
            SortCond {
348
                key: SortKey::NetworkCode,
348
                asc: true,
348
            },
348
            SortCond {
348
                key: SortKey::NetworkAddr,
348
                asc: true,
348
            },
348
            SortCond {
348
                key: SortKey::CreatedAt,
348
                asc: false,
348
            },
348
        ]),
102
        Some(args) => {
102
            let mut args = args.split(",");
102
            let mut sort_cond = vec![];
252
            while let Some(arg) = args.next() {
180
                let mut cond = arg.split(":");
180
                let key = match cond.next() {
                    None => return Err(ErrResp::ErrParam(Some("wrong sort argument".to_string()))),
180
                    Some(field) => match field {
180
                        "application" => SortKey::ApplicationCode,
168
                        "network" => SortKey::NetworkCode,
138
                        "addr" => SortKey::NetworkAddr,
102
                        "created" => SortKey::CreatedAt,
24
                        "modified" => SortKey::ModifiedAt,
                        _ => {
12
                            return Err(ErrResp::ErrParam(Some(format!(
12
                                "invalid sort key {}",
12
                                field
12
                            ))))
                        }
                    },
                };
168
                let asc = match cond.next() {
6
                    None => return Err(ErrResp::ErrParam(Some("wrong sort argument".to_string()))),
162
                    Some(asc) => match asc {
162
                        "asc" => true,
54
                        "desc" => false,
                        _ => {
6
                            return Err(ErrResp::ErrParam(Some(format!(
6
                                "invalid sort asc {}",
6
                                asc
6
                            ))))
                        }
                    },
                };
156
                if cond.next().is_some() {
6
                    return Err(ErrResp::ErrParam(Some(
6
                        "invalid sort condition".to_string(),
6
                    )));
150
                }
150
                sort_cond.push(SortCond { key, asc });
            }
72
            Ok(sort_cond)
        }
    }
450
}
/// To check if the user ID can access the device route. Choose `only_owner` to check if the user
/// is the unit owner or one of unit members.
///
/// # Errors
///
/// Returns OK if the device route is found or not. Otherwise errors will be returned.
24
async fn check_route(
24
    fn_name: &str,
24
    route_id: &str,
24
    user_id: &str,
24
    only_owner: bool, // to check if this `user_id` is the owner.
24
    roles: &HashMap<String, bool>,
24
    state: &AppState,
24
) -> Result<Option<DeviceRoute>, ErrResp> {
24
    let route = match state.model.device_route().get(route_id).await {
        Err(e) => {
            error!("[{}] get error: {}", fn_name, e);
            return Err(ErrResp::ErrDb(Some(e.to_string())));
        }
24
        Ok(route) => match route {
6
            None => return Ok(None),
18
            Some(route) => route,
18
        },
18
    };
18
    let unit_id = route.unit_id.as_str();
18
    match check_unit(fn_name, user_id, roles, unit_id, only_owner, state).await? {
6
        None => Ok(None),
12
        Some(_) => Ok(Some(route)),
    }
24
}
342
fn route_list_transform(list: &Vec<DeviceRoute>) -> Vec<response::GetDeviceRouteData> {
342
    let mut ret = vec![];
1002
    for route in list.iter() {
1002
        ret.push(route_transform(&route));
1002
    }
342
    ret
342
}
168
fn route_list_transform_bytes(
168
    list: &Vec<DeviceRoute>,
168
    with_start: bool,
168
    with_end: bool,
168
    format: Option<&request::ListFormat>,
168
) -> Result<Bytes, Box<dyn StdError + Send + Sync>> {
168
    let mut build_str = match with_start {
90
        false => "".to_string(),
6
        true => match format {
6
            Some(request::ListFormat::Array) => "[".to_string(),
72
            _ => "{\"data\":[".to_string(),
        },
    };
168
    let mut is_first = with_start;
9834
    for item in list {
9666
        if is_first {
78
            is_first = false;
9588
        } else {
9588
            build_str.push(',');
9588
        }
9666
        let json_str = match serde_json::to_string(&route_transform(item)) {
            Err(e) => return Err(Box::new(e)),
9666
            Ok(str) => str,
9666
        };
9666
        build_str += json_str.as_str();
    }
168
    if with_end {
78
        build_str += match format {
6
            Some(request::ListFormat::Array) => "]",
72
            _ => "]}",
        }
90
    }
168
    Ok(Bytes::copy_from_slice(build_str.as_str().as_bytes()))
168
}
10668
fn route_transform(route: &DeviceRoute) -> response::GetDeviceRouteData {
10668
    response::GetDeviceRouteData {
10668
        route_id: route.route_id.clone(),
10668
        unit_id: route.unit_id.clone(),
10668
        application_id: route.application_id.clone(),
10668
        application_code: route.application_code.clone(),
10668
        device_id: route.device_id.clone(),
10668
        network_id: route.network_id.clone(),
10668
        network_code: route.network_code.clone(),
10668
        network_addr: route.network_addr.clone(),
10668
        profile: route.profile.clone(),
10668
        created_at: time_str(&route.created_at),
10668
        modified_at: time_str(&route.modified_at),
10668
    }
10668
}
/// Send delete control message.
76
async fn send_del_ctrl_message(
76
    fn_name: &str,
76
    msg: &SendCtrlMsg,
76
    state: &AppState,
76
) -> Result<(), ErrResp> {
76
    let payload = match serde_json::to_vec(&msg) {
        Err(e) => {
            error!(
                "[{}] marshal JSON for {} error: {}",
                fn_name,
                CtrlMsgOp::DEL_DEVICE_ROUTE,
                e
            );
            return Err(ErrResp::ErrRsc(Some(format!(
                "marshal control message error: {}",
                e
            ))));
        }
76
        Ok(payload) => payload,
76
    };
76
    let ctrl_sender = { state.ctrl_senders.device_route.lock().unwrap().clone() };
76
    if let Err(e) = ctrl_sender.send_msg(payload).await {
        error!(
            "[{}] send control message for {} error: {}",
            fn_name,
            CtrlMsgOp::DEL_DEVICE_ROUTE,
            e
        );
        return Err(ErrResp::ErrIntMsg(Some(format!(
            "send control message error: {}",
            e
        ))));
76
    }
76

            
76
    Ok(())
76
}
/// Clear the device route cache.
24
async fn clear_cache(fn_name: &str, queue_name: &str, cache: &Arc<dyn Cache>) {
24
    if let Err(e) = cache.device_route().clear().await {
        error!(
            "[{}] {} clear device route cache error: {}",
            fn_name, queue_name, e
        );
24
    }
24
}
#[async_trait]
impl QueueEventHandler for CtrlSenderHandler {
    async fn on_error(&self, queue: Arc<dyn GmqQueue>, err: Box<dyn StdError + Send + Sync>) {
        const FN_NAME: &'static str = "CtrlSenderHandler::on_error";
        let queue_name = queue.name();
        // Clear cache to avoid missing update cache content during queue status changing.
        if let Some(cache) = self.cache.as_ref() {
            clear_cache(FN_NAME, queue_name, cache).await;
        }
        error!("[{}] {} error: {}", FN_NAME, queue_name, err);
    }
76
    async fn on_status(&self, queue: Arc<dyn GmqQueue>, status: Status) {
        const FN_NAME: &'static str = "CtrlSenderHandler::on_status";
76
        let queue_name = queue.name();
        // Clear cache to avoid missing update cache content during queue status changing.
76
        if let Some(cache) = self.cache.as_ref() {
12
            clear_cache(FN_NAME, queue_name, cache).await;
64
        }
76
        match status {
30
            Status::Connected => info!("[{}] {} connected", queue_name, FN_NAME),
46
            _ => warn!("[{}] {} status to {:?}", FN_NAME, queue_name, status),
        }
152
    }
}
#[async_trait]
impl MessageHandler for CtrlSenderHandler {
    async fn on_message(&self, _queue: Arc<dyn GmqQueue>, _msg: Box<dyn Message>) {}
}
#[async_trait]
impl QueueEventHandler for CtrlReceiverHandler {
    async fn on_error(&self, queue: Arc<dyn GmqQueue>, err: Box<dyn StdError + Send + Sync>) {
        const FN_NAME: &'static str = "CtrlReceiverHandler::on_error";
        let queue_name = queue.name();
        // Clear cache to avoid missing update cache content during queue status changing.
        if let Some(cache) = self.cache.as_ref() {
            clear_cache(FN_NAME, queue_name, cache).await;
        }
        error!("[{}] {} error: {}", FN_NAME, queue_name, err);
    }
60
    async fn on_status(&self, queue: Arc<dyn GmqQueue>, status: Status) {
        const FN_NAME: &'static str = "CtrlReceiverHandler::on_status";
60
        let queue_name = queue.name();
        // Clear cache to avoid missing update cache content during queue status changing.
60
        if let Some(cache) = self.cache.as_ref() {
12
            clear_cache(FN_NAME, queue_name, cache).await;
48
        }
60
        match status {
30
            Status::Connected => info!("[{}] {} connected", queue_name, FN_NAME),
30
            _ => warn!("[{}] {} status to {:?}", FN_NAME, queue_name, status),
        }
120
    }
}
#[async_trait]
impl MessageHandler for CtrlReceiverHandler {
76
    async fn on_message(&self, queue: Arc<dyn GmqQueue>, msg: Box<dyn Message>) {
        const FN_NAME: &'static str = "CtrlReceiverHandler::on_message";
76
        let queue_name = queue.name();
76
        let ctrl_msg = match serde_json::from_slice::<RecvCtrlMsg>(msg.payload()) {
            Err(e) => {
                let src_str: String = String::from_utf8_lossy(msg.payload()).into();
                warn!(
                    "[{}] {} parse JSON error: {}, src: {}",
                    FN_NAME, queue_name, e, src_str
                );
                if let Err(e) = msg.ack().await {
                    error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
                }
                return;
            }
76
            Ok(msg) => msg,
76
        };
76
        match ctrl_msg {
36
            RecvCtrlMsg::DelDeviceRoute { new } => {
36
                if let Some(cache) = self.cache.as_ref() {
36
                    if let Err(e) = cache
36
                        .device_route()
36
                        .del_uldata(new.device_id.as_str())
36
                        .await
                    {
                        error!(
                            "[{}] {} delete device route cache {} error: {}",
                            FN_NAME, queue_name, new.device_id, e
                        );
                    } else {
36
                        debug!(
                            "[{}] {} delete device route cache {}",
                            FN_NAME, queue_name, new.device_id
                        );
                    }
                }
            }
40
            RecvCtrlMsg::DelDeviceRouteBulk { new } => {
40
                if let Some(cache) = self.cache.as_ref() {
36868
                    for device_id in new.device_ids.iter() {
36868
                        if let Err(e) = cache.device_route().del_uldata(device_id.as_str()).await {
                            error!(
                                "[{}] {} delete device route cache {} error: {}",
                                FN_NAME, queue_name, device_id, e
                            );
                        } else {
36868
                            debug!(
                                "[{}] {} delete device route cache {}",
                                FN_NAME, queue_name, device_id
                            );
                        }
                    }
                }
            }
        }
76
        if let Err(e) = msg.ack().await {
            error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
76
        }
152
    }
}