1
use std::{
2
    collections::HashMap,
3
    error::Error as StdError,
4
    io::{Error as IoError, ErrorKind},
5
    sync::{Arc, Mutex},
6
    time::Duration,
7
};
8

            
9
use async_trait::async_trait;
10
use axum::{
11
    body::{Body, Bytes},
12
    extract::State,
13
    http::{header, StatusCode},
14
    response::IntoResponse,
15
    Extension,
16
};
17
use chrono::{DateTime, TimeZone, Utc};
18
use log::{debug, error, info, warn};
19
use serde::{Deserialize, Serialize};
20
use serde_json::{self, Map, Value};
21
use tokio::time;
22
use url::Url;
23

            
24
use general_mq::{
25
    queue::{EventHandler as QueueEventHandler, GmqQueue, Message, MessageHandler, Status},
26
    Queue,
27
};
28
use sylvia_iot_corelib::{
29
    constants::ContentType,
30
    err::{self, ErrResp},
31
    http::{Json, Path, Query},
32
    role::Role,
33
    strings::{self, time_str},
34
};
35

            
36
use super::{
37
    super::{
38
        super::{middleware::GetTokenInfoData, ErrReq, State as AppState},
39
        lib::{check_application, check_unit, gen_mgr_key},
40
    },
41
    request, response,
42
};
43
use crate::{
44
    libs::{
45
        config::BrokerCtrl as CfgCtrl,
46
        mq::{
47
            self,
48
            application::{ApplicationMgr, DlData, DlDataResp, EventHandler},
49
            network::{DlData as NetworkDlData, NetworkMgr},
50
            Connection, MgrStatus, Options as MgrOptions,
51
        },
52
    },
53
    models::{
54
        application::{
55
            Application, ListOptions, ListQueryCond, QueryCond, SortCond, SortKey, UpdateQueryCond,
56
            Updates,
57
        },
58
        device, device_route, dldata_buffer, network_route, Cache, Model,
59
    },
60
};
61

            
62
struct MgrHandler {
63
    model: Arc<dyn Model>,
64
    cache: Option<Arc<dyn Cache>>,
65
    network_mgrs: Arc<Mutex<HashMap<String, NetworkMgr>>>,
66
    data_sender: Option<Queue>,
67
}
68

            
69
#[derive(Deserialize)]
70
#[serde(tag = "operation")]
71
enum RecvCtrlMsg {
72
    #[serde(rename = "del-application")]
73
    DelApplication { new: CtrlDelApplication },
74
    #[serde(rename = "add-manager")]
75
    AddManager { new: CtrlAddManager },
76
    #[serde(rename = "del-manager")]
77
    DelManager { new: String },
78
}
79

            
80
/// Control channel.
81
#[derive(Serialize)]
82
#[serde(untagged)]
83
enum SendCtrlMsg {
84
    DelApplication {
85
        operation: String,
86
        new: CtrlDelApplication,
87
    },
88
    AddManager {
89
        operation: String,
90
        new: CtrlAddManager,
91
    },
92
    DelManager {
93
        operation: String,
94
        new: String,
95
    },
96
}
97

            
98
/// Data channel.
99
#[derive(Serialize)]
100
struct SendDataMsg {
101
    kind: String,
102
    data: SendDataKind,
103
}
104

            
105
#[derive(Serialize)]
106
#[serde(untagged)]
107
enum SendDataKind {
108
    AppDlData {
109
        #[serde(rename = "dataId")]
110
        data_id: String,
111
        proc: String,
112
        status: i32,
113
        #[serde(rename = "unitId")]
114
        unit_id: String,
115
        #[serde(rename = "deviceId", skip_serializing_if = "Option::is_none")]
116
        device_id: Option<String>,
117
        #[serde(rename = "networkCode", skip_serializing_if = "Option::is_none")]
118
        network_code: Option<String>,
119
        #[serde(rename = "networkAddr", skip_serializing_if = "Option::is_none")]
120
        network_addr: Option<String>,
121
        profile: String,
122
        data: String,
123
        #[serde(skip_serializing_if = "Option::is_none")]
124
        extension: Option<Map<String, Value>>,
125
    },
126
    NetDlData {
127
        #[serde(rename = "dataId")]
128
        data_id: String,
129
        proc: String,
130
        #[serde(rename = "pub")]
131
        publish: String,
132
        status: i32,
133
        #[serde(rename = "unitId")]
134
        unit_id: String,
135
        #[serde(rename = "deviceId")]
136
        device_id: String,
137
        #[serde(rename = "networkCode")]
138
        network_code: String,
139
        #[serde(rename = "networkAddr")]
140
        network_addr: String,
141
        profile: String,
142
        data: String,
143
        #[serde(skip_serializing_if = "Option::is_none")]
144
        extension: Option<Map<String, Value>>,
145
    },
146
}
147

            
148
struct CtrlMsgOp;
149
struct DataMsgKind;
150

            
151
#[derive(Deserialize, Serialize)]
152
struct CtrlDelApplication {
153
    #[serde(rename = "unitId")]
154
    unit_id: String,
155
    #[serde(rename = "unitCode")]
156
    unit_code: String,
157
    #[serde(rename = "applicationId")]
158
    application_id: String,
159
    #[serde(rename = "applicationCode")]
160
    application_code: String,
161
}
162

            
163
#[derive(Deserialize, Serialize)]
164
struct CtrlAddManager {
165
    #[serde(rename = "hostUri")]
166
    host_uri: String,
167
    #[serde(rename = "mgrOptions")]
168
    mgr_options: MgrOptions,
169
}
170

            
171
struct CtrlSenderHandler;
172

            
173
struct CtrlReceiverHandler {
174
    model: Arc<dyn Model>,
175
    cache: Option<Arc<dyn Cache>>,
176
    mq_conns: Arc<Mutex<HashMap<String, Connection>>>,
177
    application_mgrs: Arc<Mutex<HashMap<String, ApplicationMgr>>>,
178
    network_mgrs: Arc<Mutex<HashMap<String, NetworkMgr>>>,
179
    data_sender: Option<Queue>,
180
}
181

            
182
impl CtrlMsgOp {
183
    const DEL_APPLICATION: &'static str = "del-application";
184
    const ADD_MANAGER: &'static str = "add-manager";
185
    const DEL_MANAGER: &'static str = "del-manager";
186
}
187

            
188
impl DataMsgKind {
189
    const APP_DLDATA: &'static str = "application-dldata";
190
    const NET_DLDATA: &'static str = "network-dldata";
191
}
192

            
193
const LIST_LIMIT_DEFAULT: u64 = 100;
194
const LIST_CURSOR_MAX: u64 = 100;
195
const ID_RAND_LEN: usize = 8;
196
const DATA_ID_RAND_LEN: usize = 12;
197
const DATA_EXPIRES_IN: i64 = 86400; // in seconds
198
const CTRL_QUEUE_NAME: &'static str = "application";
199
const DEF_DLDATA_STATUS: i32 = -2;
200

            
201
/// Initialize application managers and channels.
202
15
pub async fn init(state: &AppState, ctrl_conf: &CfgCtrl) -> Result<(), Box<dyn StdError>> {
203
    const FN_NAME: &'static str = "init";
204

            
205
15
    let q = new_ctrl_receiver(state, ctrl_conf)?;
206
15
    {
207
15
        state
208
15
            .ctrl_receivers
209
15
            .lock()
210
15
            .unwrap()
211
15
            .insert(CTRL_QUEUE_NAME.to_string(), q.clone());
212
15
    }
213
15

            
214
15
    let ctrl_sender = { state.ctrl_senders.application.lock().unwrap().clone() };
215
    // Wait for connected.
216
1381
    for _ in 0..500 {
217
1381
        if ctrl_sender.status() == Status::Connected && q.status() == Status::Connected {
218
15
            break;
219
1366
        }
220
1366
        time::sleep(Duration::from_millis(10)).await;
221
    }
222
15
    if ctrl_sender.status() != Status::Connected {
223
        error!(
224
            "[{}] {} control sender not connected",
225
            FN_NAME, CTRL_QUEUE_NAME
226
        );
227
        return Err(Box::new(IoError::new(
228
            ErrorKind::NotConnected,
229
            format!("control sender {} not connected", CTRL_QUEUE_NAME),
230
        )));
231
15
    }
232
15
    if q.status() != Status::Connected {
233
        error!(
234
            "[{}] {} control receiver not connected",
235
            FN_NAME, CTRL_QUEUE_NAME
236
        );
237
        return Err(Box::new(IoError::new(
238
            ErrorKind::NotConnected,
239
            format!("control receiver {} not connected", CTRL_QUEUE_NAME),
240
        )));
241
15
    }
242
15

            
243
15
    let cond = ListQueryCond {
244
15
        ..Default::default()
245
15
    };
246
15
    let opts = ListOptions {
247
15
        cond: &cond,
248
15
        offset: None,
249
15
        limit: None,
250
15
        sort: None,
251
15
        cursor_max: Some(LIST_CURSOR_MAX),
252
15
    };
253
15
    let mut list;
254
15
    let mut cursor = None;
255
    loop {
256
15
        (list, cursor) = state.model.application().list(&opts, cursor).await?;
257
15
        for item in list.iter() {
258
1
            let url = Url::parse(item.host_uri.as_str())?;
259
1
            let key = gen_mgr_key(item.unit_code.as_str(), item.code.as_str());
260
1
            let opts = MgrOptions {
261
1
                unit_id: item.unit_id.clone(),
262
1
                unit_code: item.unit_code.clone(),
263
1
                id: item.application_id.clone(),
264
1
                name: item.code.clone(),
265
1
                prefetch: Some(state.amqp_prefetch),
266
1
                persistent: state.amqp_persistent,
267
1
                shared_prefix: Some(state.mqtt_shared_prefix.clone()),
268
1
            };
269
1
            let handler = MgrHandler {
270
1
                model: state.model.clone(),
271
1
                cache: state.cache.clone(),
272
1
                network_mgrs: state.network_mgrs.clone(),
273
1
                data_sender: state.data_sender.clone(),
274
1
            };
275
1
            let mgr =
276
1
                match ApplicationMgr::new(state.mq_conns.clone(), &url, opts, Arc::new(handler)) {
277
                    Err(e) => {
278
                        error!("[{}] new manager for {} error: {}", FN_NAME, key, e);
279
                        return Err(Box::new(ErrResp::ErrRsc(Some(e))));
280
                    }
281
1
                    Ok(mgr) => mgr,
282
1
                };
283
1
            {
284
1
                state
285
1
                    .application_mgrs
286
1
                    .lock()
287
1
                    .unwrap()
288
1
                    .insert(key.clone(), mgr);
289
1
            }
290
        }
291
15
        if cursor.is_none() {
292
15
            break;
293
        }
294
    }
295

            
296
15
    Ok(())
297
15
}
298

            
299
/// Create control channel sender queue.
300
15
pub fn new_ctrl_sender(
301
15
    conn_pool: &Arc<Mutex<HashMap<String, Connection>>>,
302
15
    config: &CfgCtrl,
303
15
) -> Result<Arc<Mutex<Queue>>, Box<dyn StdError>> {
304
15
    let url = match config.url.as_ref() {
305
        None => {
306
            return Err(Box::new(IoError::new(
307
                ErrorKind::InvalidInput,
308
                "empty control url",
309
            )))
310
        }
311
15
        Some(url) => match Url::parse(url.as_str()) {
312
            Err(e) => return Err(Box::new(e)),
313
15
            Ok(url) => url,
314
15
        },
315
15
    };
316
15

            
317
15
    match mq::control::new(
318
15
        conn_pool.clone(),
319
15
        &url,
320
15
        config.prefetch,
321
15
        CTRL_QUEUE_NAME,
322
15
        false,
323
15
        Arc::new(CtrlSenderHandler {}),
324
15
        Arc::new(CtrlSenderHandler {}),
325
15
    ) {
326
        Err(e) => Err(Box::new(IoError::new(ErrorKind::InvalidInput, e))),
327
15
        Ok(q) => Ok(Arc::new(Mutex::new(q))),
328
    }
329
15
}
330

            
331
/// Create control channel receiver queue.
332
15
pub fn new_ctrl_receiver(state: &AppState, config: &CfgCtrl) -> Result<Queue, Box<dyn StdError>> {
333
15
    let url = match config.url.as_ref() {
334
        None => {
335
            return Err(Box::new(IoError::new(
336
                ErrorKind::InvalidInput,
337
                "empty control url",
338
            )))
339
        }
340
15
        Some(url) => match Url::parse(url.as_str()) {
341
            Err(e) => return Err(Box::new(e)),
342
15
            Ok(url) => url,
343
15
        },
344
15
    };
345
15
    let handler = Arc::new(CtrlReceiverHandler {
346
15
        model: state.model.clone(),
347
15
        cache: state.cache.clone(),
348
15
        mq_conns: state.mq_conns.clone(),
349
15
        application_mgrs: state.application_mgrs.clone(),
350
15
        network_mgrs: state.network_mgrs.clone(),
351
15
        data_sender: state.data_sender.clone(),
352
15
    });
353
15
    match mq::control::new(
354
15
        state.mq_conns.clone(),
355
15
        &url,
356
15
        config.prefetch,
357
15
        CTRL_QUEUE_NAME,
358
15
        true,
359
15
        handler.clone(),
360
15
        handler,
361
15
    ) {
362
        Err(e) => Err(Box::new(IoError::new(ErrorKind::InvalidInput, e))),
363
15
        Ok(q) => Ok(q),
364
    }
365
15
}
366

            
367
/// `POST /{base}/api/v1/application`
368
42
pub async fn post_application(
369
42
    State(state): State<AppState>,
370
42
    Extension(token_info): Extension<GetTokenInfoData>,
371
42
    Json(body): Json<request::PostApplicationBody>,
372
42
) -> impl IntoResponse {
373
    const FN_NAME: &'static str = "post_application";
374

            
375
42
    let user_id = token_info.user_id.as_str();
376
42
    let roles = &token_info.roles;
377
42

            
378
42
    let code = body.data.code.to_lowercase();
379
42
    let host_uri = body.data.host_uri.as_str();
380
42
    if !strings::is_code(code.as_str()) {
381
3
        return Err(ErrResp::ErrParam(Some(
382
3
            "`code` must be [A-Za-z0-9]{1}[A-Za-z0-9-_]*".to_string(),
383
3
        )));
384
39
    }
385
39
    let host_uri = match Url::parse(host_uri) {
386
3
        Err(_) => return Err(ErrResp::ErrParam(Some("invalid `hostUri`".to_string()))),
387
36
        Ok(uri) => match mq::SUPPORT_SCHEMES.contains(&uri.scheme()) {
388
            false => {
389
3
                return Err(ErrResp::ErrParam(Some(
390
3
                    "unsupport `hostUri` scheme".to_string(),
391
3
                )))
392
            }
393
33
            true => uri,
394
        },
395
    };
396
33
    if let Some(info) = body.data.info.as_ref() {
397
6
        for (k, _) in info.iter() {
398
6
            if k.len() == 0 {
399
3
                return Err(ErrResp::ErrParam(Some(
400
3
                    "`info` key must not be empty".to_string(),
401
3
                )));
402
3
            }
403
        }
404
27
    }
405
30
    let unit_id = body.data.unit_id.as_str();
406
30
    if unit_id.len() == 0 {
407
3
        return Err(ErrResp::ErrParam(Some(
408
3
            "`unitId` must with at least one character".to_string(),
409
3
        )));
410
27
    }
411
27
    let unit_code = match check_unit(FN_NAME, user_id, roles, unit_id, true, &state).await? {
412
        None => {
413
6
            return Err(ErrResp::Custom(
414
6
                ErrReq::UNIT_NOT_EXIST.0,
415
6
                ErrReq::UNIT_NOT_EXIST.1,
416
6
                None,
417
6
            ))
418
        }
419
21
        Some(unit) => unit.code,
420
21
    };
421
21
    if check_code(FN_NAME, unit_id, code.as_str(), &state).await? {
422
3
        return Err(ErrResp::Custom(
423
3
            ErrReq::APPLICATION_EXIST.0,
424
3
            ErrReq::APPLICATION_EXIST.1,
425
3
            None,
426
3
        ));
427
18
    }
428
18

            
429
18
    let now = Utc::now();
430
18
    let application = Application {
431
18
        application_id: strings::random_id(&now, ID_RAND_LEN),
432
18
        code,
433
18
        unit_id: unit_id.to_string(),
434
18
        unit_code: unit_code.clone(),
435
18
        created_at: now,
436
18
        modified_at: now,
437
18
        host_uri: host_uri.to_string(),
438
18
        name: match body.data.name.as_ref() {
439
15
            None => "".to_string(),
440
3
            Some(name) => name.clone(),
441
        },
442
18
        info: match body.data.info.as_ref() {
443
15
            None => Map::new(),
444
3
            Some(info) => info.clone(),
445
        },
446
    };
447
18
    if let Err(e) = state.model.application().add(&application).await {
448
        error!("[{}] add error: {}", FN_NAME, e);
449
        return Err(ErrResp::ErrDb(Some(e.to_string())));
450
18
    }
451
18
    add_manager(
452
18
        FN_NAME,
453
18
        &state,
454
18
        &host_uri,
455
18
        unit_id,
456
18
        unit_code.as_str(),
457
18
        application.application_id.as_str(),
458
18
        application.code.as_str(),
459
18
    )
460
18
    .await?;
461
18
    Ok(Json(response::PostApplication {
462
18
        data: response::PostApplicationData {
463
18
            application_id: application.application_id,
464
18
        },
465
18
    }))
466
42
}
467

            
468
/// `GET /{base}/api/v1/application/count`
469
63
pub async fn get_application_count(
470
63
    State(state): State<AppState>,
471
63
    Extension(token_info): Extension<GetTokenInfoData>,
472
63
    Query(query): Query<request::GetApplicationCountQuery>,
473
63
) -> impl IntoResponse {
474
    const FN_NAME: &'static str = "get_application_count";
475

            
476
63
    let user_id = token_info.user_id.as_str();
477
63
    let roles = &token_info.roles;
478
63

            
479
63
    if !Role::is_role(roles, Role::ADMIN) && !Role::is_role(roles, Role::MANAGER) {
480
33
        match query.unit.as_ref() {
481
3
            None => return Err(ErrResp::ErrParam(Some("missing `unit`".to_string()))),
482
30
            Some(unit_id) => {
483
30
                if unit_id.len() == 0 {
484
3
                    return Err(ErrResp::ErrParam(Some("missing `unit`".to_string())));
485
27
                }
486
            }
487
        }
488
30
    }
489
57
    let unit_cond = match query.unit.as_ref() {
490
9
        None => None,
491
48
        Some(unit_id) => match unit_id.len() {
492
3
            0 => None,
493
            _ => {
494
45
                match check_unit(FN_NAME, user_id, roles, unit_id.as_str(), false, &state).await? {
495
                    None => {
496
9
                        return Err(ErrResp::Custom(
497
9
                            ErrReq::UNIT_NOT_EXIST.0,
498
9
                            ErrReq::UNIT_NOT_EXIST.1,
499
9
                            None,
500
9
                        ))
501
                    }
502
36
                    Some(_) => Some(unit_id.as_str()),
503
                }
504
            }
505
        },
506
    };
507
48
    let mut code_cond = None;
508
48
    let mut code_contains_cond = None;
509
48
    if let Some(code) = query.code.as_ref() {
510
12
        if code.len() > 0 {
511
12
            code_cond = Some(code.as_str());
512
12
        }
513
36
    }
514
48
    if code_cond.is_none() {
515
36
        if let Some(contains) = query.contains.as_ref() {
516
9
            if contains.len() > 0 {
517
9
                code_contains_cond = Some(contains.as_str());
518
9
            }
519
27
        }
520
12
    }
521
48
    let cond = ListQueryCond {
522
48
        unit_id: unit_cond,
523
48
        code: code_cond,
524
48
        code_contains: code_contains_cond,
525
48
        ..Default::default()
526
48
    };
527
48
    match state.model.application().count(&cond).await {
528
        Err(e) => {
529
            error!("[{}] count error: {}", FN_NAME, e);
530
            Err(ErrResp::ErrDb(Some(e.to_string())))
531
        }
532
48
        Ok(count) => Ok(Json(response::GetApplicationCount {
533
48
            data: response::GetCountData { count },
534
48
        })),
535
    }
536
63
}
537

            
538
/// `GET /{base}/api/v1/application/list`
539
135
pub async fn get_application_list(
540
135
    State(state): State<AppState>,
541
135
    Extension(token_info): Extension<GetTokenInfoData>,
542
135
    Query(query): Query<request::GetApplicationListQuery>,
543
135
) -> impl IntoResponse {
544
    const FN_NAME: &'static str = "get_application_list";
545

            
546
135
    let user_id = token_info.user_id.as_str();
547
135
    let roles = &token_info.roles;
548
135

            
549
135
    if !Role::is_role(roles, Role::ADMIN) && !Role::is_role(roles, Role::MANAGER) {
550
33
        match query.unit.as_ref() {
551
3
            None => return Err(ErrResp::ErrParam(Some("missing `unit`".to_string()))),
552
30
            Some(unit_id) => {
553
30
                if unit_id.len() == 0 {
554
3
                    return Err(ErrResp::ErrParam(Some("missing `unit`".to_string())));
555
27
                }
556
            }
557
        }
558
102
    }
559
129
    let unit_cond = match query.unit.as_ref() {
560
57
        None => None,
561
72
        Some(unit_id) => match unit_id.len() {
562
24
            0 => None,
563
            _ => {
564
48
                match check_unit(FN_NAME, user_id, roles, unit_id.as_str(), false, &state).await? {
565
                    None => {
566
9
                        return Err(ErrResp::Custom(
567
9
                            ErrReq::UNIT_NOT_EXIST.0,
568
9
                            ErrReq::UNIT_NOT_EXIST.1,
569
9
                            None,
570
9
                        ))
571
                    }
572
39
                    Some(_) => Some(unit_id.as_str()),
573
                }
574
            }
575
        },
576
    };
577
120
    let mut code_cond = None;
578
120
    let mut code_contains_cond = None;
579
120
    if let Some(code) = query.code.as_ref() {
580
12
        if code.len() > 0 {
581
12
            code_cond = Some(code.as_str());
582
12
        }
583
108
    }
584
120
    if code_cond.is_none() {
585
108
        if let Some(contains) = query.contains.as_ref() {
586
33
            if contains.len() > 0 {
587
33
                code_contains_cond = Some(contains.as_str());
588
33
            }
589
75
        }
590
12
    }
591
120
    let cond = ListQueryCond {
592
120
        unit_id: unit_cond,
593
120
        code: code_cond,
594
120
        code_contains: code_contains_cond,
595
120
        ..Default::default()
596
120
    };
597
120
    let sort_cond = get_sort_cond(&query.sort)?;
598
105
    let opts = ListOptions {
599
105
        cond: &cond,
600
105
        offset: query.offset,
601
105
        limit: match query.limit {
602
84
            None => Some(LIST_LIMIT_DEFAULT),
603
21
            Some(limit) => match limit {
604
6
                0 => None,
605
15
                _ => Some(limit),
606
            },
607
        },
608
105
        sort: Some(sort_cond.as_slice()),
609
105
        cursor_max: Some(LIST_CURSOR_MAX),
610
    };
611

            
612
105
    let (list, cursor) = match state.model.application().list(&opts, None).await {
613
        Err(e) => {
614
            error!("[{}] list error: {}", FN_NAME, e);
615
            return Err(ErrResp::ErrDb(Some(e.to_string())));
616
        }
617
105
        Ok((list, cursor)) => match cursor {
618
3
            None => match query.format {
619
                Some(request::ListFormat::Array) => {
620
3
                    return Ok(Json(application_list_transform(&list)).into_response())
621
                }
622
                _ => {
623
81
                    return Ok(Json(response::GetApplicationList {
624
81
                        data: application_list_transform(&list),
625
81
                    })
626
81
                    .into_response())
627
                }
628
            },
629
21
            Some(_) => (list, cursor),
630
21
        },
631
21
    };
632
21

            
633
21
    let body = Body::from_stream(async_stream::stream! {
634
21
        let unit_cond = match query.unit.as_ref() {
635
21
            None => None,
636
21
            Some(unit_id) => match unit_id.len() {
637
21
                0 => None,
638
21
                _ => Some(unit_id.as_str()),
639
21
            },
640
21
        };
641
21
        let mut code_contains_cond = None;
642
21
        if let Some(contains) = query.contains.as_ref() {
643
21
            if contains.len() > 0 {
644
21
                code_contains_cond = Some(contains.as_str());
645
21
            }
646
21
        }
647
21
        let cond = ListQueryCond {
648
21
            unit_id: unit_cond,
649
21
            code_contains: code_contains_cond,
650
21
            ..Default::default()
651
21
        };
652
21
        let opts = ListOptions {
653
21
            cond: &cond,
654
21
            offset: query.offset,
655
21
            limit: match query.limit {
656
21
                None => Some(LIST_LIMIT_DEFAULT),
657
21
                Some(limit) => match limit {
658
21
                    0 => None,
659
21
                    _ => Some(limit),
660
21
                },
661
21
            },
662
21
            sort: Some(sort_cond.as_slice()),
663
21
            cursor_max: Some(LIST_CURSOR_MAX),
664
21
        };
665
21

            
666
21
        let mut list = list;
667
21
        let mut cursor = cursor;
668
21
        let mut is_first = true;
669
21
        loop {
670
21
            yield application_list_transform_bytes(&list, is_first, cursor.is_none(), query.format.as_ref());
671
21
            is_first = false;
672
21
            if cursor.is_none() {
673
21
                break;
674
21
            }
675
21
            let (_list, _cursor) = match state.model.application().list(&opts, cursor).await {
676
21
                Err(_) => break,
677
21
                Ok((list, cursor)) => (list, cursor),
678
21
            };
679
21
            list = _list;
680
21
            cursor = _cursor;
681
21
        }
682
21
    });
683
21
    Ok(([(header::CONTENT_TYPE, ContentType::JSON)], body).into_response())
684
135
}
685

            
686
/// `GET /{base}/api/v1/application/{applicationId}`
687
27
pub async fn get_application(
688
27
    State(state): State<AppState>,
689
27
    Extension(token_info): Extension<GetTokenInfoData>,
690
27
    Path(param): Path<request::ApplicationIdPath>,
691
27
) -> impl IntoResponse {
692
    const FN_NAME: &'static str = "get_application";
693

            
694
27
    let user_id = token_info.user_id.as_str();
695
27
    let roles = &token_info.roles;
696
27
    let application_id = param.application_id.as_str();
697
27

            
698
27
    match check_application(FN_NAME, application_id, user_id, false, roles, &state).await? {
699
9
        None => Err(ErrResp::ErrNotFound(None)),
700
18
        Some(application) => Ok(Json(response::GetApplication {
701
18
            data: application_transform(&application),
702
18
        })),
703
    }
704
27
}
705

            
706
/// `PATCH /{base}/api/v1/application/{applicationId}`
707
39
pub async fn patch_application(
708
39
    State(state): State<AppState>,
709
39
    Extension(token_info): Extension<GetTokenInfoData>,
710
39
    Path(param): Path<request::ApplicationIdPath>,
711
39
    Json(mut body): Json<request::PatchApplicationBody>,
712
39
) -> impl IntoResponse {
713
    const FN_NAME: &'static str = "patch_application";
714

            
715
39
    let user_id = token_info.user_id.as_str();
716
39
    let roles = &token_info.roles;
717
39
    let application_id = param.application_id.as_str();
718

            
719
    // To check if the application is for the user.
720
30
    let application =
721
39
        match check_application(FN_NAME, application_id, user_id, true, roles, &state).await? {
722
9
            None => return Err(ErrResp::ErrNotFound(None)),
723
30
            Some(application) => application,
724
        };
725

            
726
30
    let updates = get_updates(&mut body.data).await?;
727
12
    let mut should_add_mgr = false;
728

            
729
    // Remove old manager.
730
12
    if let Some(host_uri) = updates.host_uri {
731
6
        let uri = Url::parse(host_uri).unwrap();
732
6
        if !uri.as_str().eq(application.host_uri.as_str()) {
733
6
            delete_manager(FN_NAME, &state, &application).await?;
734
6
            should_add_mgr = true;
735
        }
736
6
    }
737

            
738
    // Update database.
739
12
    let cond = UpdateQueryCond { application_id };
740
12
    if let Err(e) = state.model.application().update(&cond, &updates).await {
741
        error!("[{}] update error: {}", FN_NAME, e);
742
        return Err(ErrResp::ErrDb(Some(e.to_string())));
743
12
    }
744
12

            
745
12
    // Add new manager.
746
12
    if should_add_mgr {
747
6
        if let Some(host_uri) = updates.host_uri {
748
6
            let uri = Url::parse(host_uri).unwrap();
749
6
            add_manager(
750
6
                FN_NAME,
751
6
                &state,
752
6
                &uri,
753
6
                application.unit_id.as_str(),
754
6
                application.unit_code.as_str(),
755
6
                application.application_id.as_str(),
756
6
                application.code.as_str(),
757
6
            )
758
6
            .await?;
759
        }
760
6
    }
761
12
    Ok(StatusCode::NO_CONTENT)
762
39
}
763

            
764
/// `DELETE /{base}/api/v1/application/{applicationId}`
765
27
pub async fn delete_application(
766
27
    State(state): State<AppState>,
767
27
    Extension(token_info): Extension<GetTokenInfoData>,
768
27
    Path(param): Path<request::ApplicationIdPath>,
769
27
) -> impl IntoResponse {
770
    const FN_NAME: &'static str = "delete_application";
771

            
772
27
    let user_id = token_info.user_id.as_str();
773
27
    let roles = &token_info.roles;
774
27
    let application_id = param.application_id.as_str();
775

            
776
    // To check if the application is for the user.
777
15
    let application =
778
27
        match check_application(FN_NAME, application_id, user_id, true, roles, &state).await {
779
            Err(e) => return Err(e), // XXX: not use "?" to solve E0282 error.
780
27
            Ok(application) => match application {
781
12
                None => return Ok(StatusCode::NO_CONTENT),
782
15
                Some(application) => application,
783
15
            },
784
15
        };
785
15

            
786
15
    delete_manager(FN_NAME, &state, &application).await?;
787
15
    del_application_rsc(FN_NAME, application_id, &state).await?;
788
15
    send_del_ctrl_message(FN_NAME, application, &state).await?;
789

            
790
15
    Ok(StatusCode::NO_CONTENT)
791
27
}
792

            
793
120
fn get_sort_cond(sort_args: &Option<String>) -> Result<Vec<SortCond>, ErrResp> {
794
120
    match sort_args.as_ref() {
795
81
        None => Ok(vec![SortCond {
796
81
            key: SortKey::Code,
797
81
            asc: true,
798
81
        }]),
799
39
        Some(args) => {
800
39
            let mut args = args.split(",");
801
39
            let mut sort_cond = vec![];
802
66
            while let Some(arg) = args.next() {
803
42
                let mut cond = arg.split(":");
804
42
                let key = match cond.next() {
805
                    None => return Err(ErrResp::ErrParam(Some("wrong sort argument".to_string()))),
806
42
                    Some(field) => match field {
807
42
                        "code" => SortKey::Code,
808
36
                        "created" => SortKey::CreatedAt,
809
21
                        "modified" => SortKey::ModifiedAt,
810
15
                        "name" => SortKey::Name,
811
                        _ => {
812
6
                            return Err(ErrResp::ErrParam(Some(format!(
813
6
                                "invalid sort key {}",
814
6
                                field
815
6
                            ))))
816
                        }
817
                    },
818
                };
819
36
                let asc = match cond.next() {
820
3
                    None => return Err(ErrResp::ErrParam(Some("wrong sort argument".to_string()))),
821
33
                    Some(asc) => match asc {
822
33
                        "asc" => true,
823
15
                        "desc" => false,
824
                        _ => {
825
3
                            return Err(ErrResp::ErrParam(Some(format!(
826
3
                                "invalid sort asc {}",
827
3
                                asc
828
3
                            ))))
829
                        }
830
                    },
831
                };
832
30
                if cond.next().is_some() {
833
3
                    return Err(ErrResp::ErrParam(Some(
834
3
                        "invalid sort condition".to_string(),
835
3
                    )));
836
27
                }
837
27
                sort_cond.push(SortCond { key, asc });
838
            }
839
24
            Ok(sort_cond)
840
        }
841
    }
842
120
}
843

            
844
30
async fn get_updates<'a>(
845
30
    body: &'a mut request::PatchApplicationData,
