1
use std::{
2
    collections::HashMap,
3
    error::Error as StdError,
4
    io::{Error as IoError, ErrorKind},
5
    sync::{Arc, Mutex},
6
    time::Duration,
7
};
8

            
9
use async_trait::async_trait;
10
use axum::{
11
    body::{Body, Bytes},
12
    extract::State,
13
    http::{header, StatusCode},
14
    response::IntoResponse,
15
    Extension,
16
};
17
use chrono::{DateTime, Utc};
18
use log::{debug, error, info, warn};
19
use serde::{Deserialize, Serialize};
20
use serde_json::{self, Map, Value};
21
use tokio::time;
22
use url::Url;
23

            
24
use general_mq::{
25
    queue::{EventHandler as QueueEventHandler, GmqQueue, Message, MessageHandler, Status},
26
    Queue,
27
};
28
use sylvia_iot_corelib::{
29
    constants::ContentType,
30
    err::ErrResp,
31
    http::{Json, Path, Query},
32
    role::Role,
33
    strings::{self, time_str},
34
};
35

            
36
use super::{
37
    super::{
38
        super::{middleware::GetTokenInfoData, ErrReq, State as AppState},
39
        lib::{check_network, check_unit, gen_mgr_key},
40
    },
41
    request, response,
42
};
43
use crate::{
44
    libs::{
45
        config::BrokerCtrl as CfgCtrl,
46
        mq::{
47
            self,
48
            application::{
49
                ApplicationMgr, DlDataResult as ApplicationDlDataResult,
50
                UlData as ApplicationUlData,
51
            },
52
            network::{DlDataResult, EventHandler, NetworkMgr, UlData},
53
            Connection, MgrStatus, Options as MgrOptions,
54
        },
55
    },
56
    models::{
57
        device::{self, DeviceCacheItem},
58
        device_route, dldata_buffer,
59
        network::{
60
            ListOptions, ListQueryCond, Network, QueryCond, SortCond, SortKey, UpdateQueryCond,
61
            Updates,
62
        },
63
        network_route, Cache, Model,
64
    },
65
};
66

            
67
struct MgrHandler {
68
    model: Arc<dyn Model>,
69
    cache: Option<Arc<dyn Cache>>,
70
    application_mgrs: Arc<Mutex<HashMap<String, ApplicationMgr>>>,
71
    data_sender: Option<Queue>,
72
}
73

            
74
276
#[derive(Deserialize, Serialize)]
75
#[serde(tag = "operation")]
76
enum RecvCtrlMsg {
77
    #[serde(rename = "del-network")]
78
    DelNetwork { new: CtrlDelNetwork },
79
    #[serde(rename = "add-manager")]
80
    AddManager { new: CtrlAddManager },
81
    #[serde(rename = "del-manager")]
82
    DelManager { new: String },
83
}
84

            
85
/// Control channel.
86
#[derive(Serialize)]
87
#[serde(untagged)]
88
enum SendCtrlMsg {
89
    DelNetwork {
90
        operation: String,
91
        new: CtrlDelNetwork,
92
    },
93
    AddManager {
94
        operation: String,
95
        new: CtrlAddManager,
96
    },
97
    DelManager {
98
        operation: String,
99
        new: String,
100
    },
101
}
102

            
103
/// Data channel.
104
#[derive(Serialize)]
105
struct SendDataMsg {
106
    kind: String,
107
    data: SendDataKind,
108
}
109

            
110
#[derive(Serialize)]
111
#[serde(untagged)]
112
enum SendDataKind {
113
    AppUlData {
114
        #[serde(rename = "dataId")]
115
        data_id: String,
116
        proc: String,
117
        #[serde(rename = "pub")]
118
        publish: String,
119
        #[serde(rename = "unitCode")]
120
        unit_code: Option<String>,
121
        #[serde(rename = "networkCode")]
122
        network_code: String,
123
        #[serde(rename = "networkAddr")]
124
        network_addr: String,
125
        #[serde(rename = "unitId")]
126
        unit_id: String,
127
        #[serde(rename = "deviceId")]
128
        device_id: String,
129
        time: String,
130
        profile: String,
131
        data: String,
132
        #[serde(skip_serializing_if = "Option::is_none")]
133
        extension: Option<Map<String, Value>>,
134
    },
135
    AppDlDataResult {
136
        #[serde(rename = "dataId")]
137
        data_id: String,
138
        resp: String,
139
        status: i32,
140
    },
141
    NetUlData {
142
        #[serde(rename = "dataId")]
143
        data_id: String,
144
        proc: String,
145
        #[serde(rename = "unitCode")]
146
        unit_code: Option<String>,
147
        #[serde(rename = "networkCode")]
148
        network_code: String,
149
        #[serde(rename = "networkAddr")]
150
        network_addr: String,
151
        #[serde(rename = "unitId", skip_serializing_if = "Option::is_none")]
152
        unit_id: Option<String>,
153
        #[serde(rename = "deviceId", skip_serializing_if = "Option::is_none")]
154
        device_id: Option<String>,
155
        time: String,
156
        profile: String,
157
        data: String,
158
        #[serde(skip_serializing_if = "Option::is_none")]
159
        extension: Option<Map<String, Value>>,
160
    },
161
    NetDlDataResult {
162
        #[serde(rename = "dataId")]
163
        data_id: String,
164
        resp: String,
165
        status: i32,
166
    },
167
}
168

            
169
struct CtrlMsgOp;
170
struct DataMsgKind;
171

            
172
30
#[derive(Deserialize, Serialize)]
173
struct CtrlDelNetwork {
174
    #[serde(rename = "unitId")]
175
    unit_id: Option<String>,
176
    #[serde(rename = "unitCode")]
177
    unit_code: Option<String>,
178
    #[serde(rename = "networkId")]
179
    network_id: String,
180
    #[serde(rename = "networkCode")]
181
    network_code: String,
182
}
183

            
184
153
#[derive(Deserialize, Serialize)]
185
struct CtrlAddManager {
186
    #[serde(rename = "hostUri")]
187
    host_uri: String,
188
    #[serde(rename = "mgrOptions")]
189
    mgr_options: MgrOptions,
190
}
191

            
192
struct CtrlSenderHandler {
193
    cache: Option<Arc<dyn Cache>>,
194
}
195

            
196
struct CtrlReceiverHandler {
197
    model: Arc<dyn Model>,
198
    cache: Option<Arc<dyn Cache>>,
199
    mq_conns: Arc<Mutex<HashMap<String, Connection>>>,
200
    application_mgrs: Arc<Mutex<HashMap<String, ApplicationMgr>>>,
201
    network_mgrs: Arc<Mutex<HashMap<String, NetworkMgr>>>,
202
    data_sender: Option<Queue>,
203
}
204

            
205
impl CtrlMsgOp {
206
    const DEL_NETWORK: &'static str = "del-network";
207
    const ADD_MANAGER: &'static str = "add-manager";
208
    const DEL_MANAGER: &'static str = "del-manager";
209
}
210

            
211
impl DataMsgKind {
212
    const APP_ULDATA: &'static str = "application-uldata";
213
    const APP_DLDATA_RES: &'static str = "application-dldata-result";
214
    const NET_ULDATA: &'static str = "network-uldata";
215
    const NET_DLDATA_RES: &'static str = "network-dldata-result";
216
}
217

            
218
const LIST_LIMIT_DEFAULT: u64 = 100;
219
const LIST_CURSOR_MAX: u64 = 100;
220
const ID_RAND_LEN: usize = 8;
221
const DATA_ID_RAND_LEN: usize = 12;
222
const CTRL_QUEUE_NAME: &'static str = "network";
223

            
224
/// Initialize network managers and channels.
225
15
pub async fn init(state: &AppState, ctrl_conf: &CfgCtrl) -> Result<(), Box<dyn StdError>> {
226
    const FN_NAME: &'static str = "init";
227

            
228
15
    let q = new_ctrl_receiver(state, ctrl_conf)?;
229
15
    {
230
15
        state
231
15
            .ctrl_receivers
232
15
            .lock()
233
15
            .unwrap()
234
15
            .insert(CTRL_QUEUE_NAME.to_string(), q.clone());
235
15
    }
236
15

            
237
15
    let ctrl_sender = { state.ctrl_senders.network.lock().unwrap().clone() };
238
    // Wait for connected.
239
1381
    for _ in 0..500 {
240
1381
        if ctrl_sender.status() == Status::Connected && q.status() == Status::Connected {
241
15
            break;
242
1366
        }
243
2710
        time::sleep(Duration::from_millis(10)).await;
244
    }
245
15
    if ctrl_sender.status() != Status::Connected {
246
        error!(
247
            "[{}] {} control sender not connected",
248
            FN_NAME, CTRL_QUEUE_NAME
249
        );
250
        return Err(Box::new(IoError::new(
251
            ErrorKind::NotConnected,
252
            format!("control sender {} not connected", CTRL_QUEUE_NAME),
253
        )));
254
15
    }
255
15
    if q.status() != Status::Connected {
256
        error!(
257
            "[{}] {} control receiver not connected",
258
            FN_NAME, CTRL_QUEUE_NAME
259
        );
260
        return Err(Box::new(IoError::new(
261
            ErrorKind::NotConnected,
262
            format!("control receiver {} not connected", CTRL_QUEUE_NAME),
263
        )));
264
15
    }
265
15

            
266
15
    let cond = ListQueryCond {
267
15
        ..Default::default()
268
15
    };
269
15
    let opts = ListOptions {
270
15
        cond: &cond,
271
15
        offset: None,
272
15
        limit: None,
273
15
        sort: None,
274
15
        cursor_max: Some(LIST_CURSOR_MAX),
275
15
    };
276
15
    let mut list;
277
15
    let mut cursor = None;
278
    loop {
279
41
        (list, cursor) = state.model.network().list(&opts, cursor).await?;
280
15
        for item in list.iter() {
281
1
            let url = Url::parse(item.host_uri.as_str())?;
282
1
            let unit_id = match item.unit_id.as_ref() {
283
                None => "",
284
1
                Some(unit_id) => unit_id.as_str(),
285
            };
286
1
            let unit_code = match item.unit_code.as_ref() {
287
                None => "",
288
1
                Some(unit_code) => unit_code.as_str(),
289
            };
290
1
            let key = gen_mgr_key(unit_code, item.code.as_str());
291
1
            let opts = MgrOptions {
292
1
                unit_id: unit_id.to_string(),
293
1
                unit_code: unit_code.to_string(),
294
1
                id: item.network_id.clone(),
295
1
                name: item.code.clone(),
296
1
                prefetch: Some(state.amqp_prefetch),
297
1
                persistent: state.amqp_persistent,
298
1
                shared_prefix: Some(state.mqtt_shared_prefix.clone()),
299
1
            };
300
1
            let handler = MgrHandler {
301
1
                model: state.model.clone(),
302
1
                cache: state.cache.clone(),
303
1
                application_mgrs: state.application_mgrs.clone(),
304
1
                data_sender: state.data_sender.clone(),
305
1
            };
306
1
            let mgr = match NetworkMgr::new(state.mq_conns.clone(), &url, opts, Arc::new(handler)) {
307
                Err(e) => {
308
                    error!("[{}] new manager for {} error: {}", FN_NAME, key, e);
309
                    return Err(Box::new(ErrResp::ErrRsc(Some(e))));
310
                }
311
1
                Ok(mgr) => mgr,
312
1
            };
313
1
            {
314
1
                state.network_mgrs.lock().unwrap().insert(key.clone(), mgr);
315
1
            }
316
        }
317
15
        if cursor.is_none() {
318
15
            break;
319
        }
320
    }
321

            
322
15
    Ok(())
323
15
}
324

            
325
/// Create control channel sender queue.
326
15
pub fn new_ctrl_sender(
327
15
    conn_pool: &Arc<Mutex<HashMap<String, Connection>>>,
328
15
    config: &CfgCtrl,
329
15
    cache: Option<Arc<dyn Cache>>,
330
15
) -> Result<Arc<Mutex<Queue>>, Box<dyn StdError>> {
331
15
    let url = match config.url.as_ref() {
332
        None => {
333
            return Err(Box::new(IoError::new(
334
                ErrorKind::InvalidInput,
335
                "empty control url",
336
            )))
337
        }
338
15
        Some(url) => match Url::parse(url.as_str()) {
339
            Err(e) => return Err(Box::new(e)),
340
15
            Ok(url) => url,
341
15
        },
342
15
    };
343
15

            
344
15
    match mq::control::new(
345
15
        conn_pool.clone(),
346
15
        &url,
347
15
        config.prefetch,
348
15
        CTRL_QUEUE_NAME,
349
15
        false,
350
15
        Arc::new(CtrlSenderHandler {
351
15
            cache: cache.clone(),
352
15
        }),
353
15
        Arc::new(CtrlSenderHandler { cache }),
354
15
    ) {
355
        Err(e) => Err(Box::new(IoError::new(ErrorKind::InvalidInput, e))),
356
15
        Ok(q) => Ok(Arc::new(Mutex::new(q))),
357
    }
358
15
}
359

            
360
/// Create control channel receiver queue.
361
15
pub fn new_ctrl_receiver(state: &AppState, config: &CfgCtrl) -> Result<Queue, Box<dyn StdError>> {
362
15
    let url = match config.url.as_ref() {
363
        None => {
364
            return Err(Box::new(IoError::new(
365
                ErrorKind::InvalidInput,
366
                "empty control url",
367
            )))
368
        }
369
15
        Some(url) => match Url::parse(url.as_str()) {
370
            Err(e) => return Err(Box::new(e)),
371
15
            Ok(url) => url,
372
15
        },
373
15
    };
374
15
    let handler = Arc::new(CtrlReceiverHandler {
375
15
        model: state.model.clone(),
376
15
        cache: state.cache.clone(),
377
15
        mq_conns: state.mq_conns.clone(),
378
15
        application_mgrs: state.application_mgrs.clone(),
379
15
        network_mgrs: state.network_mgrs.clone(),
380
15
        data_sender: state.data_sender.clone(),
381
15
    });
382
15
    match mq::control::new(
383
15
        state.mq_conns.clone(),
384
15
        &url,
385
15
        config.prefetch,
386
15
        CTRL_QUEUE_NAME,
387
15
        true,
388
15
        handler.clone(),
389
15
        handler,
390
15
    ) {
391
        Err(e) => Err(Box::new(IoError::new(ErrorKind::InvalidInput, e))),
392
15
        Ok(q) => Ok(q),
393
    }
394
15
}
395

            
396
/// `POST /{base}/api/v1/network`
397
66
pub async fn post_network(
398
66
    State(state): State<AppState>,
399
66
    Extension(token_info): Extension<GetTokenInfoData>,
400
66
    Json(body): Json<request::PostNetworkBody>,
401
66
) -> impl IntoResponse {
402
    const FN_NAME: &'static str = "post_network";
403

            
404
66
    let user_id = token_info.user_id.as_str();
405
66
    let roles = &token_info.roles;
406
66

            
407
66
    let code = body.data.code.to_lowercase();
408
66
    let host_uri = body.data.host_uri.as_str();
409
66
    if !strings::is_code(code.as_str()) {
410
3
        return Err(ErrResp::ErrParam(Some(
411
3
            "`code` must be [A-Za-z0-9]{1}[A-Za-z0-9-_]*".to_string(),
412
3
        )));
413
63
    }
414
63
    let host_uri = match Url::parse(host_uri) {
415
3
        Err(_) => return Err(ErrResp::ErrParam(Some("invalid `hostUri`".to_string()))),
416
60
        Ok(uri) => match mq::SUPPORT_SCHEMES.contains(&uri.scheme()) {
417
            false => {
418
3
                return Err(ErrResp::ErrParam(Some(
419
3
                    "unsupport `hostUri` scheme".to_string(),
420
3
                )))
421
            }
422
57
            true => uri,
423
        },
424
    };
425
57
    if let Some(info) = body.data.info.as_ref() {
426
9
        for (k, _) in info.iter() {
427
6
            if k.len() == 0 {
428
3
                return Err(ErrResp::ErrParam(Some(
429
3
                    "`info` key must not be empty".to_string(),
430
3
                )));
431
3
            }
432
        }
433
48
    }
434
54
    let mut unit_code: Option<String> = None;
435
54
    let unit_id = match body.data.unit_id.as_ref() {
436
        None => {
437
21
            if !Role::is_role(roles, Role::ADMIN) && !Role::is_role(roles, Role::MANAGER) {
438
3
                return Err(ErrResp::ErrParam(Some("missing `unitId`".to_string())));
439
18
            }
440
18
            None
441
        }
442
33
        Some(unit_id) => {
443
33
            if unit_id.len() == 0 {
444
3
                return Err(ErrResp::ErrParam(Some(
445
3
                    "`unitId` must with at least one character".to_string(),
446
3
                )));
447
30
            }
448
60
            match check_unit(FN_NAME, user_id, roles, unit_id.as_str(), true, &state).await? {
449
                None => {
450
6
                    return Err(ErrResp::Custom(
451
6
                        ErrReq::UNIT_NOT_EXIST.0,
452
6
                        ErrReq::UNIT_NOT_EXIST.1,
453
6
                        None,
454
6
                    ))
455
                }
456
24
                Some(unit) => {
457
24
                    unit_code = Some(unit.code);
458
24
                }
459
24
            }
460
24
            Some(unit_id.clone())
461
        }
462
    };
463
83
    if check_code(FN_NAME, unit_id.as_ref(), code.as_str(), &state).await? {
464
6
        return Err(ErrResp::Custom(
465
6
            ErrReq::NETWORK_EXIST.0,
466
6
            ErrReq::NETWORK_EXIST.1,
467
6
            None,
468
6
        ));
469
36
    }
470
36

            
471
36
    let now = Utc::now();
472
36
    let network = Network {
473
36
        network_id: strings::random_id(&now, ID_RAND_LEN),
474
36
        code: code.clone(),
475
36
        unit_id: unit_id.clone(),
476
36
        unit_code: unit_code.clone(),
477
36
        created_at: now,
478
36
        modified_at: now,
479
36
        host_uri: host_uri.to_string(),
480
36
        name: match body.data.name.as_ref() {
481
33
            None => "".to_string(),
482
3
            Some(name) => name.clone(),
483
        },
484
36
        info: match body.data.info.as_ref() {
485
30
            None => Map::new(),
486
6
            Some(info) => info.clone(),
487
        },
488
    };
489
72
    if let Err(e) = state.model.network().add(&network).await {
490
        error!("[{}] add error: {}", FN_NAME, e);
491
        return Err(ErrResp::ErrDb(Some(e.to_string())));
492
36
    }
493
36
    let unit_id = match unit_id.as_ref() {
494
15
        None => "",
495
21
        Some(id) => id.as_str(),
496
    };
497
36
    let unit_code = match unit_code.as_ref() {
498
15
        None => "",
499
21
        Some(code) => code.as_str(),
500
    };
501
36
    add_manager(
502
36
        FN_NAME,
503
36
        &state,
504
36
        &host_uri,
505
36
        unit_id,
506
36
        unit_code,
507
36
        network.network_id.as_str(),
508
36
        code.as_str(),
509
36
    )
510
    .await?;
511
36
    Ok(Json(response::PostNetwork {
512
36
        data: response::PostNetworkData {
513
36
            network_id: network.network_id,
514
36
        },
515
36
    }))
516
66
}
517

            
518
/// `GET /{base}/api/v1/network/count`
519
63
pub async fn get_network_count(
520
63
    State(state): State<AppState>,
521
63
    Extension(token_info): Extension<GetTokenInfoData>,
522
63
    Query(query): Query<request::GetNetworkCountQuery>,
523
63
) -> impl IntoResponse {
524
    const FN_NAME: &'static str = "get_network_count";
525

            
526
63
    let user_id = token_info.user_id.as_str();
527
63
    let roles = &token_info.roles;
528
63

            
529
63
    if !Role::is_role(roles, Role::ADMIN) && !Role::is_role(roles, Role::MANAGER) {
530
33
        match query.unit.as_ref() {
531
3
            None => return Err(ErrResp::ErrParam(Some("missing `unit`".to_string()))),
532
30
            Some(unit_id) => {
533
30
                if unit_id.len() == 0 {
534
3
                    return Err(ErrResp::ErrParam(Some("missing `unit`".to_string())));
535
27
                }
536
            }
537
        }
538
30
    }
539
57
    let unit_cond = match query.unit.as_ref() {
540
9
        None => None,
541
48
        Some(unit_id) => match unit_id.len() {
542
3
            0 => Some(None),
543
            _ => {
544
90
                match check_unit(FN_NAME, user_id, roles, unit_id.as_str(), false, &state).await? {
545
                    None => {
546
9
                        return Err(ErrResp::Custom(
547
9
                            ErrReq::UNIT_NOT_EXIST.0,
548
9
                            ErrReq::UNIT_NOT_EXIST.1,
549
9
                            None,
550
9
                        ))
551
                    }
552
36
                    Some(_) => Some(Some(unit_id.as_str())),
553
                }
554
            }
555
        },
556
    };
557
48
    let mut code_cond = None;
558
48
    let mut code_contains_cond = None;
559
48
    if let Some(code) = query.code.as_ref() {
560
12
        if code.len() > 0 {
561
12
            code_cond = Some(code.as_str());
562
12
        }
563
36
    }
564
48
    if code_cond.is_none() {
565
36
        if let Some(contains) = query.contains.as_ref() {
566
9
            if contains.len() > 0 {
567
9
                code_contains_cond = Some(contains.as_str());
568
9
            }
569
27
        }
570
12
    }
571
48
    let cond = ListQueryCond {
572
48
        unit_id: unit_cond,
573
48
        code: code_cond,
574
48
        code_contains: code_contains_cond,
575
48
        ..Default::default()
576
48
    };
577
96
    match state.model.network().count(&cond).await {
578
        Err(e) => {
579
            error!("[{}] count error: {}", FN_NAME, e);
580
            Err(ErrResp::ErrDb(Some(e.to_string())))
581
        }
582
48
        Ok(count) => Ok(Json(response::GetNetworkCount {
583
48
            data: response::GetCountData { count },
584
48
        })),
585
    }
586
63
}
587

            
588
/// `GET /{base}/api/v1/network/list`
589
144
pub async fn get_network_list(
590
144
    State(state): State<AppState>,
591
144
    Extension(token_info): Extension<GetTokenInfoData>,
592
144
    Query(query): Query<request::GetNetworkListQuery>,
593
144
) -> impl IntoResponse {
594
    const FN_NAME: &'static str = "get_network_list";
595

            
596
144
    let user_id = token_info.user_id.as_str();
597
144
    let roles = &token_info.roles;
598
144

            
599
144
    if !Role::is_role(roles, Role::ADMIN) && !Role::is_role(roles, Role::MANAGER) {
600
33
        match query.unit.as_ref() {
601
3
            None => return Err(ErrResp::ErrParam(Some("missing `unit`".to_string()))),
602
30
            Some(unit_id) => {
603
30
                if unit_id.len() == 0 {
604
3
                    return Err(ErrResp::ErrParam(Some("missing `unit`".to_string())));
605
27
                }
606
            }
607
        }
608
111
    }
609
138
    let unit_cond = match query.unit.as_ref() {
610
81
        None => None,
611
57
        Some(unit_id) => match unit_id.len() {
612
9
            0 => Some(None),
613
            _ => {
614
96
                match check_unit(FN_NAME, user_id, roles, unit_id.as_str(), false, &state).await? {
615
                    None => {
616
9
                        return Err(ErrResp::Custom(
617
9
                            ErrReq::UNIT_NOT_EXIST.0,
618
9
                            ErrReq::UNIT_NOT_EXIST.1,
619
9
                            None,
620
9
                        ))
621
                    }
622
39
                    Some(_) => Some(Some(unit_id.as_str())),
623
                }
624
            }
625
        },
626
    };
627
129
    let mut code_cond = None;
628
129
    let mut code_contains_cond = None;
629
129
    if let Some(code) = query.code.as_ref() {
630
12
        if code.len() > 0 {
631
12
            code_cond = Some(code.as_str());
632
12
        }
633
117
    }
634
129
    if code_cond.is_none() {
635
117
        if let Some(contains) = query.contains.as_ref() {
636
42
            if contains.len() > 0 {
637
42
                code_contains_cond = Some(contains.as_str());
638
42
            }
639
75
        }
640
12
    }
641
129
    let cond = ListQueryCond {
642
129
        unit_id: unit_cond,
643
129
        code: code_cond,
644
129
        code_contains: code_contains_cond,
645
129
        ..Default::default()
646
129
    };
647
129
    let sort_cond = get_sort_cond(&query.sort)?;
648
114
    let opts = ListOptions {
649
114
        cond: &cond,
650
114
        offset: query.offset,
651
114
        limit: match query.limit {
652
87
            None => Some(LIST_LIMIT_DEFAULT),
653
27
            Some(limit) => match limit {
654
12
                0 => None,
655
15
                _ => Some(limit),
656
            },
657
        },
658
114
        sort: Some(sort_cond.as_slice()),
659
114
        cursor_max: Some(LIST_CURSOR_MAX),
660
    };
661

            
662
228
    let (list, cursor) = match state.model.network().list(&opts, None).await {
663
        Err(e) => {
664
            error!("[{}] list error: {}", FN_NAME, e);
665
            return Err(ErrResp::ErrDb(Some(e.to_string())));
666
        }
667
114
        Ok((list, cursor)) => match cursor {
668
3
            None => match query.format {
669
                Some(request::ListFormat::Array) => {
670
3
                    return Ok(Json(network_list_transform(&list)).into_response())
671
                }
672
                _ => {
673
81
                    return Ok(Json(response::GetNetworkList {
674
81
                        data: network_list_transform(&list),
675
81
                    })
676
81
                    .into_response())
677
                }
678
            },
679
30
            Some(_) => (list, cursor),
680
30
        },
681
30
    };
682
30

            
683
30
    let body = Body::from_stream(async_stream::stream! {
684
30
        let unit_cond = match query.unit.as_ref() {
685
30
            None => None,
686
30
            Some(unit_id) => match unit_id.len() {
687
30
                0 => Some(None),
688
30
                _ => Some(Some(unit_id.as_str())),
689
30
            },
690
30
        };
691
30
        let mut code_contains_cond = None;
692
30
        if let Some(contains) = query.contains.as_ref() {
693
30
            if contains.len() > 0 {
694
30
                code_contains_cond = Some(contains.as_str());
695
30
            }
696
30
        }
697
30
        let cond = ListQueryCond {
698
30
            unit_id: unit_cond,
699
30
            code_contains: code_contains_cond,
700
30
            ..Default::default()
701
30
        };
702
30
        let opts = ListOptions {
703
30
            cond: &cond,
704
30
            offset: query.offset,
705
30
            limit: match query.limit {
706
30
                None => Some(LIST_LIMIT_DEFAULT),
707
30
                Some(limit) => match limit {
708
30
                    0 => None,
709
30
                    _ => Some(limit),
710
30
                },
711
30
            },
712
30
            sort: Some(sort_cond.as_slice()),
713
30
            cursor_max: Some(LIST_CURSOR_MAX),
714
30
        };
715
30

            
716
30
        let mut list = list;
717
30
        let mut cursor = cursor;
718
30
        let mut is_first = true;
719
30
        loop {
720
30
            yield network_list_transform_bytes(&list, is_first, cursor.is_none(), query.format.as_ref());
721
30
            is_first = false;
722
30
            if cursor.is_none() {
723
30
                break;
724
30
            }
725
30
            let (_list, _cursor) = match state.model.network().list(&opts, cursor).await {
726
30
                Err(_) => break,
727
30
                Ok((list, cursor)) => (list, cursor),
728
30
            };
729
30
            list = _list;
730
30
            cursor = _cursor;
731
30
        }
732
30
    });
733
30
    Ok(([(header::CONTENT_TYPE, ContentType::JSON)], body).into_response())
734
144
}
735

            
736
/// `GET /{base}/api/v1/network/{networkId}`
737
36
pub async fn get_network(
738
36
    State(state): State<AppState>,
739
36
    Extension(token_info): Extension<GetTokenInfoData>,
740
36
    Path(param): Path<request::NetworkIdPath>,
741
36
) -> impl IntoResponse {
742
    const FN_NAME: &'static str = "get_network";
743

            
744
36
    let user_id = token_info.user_id.as_str();
745
36
    let roles = &token_info.roles;
746
36
    let network_id = param.network_id.as_str();
747
36

            
748
108
    match check_network(FN_NAME, network_id, user_id, false, roles, &state).await? {
749
15
        None => Err(ErrResp::ErrNotFound(None)),
750
21
        Some(network) => Ok(Json(response::GetNetwork {
751
21
            data: network_transform(&network),
752
21
        })),
753
    }
754
36
}
755

            
756
/// `PATCH /{base}/api/v1/network/{networkId}`
757
51
pub async fn patch_network(
758
51
    State(state): State<AppState>,
759
51
    Extension(token_info): Extension<GetTokenInfoData>,
760
51
    Path(param): Path<request::NetworkIdPath>,
761
51
    Json(mut body): Json<request::PatchNetworkBody>,
762
51
) -> impl IntoResponse {
763
    const FN_NAME: &'static str = "patch_network";
764

            
765
51
    let user_id = token_info.user_id.as_str();
766
51
    let roles = &token_info.roles;
767
51
    let network_id = param.network_id.as_str();
768

            
769
    // To check if the network is for the user.
770
114
    let network = match check_network(FN_NAME, network_id, user_id, true, roles, &state).await? {
771
15
        None => return Err(ErrResp::ErrNotFound(None)),
772
36
        Some(network) => network,
773
    };
774

            
775
36
    let updates = get_updates(&mut body.data).await?;
776
18
    let mut should_add_mgr = false;
777

            
778
    // Remove old manager.
779
18
    if let Some(host_uri) = updates.host_uri {
780
9
        let uri = Url::parse(host_uri).unwrap();
781
9
        if !uri.as_str().eq(network.host_uri.as_str()) {
782
9
            delete_manager(FN_NAME, &state, &network).await?;
783
9
            should_add_mgr = true;
784
        }
785
9
    }
786

            
787
    // Update database.
788
18
    let cond = UpdateQueryCond { network_id };
789
36
    if let Err(e) = state.model.network().update(&cond, &updates).await {
790
        error!("[{}] update error: {}", FN_NAME, e);
791
        return Err(ErrResp::ErrDb(Some(e.to_string())));
792
18
    }
793
18

            
794
18
    // Add new manager.
795
18
    if should_add_mgr {
796
9
        if let Some(host_uri) = updates.host_uri {
797
9
            let uri = Url::parse(host_uri).unwrap();
798
9
            let unit_id = match network.unit_id.as_ref() {
799
3
                None => "",
800
6
                Some(id) => id.as_str(),
801
            };
802
9
            let unit_code = match network.unit_code.as_ref() {
803
3
                None => "",
804
6
                Some(code) => code.as_str(),
805
            };
806
9
            add_manager(
807
9
                FN_NAME,
808
9
                &state,
809
9
                &uri,
810
9
                unit_id,
811
9
                unit_code,
812
9
                network.network_id.as_str(),
813
9
                network.code.as_str(),
814
9
            )
815
            .await?;
816
        }
817
9
    }
818
18
    Ok(StatusCode::NO_CONTENT)
819
51
}
820

            
821
/// `DELETE /{base}/api/v1/network/{networkId}`
822
33
pub async fn delete_network(
823
33
    State(state): State<AppState>,
824
33
    Extension(token_info): Extension<GetTokenInfoData>,
825
33
    Path(param): Path<request::NetworkIdPath>,
826
33
) -> impl IntoResponse {
827
    const FN_NAME: &'static str = "delete_network";
828

            
829
33
    let user_id = token_info.user_id.as_str();
830
33
    let roles = &token_info.roles;
831
33
    let network_id = param.network_id.as_str();
832

            
833
    // To check if the network is for the user.
834
96
    let network = match check_network(FN_NAME, network_id, user_id, true, roles, &state).await {
835
        Err(e) => return Err(e), // XXX: not use "?" to solve E0282 error.
836
33
        Ok(network) => match network {
837
15
            None => return Ok(StatusCode::NO_CONTENT),
838
18
            Some(network) => network,
839
18
        },
840
18
    };
841
18

            
842
18
    delete_manager(FN_NAME, &state, &network).await?;
843
177
    del_network_rsc(FN_NAME, network_id, &state).await?;
844
18
    send_del_ctrl_message(FN_NAME, network, &state).await?;
845

            
846
18
    Ok(StatusCode::NO_CONTENT)
847
33
}
848

            
849
129
fn get_sort_cond(sort_args: &Option<String>) -> Result<Vec<SortCond>, ErrResp> {
850
129
    match sort_args.as_ref() {
851
90
        None => Ok(vec![SortCond {
852
90
            key: SortKey::Code,
853
90
            asc: true,
854
90
        }]),
855
39
        Some(args) => {
856
39
            let mut args = args.split(",");
857
39
            let mut sort_cond = vec![];
858
66
            while let Some(arg) = args.next() {
859
42
                let mut cond = arg.split(":");
860
42
                let key = match cond.next() {
861
                    None => return Err(ErrResp::ErrParam(Some("wrong sort argument".to_string()))),
862
42
                    Some(field) => match field {
863
42
                        "code" => SortKey::Code,
864
36
                        "created" => SortKey::CreatedAt,
865
21
                        "modified" => SortKey::ModifiedAt,
866
15
                        "name" => SortKey::Name,
867
                        _ => {
868
6
                            return Err(ErrResp::ErrParam(Some(format!(
869
6
                                "invalid sort key {}",
870
6
                                field
871
6
                            ))))
872
                        }
873
                    },
874
                };
875
36
                let asc = match cond.next() {
876
3
                    None => return Err(ErrResp::ErrParam(Some("wrong sort argument".to_string()))),
877
33
                    Some(asc) => match asc {
878
33
                        "asc" => true,
879
15
                        "desc" => false,
880
                        _ => {
881
3
                            return Err(ErrResp::ErrParam(Some(format!(
882
3
                                "invalid sort asc {}",
883
3
                                asc
884
3
                            ))))
885
                        }
886
                    },
887
                };
888
30
                if cond.next().is_some() {
889
3
                    return Err(ErrResp::ErrParam(Some(
890
3
                        "invalid sort condition".to_string(),
891
3
                    )));
892
27
                }
893
27
                sort_cond.push(SortCond { key, asc });
894
            }
895
24
            Ok(sort_cond)
896
        }
897
    }
898
129
}
899

            
900
36
async fn get_updates<'a>(body: &'a mut request::PatchNetworkData) -> Result<Updates<'a>, ErrResp> {
901
36
    let mut updates = Updates {
902
36
        ..Default::default()
903
36
    };
