1
use std::{
2
    cmp::Ordering,
3
    collections::HashMap,
4
    error::Error as StdError,
5
    io::{Error as IoError, ErrorKind},
6
    sync::{Arc, Mutex},
7
    time::Duration,
8
};
9

            
10
use async_trait::async_trait;
11
use axum::{
12
    body::{Body, Bytes},
13
    extract::State,
14
    http::{header, StatusCode},
15
    response::IntoResponse,
16
    Extension,
17
};
18
use chrono::Utc;
19
use log::{debug, error, info, warn};
20
use serde::{Deserialize, Serialize};
21
use serde_json;
22
use tokio::time;
23
use url::Url;
24

            
25
use general_mq::{
26
    queue::{EventHandler as QueueEventHandler, GmqQueue, Message, MessageHandler, Status},
27
    Queue,
28
};
29
use sylvia_iot_corelib::{
30
    constants::ContentType,
31
    err::ErrResp,
32
    http::{Json, Path, Query},
33
    role::Role,
34
    strings::{self, time_str},
35
};
36

            
37
use super::{
38
    super::{
39
        super::{middleware::GetTokenInfoData, ErrReq, State as AppState},
40
        lib::{check_application, check_network, check_unit},
41
    },
42
    request, response,
43
};
44
use crate::{
45
    libs::{
46
        config::BrokerCtrl as CfgCtrl,
47
        mq::{self, Connection},
48
    },
49
    models::{
50
        network_route::{ListOptions, ListQueryCond, NetworkRoute, QueryCond, SortCond, SortKey},
51
        Cache,
52
    },
53
};
54

            
55
#[derive(Deserialize, Serialize)]
56
#[serde(tag = "operation")]
57
enum RecvCtrlMsg {
58
    #[serde(rename = "del-network-route")]
59
    DelNetworkRoute { new: CtrlDelNetworkRoute },
60
}
61

            
62
#[derive(Serialize)]
63
#[serde(untagged)]
64
enum SendCtrlMsg {
65
    DelNetworkRoute {
66
        operation: String,
67
        new: CtrlDelNetworkRoute,
68
    },
69
}
70

            
71
struct CtrlMsgOp;
72

            
73
#[derive(Deserialize, Serialize)]
74
struct CtrlDelNetworkRoute {
75
    #[serde(rename = "unitId")]
76
    unit_id: String,
77
    #[serde(rename = "unitCode")]
78
    unit_code: Option<String>,
79
    #[serde(rename = "networkId")]
80
    network_id: String,
81
    #[serde(rename = "networkCode")]
82
    network_code: String,
83
}
84

            
85
struct CtrlSenderHandler {
86
    cache: Option<Arc<dyn Cache>>,
87
}
88

            
89
struct CtrlReceiverHandler {
90
    cache: Option<Arc<dyn Cache>>,
91
}
92

            
93
impl CtrlMsgOp {
94
    const DEL_NETWORK_ROUTE: &'static str = "del-network-route";
95
}
96

            
97
const LIST_LIMIT_DEFAULT: u64 = 100;
98
const LIST_CURSOR_MAX: u64 = 100;
99
const ID_RAND_LEN: usize = 12;
100
const CTRL_QUEUE_NAME: &'static str = "network-route";
101

            
102
/// Initialize channels.
103
15
pub async fn init(state: &AppState, ctrl_conf: &CfgCtrl) -> Result<(), Box<dyn StdError>> {
104
    const FN_NAME: &'static str = "init";
105

            
106
15
    let q = new_ctrl_receiver(state, ctrl_conf)?;
107
15
    {
108
15
        state
109
15
            .ctrl_receivers
110
15
            .lock()
111
15
            .unwrap()
112
15
            .insert(CTRL_QUEUE_NAME.to_string(), q.clone());
113
15
    }
114
15

            
115
15
    let ctrl_sender = { state.ctrl_senders.network_route.lock().unwrap().clone() };
116
    // Wait for connected.
117
1382
    for _ in 0..500 {
118
1382
        if ctrl_sender.status() == Status::Connected && q.status() == Status::Connected {
119
15
            break;
120
1367
        }
121
1367
        time::sleep(Duration::from_millis(10)).await;
122
    }
123
15
    if ctrl_sender.status() != Status::Connected {
124
        error!(
125
            "[{}] {} control sender not connected",
126
            FN_NAME, CTRL_QUEUE_NAME
127
        );
128
        return Err(Box::new(IoError::new(
129
            ErrorKind::NotConnected,
130
            format!("control sender {} not connected", CTRL_QUEUE_NAME),
131
        )));
132
15
    }
133
15
    if q.status() != Status::Connected {
134
        error!(
135
            "[{}] {} control receiver not connected",
136
            FN_NAME, CTRL_QUEUE_NAME
137
        );
138
        return Err(Box::new(IoError::new(
139
            ErrorKind::NotConnected,
140
            format!("control receiver {} not connected", CTRL_QUEUE_NAME),
141
        )));
142
15
    }
143
15

            
144
15
    Ok(())
145
15
}
146

            
147
/// Create control channel sender queue.
148
15
pub fn new_ctrl_sender(
149
15
    conn_pool: &Arc<Mutex<HashMap<String, Connection>>>,
150
15
    config: &CfgCtrl,
151
15
    cache: Option<Arc<dyn Cache>>,
152
15
) -> Result<Arc<Mutex<Queue>>, Box<dyn StdError>> {
153
15
    let url = match config.url.as_ref() {
154
        None => {
155
            return Err(Box::new(IoError::new(
156
                ErrorKind::InvalidInput,
157
                "empty control url",
158
            )))
159
        }
160
15
        Some(url) => match Url::parse(url.as_str()) {
161
            Err(e) => return Err(Box::new(e)),
162
15
            Ok(url) => url,
163
15
        },
164
15
    };
165
15

            
166
15
    match mq::control::new(
167
15
        conn_pool.clone(),
168
15
        &url,
169
15
        config.prefetch,
170
15
        CTRL_QUEUE_NAME,
171
15
        false,
172
15
        Arc::new(CtrlSenderHandler {
173
15
            cache: cache.clone(),
174
15
        }),
175
15
        Arc::new(CtrlSenderHandler { cache }),
176
15
    ) {
177
        Err(e) => Err(Box::new(IoError::new(ErrorKind::InvalidInput, e))),
178
15
        Ok(q) => Ok(Arc::new(Mutex::new(q))),
179
    }
180
15
}
181

            
182
/// Create control channel receiver queue.
183
15
pub fn new_ctrl_receiver(state: &AppState, config: &CfgCtrl) -> Result<Queue, Box<dyn StdError>> {
184
15
    let url = match config.url.as_ref() {
185
        None => {
186
            return Err(Box::new(IoError::new(
187
                ErrorKind::InvalidInput,
188
                "empty control url",
189
            )))
190
        }
191
15
        Some(url) => match Url::parse(url.as_str()) {
192
            Err(e) => return Err(Box::new(e)),
193
15
            Ok(url) => url,
194
15
        },
195
15
    };
196
15
    let handler = Arc::new(CtrlReceiverHandler {
197
15
        cache: state.cache.clone(),
198
15
    });
199
15
    match mq::control::new(
200
15
        state.mq_conns.clone(),
201
15
        &url,
202
15
        config.prefetch,
203
15
        CTRL_QUEUE_NAME,
204
15
        true,
205
15
        handler.clone(),
206
15
        handler,
207
15
    ) {
208
        Err(e) => Err(Box::new(IoError::new(ErrorKind::InvalidInput, e))),
209
15
        Ok(q) => Ok(q),
210
    }
211
15
}
212

            
213
/// `POST /{base}/api/v1/network-route`
214
57
pub async fn post_network_route(
215
57
    State(state): State<AppState>,
216
57
    Extension(token_info): Extension<GetTokenInfoData>,
217
57
    Json(body): Json<request::PostNetworkRouteBody>,
218
57
) -> impl IntoResponse {
219
    const FN_NAME: &'static str = "post_network_route";
220

            
221
57
    let user_id = token_info.user_id.as_str();
222
57
    let roles = &token_info.roles;
223
57

            
224
57
    if body.data.network_id.len() == 0 {
225
3
        return Err(ErrResp::ErrParam(Some(
226
3
            "`networkId` must with at least one character".to_string(),
227
3
        )));
228
54
    } else if body.data.application_id.len() == 0 {
229
3
        return Err(ErrResp::ErrParam(Some(
230
3
            "`applicationId` must with at least one character".to_string(),
231
3
        )));
232
51
    }
233
51
    let network_id = body.data.network_id.as_str();
234
51
    let application_id = body.data.application_id.as_str();
235
51
    let network = match check_network(FN_NAME, network_id, user_id, true, roles, &state).await? {
236
        None => {
237
9
            return Err(ErrResp::Custom(
238
9
                ErrReq::NETWORK_NOT_EXIST.0,
239
9
                ErrReq::NETWORK_NOT_EXIST.1,
240
9
                None,
241
9
            ))
242
        }
243
42
        Some(network) => network,
244
    };
245
36
    let application =
246
42
        match check_application(FN_NAME, application_id, user_id, true, roles, &state).await? {
247
            None => {
248
6
                return Err(ErrResp::Custom(
249
6
                    ErrReq::APPLICATION_NOT_EXIST.0,
250
6
                    ErrReq::APPLICATION_NOT_EXIST.1,
251
6
                    None,
252
6
                ))
253
            }
254
36
            Some(application) => application,
255
        };
256
36
    if let Some(unit_id) = network.unit_id.as_ref() {
257
27
        if unit_id.as_str().cmp(application.unit_id.as_str()) != Ordering::Equal {
258
6
            return Err(ErrResp::Custom(
259
6
                ErrReq::UNIT_NOT_MATCH.0,
260
6
                ErrReq::UNIT_NOT_MATCH.1,
261
6
                None,
262
6
            ));
263
21
        }
264
9
    }
265
30
    let cond = ListQueryCond {
266
30
        application_id: Some(application_id),
267
30
        network_id: Some(network_id),
268
30
        ..Default::default()
269
30
    };
270
30
    let opts = ListOptions {
271
30
        cond: &cond,
272
30
        offset: None,
273
30
        limit: Some(1),
274
30
        sort: None,
275
30
        cursor_max: None,
276
30
    };
277
30
    match state.model.network_route().list(&opts, None).await {
278
        Err(e) => {
279
            error!("[{}] get error: {}", FN_NAME, e);
280
            return Err(ErrResp::ErrDb(Some(e.to_string())));
281
        }
282
30
        Ok((list, _)) => match list.len() {
283
24
            0 => (),
284
            _ => {
285
6
                return Err(ErrResp::Custom(
286
6
                    ErrReq::ROUTE_EXIST.0,
287
6
                    ErrReq::ROUTE_EXIST.1,
288
6
                    None,
289
6
                ))
290
            }
291
        },
292
    }
293

            
294
24
    let now = Utc::now();
295
24
    let route_id = strings::random_id(&now, ID_RAND_LEN);
296
24
    let route = NetworkRoute {
297
24
        route_id: route_id.clone(),
298
24
        unit_id: application.unit_id,
299
24
        unit_code: application.unit_code,
300
24
        application_id: application.application_id,
301
24
        application_code: application.code,
302
24
        network_id: network.network_id,
303
24
        network_code: network.code,
304
24
        created_at: now,
305
24
    };
306
24
    if let Err(e) = state.model.network_route().add(&route).await {
307
        error!("[{}] add error: {}", FN_NAME, e);
308
        return Err(ErrResp::ErrDb(Some(e.to_string())));
309
24
    }
310
24
    send_del_ctrl_message(FN_NAME, route, &state).await?;
311

            
312
24
    Ok(Json(response::PostNetworkRoute {
313
24
        data: response::PostNetworkRouteData { route_id },
314
24
    }))
315
57
}
316

            
317
/// `GET /{base}/api/v1/network-route/count`
318
117
pub async fn get_network_route_count(
319
117
    State(state): State<AppState>,
320
117
    Extension(token_info): Extension<GetTokenInfoData>,
321
117
    Query(query): Query<request::GetNetworkRouteCountQuery>,
322
117
) -> impl IntoResponse {
323
    const FN_NAME: &'static str = "get_network_route_count";
324

            
325
117
    let user_id = token_info.user_id.as_str();
326
117
    let roles = &token_info.roles;
327
117

            
328
117
    if !Role::is_role(roles, Role::ADMIN) && !Role::is_role(roles, Role::MANAGER) {
329
63
        match query.unit.as_ref() {
330
3
            None => return Err(ErrResp::ErrParam(Some("missing `unit`".to_string()))),
331
60
            Some(unit_id) => {
332
60
                if unit_id.len() == 0 {
333
3
                    return Err(ErrResp::ErrParam(Some("missing `unit`".to_string())));
334
57
                }
335
            }
336
        }
337
54
    }
338
111
    let unit_cond = match query.unit.as_ref() {
339
21
        None => None,
340
90
        Some(unit_id) => match unit_id.len() {
341
6
            0 => None,
342
            _ => {
343
84
                match check_unit(FN_NAME, user_id, roles, unit_id.as_str(), false, &state).await? {
344
                    None => {
345
9
                        return Err(ErrResp::Custom(
346
9
                            ErrReq::UNIT_NOT_EXIST.0,
347
9
                            ErrReq::UNIT_NOT_EXIST.1,
348
9
                            None,
349
9
                        ))
350
                    }
351
75
                    Some(_) => Some(unit_id.as_str()),
352
                }
353
            }
354
        },
355
    };
356
102
    let cond = ListQueryCond {
357
102
        unit_id: unit_cond,
358
102
        application_id: match query.application.as_ref() {
359
57
            None => None,
360
45
            Some(application) => match application.len() {
361
3
                0 => None,
362
42
                _ => Some(application.as_ref()),
363
            },
364
        },
365
102
        network_id: match query.network.as_ref() {
366
57
            None => None,
367
45
            Some(network_id) => match network_id.len() {
368
3
                0 => None,
369
42
                _ => Some(network_id.as_ref()),
370
            },
371
        },
372
102
        ..Default::default()
373
102
    };
374
102
    match state.model.network_route().count(&cond).await {
375
        Err(e) => {
376
            error!("[{}] count error: {}", FN_NAME, e);
377
            Err(ErrResp::ErrDb(Some(e.to_string())))
378
        }
379
102
        Ok(count) => Ok(Json(response::GetNetworkRouteCount {
380
102
            data: response::GetCountData { count },
381
102
        })),
382
    }
383
117
}
384

            
385
/// `GET /{base}/api/v1/network-route/list`
386
201
pub async fn get_network_route_list(
387
201
    State(state): State<AppState>,
388
201
    Extension(token_info): Extension<GetTokenInfoData>,
389
201
    Query(query): Query<request::GetNetworkRouteListQuery>,
390
201
) -> impl IntoResponse {
391
    const FN_NAME: &'static str = "get_network_route_list";
392

            
393
201
    let user_id = token_info.user_id.as_str();
394
201
    let roles = &token_info.roles;
395
201

            
396
201
    if !Role::is_role(roles, Role::ADMIN) && !Role::is_role(roles, Role::MANAGER) {
397
63
        match query.unit.as_ref() {
398
3
            None => return Err(ErrResp::ErrParam(Some("missing `unit`".to_string()))),
399
60
            Some(unit_id) => {
400
60
                if unit_id.len() == 0 {
401
3
                    return Err(ErrResp::ErrParam(Some("missing `unit`".to_string())));
402
57
                }
403
            }
404
        }
405
138
    }
406
195
    let unit_cond = match query.unit.as_ref() {
407
96
        None => None,
408
99
        Some(unit_id) => match unit_id.len() {
409
9
            0 => None,
410
            _ => {
411
90
                match check_unit(FN_NAME, user_id, roles, unit_id.as_str(), false, &state).await? {
412
                    None => {
413
9
                        return Err(ErrResp::Custom(
414
9
                            ErrReq::UNIT_NOT_EXIST.0,
415
9
                            ErrReq::UNIT_NOT_EXIST.1,
416
9
                            None,
417
9
                        ))
418
                    }
419
81
                    Some(_) => Some(unit_id.as_str()),
420
                }
421
            }
422
        },
423
    };
424
186
    let cond = ListQueryCond {
425
186
        unit_id: unit_cond,
426
186
        application_id: match query.application.as_ref() {
427
132
            None => None,
428
54
            Some(application) => match application.len() {
429
6
                0 => None,
430
48
                _ => Some(application.as_ref()),
431
            },
432
        },
433
186
        network_id: match query.network.as_ref() {
434
138
            None => None,
435
48
            Some(network_id) => match network_id.len() {
436
6
                0 => None,
437
42
                _ => Some(network_id.as_ref()),
438
            },
439
        },
440
186
        ..Default::default()
441
    };
442
186
    let sort_cond = get_sort_cond(&query.sort)?;
443
171
    let opts = ListOptions {
444
171
        cond: &cond,
445
171
        offset: query.offset,
446
171
        limit: match query.limit {
447
147
            None => Some(LIST_LIMIT_DEFAULT),
448
24
            Some(limit) => match limit {
449
9
                0 => None,
450
15
                _ => Some(limit),
451
            },
452
        },
453
171
        sort: Some(sort_cond.as_slice()),
454
171
        cursor_max: Some(LIST_CURSOR_MAX),
455
    };
456

            
457
171
    let (list, cursor) = match state.model.network_route().list(&opts, None).await {
458
        Err(e) => {
459
            error!("[{}] list error: {}", FN_NAME, e);
460
            return Err(ErrResp::ErrDb(Some(e.to_string())));
461
        }
462
171
        Ok((list, cursor)) => match cursor {
463
3
            None => match query.format {
464
                Some(request::ListFormat::Array) => {
465
3
                    return Ok(Json(route_list_transform(&list)).into_response())
466
                }
467
                _ => {
468
135
                    return Ok(Json(response::GetNetworkRouteList {
469
135
                        data: route_list_transform(&list),
470
135
                    })
471
135
                    .into_response())
472
                }
473
            },
474
33
            Some(_) => (list, cursor),
475
33
        },
476
33
    };
477
33

            
478
33
    let body = Body::from_stream(async_stream::stream! {
479
33
        let unit_cond = match query.unit.as_ref() {
480
33
            None => None,
481
33
            Some(unit_id) => match unit_id.len() {
482
33
                0 => None,
483
33
                _ => Some(unit_id.as_str()),
484
33
            },
485
33
        };
486
33
        let cond = ListQueryCond {
487
33
            unit_id: unit_cond,
488
33
            application_id: match query.application.as_ref() {
489
33
                None => None,
490
33
                Some(application) => match application.len() {
491
33
                    0 => None,
492
33
                    _ => Some(application.as_ref())
493
33
                },
494
33
            },
495
33
            network_id: match query.network.as_ref() {
496
33
                None => None,
497
33
                Some(network_id) => match network_id.len() {
498
33
                    0 => None,
499
33
                    _ => Some(network_id.as_ref())
500
33
                },
501
33
            },
502
33
            ..Default::default()
503
33
        };
504
33
        let opts = ListOptions {
505
33
            cond: &cond,
506
33
            offset: query.offset,
507
33
            limit: match query.limit {
508
33
                None => Some(LIST_LIMIT_DEFAULT),
509
33
                Some(limit) => match limit {
510
33
                    0 => None,
511
33
                    _ => Some(limit),
512
33
                },
513
33
            },
514
33
            sort: Some(sort_cond.as_slice()),
515
33
            cursor_max: Some(LIST_CURSOR_MAX),
516
33
        };
517
33

            
518
33
        let mut list = list;
519
33
        let mut cursor = cursor;
520
33
        let mut is_first = true;
521
33
        loop {
522
33
            yield route_list_transform_bytes(&list, is_first, cursor.is_none(), query.format.as_ref());
523
33
            is_first = false;
524
33
            if cursor.is_none() {
525
33
                break;
526
33
            }
527
33
            let (_list, _cursor) = match state.model.network_route().list(&opts, cursor).await {
528
33
                Err(_) => break,
529
33
                Ok((list, cursor)) => (list, cursor),
530
33
            };
531
33
            list = _list;
532
33
            cursor = _cursor;
533
33
        }
534
33
    });
535
33
    Ok(([(header::CONTENT_TYPE, ContentType::JSON)], body).into_response())
536
201
}
537

            
538
/// `DELETE /{base}/api/v1/network-route/{routeId}`
539
9
pub async fn delete_network_route(
540
9
    State(state): State<AppState>,
541
9
    Extension(token_info): Extension<GetTokenInfoData>,
542
9
    Path(param): Path<request::RouteIdPath>,
543
9
) -> impl IntoResponse {
544
    const FN_NAME: &'static str = "delete_network_route";
545

            
546
9
    let user_id = token_info.user_id.as_str();
547
9
    let roles = &token_info.roles;
548
9
    let route_id = param.route_id.as_str();
549

            
550
    // To check if the network route is for the user.
551
9
    let route = match check_route(FN_NAME, route_id, user_id, true, roles, &state).await {
552
        Err(e) => return Err(e), // XXX: not use "?" to solve E0282 error.
553
9
        Ok(route) => match route {
554
6
            None => return Ok(StatusCode::NO_CONTENT),
555
3
            Some(route) => route,
556
3
        },
557
3
    };
558
3

            
559
3
    let cond = QueryCond {
560
3
        route_id: Some(route_id),
561
3
        ..Default::default()
562
3
    };
563
3
    if let Err(e) = state.model.network_route().del(&cond).await {
564
        error!("[{}] del error: {}", FN_NAME, e);
565
        return Err(ErrResp::ErrDb(Some(e.to_string())));
566
3
    }
567
3
    send_del_ctrl_message(FN_NAME, route, &state).await?;
568

            
569
3
    Ok(StatusCode::NO_CONTENT)
570
9
}
571

            
572
186
fn get_sort_cond(sort_args: &Option<String>) -> Result<Vec<SortCond>, ErrResp> {
573
186
    match sort_args.as_ref() {
574
147
        None => Ok(vec![
575
147
            SortCond {
576
147
                key: SortKey::NetworkCode,
577
147
                asc: true,
578
147
            },
579
147
            SortCond {
580
147
                key: SortKey::CreatedAt,
581
147
                asc: false,
582
147
            },
583
147
        ]),
584
39
        Some(args) => {
585
39
            let mut args = args.split(",");
586
39
            let mut sort_cond = vec![];
587
84
            while let Some(arg) = args.next() {
588
60
                let mut cond = arg.split(":");
589
60
                let key = match cond.next() {
590
                    None => return Err(ErrResp::ErrParam(Some("wrong sort argument".to_string()))),
591
60
                    Some(field) => match field {
592
60
                        "application" => SortKey::ApplicationCode,
593
54
                        "network" => SortKey::NetworkCode,
594
39
                        "created" => SortKey::CreatedAt,
595
                        _ => {
596
6
                            return Err(ErrResp::ErrParam(Some(format!(
597
6
                                "invalid sort key {}",
598
6
                                field
599
6
                            ))))
600
                        }
601
                    },
602
                };
603
54
                let asc = match cond.next() {
604
3
                    None => return Err(ErrResp::ErrParam(Some("wrong sort argument".to_string()))),
605
51
                    Some(asc) => match asc {
606
51
                        "asc" => true,
607
27
                        "desc" => false,
608
                        _ => {
609
3
                            return Err(ErrResp::ErrParam(Some(format!(
610
3
                                "invalid sort asc {}",
611
3
                                asc
612
3
                            ))))
613
                        }
614
                    },
615
                };
616
48
                if cond.next().is_some() {
617
3
                    return Err(ErrResp::ErrParam(Some(
618
3
                        "invalid sort condition".to_string(),
619
3
                    )));
620
45
                }
621
45
                sort_cond.push(SortCond { key, asc });
622
            }
623
24
            Ok(sort_cond)
624
        }
625
    }
626
186
}
627

            
628
/// To check if the user ID can access the network route. Choose `only_owner` to check if the user
629
/// is the unit owner or one of unit members.
630
///
631
/// # Errors
632
///
633
/// Returns OK if the network route is found or not. Otherwise errors will be returned.
634
9
async fn check_route(
635
9
    fn_name: &str,
636
9
    route_id: &str,
637
9
    user_id: &str,
638
9
    only_owner: bool, // to check if this `user_id` is the owner.
639
9
    roles: &HashMap<String, bool>,
640
9
    state: &AppState,
641
9
) -> Result<Option<NetworkRoute>, ErrResp> {
642
9
    let route = match state.model.network_route().get(route_id).await {
643
        Err(e) => {
644
            error!("[{}] get error: {}", fn_name, e);
645
            return Err(ErrResp::ErrDb(Some(e.to_string())));
646
        }
647
9
        Ok(route) => match route {
648
3
            None => return Ok(None),
649
6
            Some(route) => route,
650
6
        },
651
6
    };
652
6
    let unit_id = route.unit_id.as_str();
653
6
    match check_unit(fn_name, user_id, roles, unit_id, only_owner, state).await? {
654
3
        None => Ok(None),
655
3
        Some(_) => Ok(Some(route)),
656
    }
657
9
}
658

            
659
138
fn route_list_transform(list: &Vec<NetworkRoute>) -> Vec<response::GetNetworkRouteData> {
660
138
    let mut ret = vec![];
661
366
    for route in list.iter() {
662
366
        ret.push(route_transform(&route));
663
366
    }
664
138
    ret
665
138
}
666

            
667
72
fn route_list_transform_bytes(
668
72
    list: &Vec<NetworkRoute>,
669
72
    with_start: bool,
670
72
    with_end: bool,
671
72
    format: Option<&request::ListFormat>,
672
72
) -> Result<Bytes, Box<dyn StdError + Send + Sync>> {
673
72
    let mut build_str = match with_start {
674
39
        false => "".to_string(),
675
3
        true => match format {
676
3
            Some(request::ListFormat::Array) => "[".to_string(),
677
30
            _ => "{\"data\":[".to_string(),
678
        },
679
    };
680
72
    let mut is_first = with_start;
681

            
682
4305
    for item in list {
683
4233
        if is_first {
684
33
            is_first = false;
685
4200
        } else {
686
4200
            build_str.push(',');
687
4200
        }
688
4233
        let json_str = match serde_json::to_string(&route_transform(item)) {
689
            Err(e) => return Err(Box::new(e)),
690
4233
            Ok(str) => str,
691
4233
        };
692
4233
        build_str += json_str.as_str();
693
    }
694

            
695
72
    if with_end {
696
33
        build_str += match format {
697
3
            Some(request::ListFormat::Array) => "]",
698
30
            _ => "]}",
699
        }
700
39
    }
701
72
    Ok(Bytes::copy_from_slice(build_str.as_str().as_bytes()))
702
72
}
703

            
704
4599
fn route_transform(route: &NetworkRoute) -> response::GetNetworkRouteData {
705
4599
    response::GetNetworkRouteData {
706
4599
        route_id: route.route_id.clone(),
707
4599
        unit_id: route.unit_id.clone(),
708
4599
        application_id: route.application_id.clone(),
709
4599
        application_code: route.application_code.clone(),
710
4599
        network_id: route.network_id.clone(),
711
4599
        network_code: route.network_code.clone(),
712
4599
        created_at: time_str(&route.created_at),
713
4599
    }
714
4599
}
715

            
716
/// Send delete control message.
717
27
async fn send_del_ctrl_message(
718
27
    fn_name: &str,
719
27
    route: NetworkRoute,
720
27
    state: &AppState,
721
27
) -> Result<(), ErrResp> {
722
27
    if state.cache.is_some() {
723
9
        let msg = SendCtrlMsg::DelNetworkRoute {
724
9
            operation: CtrlMsgOp::DEL_NETWORK_ROUTE.to_string(),
725
9
            new: CtrlDelNetworkRoute {
726
9
                unit_id: route.unit_id,
727
9
                unit_code: Some(route.unit_code),
728
9
                network_id: route.network_id,
729
9
                network_code: route.network_code,
730
9
            },
731
9
        };
732
9
        let payload = match serde_json::to_vec(&msg) {
733
            Err(e) => {
734
                error!(
735
                    "[{}] marshal JSON for {} error: {}",
736
                    fn_name,
737
                    CtrlMsgOp::DEL_NETWORK_ROUTE,
738
                    e
739
                );
740
                return Err(ErrResp::ErrRsc(Some(format!(
741
                    "marshal control message error: {}",
742
                    e
743
                ))));
744
            }
745
9
            Ok(payload) => payload,
746
9
        };
747
9
        let ctrl_sender = { state.ctrl_senders.network_route.lock().unwrap().clone() };
748
9
        if let Err(e) = ctrl_sender.send_msg(payload).await {
749
            error!(
750
                "[{}] send control message for {} error: {}",
751
                fn_name,
752
                CtrlMsgOp::DEL_NETWORK_ROUTE,
753
                e
754
            );
755
            return Err(ErrResp::ErrIntMsg(Some(format!(
756
                "send control message error: {}",
757
                e
758
            ))));
759
9
        }
760
18
    }
761

            
762
27
    Ok(())
763
27
}
764

            
765
/// Clear the network relative cache.
766
11
async fn clear_cache(fn_name: &str, queue_name: &str, cache: &Arc<dyn Cache>) {
767
11
    if let Err(e) = cache.network_route().clear().await {
768
        error!(
769
            "[{}] {} clear network route cache error: {}",
770
            fn_name, queue_name, e
771
        );
772
11
    }
773
11
}
774

            
775
#[async_trait]
776
impl QueueEventHandler for CtrlSenderHandler {
777
    async fn on_error(&self, queue: Arc<dyn GmqQueue>, err: Box<dyn StdError + Send + Sync>) {
778
        const FN_NAME: &'static str = "CtrlSenderHandler::on_error";
779
        let queue_name = queue.name();
780

            
781
        // Clear cache to avoid missing update cache content during queue status changing.
782
        if let Some(cache) = self.cache.as_ref() {
783
            clear_cache(FN_NAME, queue_name, cache).await;
784
        }
785

            
786
        error!("[{}] {} error: {}", FN_NAME, queue_name, err);
787
    }
788

            
789
34
    async fn on_status(&self, queue: Arc<dyn GmqQueue>, status: Status) {
790
        const FN_NAME: &'static str = "CtrlSenderHandler::on_status";
791
34
        let queue_name = queue.name();
792

            
793
        // Clear cache to avoid missing update cache content during queue status changing.
794
34
        if let Some(cache) = self.cache.as_ref() {
795
5
            clear_cache(FN_NAME, queue_name, cache).await;
796
29
        }
797

            
798
34
        match status {
799
15
            Status::Connected => info!("[{}] {} connected", queue_name, FN_NAME),
800
19
            _ => warn!("[{}] {} status to {:?}", FN_NAME, queue_name, status),
801
        }
802
68
    }
803
}
804

            
805
#[async_trait]
806
impl MessageHandler for CtrlSenderHandler {
807
    async fn on_message(&self, _queue: Arc<dyn GmqQueue>, _msg: Box<dyn Message>) {}
808
}
809

            
810
#[async_trait]
811
impl QueueEventHandler for CtrlReceiverHandler {
812
    async fn on_error(&self, queue: Arc<dyn GmqQueue>, err: Box<dyn StdError + Send + Sync>) {
813
        const FN_NAME: &'static str = "CtrlReceiverHandler::on_error";
814
        let queue_name = queue.name();
815

            
816
        // Clear cache to avoid missing update cache content during queue status changing.
817
        if let Some(cache) = self.cache.as_ref() {
818
            clear_cache(FN_NAME, queue_name, cache).await;
819
        }
820

            
821
        error!("[{}] {} error: {}", FN_NAME, queue_name, err);
822
    }
823

            
824
30
    async fn on_status(&self, queue: Arc<dyn GmqQueue>, status: Status) {
825
        const FN_NAME: &'static str = "CtrlReceiverHandler::on_status";
826
30
        let queue_name = queue.name();
827

            
828
        // Clear cache to avoid missing update cache content during queue status changing.
829
30
        if let Some(cache) = self.cache.as_ref() {
830
6
            clear_cache(FN_NAME, queue_name, cache).await;
831
24
        }
832

            
833
30
        match status {
834
15
            Status::Connected => info!("[{}] {} connected", queue_name, FN_NAME),
835
15
            _ => warn!("[{}] {} status to {:?}", FN_NAME, queue_name, status),
836
        }
837
60
    }
838
}
839

            
840
#[async_trait]
841
impl MessageHandler for CtrlReceiverHandler {
842
9
    async fn on_message(&self, queue: Arc<dyn GmqQueue>, msg: Box<dyn Message>) {
843
        const FN_NAME: &'static str = "CtrlReceiverHandler::on_message";
844
9
        let queue_name = queue.name();
845

            
846
9
        let ctrl_msg = match serde_json::from_slice::<RecvCtrlMsg>(msg.payload()) {
847
            Err(e) => {
848
                let src_str: String = String::from_utf8_lossy(msg.payload()).into();
849
                warn!(
850
                    "[{}] {} parse JSON error: {}, src: {}",
851
                    FN_NAME, queue_name, e, src_str
852
                );
853
                if let Err(e) = msg.ack().await {
854
                    error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
855
                }
856
                return;
857
            }
858
9
            Ok(msg) => msg,
859
9
        };
860
9
        match ctrl_msg {
861
9
            RecvCtrlMsg::DelNetworkRoute { new } => {
862
9
                if let Some(cache) = self.cache.as_ref() {
863
9
                    if let Err(e) = cache
864
9
                        .network_route()
865
9
                        .del_uldata(new.network_id.as_str())
866
9
                        .await
867
                    {
868
                        error!(
869
                            "[{}] {} delete network route cache error: {}",
870
                            FN_NAME, queue_name, e
871
                        );
872
                    } else {
873
9
                        debug!(
874
                            "[{}] {} delete network route cache error",
875
                            FN_NAME, queue_name
876
                        );
877
                    }
878
                }
879
9
                if let Err(e) = msg.ack().await {
880
                    error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
881
9
                }
882
            }
883
        }
884
18
    }
885
}