1
use std::error::Error as StdError;
2

            
3
use axum::{
4
    Router,
5
    body::Body,
6
    extract::{Request, State},
7
    http::{HeaderMap, HeaderValue, StatusCode, header},
8
    response::{IntoResponse, Response},
9
    routing,
10
};
11
use base64::{Engine, engine::general_purpose};
12
use bytes::{Bytes, BytesMut};
13
use chrono::Utc;
14
use csv::WriterBuilder;
15
use futures_util::StreamExt;
16
use hex;
17
use log::error;
18
use reqwest;
19
use serde::{Deserialize, Serialize};
20
use serde_json::{Deserializer, Map, Value};
21
use url::Url;
22

            
23
use sylvia_iot_corelib::{
24
    err::ErrResp,
25
    http::{Json, Path},
26
    role::Role,
27
    strings,
28
};
29

            
30
use super::{
31
    super::{AmqpState, ErrReq, MqttState, State as AppState},
32
    ClearQueueResource, CreateQueueResource, ListResp, PatchHost, api_bridge, clear_patch_host,
33
    clear_queue_rsc, cmp_host_uri, create_queue_rsc, get_device_inner, get_stream_resp,
34
    get_tokeninfo_inner, get_unit_inner, list_api_bridge, request, response, transfer_host_uri,
35
    trunc_host_uri,
36
};
37
use crate::libs::mq::{self, QueueType, emqx, rabbitmq};
38

            
39
enum ListFormat {
40
    Array,
41
    Csv,
42
    Data,
43
}
44

            
45
#[derive(Deserialize)]
46
struct NetworkIdPath {
47
    network_id: String,
48
}
49

            
50
#[derive(Deserialize, Serialize)]
51
struct Network {
52
    #[serde(rename = "networkId")]
53
    network_id: String,
54
    code: String,
55
    #[serde(rename = "unitId")]
56
    unit_id: Option<String>,
57
    #[serde(rename = "unitCode")]
58
    unit_code: Option<String>,
59
    #[serde(rename = "createdAt")]
60
    created_at: String,
61
    #[serde(rename = "modifiedAt")]
62
    modified_at: String,
63
    #[serde(rename = "hostUri")]
64
    host_uri: String,
65
    name: String,
66
    info: Map<String, Value>,
67
}
68

            
69
#[derive(Deserialize, Serialize)]
70
struct CsvItem {
71
    #[serde(rename = "networkId")]
72
    network_id: String,
73
    code: String,
74
    #[serde(rename = "unitId")]
75
    unit_id: Option<String>,
76
    #[serde(rename = "unitCode")]
77
    unit_code: Option<String>,
78
    #[serde(rename = "createdAt")]
79
    created_at: String,
80
    #[serde(rename = "modifiedAt")]
81
    modified_at: String,
82
    #[serde(rename = "hostUri")]
83
    host_uri: String,
84
    name: String,
85
    info: Option<String>,
86
}
87

            
88
/// Downlink data from application to broker.
89
#[derive(Default, Serialize)]
90
struct UlData {
91
    time: String,
92
    #[serde(rename = "networkAddr")]
93
    network_addr: String,
94
    data: String,
95
    extension: Option<Map<String, Value>>,
96
}
97

            
98
const CSV_FIELDS: &'static [u8] =
99
    b"\xEF\xBB\xBFnetworkId,code,unitId,createdAt,modifiedAt,hostUri,name,info\n";