904
36
    let mut count = 0;
905
36
    if let Some(host_uri) = body.host_uri.as_ref() {
906
18
        match Url::parse(host_uri) {
907
6
            Err(_) => return Err(ErrResp::ErrParam(Some("invalid `hostUri`".to_string()))),
908
12
            Ok(uri) => {
909
12
                if !mq::SUPPORT_SCHEMES.contains(&uri.scheme()) {
910
3
                    return Err(ErrResp::ErrParam(Some(
911
3
                        "unsupport `hostUri` scheme".to_string(),
912
3
                    )));
913
9
                }
914
9
                body.host_uri = Some(uri.to_string()); // change host name case.
915
            }
916
        }
917
18
    }
918
27
    if let Some(host_uri) = body.host_uri.as_ref() {
919
9
        updates.host_uri = Some(host_uri.as_str());
920
9
        count += 1;
921
18
    }
922
27
    if let Some(name) = body.name.as_ref() {
923
18
        updates.name = Some(name.as_str());
924
18
        count += 1;
925
18
    }
926
27
    if let Some(info) = body.info.as_ref() {
927
21
        for (k, _) in info.iter() {
928
12
            if k.len() == 0 {
929
3
                return Err(ErrResp::ErrParam(Some(
930
3
                    "`info` key must not be empty".to_string(),
931
3
                )));
932
9
            }
933
        }
934
18
        updates.info = Some(info);
935
18
        count += 1;
936
6
    }
