1
use std::error::Error as StdError;
2

            
3
use axum::{
4
    body::Body,
5
    extract::{Request, State},
6
    http::{header, HeaderMap, HeaderValue, StatusCode},
7
    response::{IntoResponse, Response},
8
    routing, Router,
9
};
10
use base64::{engine::general_purpose, Engine};
11
use bytes::{Bytes, BytesMut};
12
use chrono::Utc;
13
use csv::WriterBuilder;
14
use futures_util::StreamExt;
15
use hex;
16
use log::error;
17
use reqwest;
18
use serde::{Deserialize, Serialize};
19
use serde_json::{Deserializer, Map, Value};
20
use url::Url;
21

            
22
use sylvia_iot_corelib::{
23
    err::ErrResp,
24
    http::{Json, Path},
25
    role::Role,
26
    strings,
27
};
28

            
29
use super::{
30
    super::{AmqpState, ErrReq, MqttState, State as AppState},
31
    api_bridge, clear_patch_host, clear_queue_rsc, cmp_host_uri, create_queue_rsc,
32
    get_device_inner, get_stream_resp, get_tokeninfo_inner, get_unit_inner, list_api_bridge,
33
    request, response, transfer_host_uri, trunc_host_uri, ClearQueueResource, CreateQueueResource,
34
    ListResp, PatchHost,
35
};
36
use crate::libs::mq::{self, emqx, rabbitmq, QueueType};
37

            
38
enum ListFormat {
39
    Array,
40
    Csv,
41
    Data,
42
}
43

            
44
#[derive(Deserialize)]
45
struct NetworkIdPath {
46
    network_id: String,
47
}
48

            
49
#[derive(Deserialize, Serialize)]
50
struct Network {
51
    #[serde(rename = "networkId")]
52
    network_id: String,
53
    code: String,
54
    #[serde(rename = "unitId")]
55
    unit_id: Option<String>,
56
    #[serde(rename = "unitCode")]
57
    unit_code: Option<String>,
58
    #[serde(rename = "createdAt")]
59
    created_at: String,
60
    #[serde(rename = "modifiedAt")]
61
    modified_at: String,
62
    #[serde(rename = "hostUri")]
63
    host_uri: String,
64
    name: String,
65
    info: Map<String, Value>,
66
}
67

            
68
#[derive(Deserialize, Serialize)]
69
struct CsvItem {
70
    #[serde(rename = "networkId")]
71
    network_id: String,
72
    code: String,
73
    #[serde(rename = "unitId")]
74
    unit_id: Option<String>,
75
    #[serde(rename = "unitCode")]
76
    unit_code: Option<String>,
77
    #[serde(rename = "createdAt")]
78
    created_at: String,
79
    #[serde(rename = "modifiedAt")]
80
    modified_at: String,
81
    #[serde(rename = "hostUri")]
82
    host_uri: String,
83
    name: String,
84
    info: Option<String>,
85
}
86

            
87
/// Downlink data from application to broker.
88
#[derive(Default, Serialize)]
89
struct UlData {
90
    time: String,
91
    #[serde(rename = "networkAddr")]
92
    network_addr: String,
93
    data: String,
94
    extension: Option<Map<String, Value>>,
95
}
96

            
97
const CSV_FIELDS: &'static [u8] =
98
    b"\xEF\xBB\xBFnetworkId,code,unitId,createdAt,modifiedAt,hostUri,name,info\n";