100

            
101
506
pub fn new_service(scope_path: &str, state: &AppState) -> Router {
102
506
    Router::new().nest(
103
506
        scope_path,
104
506
        Router::new()
105
506
            .route("/", routing::post(post_network))
106
506
            .route("/count", routing::get(get_network_count))
107
506
            .route("/list", routing::get(get_network_list))
108
506
            .route(
109
506
                "/{network_id}",
110
506
                routing::get(get_network)
111
506
                    .patch(patch_network)
112
506
                    .delete(delete_network),
113
506
            )
114
506
            .route("/{network_id}/stats", routing::get(get_network_stats))
115
506
            .route("/{network_id}/uldata", routing::post(post_network_uldata))
116
506
            .with_state(state.clone()),
117
506
    )
118
506
}
119

            
120
/// `POST /{base}/api/v1/network`
121
92
async fn post_network(
122
92
    State(state): State<AppState>,
123
92
    mut headers: HeaderMap,
124
92
    Json(mut body): Json<request::PostNetworkBody>,
125
92
) -> impl IntoResponse {
126
    const FN_NAME: &'static str = "post_network";
127
92
    let broker_base = state.broker_base.as_str();
128
92
    let api_path = format!("{}/api/v1/network", broker_base);
129
92
    let client = state.client.clone();
130
92
    let token = match headers.get(header::AUTHORIZATION) {
131
        None => {
132
4
            let e = "missing Authorization".to_string();
133
4
            return ErrResp::ErrParam(Some(e)).into_response();
134
        }
135
88
        Some(value) => value.clone(),
136
88
    };
137
88

            
138
88
    if body.data.unit_id.is_none() {
139
16
        let auth_base = state.auth_base.as_str();
140
16
        let token_info = match get_tokeninfo_inner(FN_NAME, &client, auth_base, &token).await {
141
            Err(e) => return e,
142
16
            Ok(info) => info,
143
16
        };
144
16
        if !Role::is_role(&token_info.roles, Role::ADMIN)
145
4
            && !Role::is_role(&token_info.roles, Role::MANAGER)
146
        {
147
4
            let e = "missing `unitId`".to_string();
148
4
            return ErrResp::ErrParam(Some(e)).into_response();
149
12
        }
150
72
    }
151

            
152
    // Get unit information to create queue information.
153
84
    let unit_code = match body.data.unit_id.as_ref() {
154
12
        None => "".to_string(),
155
72
        Some(unit_id) => {
156
72
            if unit_id.len() == 0 {
157
4
                return ErrResp::ErrParam(Some(
158
4
                    "`unitId` must with at least one character".to_string(),
159
4
                ))
160
4
                .into_response();
161
68
            }
162
68
            let unit = match get_unit_inner(FN_NAME, &client, broker_base, unit_id, &token).await {
163
                Err(e) => return e,
164
68
                Ok(unit) => match unit {
165
                    None => {
166
4
                        return ErrResp::Custom(
167
4
                            ErrReq::UNIT_NOT_EXIST.0,
168
4
                            ErrReq::UNIT_NOT_EXIST.1,
169
4
                            None,
170
4
                        )
171
4
                        .into_response();
172
                    }
173
64
                    Some(unit) => unit,
174
64
                },
175
64
            };
176
64
            unit.code
177
        }
178
    };
179
76
    let code = body.data.code.as_str();
180
76
    if !strings::is_code(code) {
181
4
        return ErrResp::ErrParam(Some(
182
4
            "`code` must be [A-Za-z0-9]{1}[A-Za-z0-9-_]*".to_string(),
183
4
        ))
184
4
        .into_response();
185
72
    }
186
72
    let unit_id = match body.data.unit_id.as_ref() {
187
12
        None => "",
188
60
        Some(unit_id) => unit_id.as_str(),
189
    };
190
72
    match check_network_code_inner(FN_NAME, &client, broker_base, unit_id, code, &token).await {
191
        Err(e) => return e,
192
72
        Ok(count) => match count {
193
68
            0 => (),
194
            _ => {
195
4
                return ErrResp::Custom(ErrReq::NETWORK_EXIST.0, ErrReq::NETWORK_EXIST.1, None)
196
4
                    .into_response();
197
            }
198
        },
199
    }
200
68
    let q_type = QueueType::Network;
201
68
    let username = mq::to_username(q_type, unit_code.as_str(), code);
202
68
    let password = strings::randomstring(8);
203
68
    let uri = match Url::parse(body.data.host_uri.as_str()) {
204
4
        Err(e) => {
205
4
            return ErrResp::ErrParam(Some(format!("invalid `hostUri`: {}", e))).into_response();
206
        }
207
64
        Ok(uri) => uri,
208
    };
209
64
    let host = match uri.host() {
210
        None => {
211
4
            let e = "invalid `hostUri`".to_string();
212
4
            return ErrResp::ErrParam(Some(e)).into_response();
213
        }
214
60
        Some(host) => host.to_string(),
215
60
    };
216
60
    let scheme = uri.scheme();
217
60
    let host = host.as_str();
218
60
    let username = username.as_str();
219
60
    let password = password.as_str();
220
60

            
221
60
    // Create message broker resources.
222
60
    let create_rsc = CreateQueueResource {
223
60
        scheme,
224
60
        host,
225
60
        username,
226
60
        password,
227
60
        ttl: body.data.ttl,
228
60
        length: body.data.length,
229
60
        q_type: QueueType::Network,
230
60
    };
231
60
    if let Err(e) = create_queue_rsc(FN_NAME, &state, &create_rsc).await {
232
        return e;
233
60
    }
234
60
    let clear_rsc = ClearQueueResource {
235
60
        scheme,
236
60
        host,
237
60
        username,
238
60
    };
239
60

            
240
60
    // Create network instance.
241
60
    let mut body_uri = uri.clone();
242
60
    transfer_host_uri(&state, &mut body_uri, username);
243
60
    body.data.host_uri = body_uri.to_string();
244
60
    headers.remove(header::CONTENT_LENGTH);
245
60
    let builder = client
246
60
        .request(reqwest::Method::POST, api_path)
247
60
        .headers(headers)
248
60
        .json(&body);
249
60
    let api_req = match builder.build() {
250
        Err(e) => {
251
            let _ = clear_queue_rsc(FN_NAME, &state, &clear_rsc);
252
            let e = format!("generate request error: {}", e);
253
            error!("[{}] {}", FN_NAME, e);
254
            return ErrResp::ErrRsc(Some(e)).into_response();
255
        }
256
60
        Ok(req) => req,
257
    };
258
60
    let api_resp = match client.execute(api_req).await {
259
        Err(e) => {
260
            let _ = clear_queue_rsc(FN_NAME, &state, &clear_rsc);
261
            let e = format!("execute request error: {}", e);
262
            error!("[{}] {}", FN_NAME, e);
263
            return ErrResp::ErrIntMsg(Some(e)).into_response();
264
        }
265
60
        Ok(resp) => match resp.status() {
266
60
            StatusCode::OK => resp,
267
            _ => {
268
                let mut resp_builder = Response::builder().status(resp.status());
269
                for (k, v) in resp.headers() {
270
                    resp_builder = resp_builder.header(k, v);
271
                }
272
                match resp_builder.body(Body::from_stream(resp.bytes_stream())) {
273
                    Err(e) => {
274
                        let e = format!("wrap response body error: {}", e);
275
                        error!("[{}] {}", FN_NAME, e);
276
                        return ErrResp::ErrIntMsg(Some(e)).into_response();
277
                    }
278
                    Ok(resp) => return resp,
279
                }
280
            }
281
        },
282
    };
283
60
    let mut body = match api_resp.json::<response::PostNetwork>().await {
284
        Err(e) => {
285
            let _ = clear_queue_rsc(FN_NAME, &state, &clear_rsc);
286
            let e = format!("unexpected response: {}", e);
287
            return ErrResp::ErrUnknown(Some(e)).into_response();
288
        }
289
60
        Ok(body) => body,
290
60
    };
291
60
    body.data.password = Some(password.to_string());
292
60

            
293
60
    Json(&body).into_response()
294
92
}
295

            
296
/// `GET /{base}/api/v1/network/count`
297
4
async fn get_network_count(state: State<AppState>, req: Request) -> impl IntoResponse {
298
    const FN_NAME: &'static str = "get_network_count";
299
4
    let api_path = format!("{}/api/v1/network/count", state.broker_base.as_str());
300
4
    let client = state.client.clone();
301
4

            
302
4
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
303
4
}
304

            
305
/// `GET /{base}/api/v1/network/list`
306
20
async fn get_network_list(state: State<AppState>, req: Request) -> impl IntoResponse {
307
    const FN_NAME: &'static str = "get_network_list";
308
20
    let api_path = format!("{}/api/v1/network/list", state.broker_base.as_str());
309
20
    let api_path = api_path.as_str();
310
20
    let client = state.client.clone();
311
20

            
312
20
    let mut list_format = ListFormat::Data;
313
20
    if let Some(query_str) = req.uri().query() {
314
16
        let query = match serde_urlencoded::from_str::<Vec<(String, String)>>(query_str) {
315
            Err(e) => {
316
                let e = format!("parse query error: {}", e);
317
                return ErrResp::ErrParam(Some(e)).into_response();
318
            }
319
16
            Ok(query) => query,
320
        };
321
24
        for (k, v) in query.iter() {
322
24
            if k.as_str().eq("format") {
323
12
                if v.as_str().eq("array") {
324
4
                    list_format = ListFormat::Array;
325
8
                } else if v.as_str().eq("csv") {
326
4
                    list_format = ListFormat::Csv;
327
4
                }
328
12
            }
329
        }
330
4
    }
331

            
332
16
    let (api_resp, resp_builder) =
333
20
        match list_api_bridge(FN_NAME, &client, req, api_path, true, "network").await {
334
4
            ListResp::Axum(resp) => return resp,
335
16
            ListResp::ArrayStream(api_resp, resp_builder) => (api_resp, resp_builder),
336
16
        };
337
16

            
338
16
    let mut resp_stream = api_resp.bytes_stream();
339
16
    let body = Body::from_stream(async_stream::stream! {
340
16
        match list_format {
341
16
            ListFormat::Array => yield Ok(Bytes::from("[")),
342
16
            ListFormat::Csv => yield Ok(Bytes::from(CSV_FIELDS)),
343
16
            ListFormat::Data => yield Ok(Bytes::from("{\"data\":[")),
344
16
        }
345
16
        let mut first_sent = false;
346
16

            
347
16
        let mut buffer = BytesMut::new();
348
16
        while let Some(body) = resp_stream.next().await {
349
16
            match body {
350
16
                Err(e) => {
351
16
                    error!("[{}] get body error: {}", FN_NAME, e);
352
16
                    let err: Box<dyn StdError + Send + Sync> = Box::new(e);
353
16
                    yield Err(err);
354
16
                    break;
355
16
                }
356
16
                Ok(body) => buffer.extend_from_slice(&body[..]),
357
16
            }
358
16

            
359
16
            let mut json_stream = Deserializer::from_slice(&buffer[..]).into_iter::<Network>();
360
16
            let mut index = 0;
361
16
            let mut finish = false;
362
16
            loop {
363
16
                if let Some(Ok(mut v)) = json_stream.next() {
364
16
                    v.host_uri = match Url::parse(v.host_uri.as_str()) {
365
16
                        Err(e) => {
366
16
                            error!("[{}] parse body hostUri error: {}", FN_NAME, e);
367
16
                            let err: Box<dyn StdError + Send + Sync> = Box::new(e);
368
16
                            yield Err(err);
369
16
                            finish = true;
370
16
                            break;
371
16
                        }
372
16
                        Ok(uri) => trunc_host_uri(&uri),
373
16
                    };
374
16
                    match list_format {
375
16
                        ListFormat::Array | ListFormat::Data => match serde_json::to_string(&v) {
376
16
                            Err(e) =>{
377
16
                                error!("[{}] serialize JSON error: {}", FN_NAME, e);
378
16
                                let err: Box<dyn StdError + Send + Sync> = Box::new(e);
379
16
                                yield Err(err);
380
16
                                finish = true;
381
16
                                break;
382
16
                            }
383
16
                            Ok(v) => {
384
16
                                match first_sent {
385
16
                                    false => first_sent = true,
386
16
                                    true => yield Ok(Bytes::from(",")),
387
16
                                }
388
16
                                yield Ok(Bytes::copy_from_slice(v.as_str().as_bytes()));
389
16
                            }
390
16
                        }
391
16
                        ListFormat::Csv => {
392
16
                            let mut item = CsvItem{
393
16
                                network_id: v.network_id,
394
16
                                code: v.code,
395
16
                                unit_id: v.unit_id,
396
16
                                unit_code: v.unit_code,
397
16
                                created_at: v.created_at,
398
16
                                modified_at: v.modified_at,
399
16
                                host_uri: v.host_uri,
400
16
                                name: v.name,
401
16
                                info: None,
402
16
                            };
403
16
                            if let Ok(info_str) = serde_json::to_string(&v.info) {
404
16
                                item.info = Some(info_str);
405
16
                            }
406
16
                            let mut writer =
407
16
                                WriterBuilder::new().has_headers(false).from_writer(vec![]);
408
16
                            if let Err(e) = writer.serialize(item) {
409
16
                                error!("[{}] serialize CSV error: {}", FN_NAME, e);
410
16
                                let err: Box<dyn StdError + Send + Sync> = Box::new(e);
411
16
                                yield Err(err);
412
16
                                finish = true;
413
16
                                break;
414
16
                            }
415
16
                            match writer.into_inner() {
416
16
                                Err(e) => {
417
16
                                    error!("[{}] serialize bytes error: {}", FN_NAME, e);
418
16
                                    let err: Box<dyn StdError + Send + Sync> = Box::new(e);
419
16
                                    yield Err(err);
420
16
                                    finish = true;
421
16
                                    break;
422
16
                                }
423
16
                                Ok(row) => yield Ok(Bytes::copy_from_slice(row.as_slice())),
424
16
                            }
425
16
                        }
426
16
                    }
427
16
                    continue;
428
16
                }
429
16
                let offset = json_stream.byte_offset();
430
16
                if buffer.len() <= index + offset {
431
16
                    index = buffer.len();
432
16
                    break;
433
16
                }
434
16
                match buffer[index+offset] {
435
16
                    b'[' | b',' => {
436
16
                        index += offset + 1;
437
16
                        if buffer.len() <= index {
438
16
                            break;
439
16
                        }
440
16
                        json_stream =
441
16
                            Deserializer::from_slice(&buffer[index..]).into_iter::<Network>();
442
16
                    }
443
16
                    b']' => {
444
16
                        finish = true;
445
16
                        break;
446
16
                    }
447
16
                    _ => break,
448
16
                }
449
16
            }
450
16
            if finish {
451
16
                match list_format {
452
16
                    ListFormat::Array => yield Ok(Bytes::from("]")),
453
16
                    ListFormat::Csv => (),
454
16
                    ListFormat::Data => yield Ok(Bytes::from("]}")),
455
16
                }
456
16
                break;
457
16
            }
458
16
            buffer = buffer.split_off(index);
459
16
        }
460
16
    });
461
16
    match resp_builder.body(body) {
462
        Err(e) => ErrResp::ErrRsc(Some(e.to_string())).into_response(),
463
16
        Ok(resp) => resp,
464
    }
465
20
}
466

            
467
/// `GET /{base}/api/v1/network/{networkId}`
468
36
async fn get_network(
469
36
    state: State<AppState>,
470
36
    Path(param): Path<NetworkIdPath>,
471
36
    req: Request,
472
36
) -> impl IntoResponse {
473
    const FN_NAME: &'static str = "get_network";
474
36
    let broker_base = state.broker_base.as_str();
475
36
    let client = state.client.clone();
476
36
    let token = match req.headers().get(header::AUTHORIZATION) {
477
        None => {
478
4
            let e = "missing Authorization".to_string();
479
4
            return ErrResp::ErrParam(Some(e)).into_response();
480
        }
481
32
        Some(value) => value.clone(),
482
    };
483

            
484
32
    let (mut network, uri, host) = match get_network_inner(
485
32
        FN_NAME,
486
32
        &client,
487
32
        broker_base,
488
32
        param.network_id.as_str(),
489
32
        &token,
490
32
    )
491
32
    .await
492
    {
493
4
        Err(e) => return e,
494
28
        Ok((network, uri, host)) => (network, uri, host),
495
28
    };
496
28

            
497
28
    let host = host.as_str();
498
28
    let scheme = uri.scheme();
499
28
    if scheme.eq("amqp") || scheme.eq("amqps") {
500
16
        let AmqpState::RabbitMq(opts) = &state.amqp;
501
16
        let unit_code = match network.unit_code.as_ref() {
502
            None => "",
503
16
            Some(unit_code) => unit_code.as_str(),
504
        };
505
16
        let username = mq::to_username(QueueType::Network, unit_code, network.code.as_str());
506
16
        let username = username.as_str();
507
16
        match rabbitmq::get_policies(&client, opts, host, username).await {
508
            Err(e) => {
509
                error!("[{}] get {} policies error: {}", FN_NAME, username, e);
510
                return e.into_response();
511
            }
512
16
            Ok(policies) => {
513
16
                network.ttl = policies.ttl;
514
16
                network.length = policies.length;
515
16
            }
516
        }
517
12
    }
518
28
    network.host_uri = trunc_host_uri(&uri);
519
28
    Json(&response::GetNetwork { data: network }).into_response()
520
36
}
521

            
522
/// `PATCH /{base}/api/v1/network/{networkId}`
523
52
async fn patch_network(
524
52
    state: State<AppState>,
525
52
    headers: HeaderMap,
526
52
    Path(param): Path<NetworkIdPath>,
527
52
    Json(body): Json<request::PatchNetworkBody>,
528
52
) -> impl IntoResponse {
529
    const FN_NAME: &'static str = "patch_network";
530
52
    let broker_base = state.broker_base.as_str();
531
52
    let client = state.client.clone();
532
52
    let token = match headers.get(header::AUTHORIZATION) {
533
        None => {
534
4
            let e = "missing Authorization".to_string();
535
4
            return ErrResp::ErrParam(Some(e)).into_response();
536
        }
537
48
        Some(value) => value.clone(),
538
48
    };
539
48

            
540
48
    let data = &body.data;
541
48
    if data.host_uri.is_none()
542
24
        && data.name.is_none()
543
16
        && data.info.is_none()
544
16
        && data.ttl.is_none()
545
12
        && data.length.is_none()
546
12
        && data.password.is_none()
547
    {
548
4
        return ErrResp::ErrParam(Some("at least one parameter".to_string())).into_response();
549
44
    }
550

            
551
44
    let (network, uri, hostname) = match get_network_inner(
552
44
        FN_NAME,
553
44
        &client,
554
44
        broker_base,
555
44
        param.network_id.as_str(),
556
44
        &token,
557
44
    )
558
44
    .await
559
    {
560
4
        Err(e) => return e,
561
40
        Ok((network, uri, hostname)) => (network, uri, hostname),
562
40
    };
563
40

            
564
40
    let mut patch_data = request::PatchNetworkData {
565
40
        name: data.name.clone(),
566
40
        info: data.info.clone(),
567
40
        ..Default::default()
568
40
    };
569
40
    let mut patch_host: Option<PatchHost> = None;
570
40
    if let Some(host) = data.host_uri.as_ref() {
571
24
        if !strings::is_uri(host) {
572
4
            return ErrResp::ErrParam(Some("invalid `hostUri`".to_string())).into_response();
573
20
        }
574
20
        // Change to the new broker host.
575
20
        if !cmp_host_uri(network.host_uri.as_str(), host.as_str()) {
576
20
            let password = match data.password.as_ref() {
577
                None => {
578
4
                    let e = "missing `password`".to_string();
579
4
                    return ErrResp::ErrParam(Some(e)).into_response();
580
                }
581
16
                Some(password) => match password.len() {
582
                    0 => {
583
4
                        let e = "missing `password`".to_string();
584
4
                        return ErrResp::ErrParam(Some(e)).into_response();
585
                    }
586
12
                    _ => password,
587
                },
588
            };
589
12
            let mut new_host_uri = match Url::parse(host.as_str()) {
590
                Err(e) => {
591
                    let e = format!("invalid `hostUri`: {}", e);
592
                    return ErrResp::ErrParam(Some(e)).into_response();
593
                }
594
12
                Ok(uri) => match uri.host_str() {
595
                    None => {
596
4
                        let e = "invalid `hostUri`".to_string();
597
4
                        return ErrResp::ErrParam(Some(e)).into_response();
598
                    }
599
8
                    Some(_) => uri,
600
                },
601
            };
602

            
603
8
            let unit_code = match network.unit_code.as_ref() {
604
                None => "",
605
8
                Some(unit_code) => unit_code.as_str(),
606
            };
607
8
            let code = network.code.as_str();
608
8
            let username = mq::to_username(QueueType::Network, unit_code, code);
609
8
            let resource = CreateQueueResource {
610
8
                scheme: new_host_uri.scheme(),
611
8
                host: new_host_uri.host_str().unwrap(),
612
8
                username: username.as_str(),
613
8
                password: password.as_str(),
614
8
                ttl: data.ttl,
615
8
                length: data.length,
616
8
                q_type: QueueType::Network,
617
8
            };
618
8
            if let Err(e) = create_queue_rsc(FN_NAME, &state, &resource).await {
619
                return e;
620
8
            }
621
8
            let resource = ClearQueueResource {
622
8
                scheme: uri.scheme(),
623
8
                host: uri.host_str().unwrap(),
624
8
                username: username.as_str(),
625
8
            };
626
8
            let _ = clear_queue_rsc(FN_NAME, &state, &resource).await;
627

            
628
8
            transfer_host_uri(&state, &mut new_host_uri, username.as_str());
629
8
            patch_data.host_uri = Some(new_host_uri.to_string());
630
8
            patch_host = Some(PatchHost {
631
8
                host_uri: new_host_uri,
632
8
                username,
633
8
            });
634
        }
635
16
    }
636

            
637
    // Send request body to the sylvia-iot-broker.
638
24
    if patch_data.host_uri.is_some() || patch_data.name.is_some() || patch_data.info.is_some() {
639
12
        let network_id = param.network_id.as_str();
640
12
        let uri = format!("{}/api/v1/network/{}", broker_base, network_id);
641
12
        let mut builder = client
642
12
            .request(reqwest::Method::PATCH, uri)
643
12
            .header(reqwest::header::AUTHORIZATION, &token)
644
12
            .json(&request::PatchNetworkBody { data: patch_data });
645
12
        if let Some(content_type) = headers.get(header::CONTENT_TYPE) {
646
12
            builder = builder.header(reqwest::header::CONTENT_TYPE, content_type);
647
12
        }
648
12
        let api_req = match builder.build() {
649
            Err(e) => {
650
                clear_patch_host(FN_NAME, &state, &patch_host).await;
651
                let e = format!("generate request error: {}", e);
652
                error!("[{}] {}", FN_NAME, e);
653
                return ErrResp::ErrRsc(Some(e)).into_response();
654
            }
655
12
            Ok(req) => req,
656
        };
657
12
        let api_resp = match client.execute(api_req).await {
658
            Err(e) => {
659
                clear_patch_host(FN_NAME, &state, &patch_host).await;
660
                let e = format!("execute request error: {}", e);
661
                error!("[{}] {}", FN_NAME, e);
662
                return ErrResp::ErrIntMsg(Some(e)).into_response();
663
            }
664
12
            Ok(resp) => resp,
665
12
        };
666
12

            
667
12
        let status_code = api_resp.status();
668
12
        if status_code != StatusCode::NO_CONTENT {
669
            clear_patch_host(FN_NAME, &state, &patch_host).await;
670
            let mut resp_builder = Response::builder().status(status_code);
671
            for (k, v) in api_resp.headers() {
672
                resp_builder = resp_builder.header(k, v);
673
            }
674
            match resp_builder.body(Body::from_stream(api_resp.bytes_stream())) {
675
                Err(e) => {
676
                    let e = format!("wrap response body error: {}", e);
677
                    error!("[{}] {}", FN_NAME, e);
678
                    return ErrResp::ErrIntMsg(Some(e)).into_response();
679
                }
680
                Ok(resp) => return resp,
681
            }
682
12
        }
683
12
    }
684

            
685
24
    if let Some(host) = patch_host {
686
8
        let resource = ClearQueueResource {
687
8
            scheme: uri.scheme(),
688
8
            host: uri.host_str().unwrap(),
689
8
            username: host.username.as_str(),
690
8
        };
691
8
        let _ = clear_queue_rsc(FN_NAME, &state, &resource).await;
692
8
        return StatusCode::NO_CONTENT.into_response();
693
16
    } else if data.ttl.is_none() && data.length.is_none() && data.password.is_none() {
694
4
        return StatusCode::NO_CONTENT.into_response();
695
12
    }
696

            
697
    // Update broker information without changing hostUri.
698
12
    if let Some(password) = data.password.as_ref() {
699
12
        if password.len() == 0 {
700
4
            let e = "missing `password`".to_string();
701
4
            return ErrResp::ErrParam(Some(e)).into_response();
702
8
        }
703
    }
704
8
    let unit_code = match network.unit_code.as_ref() {
705
        None => "",
706
8
        Some(unit_code) => unit_code.as_str(),
707
    };
708
8
    let code = network.code.as_str();
709
8
    let hostname = hostname.as_str();
710
8
    let username = mq::to_username(QueueType::Network, unit_code, code);
711
8
    let username = username.as_str();
712
8
    match uri.scheme() {
713
8
        "amqp" | "amqps" => match &state.amqp {
714
4
            AmqpState::RabbitMq(opts) => {
715
4
                if data.ttl.is_some() || data.length.is_some() {
716
4
                    let policies = rabbitmq::BrokerPolicies {
717
4
                        ttl: data.ttl,
718
4
                        length: data.length,
719
4
                    };
720
                    if let Err(e) =
721
4
                        rabbitmq::put_policies(&client, opts, hostname, username, &policies).await
722
                    {
723
                        let e = format!("patch RabbitMQ error: {}", e);
724
                        error!("[{}] {}", FN_NAME, e);
725
                        return ErrResp::ErrIntMsg(Some(e)).into_response();
726
4
                    }
727
                }
728
4
                if let Some(password) = data.password.as_ref() {
729
4
                    let password = password.as_str();
730
                    if let Err(e) =
731
4
                        rabbitmq::put_user(&client, opts, hostname, username, password).await
732
                    {
733
                        let e = format!("patch RabbitMQ password error: {}", e);
734
                        error!("[{}] {}", FN_NAME, e);
735
                        return ErrResp::ErrIntMsg(Some(e)).into_response();
736
4
                    }
737
                }
738
            }
739
        },
740
4
        "mqtt" | "mqtts" => match &state.mqtt {
741
2
            MqttState::Emqx(opts) => {
742
2
                if let Some(password) = data.password.as_ref() {
743
2
                    let password = password.as_str();
744
                    if let Err(e) =
745
2
                        emqx::put_user(&client, opts, hostname, username, password).await
746
                    {
747
                        let e = format!("patch EMQX password error: {}", e);
748
                        error!("[{}] {}", FN_NAME, e);
749
                        return ErrResp::ErrIntMsg(Some(e)).into_response();
750
2
                    }
751
                }
752
            }
753
2
            MqttState::Rumqttd => {}
754
        },
755
        _ => {}
756
    }
757

            
758
8
    StatusCode::NO_CONTENT.into_response()
759
52
}
760

            
761
/// `DELETE /{base}/api/v1/network/{networkId}`
762
16
async fn delete_network(
763
16
    state: State<AppState>,
764
16
    Path(param): Path<NetworkIdPath>,
765
16
    req: Request,
766
16
) -> impl IntoResponse {
767
    const FN_NAME: &'static str = "delete_network";
768
16
    let broker_base = state.broker_base.as_str();
769
16
    let network_id = param.network_id.as_str();
770
16
    let api_path = format!("{}/api/v1/network/{}", broker_base, network_id);
771
16
    let client = state.client.clone();
772
16
    let token = match req.headers().get(header::AUTHORIZATION) {
773
        None => {
774
4
            let e = "missing Authorization".to_string();
775
4
            return ErrResp::ErrParam(Some(e)).into_response();
776
        }
777
12
        Some(value) => value.clone(),
778
    };
779

            
780
8
    let (network, uri, host) =
781
12
        match get_network_inner(FN_NAME, &client, broker_base, network_id, &token).await {
782
4
            Err(e) => return e,
783
8
            Ok((network, uri, host)) => (network, uri, host),
784
        };
785

            
786
8
    let resp = api_bridge(FN_NAME, &client, req, api_path.as_str()).await;
787
8
    if !resp.status().is_success() {
788
        return resp;
789
8
    }
790

            
791
8
    let unit_code = match network.unit_code.as_ref() {
792
        None => "",
793
8
        Some(unit_code) => unit_code.as_str(),
794
    };
795
8
    let code = network.code.as_str();
796
8
    let username = mq::to_username(QueueType::Network, unit_code, code);
797
8
    let clear_rsc = ClearQueueResource {
798
8
        scheme: uri.scheme(),
799
8
        host: host.as_str(),
800
8
        username: username.as_str(),
801
8
    };
802
8
    if let Err(e) = clear_queue_rsc(FN_NAME, &state, &clear_rsc).await {
803
        return e;
804
8
    }
805
8

            
806
8
    StatusCode::NO_CONTENT.into_response()
807
16
}
808

            
809
/// `GET /{base}/api/v1/network/{networkId}/stats`
810
154
async fn get_network_stats(
811
154
    state: State<AppState>,
812
154
    Path(param): Path<NetworkIdPath>,
813
154
    req: Request,
814
154
) -> impl IntoResponse {
815
    const FN_NAME: &'static str = "get_network";
816
154
    let broker_base = state.broker_base.as_str();
817
154
    let client = state.client.clone();
818
154
    let token = match req.headers().get(header::AUTHORIZATION) {
819
        None => {
820
4
            let e = "missing Authorization".to_string();
821
4
            return ErrResp::ErrParam(Some(e)).into_response();
822
        }
823
150
        Some(value) => value.clone(),
824
    };
825

            
826
150
    let (network, uri, host) = match get_network_inner(
827
150
        FN_NAME,
828
150
        &client,
829
150
        broker_base,
830
150
        param.network_id.as_str(),
831
150
        &token,
832
150
    )
833
150
    .await
834
    {
835
4
        Err(e) => return e,
836
146
        Ok((network, uri, host)) => (network, uri, host),
837
146
    };
838
146

            
839
146
    let host = host.as_str();
840
146
    let scheme = uri.scheme();
841
146
    let data = match scheme {
842
146
        "amqp" | "amqps" => {
843
126
            let AmqpState::RabbitMq(opts) = &state.amqp;
844
126
            let unit_code = match network.unit_code.as_ref() {
845
                None => "",
846
126
                Some(unit_code) => unit_code.as_str(),
847
            };
848
126
            let username = mq::to_username(QueueType::Network, unit_code, network.code.as_str());
849
126
            let username = username.as_str();
850
126
            response::GetNetworkStatsData {
851
126
                dldata: match rabbitmq::stats(&client, opts, host, username, "dldata").await {
852
                    Err(ErrResp::ErrNotFound(_)) => response::Stats {
853
                        consumers: 0,
854
                        messages: 0,
855
                        publish_rate: 0.0,
856
                        deliver_rate: 0.0,
857
                    },
858
                    Err(e) => {
859
                        error!("[{}] get dldata stats error: {}", FN_NAME, e);
860
                        return e.into_response();
861
                    }
862
126
                    Ok(stats) => response::Stats {
863
126
                        consumers: stats.consumers,
864
126
                        messages: stats.messages,
865
126
                        publish_rate: stats.publish_rate,
866
126
                        deliver_rate: stats.deliver_rate,
867
126
                    },
868
                },
869
126
                ctrl: match rabbitmq::stats(&client, opts, host, username, "ctrl").await {
870
                    Err(ErrResp::ErrNotFound(_)) => response::Stats {
871
                        consumers: 0,
872
                        messages: 0,
873
                        publish_rate: 0.0,
874
                        deliver_rate: 0.0,
875
                    },
876
                    Err(e) => {
877
                        error!("[{}] get ctrl stats error: {}", FN_NAME, e);
878
                        return e.into_response();
879
                    }
880
126
                    Ok(stats) => response::Stats {
881
126
                        consumers: stats.consumers,
882
126
                        messages: stats.messages,
883
126
                        publish_rate: stats.publish_rate,
884
126
                        deliver_rate: stats.deliver_rate,
885
126
                    },
886
                },
887
            }
888
        }
889
20
        "mqtt" | "mqtts" => match &state.mqtt {
890
18
            MqttState::Emqx(opts) => {
891
18
                let unit_code = match network.unit_code.as_ref() {
892
                    None => "",
893
18
                    Some(unit_code) => unit_code.as_str(),
894
                };
895
18
                let username =
896
18
                    mq::to_username(QueueType::Network, unit_code, network.code.as_str());
897
18
                let username = username.as_str();
898
18
                response::GetNetworkStatsData {
899
18
                    dldata: match emqx::stats(&client, opts, host, username, "dldata").await {
900
                        Err(e) => {
901
                            error!("[{}] get dldata stats error: {}", FN_NAME, e);
902
                            return e.into_response();
903
                        }
904
18
                        Ok(stats) => response::Stats {
905
18
                            consumers: stats.consumers,
906
18
                            messages: stats.messages,
907
18
                            publish_rate: stats.publish_rate,
908
18
                            deliver_rate: stats.deliver_rate,
909
18
                        },
910
18
                    },
911
18
                    ctrl: match emqx::stats(&client, opts, host, username, "ctrl").await {
912
                        Err(e) => {
913
                            error!("[{}] get ctrl stats error: {}", FN_NAME, e);
914
                            return e.into_response();
915
                        }
916
18
                        Ok(stats) => response::Stats {
917
18
                            consumers: stats.consumers,
918
18
                            messages: stats.messages,
919
18
                            publish_rate: stats.publish_rate,
920
18
                            deliver_rate: stats.deliver_rate,
921
18
                        },
922
                    },
923
                }
924
            }
925
2
            MqttState::Rumqttd => response::GetNetworkStatsData {
926
2
                dldata: response::Stats {
927
2
                    ..Default::default()
928
2
                },
929
2
                ctrl: response::Stats {
930
2
                    ..Default::default()
931
2
                },
932
2
            },
933
        },
934
        _ => {
935
            let e = format!("unsupport scheme {}", scheme);
936
            error!("[{}] {}", FN_NAME, e);
937
            return ErrResp::ErrUnknown(Some(e)).into_response();
938
        }
939
    };
940
146
    Json(&response::GetNetworkStats { data }).into_response()
941
154
}
942

            
943
/// `POST /{base}/api/v1/network/{networkId}/uldata`
944
28
async fn post_network_uldata(
945
28
    state: State<AppState>,
946
28
    headers: HeaderMap,
947
28
    Path(param): Path<NetworkIdPath>,
948
28
    Json(body): Json<request::PostNetworkUlDataBody>,
949
28
) -> impl IntoResponse {
950
    const FN_NAME: &'static str = "post_network_uldata";
951
28
    let broker_base = state.broker_base.as_str();
952
28
    let client = state.client.clone();
953
28
    let token = match headers.get(header::AUTHORIZATION) {
954
        None => {
955
4
            let e = "missing Authorization".to_string();
956
4
            return ErrResp::ErrParam(Some(e)).into_response();
957
        }
958
24
        Some(value) => value.clone(),
959
24
    };
960
24

            
961
24
    if body.data.device_id.len() == 0 {
962
4
        let e = "empty `deviceId` is invalid".to_string();
963
4
        return ErrResp::ErrParam(Some(e)).into_response();
964
20
    }
965
20
    if let Err(e) = hex::decode(body.data.payload.as_str()) {
966
4
        let e = format!("`payload` is not hexadecimal string: {}", e);
967
4
        return ErrResp::ErrParam(Some(e)).into_response();
968
16
    }
969

            
970
16
    let (network, uri, hostname) = match get_network_inner(
971
16
        FN_NAME,
972
16
        &client,
973
16
        broker_base,
974
16
        param.network_id.as_str(),
975
16
        &token,
976
16
    )
977
16
    .await
978
    {
979
4
        Err(e) => return e,
980
12
        Ok((network, uri, hostname)) => (network, uri, hostname),
981
    };
982
12
    let device = match get_device_inner(
983
12
        FN_NAME,
984
12
        &client,
985
12
        broker_base,
986
12
        body.data.device_id.as_str(),
987
12
        &token,
988
12
    )
989
12
    .await
990
    {
991
        Err(e) => return e,
992
12
        Ok(device) => match device {
993
            None => {
994
4
                return ErrResp::Custom(
995
4
                    ErrReq::DEVICE_NOT_EXIST.0,
996
4
                    ErrReq::DEVICE_NOT_EXIST.1,
997
4
                    None,
998
4
                )
999
4
                .into_response();
            }
8
            Some(device) => device,
8
        },
8
    };
8

            
8
    let hostname = hostname.as_str();
8
    let scheme = uri.scheme();
8
    let payload = match serde_json::to_string(&UlData {
8
        time: strings::time_str(&Utc::now()),
8
        network_addr: device.network_addr,
8
        data: body.data.payload.clone(),
8
        ..Default::default()
8
    }) {
        Err(e) => {
            let e = format!("encode JSON error: {}", e);
            error!("[{}] {}", FN_NAME, e);
            return ErrResp::ErrRsc(Some(e)).into_response();
        }
8
        Ok(payload) => general_purpose::STANDARD.encode(payload),
8
    };
8
    match scheme {
8
        "amqp" | "amqps" => {
4
            let AmqpState::RabbitMq(opts) = &state.amqp;
4
            let unit_code = match network.unit_code.as_ref() {
                None => "",
4
                Some(unit_code) => unit_code.as_str(),
            };
4
            let username = mq::to_username(QueueType::Network, unit_code, network.code.as_str());
4
            let username = username.as_str();
            if let Err(e) =
4
                rabbitmq::publish_message(&client, opts, hostname, username, "uldata", payload)
4
                    .await
            {
                return e.into_response();
4
            }
        }
4
        "mqtt" | "mqtts" => match &state.mqtt {
2
            MqttState::Emqx(opts) => {
2
                let unit_code = match network.unit_code.as_ref() {
                    None => "",
2
                    Some(unit_code) => unit_code.as_str(),
                };
2
                let username =
2
                    mq::to_username(QueueType::Network, unit_code, network.code.as_str());
2
                let username = username.as_str();
                if let Err(e) =
2
                    emqx::publish_message(&client, opts, hostname, username, "uldata", payload)
2
                        .await
                {
                    return e.into_response();
2
                }
            }
            MqttState::Rumqttd => {
2
                let e = "not support now".to_string();
2
                return ErrResp::ErrParam(Some(e)).into_response();
            }
        },
        _ => {
            let e = format!("unsupport scheme {}", scheme);
            error!("[{}] {}", FN_NAME, e);
            return ErrResp::ErrUnknown(Some(e)).into_response();
        }
    }
6
    StatusCode::NO_CONTENT.into_response()
28
}
254
async fn get_network_inner(
254
    fn_name: &str,
254
    client: &reqwest::Client,
254
    broker_base: &str,
254
    network_id: &str,
254
    token: &HeaderValue,
254
) -> Result<(response::GetNetworkData, Url, String), Response> {
254
    let uri = format!("{}/api/v1/network/{}", broker_base, network_id);
254
    let resp = get_stream_resp(fn_name, token, &client, uri.as_str()).await?;
234
    let network = match resp.json::<response::GetNetwork>().await {
        Err(e) => {
            let e = format!("wrong response of network: {}", e);
            error!("[{}] {}", fn_name, e);
            return Err(ErrResp::ErrIntMsg(Some(e)).into_response());
        }
234
        Ok(network) => network.data,
    };
234
    let uri = match Url::parse(network.host_uri.as_str()) {
        Err(e) => {
            let e = format!("unexpected hostUri: {}", e);
            error!("[{}] {}", fn_name, e);
            return Err(ErrResp::ErrUnknown(Some(e)).into_response());
        }
234
        Ok(uri) => uri,
    };
234
    let host = match uri.host_str() {
        None => {
            let e = "unexpected hostUri".to_string();
            error!("[{}] {}", fn_name, e);
            return Err(ErrResp::ErrUnknown(Some(e)).into_response());
        }
234
        Some(host) => host.to_string(),
234
    };
234
    Ok((network, uri, host))
254
}
72
async fn check_network_code_inner(
72
    fn_name: &str,
72
    client: &reqwest::Client,
72
    broker_base: &str,
72
    unit_id: &str,
72
    code: &str,
72
    token: &HeaderValue,
72
) -> Result<u64, Response> {
72
    let uri = format!("{}/api/v1/network/count", broker_base);
72
    let req = match client
72
        .request(reqwest::Method::GET, uri)
72
        .header(reqwest::header::AUTHORIZATION, token)
72
        .query(&[("unit", unit_id), ("code", code)])
72
        .build()
    {
        Err(e) => {
            let e = format!("generate request error: {}", e);
            error!("[{}] {}", fn_name, e);
            return Err(ErrResp::ErrRsc(Some(e)).into_response());
        }
72
        Ok(req) => req,
    };
72
    let resp = match client.execute(req).await {
        Err(e) => {
            let e = format!("execute request error: {}", e);
            error!("[{}] {}", fn_name, e);
            return Err(ErrResp::ErrIntMsg(Some(e)).into_response());
        }
72
        Ok(resp) => resp,
72
    };
72

            
72
    match resp.json::<response::GetCount>().await {
        Err(e) => {
            let e = format!("wrong response of network: {}", e);
            error!("[{}] {}", fn_name, e);
            Err(ErrResp::ErrIntMsg(Some(e)).into_response())
        }
72
        Ok(data) => Ok(data.data.count),
    }
72
}