846
30
) -> Result<Updates<'a>, ErrResp> {
847
30
    let mut updates = Updates {
848
30
        ..Default::default()
849
30
    };
850
30
    let mut count = 0;
851
30
    if let Some(host_uri) = body.host_uri.as_ref() {
852
15
        match Url::parse(host_uri) {
853
6
            Err(_) => return Err(ErrResp::ErrParam(Some("invalid `hostUri`".to_string()))),
854
9
            Ok(uri) => {
855
9
                if !mq::SUPPORT_SCHEMES.contains(&uri.scheme()) {
856
3
                    return Err(ErrResp::ErrParam(Some(
857
3
                        "unsupport `hostUri` scheme".to_string(),
858
3
                    )));
859
6
                }
860
6
                body.host_uri = Some(uri.to_string()); // change host name case.
861
            }
862
        }
863
15
    }
864
21
    if let Some(host_uri) = body.host_uri.as_ref() {
865
6
        updates.host_uri = Some(host_uri.as_str());
866
6
        count += 1;
867
15
    }
868
21
    if let Some(name) = body.name.as_ref() {
869
12
        updates.name = Some(name.as_str());
870
12
        count += 1;
871
12
    }
872
21
    if let Some(info) = body.info.as_ref() {
873
15
        for (k, _) in info.iter() {
874
9
            if k.len() == 0 {
875
3
                return Err(ErrResp::ErrParam(Some(
876
3
                    "`info` key must not be empty".to_string(),
877
3
                )));
878
6
            }
879
        }
880
12
        updates.info = Some(info);
881
12
        count += 1;
882
6
    }
