1
use std::{
2
    cmp::Ordering,
3
    collections::HashMap,
4
    error::Error as StdError,
5
    io::{Error as IoError, ErrorKind},
6
    sync::{Arc, Mutex},
7
    time::Duration,
8
};
9

            
10
use async_trait::async_trait;
11
use axum::{
12
    body::{Body, Bytes},
13
    extract::State,
14
    http::{header, StatusCode},
15
    response::IntoResponse,
16
    Extension,
17
};
18
use chrono::Utc;
19
use log::{debug, error, info, warn};
20
use serde::{Deserialize, Serialize};
21
use serde_json;
22
use tokio::time;
23
use url::Url;
24

            
25
use general_mq::{
26
    queue::{EventHandler as QueueEventHandler, GmqQueue, Message, MessageHandler, Status},
27
    Queue,
28
};
29
use sylvia_iot_corelib::{
30
    constants::ContentType,
31
    err::ErrResp,
32
    http::{Json, Path, Query},
33
    role::Role,
34
    strings::{self, time_str},
35
};
36

            
37
use super::{
38
    super::{
39
        super::{middleware::GetTokenInfoData, ErrReq, State as AppState},
40
        lib::{check_application, check_network, check_unit},
41
    },
42
    request, response,
43
};
44
use crate::{
45
    libs::{
46
        config::BrokerCtrl as CfgCtrl,
47
        mq::{self, Connection},
48
    },
49
    models::{
50
        network_route::{ListOptions, ListQueryCond, NetworkRoute, QueryCond, SortCond, SortKey},
51
        Cache,
52
    },
53
};
54

            
55
#[derive(Deserialize, Serialize)]
56
#[serde(tag = "operation")]
57
enum RecvCtrlMsg {
58
    #[serde(rename = "del-network-route")]
59
    DelNetworkRoute { new: CtrlDelNetworkRoute },
60
}
61

            
62
#[derive(Serialize)]
63
#[serde(untagged)]
64
enum SendCtrlMsg {
65
    DelNetworkRoute {
66
        operation: String,
67
        new: CtrlDelNetworkRoute,
68
    },
69
}
70

            
71
struct CtrlMsgOp;
72

            
73
#[derive(Deserialize, Serialize)]
74
struct CtrlDelNetworkRoute {
75
    #[serde(rename = "unitId")]
76
    unit_id: String,
77
    #[serde(rename = "unitCode")]
78
    unit_code: Option<String>,
79
    #[serde(rename = "networkId")]
80
    network_id: String,
81
    #[serde(rename = "networkCode")]
82
    network_code: String,
83
}
84

            
85
struct CtrlSenderHandler {
86
    cache: Option<Arc<dyn Cache>>,
87
}
88

            
89
struct CtrlReceiverHandler {
90
    cache: Option<Arc<dyn Cache>>,
91
}
92

            
93
impl CtrlMsgOp {
94
    const DEL_NETWORK_ROUTE: &'static str = "del-network-route";
95
}
96

            
97
const LIST_LIMIT_DEFAULT: u64 = 100;
98
const LIST_CURSOR_MAX: u64 = 100;
99
const ID_RAND_LEN: usize = 12;
100
const CTRL_QUEUE_NAME: &'static str = "network-route";
101

            
102
/// Initialize channels.
103
30
pub async fn init(state: &AppState, ctrl_conf: &CfgCtrl) -> Result<(), Box<dyn StdError>> {
104
    const FN_NAME: &'static str = "init";
105

            
106
30
    let q = new_ctrl_receiver(state, ctrl_conf)?;
107
30
    {
108
30
        state
109
30
            .ctrl_receivers
110
30
            .lock()
111
30
            .unwrap()
112
30
            .insert(CTRL_QUEUE_NAME.to_string(), q.clone());
113
30
    }
114
30

            
115
30
    let ctrl_sender = { state.ctrl_senders.network_route.lock().unwrap().clone() };
116
    // Wait for connected.
117
2762
    for _ in 0..500 {
118
2762
        if ctrl_sender.status() == Status::Connected && q.status() == Status::Connected {
119
30
            break;
120
2732
        }
121
2732
        time::sleep(Duration::from_millis(10)).await;
122
    }
123
30
    if ctrl_sender.status() != Status::Connected {
124
        error!(
125
            "[{}] {} control sender not connected",
126
            FN_NAME, CTRL_QUEUE_NAME
127
        );
128
        return Err(Box::new(IoError::new(
129
            ErrorKind::NotConnected,
130
            format!("control sender {} not connected", CTRL_QUEUE_NAME),
131
        )));
132
30
    }
133
30
    if q.status() != Status::Connected {
134
        error!(
135
            "[{}] {} control receiver not connected",
136
            FN_NAME, CTRL_QUEUE_NAME
137
        );
138
        return Err(Box::new(IoError::new(
139
            ErrorKind::NotConnected,
140
            format!("control receiver {} not connected", CTRL_QUEUE_NAME),
141
        )));
142
30
    }
143
30

            
144
30
    Ok(())
145
30
}
146

            
147
/// Create control channel sender queue.
148
30
pub fn new_ctrl_sender(
149
30
    conn_pool: &Arc<Mutex<HashMap<String, Connection>>>,
150
30
    config: &CfgCtrl,
151
30
    cache: Option<Arc<dyn Cache>>,
152
30
) -> Result<Arc<Mutex<Queue>>, Box<dyn StdError>> {
153
30
    let url = match config.url.as_ref() {
154
        None => {
155
            return Err(Box::new(IoError::new(
156
                ErrorKind::InvalidInput,
157
                "empty control url",
158
            )))
159
        }
160
30
        Some(url) => match Url::parse(url.as_str()) {
161
            Err(e) => return Err(Box::new(e)),
162
30
            Ok(url) => url,
163
30
        },
164
30
    };
165
30

            
166
30
    match mq::control::new(
167
30
        conn_pool.clone(),
168
30
        &url,
169
30
        config.prefetch,
170
30
        CTRL_QUEUE_NAME,
171
30
        false,
172
30
        Arc::new(CtrlSenderHandler {
173
30
            cache: cache.clone(),
174
30
        }),
175
30
        Arc::new(CtrlSenderHandler { cache }),
176
30
    ) {
177
        Err(e) => Err(Box::new(IoError::new(ErrorKind::InvalidInput, e))),
178
30
        Ok(q) => Ok(Arc::new(Mutex::new(q))),
179
    }
180
30
}
181

            
182
/// Create control channel receiver queue.
183
30
pub fn new_ctrl_receiver(state: &AppState, config: &CfgCtrl) -> Result<Queue, Box<dyn StdError>> {
184
30
    let url = match config.url.as_ref() {
185
        None => {
186
            return Err(Box::new(IoError::new(
187
                ErrorKind::InvalidInput,
188
                "empty control url",
189
            )))
190
        }
191
30
        Some(url) => match Url::parse(url.as_str()) {
192
            Err(e) => return Err(Box::new(e)),
193
30
            Ok(url) => url,
194
30
        },
195
30
    };
196
30
    let handler = Arc::new(CtrlReceiverHandler {
197
30
        cache: state.cache.clone(),
198
30
    });
199
30
    match mq::control::new(
200
30
        state.mq_conns.clone(),
201
30
        &url,
202
30
        config.prefetch,
203
30
        CTRL_QUEUE_NAME,
204
30
        true,
205
30
        handler.clone(),
206
30
        handler,
207
30
    ) {
208
        Err(e) => Err(Box::new(IoError::new(ErrorKind::InvalidInput, e))),
209
30
        Ok(q) => Ok(q),
210
    }
211
30
}
212

            
213
/// `POST /{base}/api/v1/network-route`
214
114
pub async fn post_network_route(
215
114
    State(state): State<AppState>,
216
114
    Extension(token_info): Extension<GetTokenInfoData>,
217
114
    Json(body): Json<request::PostNetworkRouteBody>,
218
114
) -> impl IntoResponse {
219
    const FN_NAME: &'static str = "post_network_route";
220

            
221
114
    let user_id = token_info.user_id.as_str();
222
114
    let roles = &token_info.roles;
223
114

            
224
114
    if body.data.network_id.len() == 0 {
225
6
        return Err(ErrResp::ErrParam(Some(
226
6
            "`networkId` must with at least one character".to_string(),
227
6
        )));
228
108
    } else if body.data.application_id.len() == 0 {
229
6
        return Err(ErrResp::ErrParam(Some(
230
6
            "`applicationId` must with at least one character".to_string(),
231
6
        )));
232
102
    }
233
102
    let network_id = body.data.network_id.as_str();
234
102
    let application_id = body.data.application_id.as_str();
235
102
    let network = match check_network(FN_NAME, network_id, user_id, true, roles, &state).await? {
236
        None => {
237
18
            return Err(ErrResp::Custom(
238
18
                ErrReq::NETWORK_NOT_EXIST.0,
239
18
                ErrReq::NETWORK_NOT_EXIST.1,
240
18
                None,
241
18
            ))
242
        }
243
84
        Some(network) => network,
244
    };
245
72
    let application =
246
84
        match check_application(FN_NAME, application_id, user_id, true, roles, &state).await? {
247
            None => {
248
12
                return Err(ErrResp::Custom(
249
12
                    ErrReq::APPLICATION_NOT_EXIST.0,
250
12
                    ErrReq::APPLICATION_NOT_EXIST.1,
251
12
                    None,
252
12
                ))
253
            }
254
72
            Some(application) => application,
255
        };
256
72
    if let Some(unit_id) = network.unit_id.as_ref() {
257
54
        if unit_id.as_str().cmp(application.unit_id.as_str()) != Ordering::Equal {
258
12
            return Err(ErrResp::Custom(
259
12
                ErrReq::UNIT_NOT_MATCH.0,
260
12
                ErrReq::UNIT_NOT_MATCH.1,
261
12
                None,
262
12
            ));
263
42
        }
264
18
    }
265
60
    let cond = ListQueryCond {
266
60
        application_id: Some(application_id),
267
60
        network_id: Some(network_id),
268
60
        ..Default::default()
269
60
    };
270
60
    let opts = ListOptions {
271
60
        cond: &cond,
272
60
        offset: None,
273
60
        limit: Some(1),
274
60
        sort: None,
275
60
        cursor_max: None,
276
60
    };
277
60
    match state.model.network_route().list(&opts, None).await {
278
        Err(e) => {
279
            error!("[{}] get error: {}", FN_NAME, e);
280
            return Err(ErrResp::ErrDb(Some(e.to_string())));
281
        }
282
60
        Ok((list, _)) => match list.len() {
283
48
            0 => (),
284
            _ => {
285
12
                return Err(ErrResp::Custom(
286
12
                    ErrReq::ROUTE_EXIST.0,
287
12
                    ErrReq::ROUTE_EXIST.1,
288
12
                    None,
289
12
                ))
290
            }
291
        },
292
    }
293

            
294
48
    let now = Utc::now();
295
48
    let route_id = strings::random_id(&now, ID_RAND_LEN);
296
48
    let route = NetworkRoute {
297
48
        route_id: route_id.clone(),
298
48
        unit_id: application.unit_id,
299
48
        unit_code: application.unit_code,
300
48
        application_id: application.application_id,
301
48
        application_code: application.code,
302
48
        network_id: network.network_id,
303
48
        network_code: network.code,
304
48
        created_at: now,
305
48
    };
306
48
    if let Err(e) = state.model.network_route().add(&route).await {
307
        error!("[{}] add error: {}", FN_NAME, e);
308
        return Err(ErrResp::ErrDb(Some(e.to_string())));
309
48
    }
310
48
    send_del_ctrl_message(FN_NAME, route, &state).await?;
311

            
312
48
    Ok(Json(response::PostNetworkRoute {
313
48
        data: response::PostNetworkRouteData { route_id },
314
48
    }))
315
114
}
316

            
317
/// `GET /{base}/api/v1/network-route/count`
318
234
pub async fn get_network_route_count(
319
234
    State(state): State<AppState>,
320
234
    Extension(token_info): Extension<GetTokenInfoData>,
321
234
    Query(query): Query<request::GetNetworkRouteCountQuery>,
322
234
) -> impl IntoResponse {
323
    const FN_NAME: &'static str = "get_network_route_count";
324

            
325
234
    let user_id = token_info.user_id.as_str();
326
234
    let roles = &token_info.roles;
327
234

            
328
234
    if !Role::is_role(roles, Role::ADMIN) && !Role::is_role(roles, Role::MANAGER) {
329
126
        match query.unit.as_ref() {
330
6
            None => return Err(ErrResp::ErrParam(Some("missing `unit`".to_string()))),
331
120
            Some(unit_id) => {
332
120
                if unit_id.len() == 0 {
333
6
                    return Err(ErrResp::ErrParam(Some("missing `unit`".to_string())));
334
114
                }
335
            }
336
        }
337
108
    }
338
222
    let unit_cond = match query.unit.as_ref() {
339
42
        None => None,
340
180
        Some(unit_id) => match unit_id.len() {
341
12
            0 => None,
342
            _ => {
343
168
                match check_unit(FN_NAME, user_id, roles, unit_id.as_str(), false, &state).await? {
344
                    None => {
345
18
                        return Err(ErrResp::Custom(
346
18
                            ErrReq::UNIT_NOT_EXIST.0,
347
18
                            ErrReq::UNIT_NOT_EXIST.1,
348
18
                            None,
349
18
                        ))
350
                    }
351
150
                    Some(_) => Some(unit_id.as_str()),
352
                }
353
            }
354
        },
355
    };
356
204
    let cond = ListQueryCond {
357
204
        unit_id: unit_cond,
358
204
        application_id: match query.application.as_ref() {
359
114
            None => None,
360
90
            Some(application) => match application.len() {
361
6
                0 => None,
362
84
                _ => Some(application.as_ref()),
363
            },
364
        },
365
204
        network_id: match query.network.as_ref() {
366
114
            None => None,
367
90
            Some(network_id) => match network_id.len() {
368
6
                0 => None,
369
84
                _ => Some(network_id.as_ref()),
370
            },
371
        },
372
204
        ..Default::default()
373
204
    };
374
204
    match state.model.network_route().count(&cond).await {
375
        Err(e) => {
376
            error!("[{}] count error: {}", FN_NAME, e);
377
            Err(ErrResp::ErrDb(Some(e.to_string())))
378
        }
379
204
        Ok(count) => Ok(Json(response::GetNetworkRouteCount {
380
204
            data: response::GetCountData { count },
381
204
        })),
382
    }
383
234
}
384

            
385
/// `GET /{base}/api/v1/network-route/list`
386
402
pub async fn get_network_route_list(
387
402
    State(state): State<AppState>,
388
402
    Extension(token_info): Extension<GetTokenInfoData>,
389
402
    Query(query): Query<request::GetNetworkRouteListQuery>,
390
402
) -> impl IntoResponse {
391
    const FN_NAME: &'static str = "get_network_route_list";
392

            
393
402
    let user_id = token_info.user_id.as_str();
394
402
    let roles = &token_info.roles;
395
402

            
396
402
    if !Role::is_role(roles, Role::ADMIN) && !Role::is_role(roles, Role::MANAGER) {
397
126
        match query.unit.as_ref() {
398
6
            None => return Err(ErrResp::ErrParam(Some("missing `unit`".to_string()))),
399
120
            Some(unit_id) => {
400
120
                if unit_id.len() == 0 {
401
6
                    return Err(ErrResp::ErrParam(Some("missing `unit`".to_string())));
402
114
                }
403
            }
404
        }
405
276
    }
406
390
    let unit_cond = match query.unit.as_ref() {
407
192
        None => None,
408
198
        Some(unit_id) => match unit_id.len() {
409
18
            0 => None,
410
            _ => {
411
180
                match check_unit(FN_NAME, user_id, roles, unit_id.as_str(), false, &state).await? {
412
                    None => {
413
18
                        return Err(ErrResp::Custom(
414
18
                            ErrReq::UNIT_NOT_EXIST.0,
415
18
                            ErrReq::UNIT_NOT_EXIST.1,
416
18
                            None,
417
18
                        ))
418
                    }
419
162
                    Some(_) => Some(unit_id.as_str()),
420
                }
421
            }
422
        },
423
    };
424
372
    let cond = ListQueryCond {
425
372
        unit_id: unit_cond,
426
372
        application_id: match query.application.as_ref() {
427
264
            None => None,
428
108
            Some(application) => match application.len() {
429
12
                0 => None,
430
96
                _ => Some(application.as_ref()),
431
            },
432
        },
433
372
        network_id: match query.network.as_ref() {
434
276
            None => None,
435
96
            Some(network_id) => match network_id.len() {
436
12
                0 => None,
437
84
                _ => Some(network_id.as_ref()),
438
            },
439
        },
440
372
        ..Default::default()
441
    };
442
372
    let sort_cond = get_sort_cond(&query.sort)?;
443
342
    let opts = ListOptions {
444
342
        cond: &cond,
445
342
        offset: query.offset,
446
342
        limit: match query.limit {
447
294
            None => Some(LIST_LIMIT_DEFAULT),
448
48
            Some(limit) => match limit {
449
18
                0 => None,
450
30
                _ => Some(limit),
451
            },
452
        },
453
342
        sort: Some(sort_cond.as_slice()),
454
342
        cursor_max: Some(LIST_CURSOR_MAX),
455
    };
456

            
457
342
    let (list, cursor) = match state.model.network_route().list(&opts, None).await {
458
        Err(e) => {
459
            error!("[{}] list error: {}", FN_NAME, e);
460
            return Err(ErrResp::ErrDb(Some(e.to_string())));
461
        }
462
342
        Ok((list, cursor)) => match cursor {
463
6
            None => match query.format {
464
                Some(request::ListFormat::Array) => {
465
6
                    return Ok(Json(route_list_transform(&list)).into_response())
466
                }
467
                _ => {
468
270
                    return Ok(Json(response::GetNetworkRouteList {
469
270
                        data: route_list_transform(&list),
470
270
                    })
471
270
                    .into_response())
472
                }
473
            },
474
66
            Some(_) => (list, cursor),
475
66
        },
476
66
    };
477
66

            
478
66
    let body = Body::from_stream(async_stream::stream! {
479
66
        let unit_cond = match query.unit.as_ref() {
480
66
            None => None,
481
66
            Some(unit_id) => match unit_id.len() {
482
66
                0 => None,
483
66
                _ => Some(unit_id.as_str()),
484
66
            },
485
66
        };
486
66
        let cond = ListQueryCond {
487
66
            unit_id: unit_cond,
488
66
            application_id: match query.application.as_ref() {
489
66
                None => None,
490
66
                Some(application) => match application.len() {
491
66
                    0 => None,
492
66
                    _ => Some(application.as_ref())
493
66
                },
494
66
            },
495
66
            network_id: match query.network.as_ref() {
496
66
                None => None,
497
66
                Some(network_id) => match network_id.len() {
498
66
                    0 => None,
499
66
                    _ => Some(network_id.as_ref())
500
66
                },
501
66
            },
502
66
            ..Default::default()
503
66
        };
504
66
        let opts = ListOptions {
505
66
            cond: &cond,
506
66
            offset: query.offset,
507
66
            limit: match query.limit {
508
66
                None => Some(LIST_LIMIT_DEFAULT),
509
66
                Some(limit) => match limit {
510
66
                    0 => None,
511
66
                    _ => Some(limit),
512
66
                },
513
66
            },
514
66
            sort: Some(sort_cond.as_slice()),
515
66
            cursor_max: Some(LIST_CURSOR_MAX),
516
66
        };
517
66

            
518
66
        let mut list = list;
519
66
        let mut cursor = cursor;
520
66
        let mut is_first = true;
521
66
        loop {
522
66
            yield route_list_transform_bytes(&list, is_first, cursor.is_none(), query.format.as_ref());
523
66
            is_first = false;
524
66
            if cursor.is_none() {
525
66
                break;
526
66
            }
527
66
            let (_list, _cursor) = match state.model.network_route().list(&opts, cursor).await {
528
66
                Err(_) => break,
529
66
                Ok((list, cursor)) => (list, cursor),
530
66
            };
531
66
            list = _list;
532
66
            cursor = _cursor;
533
66
        }
534
66
    });
535
66
    Ok(([(header::CONTENT_TYPE, ContentType::JSON)], body).into_response())
536
402
}
537

            
538
/// `DELETE /{base}/api/v1/network-route/{routeId}`
539
18
pub async fn delete_network_route(
540
18
    State(state): State<AppState>,
541
18
    Extension(token_info): Extension<GetTokenInfoData>,
542
18
    Path(param): Path<request::RouteIdPath>,
543
18
) -> impl IntoResponse {
544
    const FN_NAME: &'static str = "delete_network_route";
545

            
546
18
    let user_id = token_info.user_id.as_str();
547
18
    let roles = &token_info.roles;
548
18
    let route_id = param.route_id.as_str();
549

            
550
    // To check if the network route is for the user.
551
18
    let route = match check_route(FN_NAME, route_id, user_id, true, roles, &state).await {
552
        Err(e) => return Err(e), // XXX: not use "?" to solve E0282 error.
553
18
        Ok(route) => match route {
554
12
            None => return Ok(StatusCode::NO_CONTENT),
555
6
            Some(route) => route,
556
6
        },
557
6
    };
558
6

            
559
6
    let cond = QueryCond {
560
6
        route_id: Some(route_id),
561
6
        ..Default::default()
562
6
    };
563
6
    if let Err(e) = state.model.network_route().del(&cond).await {
564
        error!("[{}] del error: {}", FN_NAME, e);
565
        return Err(ErrResp::ErrDb(Some(e.to_string())));
566
6
    }
567
6
    send_del_ctrl_message(FN_NAME, route, &state).await?;
568

            
569
6
    Ok(StatusCode::NO_CONTENT)
570
18
}
571

            
572
372
fn get_sort_cond(sort_args: &Option<String>) -> Result<Vec<SortCond>, ErrResp> {
573
372
    match sort_args.as_ref() {
574
294
        None => Ok(vec![
575
294
            SortCond {
576
294
                key: SortKey::NetworkCode,
577
294
                asc: true,
578
294
            },
579
294
            SortCond {
580
294
                key: SortKey::CreatedAt,
581
294
                asc: false,
582
294
            },
583
294
        ]),
584
78
        Some(args) => {
585
78
            let mut args = args.split(",");
586
78
            let mut sort_cond = vec![];
587
168
            while let Some(arg) = args.next() {
588
120
                let mut cond = arg.split(":");
589
120
                let key = match cond.next() {
590
                    None => return Err(ErrResp::ErrParam(Some("wrong sort argument".to_string()))),
591
120
                    Some(field) => match field {
592
120
                        "application" => SortKey::ApplicationCode,
593
108
                        "network" => SortKey::NetworkCode,
594
78
                        "created" => SortKey::CreatedAt,
595
                        _ => {
596
12
                            return Err(ErrResp::ErrParam(Some(format!(
597
12
                                "invalid sort key {}",
598
12
                                field
599
12
                            ))))
600
                        }
601
                    },
602
                };
603
108
                let asc = match cond.next() {
604
6
                    None => return Err(ErrResp::ErrParam(Some("wrong sort argument".to_string()))),
605
102
                    Some(asc) => match asc {
606
102
                        "asc" => true,
607
54
                        "desc" => false,
608
                        _ => {
609
6
                            return Err(ErrResp::ErrParam(Some(format!(
610
6
                                "invalid sort asc {}",
611
6
                                asc
612
6
                            ))))
613
                        }
614
                    },
615
                };
616
96
                if cond.next().is_some() {
617
6
                    return Err(ErrResp::ErrParam(Some(
618
6
                        "invalid sort condition".to_string(),
619
6
                    )));
620
90
                }
621
90
                sort_cond.push(SortCond { key, asc });
622
            }
623
48
            Ok(sort_cond)
624
        }
625
    }
626
372
}
627

            
628
/// To check if the user ID can access the network route. Choose `only_owner` to check if the user
629
/// is the unit owner or one of unit members.
630
///
631
/// # Errors
632
///
633
/// Returns OK if the network route is found or not. Otherwise errors will be returned.
634
18
async fn check_route(
635
18
    fn_name: &str,
636
18
    route_id: &str,
637
18
    user_id: &str,
638
18
    only_owner: bool, // to check if this `user_id` is the owner.
639
18
    roles: &HashMap<String, bool>,
640
18
    state: &AppState,
641
18
) -> Result<Option<NetworkRoute>, ErrResp> {
642
18
    let route = match state.model.network_route().get(route_id).await {
643
        Err(e) => {
644
            error!("[{}] get error: {}", fn_name, e);
645
            return Err(ErrResp::ErrDb(Some(e.to_string())));
646
        }
647
18
        Ok(route) => match route {
648
6
            None => return Ok(None),
649
12
            Some(route) => route,
650
12
        },
651
12
    };
652
12
    let unit_id = route.unit_id.as_str();
653
12
    match check_unit(fn_name, user_id, roles, unit_id, only_owner, state).await? {
654
6
        None => Ok(None),
655
6
        Some(_) => Ok(Some(route)),
656
    }
657
18
}
658

            
659
276
fn route_list_transform(list: &Vec<NetworkRoute>) -> Vec<response::GetNetworkRouteData> {
660
276
    let mut ret = vec![];
661
732
    for route in list.iter() {
662
732
        ret.push(route_transform(&route));
663
732
    }
664
276
    ret
665
276
}
666

            
667
144
fn route_list_transform_bytes(
668
144
    list: &Vec<NetworkRoute>,
669
144
    with_start: bool,
670
144
    with_end: bool,
671
144
    format: Option<&request::ListFormat>,
672
144
) -> Result<Bytes, Box<dyn StdError + Send + Sync>> {
673
144
    let mut build_str = match with_start {
674
78
        false => "".to_string(),
675
6
        true => match format {
676
6
            Some(request::ListFormat::Array) => "[".to_string(),
677
60
            _ => "{\"data\":[".to_string(),
678
        },
679
    };
680
144
    let mut is_first = with_start;
681

            
682
8610
    for item in list {
683
8466
        if is_first {
684
66
            is_first = false;
685
8400
        } else {
686
8400
            build_str.push(',');
687
8400
        }
688
8466
        let json_str = match serde_json::to_string(&route_transform(item)) {
689
            Err(e) => return Err(Box::new(e)),
690
8466
            Ok(str) => str,
691
8466
        };
692
8466
        build_str += json_str.as_str();
693
    }
694

            
695
144
    if with_end {
696
66
        build_str += match format {
697
6
            Some(request::ListFormat::Array) => "]",
698
60
            _ => "]}",
699
        }
700
78
    }
701
144
    Ok(Bytes::copy_from_slice(build_str.as_str().as_bytes()))
702
144
}
703

            
704
9198
fn route_transform(route: &NetworkRoute) -> response::GetNetworkRouteData {
705
9198
    response::GetNetworkRouteData {
706
9198
        route_id: route.route_id.clone(),
707
9198
        unit_id: route.unit_id.clone(),
708
9198
        application_id: route.application_id.clone(),
709
9198
        application_code: route.application_code.clone(),
710
9198
        network_id: route.network_id.clone(),
711
9198
        network_code: route.network_code.clone(),
712
9198
        created_at: time_str(&route.created_at),
713
9198
    }
714
9198
}
715

            
716
/// Send delete control message.
717
54
async fn send_del_ctrl_message(
718
54
    fn_name: &str,
719
54
    route: NetworkRoute,
720
54
    state: &AppState,
721
54
) -> Result<(), ErrResp> {
722
54
    if state.cache.is_some() {
723
18
        let msg = SendCtrlMsg::DelNetworkRoute {
724
18
            operation: CtrlMsgOp::DEL_NETWORK_ROUTE.to_string(),
725
18
            new: CtrlDelNetworkRoute {
726
18
                unit_id: route.unit_id,
727
18
                unit_code: Some(route.unit_code),
728
18
                network_id: route.network_id,
729
18
                network_code: route.network_code,
730
18
            },
731
18
        };
732
18
        let payload = match serde_json::to_vec(&msg) {
733
            Err(e) => {
734
                error!(
735
                    "[{}] marshal JSON for {} error: {}",
736
                    fn_name,
737
                    CtrlMsgOp::DEL_NETWORK_ROUTE,
738
                    e
739
                );
740
                return Err(ErrResp::ErrRsc(Some(format!(
741
                    "marshal control message error: {}",
742
                    e
743
                ))));
744
            }
745
18
            Ok(payload) => payload,
746
18
        };
747
18
        let ctrl_sender = { state.ctrl_senders.network_route.lock().unwrap().clone() };
748
18
        if let Err(e) = ctrl_sender.send_msg(payload).await {
749
            error!(
750
                "[{}] send control message for {} error: {}",
751
                fn_name,
752
                CtrlMsgOp::DEL_NETWORK_ROUTE,
753
                e
754
            );
755
            return Err(ErrResp::ErrIntMsg(Some(format!(
756
                "send control message error: {}",
757
                e
758
            ))));
759
18
        }
760
36
    }
761

            
762
54
    Ok(())
763
54
}
764

            
765
/// Clear the network relative cache.
766
24
async fn clear_cache(fn_name: &str, queue_name: &str, cache: &Arc<dyn Cache>) {
767
24
    if let Err(e) = cache.network_route().clear().await {
768
        error!(
769
            "[{}] {} clear network route cache error: {}",
770
            fn_name, queue_name, e
771
        );
772
24
    }
773
24
}
774

            
775
#[async_trait]
776
impl QueueEventHandler for CtrlSenderHandler {
777
    async fn on_error(&self, queue: Arc<dyn GmqQueue>, err: Box<dyn StdError + Send + Sync>) {
778
        const FN_NAME: &'static str = "CtrlSenderHandler::on_error";
779
        let queue_name = queue.name();
780

            
781
        // Clear cache to avoid missing update cache content during queue status changing.
782
        if let Some(cache) = self.cache.as_ref() {
783
            clear_cache(FN_NAME, queue_name, cache).await;
784
        }
785

            
786
        error!("[{}] {} error: {}", FN_NAME, queue_name, err);
787
    }
788

            
789
76
    async fn on_status(&self, queue: Arc<dyn GmqQueue>, status: Status) {
790
        const FN_NAME: &'static str = "CtrlSenderHandler::on_status";
791
76
        let queue_name = queue.name();
792

            
793
        // Clear cache to avoid missing update cache content during queue status changing.
794
76
        if let Some(cache) = self.cache.as_ref() {
795
12
            clear_cache(FN_NAME, queue_name, cache).await;
796
64
        }
797

            
798
76
        match status {
799
30
            Status::Connected => info!("[{}] {} connected", queue_name, FN_NAME),
800
46
            _ => warn!("[{}] {} status to {:?}", FN_NAME, queue_name, status),
801
        }
802
152
    }
803
}
804

            
805
#[async_trait]
806
impl MessageHandler for CtrlSenderHandler {
807
    async fn on_message(&self, _queue: Arc<dyn GmqQueue>, _msg: Box<dyn Message>) {}
808
}
809

            
810
#[async_trait]
811
impl QueueEventHandler for CtrlReceiverHandler {
812
    async fn on_error(&self, queue: Arc<dyn GmqQueue>, err: Box<dyn StdError + Send + Sync>) {
813
        const FN_NAME: &'static str = "CtrlReceiverHandler::on_error";
814
        let queue_name = queue.name();
815

            
816
        // Clear cache to avoid missing update cache content during queue status changing.
817
        if let Some(cache) = self.cache.as_ref() {
818
            clear_cache(FN_NAME, queue_name, cache).await;
819
        }
820

            
821
        error!("[{}] {} error: {}", FN_NAME, queue_name, err);
822
    }
823

            
824
60
    async fn on_status(&self, queue: Arc<dyn GmqQueue>, status: Status) {
825
        const FN_NAME: &'static str = "CtrlReceiverHandler::on_status";
826
60
        let queue_name = queue.name();
827

            
828
        // Clear cache to avoid missing update cache content during queue status changing.
829
60
        if let Some(cache) = self.cache.as_ref() {
830
12
            clear_cache(FN_NAME, queue_name, cache).await;
831
48
        }
832

            
833
60
        match status {
834
30
            Status::Connected => info!("[{}] {} connected", queue_name, FN_NAME),
835
30
            _ => warn!("[{}] {} status to {:?}", FN_NAME, queue_name, status),
836
        }
837
120
    }
838
}
839

            
840
#[async_trait]
841
impl MessageHandler for CtrlReceiverHandler {
842
18
    async fn on_message(&self, queue: Arc<dyn GmqQueue>, msg: Box<dyn Message>) {
843
        const FN_NAME: &'static str = "CtrlReceiverHandler::on_message";
844
18
        let queue_name = queue.name();
845

            
846
18
        let ctrl_msg = match serde_json::from_slice::<RecvCtrlMsg>(msg.payload()) {
847
            Err(e) => {
848
                let src_str: String = String::from_utf8_lossy(msg.payload()).into();
849
                warn!(
850
                    "[{}] {} parse JSON error: {}, src: {}",
851
                    FN_NAME, queue_name, e, src_str
852
                );
853
                if let Err(e) = msg.ack().await {
854
                    error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
855
                }
856
                return;
857
            }
858
18
            Ok(msg) => msg,
859
18
        };
860
18
        match ctrl_msg {
861
18
            RecvCtrlMsg::DelNetworkRoute { new } => {
862
18
                if let Some(cache) = self.cache.as_ref() {
863
18
                    if let Err(e) = cache
864
18
                        .network_route()
865
18
                        .del_uldata(new.network_id.as_str())
866
18
                        .await
867
                    {
868
                        error!(
869
                            "[{}] {} delete network route cache error: {}",
870
                            FN_NAME, queue_name, e
871
                        );
872
                    } else {
873
18
                        debug!(
874
                            "[{}] {} delete network route cache error",
875
                            FN_NAME, queue_name
876
                        );
877
                    }
878
                }
879
18
                if let Err(e) = msg.ack().await {
880
                    error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
881
18
                }
882
            }
883
        }
884
36
    }
885
}