937

            
938
24
    if count == 0 {
939
6
        return Err(ErrResp::ErrParam(Some(
940
6
            "at least one parameter".to_string(),
941
6
        )));
942
18
    }
943
18
    updates.modified_at = Some(Utc::now());
944
18
    Ok(updates)
945
36
}
946

            
947
/// To check if the network code is used by the unit.
948
///
949
/// # Errors
950
///
951
/// Returns OK if the code is found or not. Otherwise errors will be returned.
952
42
async fn check_code(
953
42
    fn_name: &str,
954
42
    unit_id: Option<&String>,
955
42
    code: &str,
956
42
    state: &AppState,
957
42
) -> Result<bool, ErrResp> {
958
42
    let cond = QueryCond {
959
42
        unit_id: match unit_id {
960
18
            None => Some(None),
961
24
            Some(unit_id) => Some(Some(unit_id.as_str())),
962
        },
963
42
        code: Some(code),
964
42
        ..Default::default()
965
42
    };
966
83
    match state.model.network().get(&cond).await {
967
        Err(e) => {
968
            error!("[{}] check code error: {}", fn_name, e);
969
            return Err(ErrResp::ErrDb(Some(format!("check code error: {}", e))));
970
        }
971
42
        Ok(network) => match network {
972
36
            None => Ok(false),
973
6
            Some(_) => Ok(true),
974
        },
975
    }
976
42
}
977

            
978
84
fn network_list_transform(list: &Vec<Network>) -> Vec<response::GetNetworkData> {
979
84
    let mut ret = vec![];
980
318
    for network in list.iter() {
981
318
        ret.push(network_transform(&network));
982
318
    }
983
84
    ret
984
84
}
985

            
986
75
fn network_list_transform_bytes(
987
75
    list: &Vec<Network>,
988
75
    with_start: bool,
989
75
    with_end: bool,
990
75
    format: Option<&request::ListFormat>,
991
75
) -> Result<Bytes, Box<dyn StdError + Send + Sync>> {
992
75
    let mut build_str = match with_start {
993
45
        false => "".to_string(),
994
3
        true => match format {
995
3
            Some(request::ListFormat::Array) => "[".to_string(),
996
27
            _ => "{\"data\":[".to_string(),
997
        },
998
    };
999
75
    let mut is_first = with_start;
4920
    for item in list {
4845
        if is_first {
30
            is_first = false;
4815
        } else {
4815
            build_str.push(',');
4815
        }
4845
        let json_str = match serde_json::to_string(&network_transform(item)) {
            Err(e) => return Err(Box::new(e)),
4845
            Ok(str) => str,
4845
        };
4845
        build_str += json_str.as_str();
    }
75
    if with_end {
30
        build_str += match format {
3
            Some(request::ListFormat::Array) => "]",
27
            _ => "]}",
        }
45
    }
75
    Ok(Bytes::copy_from_slice(build_str.as_str().as_bytes()))
75
}
5184
fn network_transform(network: &Network) -> response::GetNetworkData {
5184
    response::GetNetworkData {
5184
        network_id: network.network_id.clone(),
5184
        code: network.code.clone(),
5184
        unit_id: network.unit_id.clone(),
5184
        unit_code: network.unit_code.clone(),
5184
        created_at: time_str(&network.created_at),
5184
        modified_at: time_str(&network.modified_at),
5184
        host_uri: network.host_uri.clone(),
5184
        name: network.name.clone(),
5184
        info: network.info.clone(),
5184
    }
5184
}
18
async fn del_network_rsc(fn_name: &str, network_id: &str, state: &AppState) -> Result<(), ErrResp> {
18
    let cond = network_route::QueryCond {
18
        network_id: Some(network_id),
18
        ..Default::default()
18
    };
36
    if let Err(e) = state.model.network_route().del(&cond).await {
        error!("[{}] del network_route error: {}", fn_name, e);
        return Err(ErrResp::ErrDb(Some(e.to_string())));
18
    }
18

            
18
    let cond = device_route::QueryCond {
18
        network_id: Some(network_id),
18
        ..Default::default()
18
    };
35
    if let Err(e) = state.model.device_route().del(&cond).await {
        error!("[{}] del device_route error: {}", fn_name, e);
        return Err(ErrResp::ErrDb(Some(e.to_string())));
18
    }
18

            
18
    let cond = dldata_buffer::QueryCond {
18
        network_id: Some(network_id),
18
        ..Default::default()
18
    };
35
    if let Err(e) = state.model.dldata_buffer().del(&cond).await {
        error!("[{}] del dldata_buffer error: {}", fn_name, e);
        return Err(ErrResp::ErrDb(Some(e.to_string())));
18
    }
18

            
18
    let cond = device::QueryCond {
18
        network_id: Some(network_id),
18
        ..Default::default()
18
    };
35
    if let Err(e) = state.model.device().del(&cond).await {
        error!("[{}] del device error: {}", fn_name, e);
        return Err(ErrResp::ErrDb(Some(e.to_string())));
18
    }
18

            
18
    let cond = QueryCond {
18
        network_id: Some(network_id),
18
        ..Default::default()
18
    };
36
    if let Err(e) = state.model.network().del(&cond).await {
        error!("[{}] del network error: {}", fn_name, e);
        return Err(ErrResp::ErrDb(Some(e.to_string())));
18
    }
18

            
18
    Ok(())
18
}
/// Send delete control message.
18
async fn send_del_ctrl_message(
18
    fn_name: &str,
18
    network: Network,
18
    state: &AppState,
18
) -> Result<(), ErrResp> {
18
    if state.cache.is_some() {
6
        let msg = SendCtrlMsg::DelNetwork {
6
            operation: CtrlMsgOp::DEL_NETWORK.to_string(),
6
            new: CtrlDelNetwork {
6
                unit_id: network.unit_id,
6
                unit_code: network.unit_code,
6
                network_id: network.network_id,
6
                network_code: network.code,
6
            },
6
        };
6
        let payload = match serde_json::to_vec(&msg) {
            Err(e) => {
                error!(
                    "[{}] marshal JSON for {} error: {}",
                    fn_name,
                    CtrlMsgOp::DEL_NETWORK,
                    e
                );
                return Err(ErrResp::ErrRsc(Some(format!(
                    "marshal control message error: {}",
                    e
                ))));
            }
6
            Ok(payload) => payload,
6
        };
6
        let ctrl_sender = { state.ctrl_senders.network.lock().unwrap().clone() };
6
        if let Err(e) = ctrl_sender.send_msg(payload).await {
            error!(
                "[{}] send control message for {} error: {}",
                fn_name,
                CtrlMsgOp::DEL_NETWORK,
                e
            );
            return Err(ErrResp::ErrIntMsg(Some(format!(
                "send control message error: {}",
                e
            ))));
6
        }
12
    }
18
    Ok(())
18
}
/// To create a manager by:
/// - get a connection from the pool.
/// - register manager handlers.
45
async fn add_manager(
45
    fn_name: &str,
45
    state: &AppState,
45
    host_uri: &Url,
45
    unit_id: &str,
45
    unit_code: &str,
45
    id: &str,
45
    name: &str,
45
) -> Result<(), ErrResp> {
45
    let opts = MgrOptions {
45
        unit_id: unit_id.to_string(),
45
        unit_code: unit_code.to_string(),
45
        id: id.to_string(),
45
        name: name.to_string(),
45
        prefetch: Some(state.amqp_prefetch),
45
        persistent: state.amqp_persistent,
45
        shared_prefix: Some(state.mqtt_shared_prefix.clone()),
45
    };
45
    let msg = SendCtrlMsg::AddManager {
45
        operation: CtrlMsgOp::ADD_MANAGER.to_string(),
45
        new: CtrlAddManager {
45
            host_uri: host_uri.to_string(),
45
            mgr_options: opts,
45
        },
45
    };
45
    let payload = match serde_json::to_vec(&msg) {
        Err(e) => {
            error!("[{}] marshal JSON for {} error: {}", fn_name, name, e);
            return Err(ErrResp::ErrRsc(Some(format!("new manager error: {}", e))));
        }
45
        Ok(payload) => payload,
45
    };
45
    let ctrl_sender = { state.ctrl_senders.network.lock().unwrap().clone() };
45
    if let Err(e) = ctrl_sender.send_msg(payload).await {
        error!(
            "[{}] send control message for {} error: {}",
            fn_name, name, e
        );
        return Err(ErrResp::ErrIntMsg(Some(format!(
            "new manager error: {}",
            e
        ))));
45
    }
45
    Ok(())
45
}
/// To delete a network manager.
27
async fn delete_manager(fn_name: &str, state: &AppState, network: &Network) -> Result<(), ErrResp> {
27
    let key = match network.unit_code.as_ref() {
6
        None => gen_mgr_key("", network.code.as_str()),
21
        Some(unit_code) => gen_mgr_key(unit_code.as_str(), network.code.as_str()),
    };
27
    let msg = SendCtrlMsg::DelManager {
27
        operation: CtrlMsgOp::DEL_MANAGER.to_string(),
27
        new: key.clone(),
27
    };
27
    let payload = match serde_json::to_vec(&msg) {
        Err(e) => {
            error!("[{}] marshal JSON for {} error: {}", fn_name, key, e);
            return Err(ErrResp::ErrRsc(Some(format!(
                "delete manager error: {}",
                e
            ))));
        }
27
        Ok(payload) => payload,
27
    };
27
    let ctrl_sender = { state.ctrl_senders.network.lock().unwrap().clone() };
27
    if let Err(e) = ctrl_sender.send_msg(payload).await {
        error!(
            "[{}] send control message for {} error: {}",
            fn_name, key, e
        );
        return Err(ErrResp::ErrIntMsg(Some(format!(
            "delete manager error: {}",
            e
        ))));
27
    }
27
    Ok(())
27
}
impl MgrHandler {
    /// To validate the device and return device ID and profile.
63
    async fn validate_device(
63
        &self,
63
        mgr: &NetworkMgr,
63
        data: &Box<UlData>,
63
    ) -> Result<Option<DeviceCacheItem>, Box<dyn StdError>> {
        const FN_NAME: &'static str = "validate_device";
63
        let dev_cond = device::QueryOneCond {
63
            unit_code: match mgr.unit_code().len() {
12
                0 => None,
51
                _ => Some(mgr.unit_code()),
            },
63
            network_code: mgr.name(),
63
            network_addr: data.network_addr.as_str(),
63
        };
63
        match self.cache.as_ref() {
            None => {
42
                let cond = device::QueryCond {
42
                    device: Some(dev_cond.clone()),
42
                    ..Default::default()
42
                };
84
                match self.model.device().get(&cond).await {
                    Err(e) => {
                        error!("[{}] get device with error: {}", FN_NAME, e);
                        return Err(e);
                    }
42
                    Ok(device) => match device {
                        None => {
4
                            warn!(
                                "[{}] no device for {:?}.{}.{}",
                                FN_NAME,
                                dev_cond.unit_code,
                                dev_cond.network_code,
                                dev_cond.network_addr
                            );
4
                            Ok(None)
                        }
38
                        Some(device) => Ok(Some(DeviceCacheItem {
38
                            device_id: device.device_id,
38
                            profile: device.profile,
38
                        })),
                    },
                }
            }
21
            Some(cache) => {
21
                let cond = device::GetCacheQueryCond::CodeAddr(dev_cond.clone());
30
                match cache.device().get(&cond).await {
                    Err(e) => {
                        error!("[{}] get device cache with error: {}", FN_NAME, e);
                        Err(e)
                    }
21
                    Ok(device) => match device {
                        None => {
2
                            warn!(
                                "[{}] no device cache for {:?}.{}.{}",
                                FN_NAME,
                                dev_cond.unit_code,
                                dev_cond.network_code,
                                dev_cond.network_addr
                            );
2
                            Ok(None)
                        }
19
                        Some(device) => Ok(Some(device)),
                    },
                }
            }
        }
63
    }
57
    async fn send_by_device_route(
57
        &self,
57
        netmgr_unit_code: Option<String>,
57
        app_data: &mut ApplicationUlData,
57
        sent_mgrs: &mut Vec<String>,
57
    ) -> Result<(), ()> {
        const FN_NAME: &'static str = "send_by_device_route";
57
        if let Some(cache) = self.cache.as_ref() {
19
            let route = match cache
19
                .device_route()
19
                .get_uldata(&app_data.device_id.as_str())
30
                .await
            {
                Err(e) => {
                    error!("[{}] get device route error: {}", FN_NAME, e);
                    return Err(());
                }
19
                Ok(route) => match route {
8
                    None => return Ok(()),
11
                    Some(route) => route,
                },
            };
11
            for key in route.app_mgr_keys.iter() {
11
                if sent_mgrs.contains(key) {
                    continue;
11
                }
11
                let mgr = {
11
                    match self.application_mgrs.lock().unwrap().get(key) {
                        None => continue,
11
                        Some(mgr) => mgr.clone(),
11
                    }
11
                };
11
                let now = Utc::now();
11
                app_data.data_id = strings::random_id(&now, DATA_ID_RAND_LEN);
11
                if let Err(e) = mgr.send_uldata(&app_data) {
                    // TODO: retry internally because one or more routes may sent successfully.
                    error!("[{}] send data to {} error: {}", FN_NAME, key, e);
                    continue;
11
                }
11
                self.send_application_uldata_msg(
11
                    &now,
11
                    netmgr_unit_code.clone(),
11
                    mgr.unit_id().to_string(),
11
                    &app_data,
11
                )
                .await?;
11
                sent_mgrs.push(key.clone());
            }
11
            return Ok(());
38
        }
38

            
38
        let cond = device_route::ListQueryCond {
38
            device_id: Some(app_data.device_id.as_str()),
38
            ..Default::default()
38
        };
38
        let opts = device_route::ListOptions {
38
            cond: &cond,
38
            offset: None,
38
            limit: None,
38
            sort: None,
38
            cursor_max: Some(LIST_CURSOR_MAX),
38
        };
38
        let mut cursor: Option<Box<dyn device_route::Cursor>> = None;
        loop {
76
            let (list, _cursor) = match self.model.device_route().list(&opts, cursor).await {
                Err(e) => {
                    error!("[{}] get device route error: {}", FN_NAME, e);
                    return Err(());
                }
38
                Ok((list, cursor)) => (list, cursor),
            };
38
            for route in list.iter() {
22
                let key = gen_mgr_key(route.unit_code.as_str(), route.application_code.as_str());
22
                if sent_mgrs.contains(&key) {
                    continue;
22
                }
22
                let mgr = {
22
                    match self.application_mgrs.lock().unwrap().get(&key) {
                        None => continue,
22
                        Some(mgr) => mgr.clone(),
22
                    }
22
                };
22
                let now = Utc::now();
22
                app_data.data_id = strings::random_id(&now, DATA_ID_RAND_LEN);
22
                if let Err(e) = mgr.send_uldata(&app_data) {
                    // TODO: retry internally because one or more routes may sent successfully.
                    error!("[{}] send data to {} error: {}", FN_NAME, key, e);
                    continue;
22
                }
22
                self.send_application_uldata_msg(
22
                    &now,
22
                    netmgr_unit_code.clone(),
22
                    mgr.unit_id().to_string(),
22
                    &app_data,
22
                )
                .await?;
22
                sent_mgrs.push(key);
            }
38
            if _cursor.is_none() {
38
                break;
            }
            cursor = _cursor;
        }
38
        Ok(())
57
    }
57
    async fn send_by_network_route(
57
        &self,
57
        netmgr_unit_id: String,
57
        netmgr_unit_code: Option<String>,
57
        app_data: &mut ApplicationUlData,
57
        sent_mgrs: &mut Vec<String>,
57
    ) -> Result<(), ()> {
        const FN_NAME: &'static str = "send_by_network_route";
57
        if let Some(cache) = self.cache.as_ref() {
19
            let route = match cache
19
                .network_route()
19
                .get_uldata(&app_data.network_id.as_str())
9
                .await
            {
                Err(e) => {
                    error!("[{}] get network route error: {}", FN_NAME, e);
                    return Err(());
                }
19
                Ok(route) => match route {
18
                    None => return Ok(()),
1
                    Some(route) => route,
                },
            };
1
            for key in route.app_mgr_keys.iter() {
1
                if sent_mgrs.contains(key) {
                    continue;
1
                }
1
                let mgr = {
1
                    match self.application_mgrs.lock().unwrap().get(key) {
                        None => continue,
1
                        Some(mgr) => mgr.clone(),
1
                    }
1
                };
1
                let now = Utc::now();
1
                app_data.data_id = strings::random_id(&now, DATA_ID_RAND_LEN);
1
                if let Err(e) = mgr.send_uldata(&app_data) {
                    // TODO: retry internally because one or more routes may sent successfully.
                    error!("[{}] send data to {} error: {}", FN_NAME, key, e);
                    continue;
1
                }
1
                self.send_application_uldata_msg(
1
                    &now,
1
                    netmgr_unit_code.clone(),
1
                    mgr.unit_id().to_string(),
1
                    &app_data,
1
                )
                .await?;
1
                sent_mgrs.push(key.clone());
            }
1
            return Ok(());
38
        }
38

            
38
        let cond = network_route::ListQueryCond {
38
            network_id: Some(netmgr_unit_id.as_str()),
38
            ..Default::default()
38
        };
38
        let opts = network_route::ListOptions {
38
            cond: &cond,
38
            offset: None,
38
            limit: None,
38
            sort: None,
38
            cursor_max: Some(LIST_CURSOR_MAX),
38
        };
38
        let mut cursor: Option<Box<dyn network_route::Cursor>> = None;
        loop {
76
            let (list, _cursor) = match self.model.network_route().list(&opts, cursor).await {
                Err(e) => {
                    error!("[{}] get network route error: {}", FN_NAME, e);
                    return Err(());
                }
38
                Ok((list, cursor)) => (list, cursor),
            };
38
            for route in list.iter() {
2
                let key = gen_mgr_key(route.unit_code.as_str(), route.application_code.as_str());
2
                if sent_mgrs.contains(&key) {
                    continue;
2
                }
2
                let mgr = {
2
                    match self.application_mgrs.lock().unwrap().get(&key) {
                        None => continue,
2
                        Some(mgr) => mgr.clone(),
2
                    }
2
                };
2
                let now = Utc::now();
2
                app_data.data_id = strings::random_id(&now, DATA_ID_RAND_LEN);
2
                if let Err(e) = mgr.send_uldata(&app_data) {
                    // TODO: retry internally because one or more routes may sent successfully.
                    error!("[{}] send data to {} error: {}", FN_NAME, key, e);
                    continue;
2
                }
2
                self.send_application_uldata_msg(
2
                    &now,
2
                    netmgr_unit_code.clone(),
2
                    mgr.unit_id().to_string(),
2
                    &app_data,
2
                )
                .await?;
2
                sent_mgrs.push(key);
            }
38
            if _cursor.is_none() {
38
                break;
            }
            cursor = _cursor;
        }
38
        Ok(())
57
    }
36
    async fn send_application_uldata_msg(
36
        &self,
36
        proc: &DateTime<Utc>,
36
        netmgr_unit_code: Option<String>,
36
        app_unit_id: String,
36
        app_data: &ApplicationUlData,
36
    ) -> Result<(), ()> {
        const FN_NAME: &'static str = "send_application_uldata_msg";
36
        if let Some(sender) = self.data_sender.as_ref() {
6
            let msg = SendDataMsg {
6
                kind: DataMsgKind::APP_ULDATA.to_string(),
6
                data: SendDataKind::AppUlData {
6
                    data_id: app_data.data_id.clone(),
6
                    proc: time_str(proc),
6
                    publish: app_data.publish.clone(),
6
                    unit_code: netmgr_unit_code,
6
                    network_code: app_data.network_code.clone(),
6
                    network_addr: app_data.network_addr.clone(),
6
                    unit_id: app_unit_id,
6
                    device_id: app_data.device_id.clone(),
6
                    time: app_data.time.clone(),
6
                    profile: app_data.profile.clone(),
6
                    data: app_data.data.clone(),
6
                    extension: app_data.extension.clone(),
6
                },
6
            };
6
            let payload = match serde_json::to_vec(&msg) {
                Err(e) => {
                    error!("[{}] marshal JSON error: {}", FN_NAME, e);
                    return Err(());
                }
6
                Ok(payload) => payload,
            };
6
            if let Err(e) = sender.send_msg(payload).await {
                error!("[{}] send data to {} error: {}", FN_NAME, sender.name(), e);
                return Err(());
6
            }
30
        }
36
        Ok(())
36
    }
18
    async fn send_application_dldata_result_msg(
18
        &self,
18
        proc: &DateTime<Utc>,
18
        data: &Box<DlDataResult>,
18
    ) -> Result<(), ()> {
        const FN_NAME: &'static str = "send_application_dldata_result_msg";
18
        if let Some(sender) = self.data_sender.as_ref() {
6
            let msg = SendDataMsg {
6
                kind: DataMsgKind::APP_DLDATA_RES.to_string(),
6
                data: SendDataKind::AppDlDataResult {
6
                    data_id: data.data_id.clone(),
6
                    resp: time_str(proc),
6
                    status: data.status,
6
                },
6
            };
6
            let payload = match serde_json::to_vec(&msg) {
                Err(e) => {
                    error!("[{}] marshal JSON error: {}", FN_NAME, e);
                    return Err(());
                }
6
                Ok(payload) => payload,
            };
6
            if let Err(e) = sender.send_msg(payload).await {
                error!("[{}] send data to {} error: {}", FN_NAME, sender.name(), e);
                return Err(());
6
            }
12
        }
18
        Ok(())
18
    }
63
    async fn send_network_uldata_msg(
63
        &self,
63
        mgr: &NetworkMgr,
63
        proc: &DateTime<Utc>,
63
        data: &Box<UlData>,
63
        device: Option<&DeviceCacheItem>,
63
    ) -> Result<(), ()> {
        const FN_NAME: &'static str = "send_network_uldata_msg";
63
        if let Some(sender) = self.data_sender.as_ref() {
12
            let msg = SendDataMsg {
12
                kind: DataMsgKind::NET_ULDATA.to_string(),
12
                data: SendDataKind::NetUlData {
12
                    data_id: strings::random_id(&proc, DATA_ID_RAND_LEN),
12
                    proc: time_str(proc),
12
                    unit_code: match mgr.unit_code().len() {
6
                        0 => None,
6
                        _ => Some(mgr.unit_code().to_string()),
                    },
12
                    network_code: mgr.name().to_string(),
12
                    network_addr: data.network_addr.clone(),
12
                    unit_id: match mgr.unit_id().len() {
6
                        0 => None,
6
                        _ => Some(mgr.unit_id().to_string()),
                    },
12
                    device_id: match device {
                        None => None,
12
                        Some(device) => Some(device.device_id.clone()),
                    },
12
                    profile: match device {
                        None => "".to_string(),
12
                        Some(device) => device.profile.clone(),
                    },
12
                    time: data.time.clone(),
12
                    data: data.data.clone(),
12
                    extension: data.extension.clone(),
                },
            };
12
            let payload = match serde_json::to_vec(&msg) {
                Err(e) => {
                    error!("[{}] marshal JSON error: {}", FN_NAME, e);
                    return Err(());
                }
12
                Ok(payload) => payload,
            };
12
            if let Err(e) = sender.send_msg(payload).await {
                error!("[{}] send data to {} error: {}", FN_NAME, sender.name(), e);
                return Err(());
12
            }
51
        }
63
        Ok(())
63
    }
18
    async fn send_network_dldata_result_msg(
18
        &self,
18
        proc: &DateTime<Utc>,
18
        data: &Box<DlDataResult>,
18
    ) -> Result<(), ()> {
        const FN_NAME: &'static str = "send_network_dldata_result_msg";
18
        if let Some(sender) = self.data_sender.as_ref() {
6
            let msg = SendDataMsg {
6
                kind: DataMsgKind::NET_DLDATA_RES.to_string(),
6
                data: SendDataKind::NetDlDataResult {
6
                    data_id: data.data_id.clone(),
6
                    resp: time_str(proc),
6
                    status: data.status,
6
                },
6
            };
6
            let payload = match serde_json::to_vec(&msg) {
                Err(e) => {
                    error!("[{}] marshal JSON error: {}", FN_NAME, e);
                    return Err(());
                }
6
                Ok(payload) => payload,
            };
6
            if let Err(e) = sender.send_msg(payload).await {
                error!("[{}] send data to {} error: {}", FN_NAME, sender.name(), e);
                return Err(());
6
            }
12
        }
18
        Ok(())
18
    }
}
#[async_trait]
impl EventHandler for MgrHandler {
42
    async fn on_status_change(&self, mgr: &NetworkMgr, status: MgrStatus) {
        // Clear cache when manager status changed.
42
        if let Some(cache) = self.cache.as_ref() {
14
            if let Err(e) = cache.device().clear().await {
                error!(
                    "[on_status_change] {}.{} clear device cache error: {}",
                    mgr.unit_code(),
                    mgr.name(),
                    e
                );
14
            }
14
            if let Err(e) = cache.device_route().clear().await {
                error!(
                    "[on_status_change] {}.{} clear device_route cache error: {}",
                    mgr.unit_code(),
                    mgr.name(),
                    e
                );
14
            }
14
            if let Err(e) = cache.network_route().clear().await {
                error!(
                    "[on_status_change] {}.{} clear network_route cache error: {}",
                    mgr.unit_code(),
                    mgr.name(),
                    e
                );
14
            }
28
        }
42
        match status {
            MgrStatus::NotReady => {
21
                error!(
                    "[on_status_change] {}.{} to NotReady",
                    mgr.unit_code(),
                    mgr.name()
                );
            }
            MgrStatus::Ready => {
21
                info!(
                    "[on_status_change] {}.{} to Ready",
                    mgr.unit_code(),
                    mgr.name()
                );
            }
        }
84
    }
    // Do the following jobs:
    // - check if the source device is valid for the unit.
    // - lookup device route to send the data.
    // - lookup network route to send the data.
    // The manager does not send duplicate data to one application if the data matches both routes.
63
    async fn on_uldata(&self, mgr: &NetworkMgr, data: Box<UlData>) -> Result<(), ()> {
63
        let proc = Utc::now();
        // To validate the device and get the device ID for generating data for applications.
114
        let device = match self.validate_device(mgr, &data).await {
            Err(_) => return Err(()),
63
            Ok(device) => device,
63
        };
63
        self.send_network_uldata_msg(mgr, &proc, &data, device.as_ref())
            .await?;
63
        let device = match device {
6
            None => return Ok(()),
57
            Some(device) => device,
57
        };
57

            
57
        let mut app_data = {
57
            let now = Utc::now();
57
            ApplicationUlData {
57
                data_id: strings::random_id(&proc, DATA_ID_RAND_LEN),
57
                time: data.time,
57
                publish: time_str(&now),
57
                device_id: device.device_id,
57
                network_id: mgr.id().to_string(),
57
                network_code: mgr.name().to_string(),
57
                network_addr: data.network_addr,
57
                is_public: mgr.unit_id().len() > 0,
57
                profile: device.profile,
57
                data: data.data,
57
                extension: data.extension,
57
            }
57
        };
57

            
57
        let mut sent_mgrs = vec![];
57
        let unit_code = match mgr.unit_code().len() {
12
            0 => None,
45
            _ => Some(mgr.unit_code().to_string()),
        };
        // Get device routes to pass data.
57
        self.send_by_device_route(unit_code.clone(), &mut app_data, &mut sent_mgrs)
106
            .await?;
        // Get network routes to pass data.
57
        self.send_by_network_route(
57
            mgr.id().to_string(),
57
            unit_code,
57
            &mut app_data,
57
            &mut sent_mgrs,
57
        )
85
        .await
126
    }
    // Do the following jobs:
    // - check if the associated dldata buffer exists.
    // - send the result to the source application.
18
    async fn on_dldata_result(&self, _mgr: &NetworkMgr, data: Box<DlDataResult>) -> Result<(), ()> {
        const FN_NAME: &'static str = "on_dldata_result";
18
        let now = Utc::now();
18
        self.send_network_dldata_result_msg(&now, &data).await?;
36
        let dldata = match self.model.dldata_buffer().get(data.data_id.as_str()).await {
            Err(e) => {
                error!(
                    "[{}] get dldata buffer for {} error: {}",
                    FN_NAME, data.data_id, e
                );
                return Err(());
            }
18
            Ok(dldata) => match dldata {
                None => {
                    warn!("[{}] no data ID {}", FN_NAME, data.data_id);
                    return Ok(());
                }
18
                Some(dldata) => dldata,
18
            },
18
        };
18

            
18
        let key = gen_mgr_key(dldata.unit_code.as_str(), dldata.application_code.as_str());
18
        let mgr = {
18
            match self.application_mgrs.lock().unwrap().get(&key) {
                None => None,
18
                Some(mgr) => Some(mgr.clone()),
            }
        };
18
        if let Some(mgr) = mgr {
18
            let result_data = ApplicationDlDataResult {
18
                data_id: dldata.data_id,
18
                status: data.status,
18
                message: data.message.clone(),
18
            };
18
            if let Err(e) = mgr.send_dldata_result(&result_data).await {
                error!("[{}] send data to {} error: {}", FN_NAME, key, e);
                return Err(());
18
            }
18
            self.send_application_dldata_result_msg(&now, &data).await?;
        }
18
        if data.status < 0 {
6
            return Ok(());
12
        }
12

            
12
        // Remove dldata buffer after completing data processing.
12
        let cond = dldata_buffer::QueryCond {
12
            data_id: Some(data.data_id.as_str()),
12
            ..Default::default()
12
        };
22
        if let Err(e) = self.model.dldata_buffer().del(&cond).await {
            // TODO: retry delete internally.
            error!("[{}] delete dldata {} error: {}", FN_NAME, data.data_id, e);
            return Err(());
12
        }
12
        Ok(())
36
    }
}
/// Clear the network relative cache.
12
async fn clear_cache(fn_name: &str, queue_name: &str, cache: &Arc<dyn Cache>) {
12
    if let Err(e) = cache.device().clear().await {
        error!(
            "[{}] {} clear device cache error: {}",
            fn_name, queue_name, e
        );
12
    }
12
    if let Err(e) = cache.network_route().clear().await {
        error!(
            "[{}] {} clear network route cache error: {}",
            fn_name, queue_name, e
        );
12
    }
12
}
#[async_trait]
impl QueueEventHandler for CtrlSenderHandler {
    async fn on_error(&self, queue: Arc<dyn GmqQueue>, err: Box<dyn StdError + Send + Sync>) {
        const FN_NAME: &'static str = "CtrlSenderHandler::on_error";
        let queue_name = queue.name();
        // Clear cache to avoid missing update cache content during queue status changing.
        if let Some(cache) = self.cache.as_ref() {
            clear_cache(FN_NAME, queue_name, cache).await;
        }
        error!("[{}] {} error: {}", FN_NAME, queue_name, err);
    }
30
    async fn on_status(&self, queue: Arc<dyn GmqQueue>, status: Status) {
        const FN_NAME: &'static str = "CtrlSenderHandler::on_status";
30
        let queue_name = queue.name();
        // Clear cache to avoid missing update cache content during queue status changing.
30
        if let Some(cache) = self.cache.as_ref() {
6
            clear_cache(FN_NAME, queue_name, cache).await;
24
        }
30
        match status {
15
            Status::Connected => info!("[{}] {} connected", queue_name, FN_NAME),
15
            _ => warn!("[{}] {} status to {:?}", FN_NAME, queue_name, status),
        }
60
    }
}
#[async_trait]
impl MessageHandler for CtrlSenderHandler {
    async fn on_message(&self, _queue: Arc<dyn GmqQueue>, _msg: Box<dyn Message>) {}
}
#[async_trait]
impl QueueEventHandler for CtrlReceiverHandler {
    async fn on_error(&self, queue: Arc<dyn GmqQueue>, err: Box<dyn StdError + Send + Sync>) {
        const FN_NAME: &'static str = "CtrlReceiverHandler::on_error";
        let queue_name = queue.name();
        // Clear cache to avoid missing update cache content during queue status changing.
        if let Some(cache) = self.cache.as_ref() {
            clear_cache(FN_NAME, queue_name, cache).await;
        }
        error!("[{}] {} error: {}", FN_NAME, queue_name, err);
    }
30
    async fn on_status(&self, queue: Arc<dyn GmqQueue>, status: Status) {
        const FN_NAME: &'static str = "CtrlReceiverHandler::on_status";
30
        let queue_name = queue.name();
        // Clear cache to avoid missing update cache content during queue status changing.
30
        if let Some(cache) = self.cache.as_ref() {
6
            clear_cache(FN_NAME, queue_name, cache).await;
24
        }
30
        match status {
15
            Status::Connected => info!("[{}] {} connected", queue_name, FN_NAME),
15
            _ => warn!("[{}] {} status to {:?}", FN_NAME, queue_name, status),
        }
60
    }
}
#[async_trait]
impl MessageHandler for CtrlReceiverHandler {
93
    async fn on_message(&self, queue: Arc<dyn GmqQueue>, msg: Box<dyn Message>) {
        const FN_NAME: &'static str = "CtrlReceiverHandler::on_message";
93
        let queue_name = queue.name();
93
        let ctrl_msg = match serde_json::from_slice::<RecvCtrlMsg>(msg.payload()) {
9
            Err(e) => {
9
                let src_str: String = String::from_utf8_lossy(msg.payload()).into();
9
                warn!(
                    "[{}] {} parse JSON error: {}, src: {}",
                    FN_NAME, queue_name, e, src_str
                );
9
                if let Err(e) = msg.ack().await {
                    error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
9
                }
9
                return;
            }
84
            Ok(msg) => msg,
84
        };
84
        match ctrl_msg {
6
            RecvCtrlMsg::DelNetwork { new } => {
6
                if let Some(cache) = self.cache.as_ref() {
6
                    let cond = device::DelCacheQueryCond {
6
                        unit_code: match new.unit_code.as_ref() {
1
                            None => "",
5
                            Some(code) => code.as_str(),
                        },
6
                        network_code: Some(new.network_code.as_str()),
6
                        network_addr: None,
                    };
6
                    if let Err(e) = cache.device().del(&cond).await {
                        error!(
                            "[{}] {} delete device cache error: {}",
                            FN_NAME, queue_name, e
                        );
                    } else {
6
                        debug!("[{}] {} delete device cache", FN_NAME, queue_name);
                    }
6
                    if let Err(e) = cache
6
                        .network_route()
6
                        .del_uldata(new.network_id.as_str())
                        .await
                    {
                        error!(
                            "[{}] {} delete network route cache error: {}",
                            FN_NAME, queue_name, e
                        );
                    } else {
6
                        debug!("[{}] {} delete network route cache", FN_NAME, queue_name);
                    }
                }
6
                if let Err(e) = msg.ack().await {
                    error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
6
                }
            }
51
            RecvCtrlMsg::AddManager { new } => {
51
                let host_uri = match Url::parse(new.host_uri.as_str()) {
3
                    Err(e) => {
3
                        warn!("[{}] {} hostUri error: {}", FN_NAME, queue_name, e);
3
                        if let Err(e) = msg.ack().await {
                            error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
3
                        }
3
                        return;
                    }
48
                    Ok(uri) => uri,
48
                };
48
                let handler = MgrHandler {
48
                    model: self.model.clone(),
48
                    cache: self.cache.clone(),
48
                    application_mgrs: self.application_mgrs.clone(),
48
                    data_sender: self.data_sender.clone(),
48
                };
48
                let unit_code = new.mgr_options.unit_code.clone();
48
                let name = new.mgr_options.name.clone();
48
                let mgr = match NetworkMgr::new(
48
                    self.mq_conns.clone(),
48
                    &host_uri,
48
                    new.mgr_options,
48
                    Arc::new(handler),
48
                ) {
3
                    Err(e) => {
3
                        error!("[{}] {} new manager error: {}", FN_NAME, queue_name, e);
3
                        if let Err(e) = msg.ack().await {
                            error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
3
                        }
3
                        return;
                    }
45
                    Ok(mgr) => {
45
                        debug!("[{}] {} new manager", FN_NAME, queue_name);
45
                        mgr
45
                    }
45
                };
45
                let key = gen_mgr_key(unit_code.as_str(), name.as_str());
45
                let old_mgr = { self.network_mgrs.lock().unwrap().insert(key.clone(), mgr) };
45
                if let Some(mgr) = old_mgr {
3
                    if let Err(e) = mgr.close().await {
                        error!(
                            "[{}] {} close old manager {} error: {}",
                            FN_NAME, queue_name, key, e
                        );
                    } else {
3
                        debug!("[{}] {} close old manager {}", FN_NAME, queue_name, key);
                    }
42
                }
45
                if let Err(e) = msg.ack().await {
                    error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
45
                }
45
                info!("[{}] {} manager {} added", FN_NAME, queue_name, key);
            }
27
            RecvCtrlMsg::DelManager { new } => {
27
                let old_mgr = { self.network_mgrs.lock().unwrap().remove(&new) };
27
                match old_mgr {
                    None => {
27
                        error!("[{}] {} get no manager {}", FN_NAME, queue_name, new);
27
                        if let Err(e) = msg.ack().await {
                            error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
27
                        }
27
                        return;
                    }
                    Some(mgr) => {
                        if let Err(e) = mgr.close().await {
                            error!(
                                "[{}] {} close old manager {} error: {}",
                                FN_NAME, queue_name, new, e
                            );
                        } else {
                            debug!("[{}] {} close old manager {}", FN_NAME, queue_name, new);
                        }
                    }
                }
                if let Err(e) = msg.ack().await {
                    error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
                }
                info!("[{}] {} manager {} deleted", FN_NAME, queue_name, new);
            }
        }
186
    }
}