883

            
884
18
    if count == 0 {
885
6
        return Err(ErrResp::ErrParam(Some(
886
6
            "at least one parameter".to_string(),
887
6
        )));
888
12
    }
889
12
    updates.modified_at = Some(Utc::now());
890
12
    Ok(updates)
891
30
}
892

            
893
/// To check if the application code is used by the unit.
894
///
895
/// # Errors
896
///
897
/// Returns OK if the code is found or not. Otherwise errors will be returned.
898
21
async fn check_code(
899
21
    fn_name: &str,
900
21
    unit_id: &str,
901
21
    code: &str,
902
21
    state: &AppState,
903
21
) -> Result<bool, ErrResp> {
904
21
    let cond = QueryCond {
905
21
        unit_id: Some(unit_id),
906
21
        code: Some(code),
907
21
        ..Default::default()
908
21
    };
909
21
    match state.model.application().get(&cond).await {
910
        Err(e) => {
911
            error!("[{}] check code error: {}", fn_name, e);
912
            return Err(ErrResp::ErrDb(Some(format!("check code error: {}", e))));
913
        }
914
21
        Ok(application) => match application {
915
18
            None => Ok(false),
916
3
            Some(_) => Ok(true),
917
        },
918
    }
919
21
}
920

            
921
84
fn application_list_transform(list: &Vec<Application>) -> Vec<response::GetApplicationData> {
922
84
    let mut ret = vec![];
923
303
    for application in list.iter() {
924
303
        ret.push(application_transform(&application));
925
303
    }
926
84
    ret
927
84
}
928

            
929
45
fn application_list_transform_bytes(
930
45
    list: &Vec<Application>,
931
45
    with_start: bool,
932
45
    with_end: bool,
933
45
    format: Option<&request::ListFormat>,
934
45
) -> Result<Bytes, Box<dyn StdError + Send + Sync>> {
935
45
    let mut build_str = match with_start {
936
24
        false => "".to_string(),
937
3
        true => match format {
938
3
            Some(request::ListFormat::Array) => "[".to_string(),
939
18
            _ => "{\"data\":[".to_string(),
940
        },
941
    };
942
45
    let mut is_first = with_start;
943

            
944
2772
    for item in list {
945
2727
        if is_first {
946
21
            is_first = false;
947
2706
        } else {
948
2706
            build_str.push(',');
949
2706
        }
950
2727
        let json_str = match serde_json::to_string(&application_transform(item)) {
951
            Err(e) => return Err(Box::new(e)),
952
2727
            Ok(str) => str,
953
2727
        };
954
2727
        build_str += json_str.as_str();
955
    }
956

            
957
45
    if with_end {
958
21
        build_str += match format {
959
3
            Some(request::ListFormat::Array) => "]",
960
18
            _ => "]}",
961
        }
962
24
    }
963
45
    Ok(Bytes::copy_from_slice(build_str.as_str().as_bytes()))
964
45
}
965

            
966
3048
fn application_transform(application: &Application) -> response::GetApplicationData {
967
3048
    response::GetApplicationData {
968
3048
        application_id: application.application_id.clone(),
969
3048
        code: application.code.clone(),
970
3048
        unit_id: application.unit_id.clone(),
971
3048
        unit_code: application.unit_code.clone(),
972
3048
        created_at: time_str(&application.created_at),
973
3048
        modified_at: time_str(&application.modified_at),
974
3048
        host_uri: application.host_uri.clone(),
975
3048
        name: application.name.clone(),
976
3048
        info: application.info.clone(),
977
3048
    }
978
3048
}
979

            
980
15
async fn del_application_rsc(
981
15
    fn_name: &str,
982
15
    application_id: &str,
983
15
    state: &AppState,
984
15
) -> Result<(), ErrResp> {
985
15
    let cond = network_route::QueryCond {
986
15
        application_id: Some(application_id),
987
15
        ..Default::default()
988
15
    };
989
15
    if let Err(e) = state.model.network_route().del(&cond).await {
990
        error!("[{}] del network_route error: {}", fn_name, e);
991
        return Err(ErrResp::ErrDb(Some(e.to_string())));
992
15
    }
993
15

            
994
15
    let cond = device_route::QueryCond {
995
15
        application_id: Some(application_id),
996
15
        ..Default::default()
997
15
    };
998
15
    if let Err(e) = state.model.device_route().del(&cond).await {
999
        error!("[{}] del device_route error: {}", fn_name, e);
        return Err(ErrResp::ErrDb(Some(e.to_string())));
15
    }
15

            
15
    let cond = dldata_buffer::QueryCond {
15
        application_id: Some(application_id),
15
        ..Default::default()
15
    };
15
    if let Err(e) = state.model.dldata_buffer().del(&cond).await {
        error!("[{}] del dldata_buffer error: {}", fn_name, e);
        return Err(ErrResp::ErrDb(Some(e.to_string())));
15
    }
15

            
15
    let cond = QueryCond {
15
        application_id: Some(application_id),
15
        ..Default::default()
15
    };
15
    if let Err(e) = state.model.application().del(&cond).await {
        error!("[{}] del application error: {}", fn_name, e);
        return Err(ErrResp::ErrDb(Some(e.to_string())));
15
    }
15

            
15
    Ok(())
15
}
/// Send delete control message.
15
async fn send_del_ctrl_message(
15
    fn_name: &str,
15
    application: Application,
15
    state: &AppState,
15
) -> Result<(), ErrResp> {
15
    if state.cache.is_some() {
5
        let msg = SendCtrlMsg::DelApplication {
5
            operation: CtrlMsgOp::DEL_APPLICATION.to_string(),
5
            new: CtrlDelApplication {
5
                unit_id: application.unit_id,
5
                unit_code: application.unit_code,
5
                application_id: application.application_id,
5
                application_code: application.code,
5
            },
5
        };
5
        let payload = match serde_json::to_vec(&msg) {
            Err(e) => {
                error!(
                    "[{}] marshal JSON for {} error: {}",
                    fn_name,
                    CtrlMsgOp::DEL_APPLICATION,
                    e
                );
                return Err(ErrResp::ErrRsc(Some(format!(
                    "marshal control message error: {}",
                    e
                ))));
            }
5
            Ok(payload) => payload,
5
        };
5
        let ctrl_sender = { state.ctrl_senders.application.lock().unwrap().clone() };
5
        if let Err(e) = ctrl_sender.send_msg(payload).await {
            error!(
                "[{}] send control message for {} error: {}",
                fn_name,
                CtrlMsgOp::DEL_APPLICATION,
                e
            );
            return Err(ErrResp::ErrIntMsg(Some(format!(
                "send control message error: {}",
                e
            ))));
5
        }
10
    }
15
    Ok(())
15
}
/// To create a manager by:
/// - get a connection from the pool.
/// - register manager handlers.
24
async fn add_manager(
24
    fn_name: &str,
24
    state: &AppState,
24
    host_uri: &Url,
24
    unit_id: &str,
24
    unit_code: &str,
24
    id: &str,
24
    name: &str,
24
) -> Result<(), ErrResp> {
24
    let opts = MgrOptions {
24
        unit_id: unit_id.to_string(),
24
        unit_code: unit_code.to_string(),
24
        id: id.to_string(),
24
        name: name.to_string(),
24
        prefetch: Some(state.amqp_prefetch),
24
        persistent: state.amqp_persistent,
24
        shared_prefix: Some(state.mqtt_shared_prefix.clone()),
24
    };
24
    let msg = SendCtrlMsg::AddManager {
24
        operation: CtrlMsgOp::ADD_MANAGER.to_string(),
24
        new: CtrlAddManager {
24
            host_uri: host_uri.to_string(),
24
            mgr_options: opts,
24
        },
24
    };
24
    let payload = match serde_json::to_vec(&msg) {
        Err(e) => {
            error!("[{}] marshal JSON for {} error: {}", fn_name, name, e);
            return Err(ErrResp::ErrRsc(Some(format!("new manager error:{}", e))));
        }
24
        Ok(payload) => payload,
24
    };
24
    let ctrl_sender = { state.ctrl_senders.application.lock().unwrap().clone() };
24
    if let Err(e) = ctrl_sender.send_msg(payload).await {
        error!(
            "[{}] send control message for {} error: {}",
            fn_name, name, e
        );
        return Err(ErrResp::ErrIntMsg(Some(format!("new manager error:{}", e))));
24
    }
24
    Ok(())
24
}
/// To delete an application manager.
21
async fn delete_manager(
21
    fn_name: &str,
21
    state: &AppState,
21
    application: &Application,
21
) -> Result<(), ErrResp> {
21
    let key = gen_mgr_key(application.unit_code.as_str(), application.code.as_str());
21
    let msg = SendCtrlMsg::DelManager {
21
        operation: CtrlMsgOp::DEL_MANAGER.to_string(),
21
        new: key.clone(),
21
    };
21
    let payload = match serde_json::to_vec(&msg) {
        Err(e) => {
            error!("[{}] marshal JSON for {} error: {}", fn_name, key, e);
            return Err(ErrResp::ErrRsc(Some(format!("delete manager error:{}", e))));
        }
21
        Ok(payload) => payload,
21
    };
21
    let ctrl_sender = { state.ctrl_senders.application.lock().unwrap().clone() };
21
    if let Err(e) = ctrl_sender.send_msg(payload).await {
        error!(
            "[{}] send control message for {} error: {}",
            fn_name, key, e
        );
        return Err(ErrResp::ErrIntMsg(Some(format!(
            "delete manager error:{}",
            e
        ))));
21
    }
21
    Ok(())
21
}
impl MgrHandler {
    /// Get device route information from cache or database. This function handles two cases:
    /// - with `network_code` and `network_addr` for private network devices.
    /// - with `device_id` for both private and public network devices.
15
    async fn get_device_route(
15
        &self,
15
        mgr: &ApplicationMgr,
15
        data: &Box<DlData>,
15
    ) -> Result<device_route::DeviceRouteCacheDlData, Box<DlDataResp>> {
        const FN_NAME: &'static str = "get_device_route";
15
        if let Some(cache) = self.cache.as_ref() {
5
            match data.device_id.as_ref() {
                None => {
3
                    let cond = device_route::GetCacheQueryCond {
3
                        unit_code: mgr.unit_code(),
3
                        network_code: data.network_code.as_ref().unwrap().as_str(),
3
                        network_addr: data.network_addr.as_ref().unwrap().as_str(),
3
                    };
3
                    match cache.device_route().get_dldata(&cond).await {
                        Err(e) => {
                            error!("[{}] get device with error: {}", FN_NAME, e);
                            return Err(Box::new(DlDataResp {
                                correlation_id: data.correlation_id.clone(),
                                error: Some(err::E_DB.to_string()),
                                message: Some(format!("{}", e)),
                                ..Default::default()
                            }));
                        }
3
                        Ok(route) => match route {
                            None => {
1
                                warn!(
                                    "[{}] no device for {}.{}.{:?}",
                                    FN_NAME,
                                    mgr.unit_code(),
                                    mgr.name(),
                                    data.network_addr.as_ref()
                                );
1
                                return Err(Box::new(DlDataResp {
1
                                    correlation_id: data.correlation_id.clone(),
1
                                    error: Some(ErrReq::DEVICE_NOT_EXIST.1.to_string()),
1
                                    ..Default::default()
1
                                }));
                            }
2
                            Some(route) => return Ok(route),
                        },
                    }
                }
2
                Some(device_id) => {
2
                    let cond = device_route::GetCachePubQueryCond {
2
                        unit_id: mgr.unit_id(),
2
                        device_id: device_id.as_str(),
2
                    };
2
                    match cache.device_route().get_dldata_pub(&cond).await {
                        Err(e) => {
                            error!("[{}] get device with error: {}", FN_NAME, e);
                            return Err(Box::new(DlDataResp {
                                correlation_id: data.correlation_id.clone(),
                                error: Some(err::E_DB.to_string()),
                                message: Some(format!("{}", e)),
                                ..Default::default()
                            }));
                        }
2
                        Ok(route) => match route {
                            None => {
                                warn!(
                                    "[{}] no device for {}.{:?}",
                                    FN_NAME,
                                    mgr.unit_id(),
                                    data.device_id.as_ref(),
                                );
                                return Err(Box::new(DlDataResp {
                                    correlation_id: data.correlation_id.clone(),
                                    error: Some(ErrReq::DEVICE_NOT_EXIST.1.to_string()),
                                    ..Default::default()
                                }));
                            }
2
                            Some(route) => return Ok(route),
                        },
                    }
                }
            }
10
        }
        // Get information from database.
10
        let cond = match data.device_id.as_ref() {
6
            None => device::QueryCond {
6
                device: Some(device::QueryOneCond {
6
                    unit_code: Some(mgr.unit_code()),
6
                    network_code: data.network_code.as_ref().unwrap().as_str(),
6
                    network_addr: data.network_addr.as_ref().unwrap().as_str(),
6
                }),
6
                ..Default::default()
6
            },
4
            Some(device_id) => device::QueryCond {
4
                unit_id: Some(mgr.unit_id()),
4
                device_id: Some(device_id.as_str()),
4
                ..Default::default()
4
            },
        };
10
        let device = match self.model.device().get(&cond).await {
            Err(e) => {
                error!("[{}] get device with error: {}", FN_NAME, e);
                return Err(Box::new(DlDataResp {
                    correlation_id: data.correlation_id.clone(),
                    error: Some(err::E_DB.to_string()),
                    message: Some(format!("{}", e)),
                    ..Default::default()
                }));
            }
10
            Ok(device) => match device {
                None => {
2
                    warn!(
                        "[{}] no device for {}.{:?} or {}.{}.{:?}",
                        FN_NAME,
                        mgr.unit_id(),
                        data.device_id.as_ref(),
                        mgr.unit_code(),
                        mgr.name(),
                        data.network_addr.as_ref()
                    );
2
                    return Err(Box::new(DlDataResp {
2
                        correlation_id: data.correlation_id.clone(),
2
                        error: Some(ErrReq::DEVICE_NOT_EXIST.1.to_string()),
2
                        ..Default::default()
2
                    }));
                }
8
                Some(device) => device,
            },
        };
8
        let unit_code = match device.unit_code.as_ref() {
4
            None => "",
4
            Some(_) => mgr.unit_code(),
        };
8
        Ok(device_route::DeviceRouteCacheDlData {
8
            net_mgr_key: gen_mgr_key(unit_code, device.network_code.as_str()),
8
            network_id: device.network_id,
8
            network_addr: device.network_addr,
8
            device_id: device.device_id,
8
            profile: device.profile,
8
        })
15
    }
12
    async fn send_application_dldata_msg(
12
        &self,
12
        proc: &DateTime<Utc>,
12
        data_id: &str,
12
        unit_id: &str,
12
        profile: &str,
12
        data: &Box<DlData>,
12
    ) -> Result<(), ()> {
        const FN_NAME: &'static str = "send_application_dldata_msg";
12
        if let Some(sender) = self.data_sender.as_ref() {
6
            let msg = SendDataMsg {
6
                kind: DataMsgKind::APP_DLDATA.to_string(),
6
                data: SendDataKind::AppDlData {
6
                    data_id: data_id.to_string(),
6
                    proc: time_str(proc),
6
                    status: DEF_DLDATA_STATUS,
6
                    unit_id: unit_id.to_string(),
6
                    device_id: data.device_id.clone(),
6
                    network_code: data.network_code.clone(),
6
                    network_addr: data.network_addr.clone(),
6
                    profile: profile.to_string(),
6
                    data: data.data.clone(),
6
                    extension: data.extension.clone(),
6
                },
6
            };
6
            let payload = match serde_json::to_vec(&msg) {
                Err(e) => {
                    error!("[{}] marshal JSON error: {}", FN_NAME, e);
                    return Err(());
                }
6
                Ok(payload) => payload,
            };
6
            if let Err(e) = sender.send_msg(payload).await {
                error!("[{}] send data to {} error: {}", FN_NAME, sender.name(), e);
                return Err(());
6
            }
6
        }
12
        Ok(())
12
    }
12
    async fn send_network_dldata_msg(
12
        &self,
12
        proc: &DateTime<Utc>,
12
        netmgr_code: &str,
12
        dldata_buff: &dldata_buffer::DlDataBuffer,
12
        profile: &str,
12
        net_data: &NetworkDlData,
12
    ) -> Result<(), ()> {
        const FN_NAME: &'static str = "send_network_dldata_msg";
12
        if let Some(sender) = self.data_sender.as_ref() {
6
            let msg = SendDataMsg {
6
                kind: DataMsgKind::NET_DLDATA.to_string(),
6
                data: SendDataKind::NetDlData {
6
                    data_id: dldata_buff.data_id.clone(),
6
                    proc: time_str(proc),
6
                    publish: net_data.publish.clone(),
6
                    status: DEF_DLDATA_STATUS,
6
                    unit_id: dldata_buff.unit_id.clone(),
6
                    device_id: dldata_buff.device_id.clone(),
6
                    network_code: netmgr_code.to_string(),
6
                    network_addr: dldata_buff.network_addr.clone(),
6
                    profile: profile.to_string(),
6
                    data: net_data.data.clone(),
6
                    extension: net_data.extension.clone(),
6
                },
6
            };
6
            let payload = match serde_json::to_vec(&msg) {
                Err(e) => {
                    error!("[{}] marshal JSON error: {}", FN_NAME, e);
                    return Err(());
                }
6
                Ok(payload) => payload,
            };
6
            if let Err(e) = sender.send_msg(payload).await {
                error!("[{}] send data to {} error: {}", FN_NAME, sender.name(), e);
                return Err(());
6
            }
6
        }
12
        Ok(())
12
    }
}
#[async_trait]
impl EventHandler for MgrHandler {
18
    async fn on_status_change(&self, mgr: &ApplicationMgr, status: MgrStatus) {
        // Clear cache when manager status changed.
18
        if let Some(cache) = self.cache.as_ref() {
6
            if let Err(e) = cache.device().clear().await {
                error!(
                    "[on_status_change] {}.{} clear device cache error: {}",
                    mgr.unit_code(),
                    mgr.name(),
                    e
                );
6
            }
6
            if let Err(e) = cache.device_route().clear().await {
                error!(
                    "[on_status_change] {}.{} clear device_route cache error: {}",
                    mgr.unit_code(),
                    mgr.name(),
                    e
                );
6
            }
6
            if let Err(e) = cache.network_route().clear().await {
                error!(
                    "[on_status_change] {}.{} clear network_route cache error: {}",
                    mgr.unit_code(),
                    mgr.name(),
                    e
                );
6
            }
12
        }
18
        match status {
            MgrStatus::NotReady => {
9
                error!(
                    "[on_status_change] {}.{} to NotReady",
                    mgr.unit_code(),
                    mgr.name()
                );
            }
            MgrStatus::Ready => {
9
                info!(
                    "[on_status_change] {}.{} to Ready",
                    mgr.unit_code(),
                    mgr.name()
                );
            }
        }
36
    }
    // Do the following jobs:
    // - check if the destination device is valid for the unit.
    // - generate dldata buffer to trace data processing.
    // - send to the network manager.
    async fn on_dldata(
        &self,
        mgr: &ApplicationMgr,
        data: Box<DlData>,
15
    ) -> Result<Box<DlDataResp>, ()> {
        const FN_NAME: &'static str = "on_dldata";
        // Check if the device is valid.
15
        let dldata_route = match self.get_device_route(mgr, &data).await {
3
            Err(e) => return Ok(e),
12
            Ok(route) => route,
12
        };
12

            
12
        let now = Utc::now();
12
        let data_id = strings::random_id(&now, DATA_ID_RAND_LEN);
12

            
12
        self.send_application_dldata_msg(
12
            &now,
12
            data_id.as_str(),
12
            mgr.unit_id(),
12
            &dldata_route.profile,
12
            &data,
12
        )
12
        .await?;
        // Check if the network exists.
12
        let network_mgr = {
12
            match self
12
                .network_mgrs
12
                .lock()
12
                .unwrap()
12
                .get(&dldata_route.net_mgr_key)
            {
                None => {
                    return Ok(Box::new(DlDataResp {
                        correlation_id: data.correlation_id,
                        error: Some(ErrReq::NETWORK_NOT_EXIST.1.to_string()),
                        ..Default::default()
                    }));
                }
12
                Some(mgr) => mgr.clone(),
            }
        };
12
        let ts_nanos = match now.timestamp_nanos_opt() {
            None => {
                error!("[{}] cannot generate valid nanoseconds", FN_NAME);
                return Ok(Box::new(DlDataResp {
                    correlation_id: data.correlation_id.clone(),
                    error: Some(err::E_RSC.to_string()),
                    message: Some(format!("cannot generate valid nanoseconds")),
                    ..Default::default()
                }));
            }
12
            Some(ts) => ts,
12
        };
12
        let expired_at = Utc.timestamp_nanos(ts_nanos + DATA_EXPIRES_IN * 1_000_000_000);
12
        let dldata = dldata_buffer::DlDataBuffer {
12
            data_id: data_id.clone(),
12
            unit_id: mgr.unit_id().to_string(),
12
            unit_code: mgr.unit_code().to_string(),
12
            application_id: mgr.id().to_string(),
12
            application_code: mgr.name().to_string(),
12
            network_id: dldata_route.network_id,
12
            network_addr: dldata_route.network_addr.clone(),
12
            device_id: dldata_route.device_id,
12
            created_at: now,
12
            expired_at,
12
        };
12
        match self.model.dldata_buffer().add(&dldata).await {
            Err(e) => {
                error!("[{}] add data buffer with error: {}", FN_NAME, e);
                return Ok(Box::new(DlDataResp {
                    correlation_id: data.correlation_id,
                    error: Some(err::E_DB.to_string()),
                    message: Some(format!("{}", e)),
                    ..Default::default()
                }));
            }
12
            Ok(_) => (),
12
        }
12

            
12
        let net_data = NetworkDlData {
12
            data_id,
12
            publish: time_str(&now),
12
            expires_in: DATA_EXPIRES_IN,
12
            network_addr: dldata_route.network_addr,
12
            data: data.data,
12
            extension: data.extension,
12
        };
12
        self.send_network_dldata_msg(
12
            &now,
12
            network_mgr.name(),
12
            &dldata,
12
            &dldata_route.profile,
12
            &net_data,
12
        )
12
        .await?;
12
        if let Err(e) = network_mgr.send_dldata(&net_data) {
            error!("[{}] send dldata to network with error: {}", FN_NAME, e);
            return Ok(Box::new(DlDataResp {
                correlation_id: data.correlation_id,
                error: Some(err::E_INT_MSG.to_string()),
                message: Some(format!("send data with error: {}", e)),
                ..Default::default()
            }));
12
        }
12

            
12
        Ok(Box::new(DlDataResp {
12
            correlation_id: data.correlation_id,
12
            data_id: Some(net_data.data_id),
12
            ..Default::default()
12
        }))
30
    }
}
#[async_trait]
impl QueueEventHandler for CtrlSenderHandler {
    async fn on_error(&self, queue: Arc<dyn GmqQueue>, err: Box<dyn StdError + Send + Sync>) {
        const FN_NAME: &'static str = "CtrlSenderHandler::on_error";
        let queue_name = queue.name();
        error!("[{}] {} error: {}", FN_NAME, queue_name, err);
    }
30
    async fn on_status(&self, queue: Arc<dyn GmqQueue>, status: Status) {
        const FN_NAME: &'static str = "CtrlSenderHandler::on_status";
30
        let queue_name = queue.name();
30
        match status {
15
            Status::Connected => info!("[{}] {} connected", queue_name, FN_NAME),
15
            _ => warn!("[{}] {} status to {:?}", FN_NAME, queue_name, status),
        }
60
    }
}
#[async_trait]
impl MessageHandler for CtrlSenderHandler {
    async fn on_message(&self, _queue: Arc<dyn GmqQueue>, _msg: Box<dyn Message>) {}
}
#[async_trait]
impl QueueEventHandler for CtrlReceiverHandler {
    async fn on_error(&self, queue: Arc<dyn GmqQueue>, err: Box<dyn StdError + Send + Sync>) {
        const FN_NAME: &'static str = "CtrlReceiverHandler::on_error";
        let queue_name = queue.name();
        error!("[{}] {} error: {}", FN_NAME, queue_name, err);
    }
30
    async fn on_status(&self, queue: Arc<dyn GmqQueue>, status: Status) {
        const FN_NAME: &'static str = "CtrlReceiverHandler::on_status";
30
        let queue_name = queue.name();
30
        match status {
15
            Status::Connected => info!("[{}] {} connected", queue_name, FN_NAME),
15
            _ => warn!("[{}] {} status to {:?}", FN_NAME, queue_name, status),
        }
60
    }
}
#[async_trait]
impl MessageHandler for CtrlReceiverHandler {
65
    async fn on_message(&self, queue: Arc<dyn GmqQueue>, msg: Box<dyn Message>) {
        const FN_NAME: &'static str = "CtrlReceiverHandler::on_message";
65
        let queue_name = queue.name();
65
        let ctrl_msg = match serde_json::from_slice::<RecvCtrlMsg>(msg.payload()) {
9
            Err(e) => {
9
                let src_str: String = String::from_utf8_lossy(msg.payload()).into();
9
                warn!(
                    "[{}] {} parse JSON error: {}, src: {}",
                    FN_NAME, queue_name, e, src_str
                );
9
                if let Err(e) = msg.ack().await {
                    error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
9
                }
9
                return;
            }
56
            Ok(msg) => msg,
56
        };
56
        match ctrl_msg {
5
            RecvCtrlMsg::DelApplication { new: _new } => {}
30
            RecvCtrlMsg::AddManager { new } => {
30
                let host_uri = match Url::parse(new.host_uri.as_str()) {
3
                    Err(e) => {
3
                        warn!("[{}] {} hostUri error: {}", FN_NAME, queue_name, e);
3
                        if let Err(e) = msg.ack().await {
                            error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
3
                        }
3
                        return;
                    }
27
                    Ok(uri) => uri,
27
                };
27
                let handler = MgrHandler {
27
                    model: self.model.clone(),
27
                    cache: self.cache.clone(),
27
                    network_mgrs: self.network_mgrs.clone(),
27
                    data_sender: self.data_sender.clone(),
27
                };
27
                let unit_code = new.mgr_options.unit_code.clone();
27
                let name = new.mgr_options.name.clone();
27
                let mgr = match ApplicationMgr::new(
27
                    self.mq_conns.clone(),
27
                    &host_uri,
27
                    new.mgr_options,
27
                    Arc::new(handler),
27
                ) {
3
                    Err(e) => {
3
                        error!("[{}] {} new manager error: {}", FN_NAME, queue_name, e);
3
                        if let Err(e) = msg.ack().await {
                            error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
3
                        }
3
                        return;
                    }
24
                    Ok(mgr) => {
24
                        debug!("[{}] {} new manager", FN_NAME, queue_name);
24
                        mgr
24
                    }
24
                };
24
                let key = gen_mgr_key(unit_code.as_str(), name.as_str());
24
                let old_mgr = {
24
                    self.application_mgrs
24
                        .lock()
24
                        .unwrap()
24
                        .insert(key.clone(), mgr)
                };
24
                if let Some(mgr) = old_mgr {
                    if let Err(e) = mgr.close().await {
                        error!(
                            "[{}] {} close old manager {} error: {}",
                            FN_NAME, queue_name, key, e
                        );
                    } else {
                        debug!("[{}] {} close old manager {}", FN_NAME, queue_name, key);
                    }
24
                }
24
                info!("[{}] {} manager {} added", FN_NAME, queue_name, key);
            }
21
            RecvCtrlMsg::DelManager { new } => {
21
                let old_mgr = { self.application_mgrs.lock().unwrap().remove(&new) };
21
                match old_mgr {
                    None => {
21
                        error!("[{}] {} get no manager {}", FN_NAME, queue_name, new);
21
                        if let Err(e) = msg.ack().await {
                            error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
21
                        }
21
                        return;
                    }
                    Some(mgr) => {
                        if let Err(e) = mgr.close().await {
                            error!(
                                "[{}] {} close old manager {} error: {}",
                                FN_NAME, queue_name, new, e
                            );
                        } else {
                            debug!("[{}] {} close old manager {}", FN_NAME, queue_name, new);
                        }
                    }
                }
                info!("[{}] {} manager {} deleted", FN_NAME, queue_name, new);
            }
        }
29
        if let Err(e) = msg.ack().await {
            error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
29
        }
130
    }
}