99

            
100
253
pub fn new_service(scope_path: &str, state: &AppState) -> Router {
101
253
    Router::new().nest(
102
253
        scope_path,
103
253
        Router::new()
104
253
            .route("/", routing::post(post_network))
105
253
            .route("/count", routing::get(get_network_count))
106
253
            .route("/list", routing::get(get_network_list))
107
253
            .route(
108
253
                "/:network_id",
109
253
                routing::get(get_network)
110
253
                    .patch(patch_network)
111
253
                    .delete(delete_network),
112
253
            )
113
253
            .route("/:network_id/stats", routing::get(get_network_stats))
114
253
            .route("/:network_id/uldata", routing::post(post_network_uldata))
115
253
            .with_state(state.clone()),
116
253
    )
117
253
}
118

            
119
/// `POST /{base}/api/v1/network`
120
46
async fn post_network(
121
46
    State(state): State<AppState>,
122
46
    mut headers: HeaderMap,
123
46
    Json(mut body): Json<request::PostNetworkBody>,
124
46
) -> impl IntoResponse {
125
    const FN_NAME: &'static str = "post_network";
126
46
    let broker_base = state.broker_base.as_str();
127
46
    let api_path = format!("{}/api/v1/network", broker_base);
128
46
    let client = state.client.clone();
129
46
    let token = match headers.get(header::AUTHORIZATION) {
130
        None => {
131
2
            let e = "missing Authorization".to_string();
132
2
            return ErrResp::ErrParam(Some(e)).into_response();
133
        }
134
44
        Some(value) => value.clone(),
135
44
    };
136
44

            
137
44
    if body.data.unit_id.is_none() {
138
8
        let auth_base = state.auth_base.as_str();
139
8
        let token_info = match get_tokeninfo_inner(FN_NAME, &client, auth_base, &token).await {
140
            Err(e) => return e,
141
8
            Ok(info) => info,
142
8
        };
143
8
        if !Role::is_role(&token_info.roles, Role::ADMIN)
144
2
            && !Role::is_role(&token_info.roles, Role::MANAGER)
145
        {
146
2
            let e = "missing `unitId`".to_string();
147
2
            return ErrResp::ErrParam(Some(e)).into_response();
148
6
        }
149
36
    }
150

            
151
    // Get unit information to create queue information.
152
42
    let unit_code = match body.data.unit_id.as_ref() {
153
6
        None => "".to_string(),
154
36
        Some(unit_id) => {
155
36
            if unit_id.len() == 0 {
156
2
                return ErrResp::ErrParam(Some(
157
2
                    "`unitId` must with at least one character".to_string(),
158
2
                ))
159
2
                .into_response();
160
34
            }
161
34
            let unit = match get_unit_inner(FN_NAME, &client, broker_base, unit_id, &token).await {
162
                Err(e) => return e,
163
34
                Ok(unit) => match unit {
164
                    None => {
165
2
                        return ErrResp::Custom(
166
2
                            ErrReq::UNIT_NOT_EXIST.0,
167
2
                            ErrReq::UNIT_NOT_EXIST.1,
168
2
                            None,
169
2
                        )
170
2
                        .into_response()
171
                    }
172
32
                    Some(unit) => unit,
173
32
                },
174
32
            };
175
32
            unit.code
176
        }
177
    };
178
38
    let code = body.data.code.as_str();
179
38
    if !strings::is_code(code) {
180
2
        return ErrResp::ErrParam(Some(
181
2
            "`code` must be [A-Za-z0-9]{1}[A-Za-z0-9-_]*".to_string(),
182
2
        ))
183
2
        .into_response();
184
36
    }
185
36
    let unit_id = match body.data.unit_id.as_ref() {
186
6
        None => "",
187
30
        Some(unit_id) => unit_id.as_str(),
188
    };
189
36
    match check_network_code_inner(FN_NAME, &client, broker_base, unit_id, code, &token).await {
190
        Err(e) => return e,
191
36
        Ok(count) => match count {
192
34
            0 => (),
193
            _ => {
194
2
                return ErrResp::Custom(ErrReq::NETWORK_EXIST.0, ErrReq::NETWORK_EXIST.1, None)
195
2
                    .into_response()
196
            }
197
        },
198
    }
199
34
    let q_type = QueueType::Network;
200
34
    let username = mq::to_username(q_type, unit_code.as_str(), code);
201
34
    let password = strings::randomstring(8);
202
34
    let uri = match Url::parse(body.data.host_uri.as_str()) {
203
2
        Err(e) => {
204
2
            return ErrResp::ErrParam(Some(format!("invalid `hostUri`: {}", e))).into_response();
205
        }
206
32
        Ok(uri) => uri,
207
    };
208
32
    let host = match uri.host() {
209
        None => {
210
2
            let e = "invalid `hostUri`".to_string();
211
2
            return ErrResp::ErrParam(Some(e)).into_response();
212
        }
213
30
        Some(host) => host.to_string(),
214
30
    };
215
30
    let scheme = uri.scheme();
216
30
    let host = host.as_str();
217
30
    let username = username.as_str();
218
30
    let password = password.as_str();
219
30

            
220
30
    // Create message broker resources.
221
30
    let create_rsc = CreateQueueResource {
222
30
        scheme,
223
30
        host,
224
30
        username,
225
30
        password,
226
30
        ttl: body.data.ttl,
227
30
        length: body.data.length,
228
30
        q_type: QueueType::Network,
229
30
    };
230
30
    if let Err(e) = create_queue_rsc(FN_NAME, &state, &create_rsc).await {
231
        return e;
232
30
    }
233
30
    let clear_rsc = ClearQueueResource {
234
30
        scheme,
235
30
        host,
236
30
        username,
237
30
    };
238
30

            
239
30
    // Create network instance.
240
30
    let mut body_uri = uri.clone();
241
30
    transfer_host_uri(&state, &mut body_uri, username);
242
30
    body.data.host_uri = body_uri.to_string();
243
30
    headers.remove(header::CONTENT_LENGTH);
244
30
    let builder = client
245
30
        .request(reqwest::Method::POST, api_path)
246
30
        .headers(headers)
247
30
        .json(&body);
248
30
    let api_req = match builder.build() {
249
        Err(e) => {
250
            let _ = clear_queue_rsc(FN_NAME, &state, &clear_rsc);
251
            let e = format!("generate request error: {}", e);
252
            error!("[{}] {}", FN_NAME, e);
253
            return ErrResp::ErrRsc(Some(e)).into_response();
254
        }
255
30
        Ok(req) => req,
256
    };
257
30
    let api_resp = match client.execute(api_req).await {
258
        Err(e) => {
259
            let _ = clear_queue_rsc(FN_NAME, &state, &clear_rsc);
260
            let e = format!("execute request error: {}", e);
261
            error!("[{}] {}", FN_NAME, e);
262
            return ErrResp::ErrIntMsg(Some(e)).into_response();
263
        }
264
30
        Ok(resp) => match resp.status() {
265
30
            StatusCode::OK => resp,
266
            _ => {
267
                let mut resp_builder = Response::builder().status(resp.status());
268
                for (k, v) in resp.headers() {
269
                    resp_builder = resp_builder.header(k, v);
270
                }
271
                match resp_builder.body(Body::from_stream(resp.bytes_stream())) {
272
                    Err(e) => {
273
                        let e = format!("wrap response body error: {}", e);
274
                        error!("[{}] {}", FN_NAME, e);
275
                        return ErrResp::ErrIntMsg(Some(e)).into_response();
276
                    }
277
                    Ok(resp) => return resp,
278
                }
279
            }
280
        },
281
    };
282
30
    let mut body = match api_resp.json::<response::PostNetwork>().await {
283
        Err(e) => {
284
            let _ = clear_queue_rsc(FN_NAME, &state, &clear_rsc);
285
            let e = format!("unexpected response: {}", e);
286
            return ErrResp::ErrUnknown(Some(e)).into_response();
287
        }
288
30
        Ok(body) => body,
289
30
    };
290
30
    body.data.password = Some(password.to_string());
291
30

            
292
30
    Json(&body).into_response()
293
46
}
294

            
295
/// `GET /{base}/api/v1/network/count`
296
2
async fn get_network_count(state: State<AppState>, req: Request) -> impl IntoResponse {
297
    const FN_NAME: &'static str = "get_network_count";
298
2
    let api_path = format!("{}/api/v1/network/count", state.broker_base.as_str());
299
2
    let client = state.client.clone();
300
2

            
301
2
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
302
2
}
303

            
304
/// `GET /{base}/api/v1/network/list`
305
10
async fn get_network_list(state: State<AppState>, req: Request) -> impl IntoResponse {
306
    const FN_NAME: &'static str = "get_network_list";
307
10
    let api_path = format!("{}/api/v1/network/list", state.broker_base.as_str());
308
10
    let api_path = api_path.as_str();
309
10
    let client = state.client.clone();
310
10

            
311
10
    let mut list_format = ListFormat::Data;
312
10
    if let Some(query_str) = req.uri().query() {
313
8
        let query = match serde_urlencoded::from_str::<Vec<(String, String)>>(query_str) {
314
            Err(e) => {
315
                let e = format!("parse query error: {}", e);
316
                return ErrResp::ErrParam(Some(e)).into_response();
317
            }
318
8
            Ok(query) => query,
319
        };
320
12
        for (k, v) in query.iter() {
321
12
            if k.as_str().eq("format") {
322
6
                if v.as_str().eq("array") {
323
2
                    list_format = ListFormat::Array;
324
4
                } else if v.as_str().eq("csv") {
325
2
                    list_format = ListFormat::Csv;
326
2
                }
327
6
            }
328
        }
329
2
    }
330

            
331
8
    let (api_resp, resp_builder) =
332
10
        match list_api_bridge(FN_NAME, &client, req, api_path, true, "network").await {
333
2
            ListResp::Axum(resp) => return resp,
334
8
            ListResp::ArrayStream(api_resp, resp_builder) => (api_resp, resp_builder),
335
8
        };
336
8

            
337
8
    let mut resp_stream = api_resp.bytes_stream();
338
8
    let body = Body::from_stream(async_stream::stream! {
339
8
        match list_format {
340
8
            ListFormat::Array => yield Ok(Bytes::from("[")),
341
8
            ListFormat::Csv => yield Ok(Bytes::from(CSV_FIELDS)),
342
8
            ListFormat::Data => yield Ok(Bytes::from("{\"data\":[")),
343
8
        }
344
8
        let mut first_sent = false;
345
8

            
346
8
        let mut buffer = BytesMut::new();
347
8
        while let Some(body) = resp_stream.next().await {
348
8
            match body {
349
8
                Err(e) => {
350
8
                    error!("[{}] get body error: {}", FN_NAME, e);
351
8
                    let err: Box<dyn StdError + Send + Sync> = Box::new(e);
352
8
                    yield Err(err);
353
8
                    break;
354
8
                }
355
8
                Ok(body) => buffer.extend_from_slice(&body[..]),
356
8
            }
357
8

            
358
8
            let mut json_stream = Deserializer::from_slice(&buffer[..]).into_iter::<Network>();
359
8
            let mut index = 0;
360
8
            let mut finish = false;
361
8
            loop {
362
8
                if let Some(Ok(mut v)) = json_stream.next() {
363
8
                    v.host_uri = match Url::parse(v.host_uri.as_str()) {
364
8
                        Err(e) => {
365
8
                            error!("[{}] parse body hostUri error: {}", FN_NAME, e);
366
8
                            let err: Box<dyn StdError + Send + Sync> = Box::new(e);
367
8
                            yield Err(err);
368
8
                            finish = true;
369
8
                            break;
370
8
                        }
371
8
                        Ok(uri) => trunc_host_uri(&uri),
372
8
                    };
373
8
                    match list_format {
374
8
                        ListFormat::Array | ListFormat::Data => match serde_json::to_string(&v) {
375
8
                            Err(e) =>{
376
8
                                error!("[{}] serialize JSON error: {}", FN_NAME, e);
377
8
                                let err: Box<dyn StdError + Send + Sync> = Box::new(e);
378
8
                                yield Err(err);
379
8
                                finish = true;
380
8
                                break;
381
8
                            }
382
8
                            Ok(v) => {
383
8
                                match first_sent {
384
8
                                    false => first_sent = true,
385
8
                                    true => yield Ok(Bytes::from(",")),
386
8
                                }
387
8
                                yield Ok(Bytes::copy_from_slice(v.as_str().as_bytes()));
388
8
                            }
389
8
                        }
390
8
                        ListFormat::Csv => {
391
8
                            let mut item = CsvItem{
392
8
                                network_id: v.network_id,
393
8
                                code: v.code,
394
8
                                unit_id: v.unit_id,
395
8
                                unit_code: v.unit_code,
396
8
                                created_at: v.created_at,
397
8
                                modified_at: v.modified_at,
398
8
                                host_uri: v.host_uri,
399
8
                                name: v.name,
400
8
                                info: None,
401
8
                            };
402
8
                            if let Ok(info_str) = serde_json::to_string(&v.info) {
403
8
                                item.info = Some(info_str);
404
8
                            }
405
8
                            let mut writer =
406
8
                                WriterBuilder::new().has_headers(false).from_writer(vec![]);
407
8
                            if let Err(e) = writer.serialize(item) {
408
8
                                error!("[{}] serialize CSV error: {}", FN_NAME, e);
409
8
                                let err: Box<dyn StdError + Send + Sync> = Box::new(e);
410
8
                                yield Err(err);
411
8
                                finish = true;
412
8
                                break;
413
8
                            }
414
8
                            match writer.into_inner() {
415
8
                                Err(e) => {
416
8
                                    error!("[{}] serialize bytes error: {}", FN_NAME, e);
417
8
                                    let err: Box<dyn StdError + Send + Sync> = Box::new(e);
418
8
                                    yield Err(err);
419
8
                                    finish = true;
420
8
                                    break;
421
8
                                }
422
8
                                Ok(row) => yield Ok(Bytes::copy_from_slice(row.as_slice())),
423
8
                            }
424
8
                        }
425
8
                    }
426
8
                    continue;
427
8
                }
428
8
                let offset = json_stream.byte_offset();
429
8
                if buffer.len() <= index + offset {
430
8
                    index = buffer.len();
431
8
                    break;
432
8
                }
433
8
                match buffer[index+offset] {
434
8
                    b'[' | b',' => {
435
8
                        index += offset + 1;
436
8
                        if buffer.len() <= index {
437
8
                            break;
438
8
                        }
439
8
                        json_stream =
440
8
                            Deserializer::from_slice(&buffer[index..]).into_iter::<Network>();
441
8
                    }
442
8
                    b']' => {
443
8
                        finish = true;
444
8
                        break;
445
8
                    }
446
8
                    _ => break,
447
8
                }
448
8
            }
449
8
            if finish {
450
8
                match list_format {
451
8
                    ListFormat::Array => yield Ok(Bytes::from("]")),
452
8
                    ListFormat::Csv => (),
453
8
                    ListFormat::Data => yield Ok(Bytes::from("]}")),
454
8
                }
455
8
                break;
456
8
            }
457
8
            buffer = buffer.split_off(index);
458
8
        }
459
8
    });
460
8
    match resp_builder.body(body) {
461
        Err(e) => ErrResp::ErrRsc(Some(e.to_string())).into_response(),
462
8
        Ok(resp) => resp,
463
    }
464
10
}
465

            
466
/// `GET /{base}/api/v1/network/{networkId}`
467
18
async fn get_network(
468
18
    state: State<AppState>,
469
18
    Path(param): Path<NetworkIdPath>,
470
18
    req: Request,
471
18
) -> impl IntoResponse {
472
    const FN_NAME: &'static str = "get_network";
473
18
    let broker_base = state.broker_base.as_str();
474
18
    let client = state.client.clone();
475
18
    let token = match req.headers().get(header::AUTHORIZATION) {
476
        None => {
477
2
            let e = "missing Authorization".to_string();
478
2
            return ErrResp::ErrParam(Some(e)).into_response();
479
        }
480
16
        Some(value) => value.clone(),
481
    };
482

            
483
16
    let (mut network, uri, host) = match get_network_inner(
484
16
        FN_NAME,
485
16
        &client,
486
16
        broker_base,
487
16
        param.network_id.as_str(),
488
16
        &token,
489
16
    )
490
16
    .await
491
    {
492
2
        Err(e) => return e,
493
14
        Ok((network, uri, host)) => (network, uri, host),
494
14
    };
495
14

            
496
14
    let host = host.as_str();
497
14
    let scheme = uri.scheme();
498
14
    if scheme.eq("amqp") || scheme.eq("amqps") {
499
8
        let AmqpState::RabbitMq(opts) = &state.amqp;
500
8
        let unit_code = match network.unit_code.as_ref() {
501
            None => "",
502
8
            Some(unit_code) => unit_code.as_str(),
503
        };
504
8
        let username = mq::to_username(QueueType::Network, unit_code, network.code.as_str());
505
8
        let username = username.as_str();
506
8
        match rabbitmq::get_policies(&client, opts, host, username).await {
507
            Err(e) => {
508
                error!("[{}] get {} policies error: {}", FN_NAME, username, e);
509
                return e.into_response();
510
            }
511
8
            Ok(policies) => {
512
8
                network.ttl = policies.ttl;
513
8
                network.length = policies.length;
514
8
            }
515
        }
516
6
    }
517
14
    network.host_uri = trunc_host_uri(&uri);
518
14
    Json(&response::GetNetwork { data: network }).into_response()
519
18
}
520

            
521
/// `PATCH /{base}/api/v1/network/{networkId}`
522
26
async fn patch_network(
523
26
    state: State<AppState>,
524
26
    headers: HeaderMap,
525
26
    Path(param): Path<NetworkIdPath>,
526
26
    Json(body): Json<request::PatchNetworkBody>,
527
26
) -> impl IntoResponse {
528
    const FN_NAME: &'static str = "patch_network";
529
26
    let broker_base = state.broker_base.as_str();
530
26
    let client = state.client.clone();
531
26
    let token = match headers.get(header::AUTHORIZATION) {
532
        None => {
533
2
            let e = "missing Authorization".to_string();
534
2
            return ErrResp::ErrParam(Some(e)).into_response();
535
        }
536
24
        Some(value) => value.clone(),
537
24
    };
538
24

            
539
24
    let data = &body.data;
540
24
    if data.host_uri.is_none()
541
12
        && data.name.is_none()
542
8
        && data.info.is_none()
543
8
        && data.ttl.is_none()
544
6
        && data.length.is_none()
545
6
        && data.password.is_none()
546
    {
547
2
        return ErrResp::ErrParam(Some("at least one parameter".to_string())).into_response();
548
22
    }
549

            
550
22
    let (network, uri, hostname) = match get_network_inner(
551
22
        FN_NAME,
552
22
        &client,
553
22
        broker_base,
554
22
        param.network_id.as_str(),
555
22
        &token,
556
22
    )
557
22
    .await
558
    {
559
2
        Err(e) => return e,
560
20
        Ok((network, uri, hostname)) => (network, uri, hostname),
561
20
    };
562
20

            
563
20
    let mut patch_data = request::PatchNetworkData {
564
20
        name: data.name.clone(),
565
20
        info: data.info.clone(),
566
20
        ..Default::default()
567
20
    };
568
20
    let mut patch_host: Option<PatchHost> = None;
569
20
    if let Some(host) = data.host_uri.as_ref() {
570
12
        if !strings::is_uri(host) {
571
2
            return ErrResp::ErrParam(Some("invalid `hostUri`".to_string())).into_response();
572
10
        }
573
10
        // Change to the new broker host.
574
10
        if !cmp_host_uri(network.host_uri.as_str(), host.as_str()) {
575
10
            let password = match data.password.as_ref() {
576
                None => {
577
2
                    let e = "missing `password`".to_string();
578
2
                    return ErrResp::ErrParam(Some(e)).into_response();
579
                }
580
8
                Some(password) => match password.len() {
581
                    0 => {
582
2
                        let e = "missing `password`".to_string();
583
2
                        return ErrResp::ErrParam(Some(e)).into_response();
584
                    }
585
6
                    _ => password,
586
                },
587
            };
588
6
            let mut new_host_uri = match Url::parse(host.as_str()) {
589
                Err(e) => {
590
                    let e = format!("invalid `hostUri`: {}", e);
591
                    return ErrResp::ErrParam(Some(e)).into_response();
592
                }
593
6
                Ok(uri) => match uri.host_str() {
594
                    None => {
595
2
                        let e = "invalid `hostUri`".to_string();
596
2
                        return ErrResp::ErrParam(Some(e)).into_response();
597
                    }
598
4
                    Some(_) => uri,
599
                },
600
            };
601

            
602
4
            let unit_code = match network.unit_code.as_ref() {
603
                None => "",
604
4
                Some(unit_code) => unit_code.as_str(),
605
            };
606
4
            let code = network.code.as_str();
607
4
            let username = mq::to_username(QueueType::Network, unit_code, code);
608
4
            let resource = CreateQueueResource {
609
4
                scheme: new_host_uri.scheme(),
610
4
                host: new_host_uri.host_str().unwrap(),
611
4
                username: username.as_str(),
612
4
                password: password.as_str(),
613
4
                ttl: data.ttl,
614
4
                length: data.length,
615
4
                q_type: QueueType::Network,
616
4
            };
617
4
            if let Err(e) = create_queue_rsc(FN_NAME, &state, &resource).await {
618
                return e;
619
4
            }
620
4
            let resource = ClearQueueResource {
621
4
                scheme: uri.scheme(),
622
4
                host: uri.host_str().unwrap(),
623
4
                username: username.as_str(),
624
4
            };
625
4
            let _ = clear_queue_rsc(FN_NAME, &state, &resource).await;
626

            
627
4
            transfer_host_uri(&state, &mut new_host_uri, username.as_str());
628
4
            patch_data.host_uri = Some(new_host_uri.to_string());
629
4
            patch_host = Some(PatchHost {
630
4
                host_uri: new_host_uri,
631
4
                username,
632
4
            });
633
        }
634
8
    }
635

            
636
    // Send request body to the sylvia-iot-broker.
637
12
    if patch_data.host_uri.is_some() || patch_data.name.is_some() || patch_data.info.is_some() {
638
6
        let network_id = param.network_id.as_str();
639
6
        let uri = format!("{}/api/v1/network/{}", broker_base, network_id);
640
6
        let mut builder = client
641
6
            .request(reqwest::Method::PATCH, uri)
642
6
            .header(reqwest::header::AUTHORIZATION, &token)
643
6
            .json(&request::PatchNetworkBody { data: patch_data });
644
6
        if let Some(content_type) = headers.get(header::CONTENT_TYPE) {
645
6
            builder = builder.header(reqwest::header::CONTENT_TYPE, content_type);
646
6
        }
647
6
        let api_req = match builder.build() {
648
            Err(e) => {
649
                clear_patch_host(FN_NAME, &state, &patch_host).await;
650
                let e = format!("generate request error: {}", e);
651
                error!("[{}] {}", FN_NAME, e);
652
                return ErrResp::ErrRsc(Some(e)).into_response();
653
            }
654
6
            Ok(req) => req,
655
        };
656
6
        let api_resp = match client.execute(api_req).await {
657
            Err(e) => {
658
                clear_patch_host(FN_NAME, &state, &patch_host).await;
659
                let e = format!("execute request error: {}", e);
660
                error!("[{}] {}", FN_NAME, e);
661
                return ErrResp::ErrIntMsg(Some(e)).into_response();
662
            }
663
6
            Ok(resp) => resp,
664
6
        };
665
6

            
666
6
        let status_code = api_resp.status();
667
6
        if status_code != StatusCode::NO_CONTENT {
668
            clear_patch_host(FN_NAME, &state, &patch_host).await;
669
            let mut resp_builder = Response::builder().status(status_code);
670
            for (k, v) in api_resp.headers() {
671
                resp_builder = resp_builder.header(k, v);
672
            }
673
            match resp_builder.body(Body::from_stream(api_resp.bytes_stream())) {
674
                Err(e) => {
675
                    let e = format!("wrap response body error: {}", e);
676
                    error!("[{}] {}", FN_NAME, e);
677
                    return ErrResp::ErrIntMsg(Some(e)).into_response();
678
                }
679
                Ok(resp) => return resp,
680
            }
681
6
        }
682
6
    }
683

            
684
12
    if let Some(host) = patch_host {
685
4
        let resource = ClearQueueResource {
686
4
            scheme: uri.scheme(),
687
4
            host: uri.host_str().unwrap(),
688
4
            username: host.username.as_str(),
689
4
        };
690
4
        let _ = clear_queue_rsc(FN_NAME, &state, &resource).await;
691
4
        return StatusCode::NO_CONTENT.into_response();
692
8
    } else if data.ttl.is_none() && data.length.is_none() && data.password.is_none() {
693
2
        return StatusCode::NO_CONTENT.into_response();
694
6
    }
695

            
696
    // Update broker information without changing hostUri.
697
6
    if let Some(password) = data.password.as_ref() {
698
6
        if password.len() == 0 {
699
2
            let e = "missing `password`".to_string();
700
2
            return ErrResp::ErrParam(Some(e)).into_response();
701
4
        }
702
    }
703
4
    let unit_code = match network.unit_code.as_ref() {
704
        None => "",
705
4
        Some(unit_code) => unit_code.as_str(),
706
    };
707
4
    let code = network.code.as_str();
708
4
    let hostname = hostname.as_str();
709
4
    let username = mq::to_username(QueueType::Network, unit_code, code);
710
4
    let username = username.as_str();
711
4
    match uri.scheme() {
712
4
        "amqp" | "amqps" => match &state.amqp {
713
2
            AmqpState::RabbitMq(opts) => {
714
2
                if data.ttl.is_some() || data.length.is_some() {
715
2
                    let policies = rabbitmq::BrokerPolicies {
716
2
                        ttl: data.ttl,
717
2
                        length: data.length,
718
2
                    };
719
                    if let Err(e) =
720
2
                        rabbitmq::put_policies(&client, opts, hostname, username, &policies).await
721
                    {
722
                        let e = format!("patch RabbitMQ error: {}", e);
723
                        error!("[{}] {}", FN_NAME, e);
724
                        return ErrResp::ErrIntMsg(Some(e)).into_response();
725
2
                    }
726
                }
727
2
                if let Some(password) = data.password.as_ref() {
728
2
                    let password = password.as_str();
729
                    if let Err(e) =
730
2
                        rabbitmq::put_user(&client, opts, hostname, username, password).await
731
                    {
732
                        let e = format!("patch RabbitMQ password error: {}", e);
733
                        error!("[{}] {}", FN_NAME, e);
734
                        return ErrResp::ErrIntMsg(Some(e)).into_response();
735
2
                    }
736
                }
737
            }
738
        },
739
2
        "mqtt" | "mqtts" => match &state.mqtt {
740
1
            MqttState::Emqx(opts) => {
741
1
                if let Some(password) = data.password.as_ref() {
742
1
                    let password = password.as_str();
743
                    if let Err(e) =
744
1
                        emqx::put_user(&client, opts, hostname, username, password).await
745
                    {
746
                        let e = format!("patch EMQX password error: {}", e);
747
                        error!("[{}] {}", FN_NAME, e);
748
                        return ErrResp::ErrIntMsg(Some(e)).into_response();
749
1
                    }
750
                }
751
            }
752
1
            MqttState::Rumqttd => {}
753
        },
754
        _ => {}
755
    }
756

            
757
4
    StatusCode::NO_CONTENT.into_response()
758
26
}
759

            
760
/// `DELETE /{base}/api/v1/network/{networkId}`
761
8
async fn delete_network(
762
8
    state: State<AppState>,
763
8
    Path(param): Path<NetworkIdPath>,
764
8
    req: Request,
765
8
) -> impl IntoResponse {
766
    const FN_NAME: &'static str = "delete_network";
767
8
    let broker_base = state.broker_base.as_str();
768
8
    let network_id = param.network_id.as_str();
769
8
    let api_path = format!("{}/api/v1/network/{}", broker_base, network_id);
770
8
    let client = state.client.clone();
771
8
    let token = match req.headers().get(header::AUTHORIZATION) {
772
        None => {
773
2
            let e = "missing Authorization".to_string();
774
2
            return ErrResp::ErrParam(Some(e)).into_response();
775
        }
776
6
        Some(value) => value.clone(),
777
    };
778

            
779
4
    let (network, uri, host) =
780
6
        match get_network_inner(FN_NAME, &client, broker_base, network_id, &token).await {
781
2
            Err(e) => return e,
782
4
            Ok((network, uri, host)) => (network, uri, host),
783
        };
784

            
785
4
    let resp = api_bridge(FN_NAME, &client, req, api_path.as_str()).await;
786
4
    if !resp.status().is_success() {
787
        return resp;
788
4
    }
789

            
790
4
    let unit_code = match network.unit_code.as_ref() {
791
        None => "",
792
4
        Some(unit_code) => unit_code.as_str(),
793
    };
794
4
    let code = network.code.as_str();
795
4
    let username = mq::to_username(QueueType::Network, unit_code, code);
796
4
    let clear_rsc = ClearQueueResource {
797
4
        scheme: uri.scheme(),
798
4
        host: host.as_str(),
799
4
        username: username.as_str(),
800
4
    };
801
4
    if let Err(e) = clear_queue_rsc(FN_NAME, &state, &clear_rsc).await {
802
        return e;
803
4
    }
804
4

            
805
4
    StatusCode::NO_CONTENT.into_response()
806
8
}
807

            
808
/// `GET /{base}/api/v1/network/{networkId}/stats`
809
79
async fn get_network_stats(
810
79
    state: State<AppState>,
811
79
    Path(param): Path<NetworkIdPath>,
812
79
    req: Request,
813
79
) -> impl IntoResponse {
814
    const FN_NAME: &'static str = "get_network";
815
79
    let broker_base = state.broker_base.as_str();
816
79
    let client = state.client.clone();
817
79
    let token = match req.headers().get(header::AUTHORIZATION) {
818
        None => {
819
2
            let e = "missing Authorization".to_string();
820
2
            return ErrResp::ErrParam(Some(e)).into_response();
821
        }
822
77
        Some(value) => value.clone(),
823
    };
824

            
825
77
    let (network, uri, host) = match get_network_inner(
826
77
        FN_NAME,
827
77
        &client,
828
77
        broker_base,
829
77
        param.network_id.as_str(),
830
77
        &token,
831
77
    )
832
77
    .await
833
    {
834
2
        Err(e) => return e,
835
75
        Ok((network, uri, host)) => (network, uri, host),
836
75
    };
837
75

            
838
75
    let host = host.as_str();
839
75
    let scheme = uri.scheme();
840
75
    let data = match scheme {
841
75
        "amqp" | "amqps" => {
842
64
            let AmqpState::RabbitMq(opts) = &state.amqp;
843
64
            let unit_code = match network.unit_code.as_ref() {
844
                None => "",
845
64
                Some(unit_code) => unit_code.as_str(),
846
            };
847
64
            let username = mq::to_username(QueueType::Network, unit_code, network.code.as_str());
848
64
            let username = username.as_str();
849
64
            response::GetNetworkStatsData {
850
64
                dldata: match rabbitmq::stats(&client, opts, host, username, "dldata").await {
851
                    Err(ErrResp::ErrNotFound(_)) => response::Stats {
852
                        consumers: 0,
853
                        messages: 0,
854
                        publish_rate: 0.0,
855
                        deliver_rate: 0.0,
856
                    },
857
                    Err(e) => {
858
                        error!("[{}] get dldata stats error: {}", FN_NAME, e);
859
                        return e.into_response();
860
                    }
861
64
                    Ok(stats) => response::Stats {
862
64
                        consumers: stats.consumers,
863
64
                        messages: stats.messages,
864
64
                        publish_rate: stats.publish_rate,
865
64
                        deliver_rate: stats.deliver_rate,
866
64
                    },
867
                },
868
64
                ctrl: match rabbitmq::stats(&client, opts, host, username, "ctrl").await {
869
                    Err(ErrResp::ErrNotFound(_)) => response::Stats {
870
                        consumers: 0,
871
                        messages: 0,
872
                        publish_rate: 0.0,
873
                        deliver_rate: 0.0,
874
                    },
875
                    Err(e) => {
876
                        error!("[{}] get ctrl stats error: {}", FN_NAME, e);
877
                        return e.into_response();
878
                    }
879
64
                    Ok(stats) => response::Stats {
880
64
                        consumers: stats.consumers,
881
64
                        messages: stats.messages,
882
64
                        publish_rate: stats.publish_rate,
883
64
                        deliver_rate: stats.deliver_rate,
884
64
                    },
885
                },
886
            }
887
        }
888
11
        "mqtt" | "mqtts" => match &state.mqtt {
889
10
            MqttState::Emqx(opts) => {
890
10
                let unit_code = match network.unit_code.as_ref() {
891
                    None => "",
892
10
                    Some(unit_code) => unit_code.as_str(),
893
                };
894
10
                let username =
895
10
                    mq::to_username(QueueType::Network, unit_code, network.code.as_str());
896
10
                let username = username.as_str();
897
10
                response::GetNetworkStatsData {
898
10
                    dldata: match emqx::stats(&client, opts, host, username, "dldata").await {
899
                        Err(e) => {
900
                            error!("[{}] get dldata stats error: {}", FN_NAME, e);
901
                            return e.into_response();
902
                        }
903
10
                        Ok(stats) => response::Stats {
904
10
                            consumers: stats.consumers,
905
10
                            messages: stats.messages,
906
10
                            publish_rate: stats.publish_rate,
907
10
                            deliver_rate: stats.deliver_rate,
908
10
                        },
909
10
                    },
910
10
                    ctrl: match emqx::stats(&client, opts, host, username, "ctrl").await {
911
                        Err(e) => {
912
                            error!("[{}] get ctrl stats error: {}", FN_NAME, e);
913
                            return e.into_response();
914
                        }
915
10
                        Ok(stats) => response::Stats {
916
10
                            consumers: stats.consumers,
917
10
                            messages: stats.messages,
918
10
                            publish_rate: stats.publish_rate,
919
10
                            deliver_rate: stats.deliver_rate,
920
10
                        },
921
                    },
922
                }
923
            }
924
1
            MqttState::Rumqttd => response::GetNetworkStatsData {
925
1
                dldata: response::Stats {
926
1
                    ..Default::default()
927
1
                },
928
1
                ctrl: response::Stats {
929
1
                    ..Default::default()
930
1
                },
931
1
            },
932
        },
933
        _ => {
934
            let e = format!("unsupport scheme {}", scheme);
935
            error!("[{}] {}", FN_NAME, e);
936
            return ErrResp::ErrUnknown(Some(e)).into_response();
937
        }
938
    };
939
75
    Json(&response::GetNetworkStats { data }).into_response()
940
79
}
941

            
942
/// `POST /{base}/api/v1/network/{networkId}/uldata`
943
14
async fn post_network_uldata(
944
14
    state: State<AppState>,
945
14
    headers: HeaderMap,
946
14
    Path(param): Path<NetworkIdPath>,
947
14
    Json(body): Json<request::PostNetworkUlDataBody>,
948
14
) -> impl IntoResponse {
949
    const FN_NAME: &'static str = "post_network_uldata";
950
14
    let broker_base = state.broker_base.as_str();
951
14
    let client = state.client.clone();
952
14
    let token = match headers.get(header::AUTHORIZATION) {
953
        None => {
954
2
            let e = "missing Authorization".to_string();
955
2
            return ErrResp::ErrParam(Some(e)).into_response();
956
        }
957
12
        Some(value) => value.clone(),
958
12
    };
959
12

            
960
12
    if body.data.device_id.len() == 0 {
961
2
        let e = "empty `deviceId` is invalid".to_string();
962
2
        return ErrResp::ErrParam(Some(e)).into_response();
963
10
    }
964
10
    if let Err(e) = hex::decode(body.data.payload.as_str()) {
965
2
        let e = format!("`payload` is not hexadecimal string: {}", e);
966
2
        return ErrResp::ErrParam(Some(e)).into_response();
967
8
    }
968

            
969
8
    let (network, uri, hostname) = match get_network_inner(
970
8
        FN_NAME,
971
8
        &client,
972
8
        broker_base,
973
8
        param.network_id.as_str(),
974
8
        &token,
975
8
    )
976
8
    .await
977
    {
978
2
        Err(e) => return e,
979
6
        Ok((network, uri, hostname)) => (network, uri, hostname),
980
    };
981
6
    let device = match get_device_inner(
982
6
        FN_NAME,
983
6
        &client,
984
6
        broker_base,
985
6
        body.data.device_id.as_str(),
986
6
        &token,
987
6
    )
988
6
    .await
989
    {
990
        Err(e) => return e,
991
6
        Ok(device) => match device {
992
            None => {
993
2
                return ErrResp::Custom(
994
2
                    ErrReq::DEVICE_NOT_EXIST.0,
995
2
                    ErrReq::DEVICE_NOT_EXIST.1,
996
2
                    None,
997
2
                )
998
2
                .into_response()
999
            }
4
            Some(device) => device,
4
        },
4
    };
4

            
4
    let hostname = hostname.as_str();
4
    let scheme = uri.scheme();
4
    let payload = match serde_json::to_string(&UlData {
4
        time: strings::time_str(&Utc::now()),
4
        network_addr: device.network_addr,
4
        data: body.data.payload.clone(),
4
        ..Default::default()
4
    }) {
        Err(e) => {
            let e = format!("encode JSON error: {}", e);
            error!("[{}] {}", FN_NAME, e);
            return ErrResp::ErrRsc(Some(e)).into_response();
        }
4
        Ok(payload) => general_purpose::STANDARD.encode(payload),
4
    };
4
    match scheme {
4
        "amqp" | "amqps" => {
2
            let AmqpState::RabbitMq(opts) = &state.amqp;
2
            let unit_code = match network.unit_code.as_ref() {
                None => "",
2
                Some(unit_code) => unit_code.as_str(),
            };
2
            let username = mq::to_username(QueueType::Network, unit_code, network.code.as_str());
2
            let username = username.as_str();
            if let Err(e) =
2
                rabbitmq::publish_message(&client, opts, hostname, username, "uldata", payload)
2
                    .await
            {
                return e.into_response();
2
            }
        }
2
        "mqtt" | "mqtts" => match &state.mqtt {
1
            MqttState::Emqx(opts) => {
1
                let unit_code = match network.unit_code.as_ref() {
                    None => "",
1
                    Some(unit_code) => unit_code.as_str(),
                };
1
                let username =
1
                    mq::to_username(QueueType::Network, unit_code, network.code.as_str());
1
                let username = username.as_str();
                if let Err(e) =
1
                    emqx::publish_message(&client, opts, hostname, username, "uldata", payload)
1
                        .await
                {
                    return e.into_response();
1
                }
            }
            MqttState::Rumqttd => {
1
                let e = "not support now".to_string();
1
                return ErrResp::ErrParam(Some(e)).into_response();
            }
        },
        _ => {
            let e = format!("unsupport scheme {}", scheme);
            error!("[{}] {}", FN_NAME, e);
            return ErrResp::ErrUnknown(Some(e)).into_response();
        }
    }
3
    StatusCode::NO_CONTENT.into_response()
14
}
129
async fn get_network_inner(
129
    fn_name: &str,
129
    client: &reqwest::Client,
129
    broker_base: &str,
129
    network_id: &str,
129
    token: &HeaderValue,
129
) -> Result<(response::GetNetworkData, Url, String), Response> {
129
    let uri = format!("{}/api/v1/network/{}", broker_base, network_id);
129
    let resp = get_stream_resp(fn_name, token, &client, uri.as_str()).await?;
119
    let network = match resp.json::<response::GetNetwork>().await {
        Err(e) => {
            let e = format!("wrong response of network: {}", e);
            error!("[{}] {}", fn_name, e);
            return Err(ErrResp::ErrIntMsg(Some(e)).into_response());
        }
119
        Ok(network) => network.data,
    };
119
    let uri = match Url::parse(network.host_uri.as_str()) {
        Err(e) => {
            let e = format!("unexpected hostUri: {}", e);
            error!("[{}] {}", fn_name, e);
            return Err(ErrResp::ErrUnknown(Some(e)).into_response());
        }
119
        Ok(uri) => uri,
    };
119
    let host = match uri.host_str() {
        None => {
            let e = "unexpected hostUri".to_string();
            error!("[{}] {}", fn_name, e);
            return Err(ErrResp::ErrUnknown(Some(e)).into_response());
        }
119
        Some(host) => host.to_string(),
119
    };
119
    Ok((network, uri, host))
129
}
36
async fn check_network_code_inner(
36
    fn_name: &str,
36
    client: &reqwest::Client,
36
    broker_base: &str,
36
    unit_id: &str,
36
    code: &str,
36
    token: &HeaderValue,
36
) -> Result<u64, Response> {
36
    let uri = format!("{}/api/v1/network/count", broker_base);
36
    let req = match client
36
        .request(reqwest::Method::GET, uri)
36
        .header(reqwest::header::AUTHORIZATION, token)
36
        .query(&[("unit", unit_id), ("code", code)])
36
        .build()
    {
        Err(e) => {
            let e = format!("generate request error: {}", e);
            error!("[{}] {}", fn_name, e);
            return Err(ErrResp::ErrRsc(Some(e)).into_response());
        }
36
        Ok(req) => req,
    };
36
    let resp = match client.execute(req).await {
        Err(e) => {
            let e = format!("execute request error: {}", e);
            error!("[{}] {}", fn_name, e);
            return Err(ErrResp::ErrIntMsg(Some(e)).into_response());
        }
36
        Ok(resp) => resp,
36
    };
36

            
36
    match resp.json::<response::GetCount>().await {
        Err(e) => {
            let e = format!("wrong response of network: {}", e);
            error!("[{}] {}", fn_name, e);
            Err(ErrResp::ErrIntMsg(Some(e)).into_response())
        }
36
        Ok(data) => Ok(data.data.count),
    }
36
}