1
use std::error::Error as StdError;
2

            
3
use axum::{
4
    Router,
5
    body::Body,
6
    extract::{Request, State},
7
    http::{HeaderMap, HeaderValue, StatusCode, header},
8
    response::{IntoResponse, Response},
9
    routing,
10
};
11
use base64::{Engine, engine::general_purpose};
12
use bytes::{Bytes, BytesMut};
13
use chrono::Utc;
14
use csv::WriterBuilder;
15
use futures_util::StreamExt;
16
use hex;
17
use log::error;
18
use reqwest;
19
use serde::{Deserialize, Serialize};
20
use serde_json::{Deserializer, Map, Value};
21
use url::Url;
22

            
23
use sylvia_iot_corelib::{
24
    err::ErrResp,
25
    http::{Json, Path},
26
    role::Role,
27
    strings,
28
};
29

            
30
use super::{
31
    super::{AmqpState, ErrReq, MqttState, State as AppState},
32
    ClearQueueResource, CreateQueueResource, ListResp, PatchHost, api_bridge, clear_patch_host,
33
    clear_queue_rsc, cmp_host_uri, create_queue_rsc, get_device_inner, get_stream_resp,
34
    get_tokeninfo_inner, get_unit_inner, list_api_bridge, request, response, transfer_host_uri,
35
    trunc_host_uri,
36
};
37
use crate::libs::mq::{self, QueueType, emqx, rabbitmq};
38

            
39
enum ListFormat {
40
    Array,
41
    Csv,
42
    Data,
43
}
44

            
45
#[derive(Deserialize)]
46
struct NetworkIdPath {
47
    network_id: String,
48
}
49

            
50
#[derive(Deserialize, Serialize)]
51
struct Network {
52
    #[serde(rename = "networkId")]
53
    network_id: String,
54
    code: String,
55
    #[serde(rename = "unitId")]
56
    unit_id: Option<String>,
57
    #[serde(rename = "unitCode")]
58
    unit_code: Option<String>,
59
    #[serde(rename = "createdAt")]
60
    created_at: String,
61
    #[serde(rename = "modifiedAt")]
62
    modified_at: String,
63
    #[serde(rename = "hostUri")]
64
    host_uri: String,
65
    name: String,
66
    info: Map<String, Value>,
67
}
68

            
69
#[derive(Deserialize, Serialize)]
70
struct CsvItem {
71
    #[serde(rename = "networkId")]
72
    network_id: String,
73
    code: String,
74
    #[serde(rename = "unitId")]
75
    unit_id: Option<String>,
76
    #[serde(rename = "unitCode")]
77
    unit_code: Option<String>,
78
    #[serde(rename = "createdAt")]
79
    created_at: String,
80
    #[serde(rename = "modifiedAt")]
81
    modified_at: String,
82
    #[serde(rename = "hostUri")]
83
    host_uri: String,
84
    name: String,
85
    info: Option<String>,
86
}
87

            
88
/// Downlink data from application to broker.
89
#[derive(Default, Serialize)]
90
struct UlData {
91
    time: String,
92
    #[serde(rename = "networkAddr")]
93
    network_addr: String,
94
    data: String,
95
    extension: Option<Map<String, Value>>,
96
}
97

            
98
const CSV_FIELDS: &'static [u8] =
99
    b"\xEF\xBB\xBFnetworkId,code,unitId,createdAt,modifiedAt,hostUri,name,info\n";
100

            
101
506
pub fn new_service(scope_path: &str, state: &AppState) -> Router {
102
506
    Router::new().nest(
103
506
        scope_path,
104
506
        Router::new()
105
506
            .route("/", routing::post(post_network))
106
506
            .route("/count", routing::get(get_network_count))
107
506
            .route("/list", routing::get(get_network_list))
108
506
            .route(
109
506
                "/{network_id}",
110
506
                routing::get(get_network)
111
506
                    .patch(patch_network)
112
506
                    .delete(delete_network),
113
            )
114
506
            .route("/{network_id}/stats", routing::get(get_network_stats))
115
506
            .route("/{network_id}/uldata", routing::post(post_network_uldata))
116
506
            .with_state(state.clone()),
117
    )
118
506
}
119

            
120
/// `POST /{base}/api/v1/network`
121
92
async fn post_network(
122
92
    State(state): State<AppState>,
123
92
    mut headers: HeaderMap,
124
92
    Json(mut body): Json<request::PostNetworkBody>,
125
92
) -> impl IntoResponse {
126
    const FN_NAME: &'static str = "post_network";
127
92
    let broker_base = state.broker_base.as_str();
128
92
    let api_path = format!("{}/api/v1/network", broker_base);
129
92
    let client = state.client.clone();
130
92
    let token = match headers.get(header::AUTHORIZATION) {
131
        None => {
132
4
            let e = "missing Authorization".to_string();
133
4
            return ErrResp::ErrParam(Some(e)).into_response();
134
        }
135
88
        Some(value) => value.clone(),
136
    };
137

            
138
88
    if body.data.unit_id.is_none() {
139
16
        let auth_base = state.auth_base.as_str();
140
16
        let token_info = match get_tokeninfo_inner(FN_NAME, &client, auth_base, &token).await {
141
            Err(e) => return e,
142
16
            Ok(info) => info,
143
        };
144
16
        if !Role::is_role(&token_info.roles, Role::ADMIN)
145
4
            && !Role::is_role(&token_info.roles, Role::MANAGER)
146
        {
147
4
            let e = "missing `unitId`".to_string();
148
4
            return ErrResp::ErrParam(Some(e)).into_response();
149
12
        }
150
72
    }
151

            
152
    // Get unit information to create queue information.
153
84
    let unit_code = match body.data.unit_id.as_ref() {
154
12
        None => "".to_string(),
155
72
        Some(unit_id) => {
156
72
            if unit_id.len() == 0 {
157
4
                return ErrResp::ErrParam(Some(
158
4
                    "`unitId` must with at least one character".to_string(),
159
4
                ))
160
4
                .into_response();
161
68
            }
162
68
            let unit = match get_unit_inner(FN_NAME, &client, broker_base, unit_id, &token).await {
163
                Err(e) => return e,
164
68
                Ok(unit) => match unit {
165
                    None => {
166
4
                        return ErrResp::Custom(
167
4
                            ErrReq::UNIT_NOT_EXIST.0,
168
4
                            ErrReq::UNIT_NOT_EXIST.1,
169
4
                            None,
170
4
                        )
171
4
                        .into_response();
172
                    }
173
64
                    Some(unit) => unit,
174
                },
175
            };
176
64
            unit.code
177
        }
178
    };
179
76
    let code = body.data.code.as_str();
180
76
    if !strings::is_code(code) {
181
4
        return ErrResp::ErrParam(Some(
182
4
            "`code` must be [A-Za-z0-9]{1}[A-Za-z0-9-_]*".to_string(),
183
4
        ))
184
4
        .into_response();
185
72
    }
186
72
    let unit_id = match body.data.unit_id.as_ref() {
187
12
        None => "",
188
60
        Some(unit_id) => unit_id.as_str(),
189
    };
190
72
    match check_network_code_inner(FN_NAME, &client, broker_base, unit_id, code, &token).await {
191
        Err(e) => return e,
192
72
        Ok(count) => match count {
193
68
            0 => (),
194
            _ => {
195
4
                return ErrResp::Custom(ErrReq::NETWORK_EXIST.0, ErrReq::NETWORK_EXIST.1, None)
196
4
                    .into_response();
197
            }
198
        },
199
    }
200
68
    let q_type = QueueType::Network;
201
68
    let username = mq::to_username(q_type, unit_code.as_str(), code);
202
68
    let password = strings::randomstring(8);
203
68
    let uri = match Url::parse(body.data.host_uri.as_str()) {
204
4
        Err(e) => {
205
4
            return ErrResp::ErrParam(Some(format!("invalid `hostUri`: {}", e))).into_response();
206
        }
207
64
        Ok(uri) => uri,
208
    };
209
64
    let host = match uri.host() {
210
        None => {
211
4
            let e = "invalid `hostUri`".to_string();
212
4
            return ErrResp::ErrParam(Some(e)).into_response();
213
        }
214
60
        Some(host) => host.to_string(),
215
    };
216
60
    let scheme = uri.scheme();
217
60
    let host = host.as_str();
218
60
    let username = username.as_str();
219
60
    let password = password.as_str();
220

            
221
    // Create message broker resources.
222
60
    let create_rsc = CreateQueueResource {
223
60
        scheme,
224
60
        host,
225
60
        username,
226
60
        password,
227
60
        ttl: body.data.ttl,
228
60
        length: body.data.length,
229
60
        q_type: QueueType::Network,
230
60
    };
231
60
    if let Err(e) = create_queue_rsc(FN_NAME, &state, &create_rsc).await {
232
        return e;
233
60
    }
234
60
    let clear_rsc = ClearQueueResource {
235
60
        scheme,
236
60
        host,
237
60
        username,
238
60
        q_type: QueueType::Network,
239
60
    };
240

            
241
    // Create network instance.
242
60
    let mut body_uri = uri.clone();
243
60
    transfer_host_uri(&state, &mut body_uri, username);
244
60
    body.data.host_uri = body_uri.to_string();
245
60
    headers.remove(header::CONTENT_LENGTH);
246
60
    let builder = client
247
60
        .request(reqwest::Method::POST, api_path)
248
60
        .headers(headers)
249
60
        .json(&body);
250
60
    let api_req = match builder.build() {
251
        Err(e) => {
252
            let _ = clear_queue_rsc(FN_NAME, &state, &clear_rsc);
253
            let e = format!("generate request error: {}", e);
254
            error!("[{}] {}", FN_NAME, e);
255
            return ErrResp::ErrRsc(Some(e)).into_response();
256
        }
257
60
        Ok(req) => req,
258
    };
259
60
    let api_resp = match client.execute(api_req).await {
260
        Err(e) => {
261
            let _ = clear_queue_rsc(FN_NAME, &state, &clear_rsc);
262
            let e = format!("execute request error: {}", e);
263
            error!("[{}] {}", FN_NAME, e);
264
            return ErrResp::ErrIntMsg(Some(e)).into_response();
265
        }
266
60
        Ok(resp) => match resp.status() {
267
60
            StatusCode::OK => resp,
268
            _ => {
269
                let mut resp_builder = Response::builder().status(resp.status());
270
                for (k, v) in resp.headers() {
271
                    resp_builder = resp_builder.header(k, v);
272
                }
273
                match resp_builder.body(Body::from_stream(resp.bytes_stream())) {
274
                    Err(e) => {
275
                        let e = format!("wrap response body error: {}", e);
276
                        error!("[{}] {}", FN_NAME, e);
277
                        return ErrResp::ErrIntMsg(Some(e)).into_response();
278
                    }
279
                    Ok(resp) => return resp,
280
                }
281
            }
282
        },
283
    };
284
60
    let mut body = match api_resp.json::<response::PostNetwork>().await {
285
        Err(e) => {
286
            let _ = clear_queue_rsc(FN_NAME, &state, &clear_rsc);
287
            let e = format!("unexpected response: {}", e);
288
            return ErrResp::ErrUnknown(Some(e)).into_response();
289
        }
290
60
        Ok(body) => body,
291
    };
292
60
    body.data.password = Some(password.to_string());
293

            
294
60
    Json(&body).into_response()
295
92
}
296

            
297
/// `GET /{base}/api/v1/network/count`
298
4
async fn get_network_count(state: State<AppState>, req: Request) -> impl IntoResponse {
299
    const FN_NAME: &'static str = "get_network_count";
300
4
    let api_path = format!("{}/api/v1/network/count", state.broker_base.as_str());
301
4
    let client = state.client.clone();
302

            
303
4
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
304
4
}
305

            
306
/// `GET /{base}/api/v1/network/list`
307
20
async fn get_network_list(state: State<AppState>, req: Request) -> impl IntoResponse {
308
    const FN_NAME: &'static str = "get_network_list";
309
20
    let api_path = format!("{}/api/v1/network/list", state.broker_base.as_str());
310
20
    let api_path = api_path.as_str();
311
20
    let client = state.client.clone();
312

            
313
20
    let mut list_format = ListFormat::Data;
314
20
    if let Some(query_str) = req.uri().query() {
315
16
        let query = match serde_urlencoded::from_str::<Vec<(String, String)>>(query_str) {
316
            Err(e) => {
317
                let e = format!("parse query error: {}", e);
318
                return ErrResp::ErrParam(Some(e)).into_response();
319
            }
320
16
            Ok(query) => query,
321
        };
322
24
        for (k, v) in query.iter() {
323
24
            if k.as_str().eq("format") {
324
12
                if v.as_str().eq("array") {
325
4
                    list_format = ListFormat::Array;
326
8
                } else if v.as_str().eq("csv") {
327
4
                    list_format = ListFormat::Csv;
328
4
                }
329
12
            }
330
        }
331
4
    }
332

            
333
16
    let (api_resp, resp_builder) =
334
20
        match list_api_bridge(FN_NAME, &client, req, api_path, true, "network").await {
335
4
            ListResp::Axum(resp) => return resp,
336
16
            ListResp::ArrayStream(api_resp, resp_builder) => (api_resp, resp_builder),
337
        };
338

            
339
16
    let mut resp_stream = api_resp.bytes_stream();
340
16
    let body = Body::from_stream(async_stream::stream! {
341
16
        match list_format {
342
            ListFormat::Array => yield Ok(Bytes::from("[")),
343
            ListFormat::Csv => yield Ok(Bytes::from(CSV_FIELDS)),
344
            ListFormat::Data => yield Ok(Bytes::from("{\"data\":[")),
345
        }
346
        let mut first_sent = false;
347

            
348
        let mut buffer = BytesMut::new();
349
        while let Some(body) = resp_stream.next().await {
350
            match body {
351
                Err(e) => {
352
                    error!("[{}] get body error: {}", FN_NAME, e);
353
                    let err: Box<dyn StdError + Send + Sync> = Box::new(e);
354
                    yield Err(err);
355
                    break;
356
                }
357
                Ok(body) => buffer.extend_from_slice(&body[..]),
358
            }
359

            
360
            let mut json_stream = Deserializer::from_slice(&buffer[..]).into_iter::<Network>();
361
            let mut index = 0;
362
            let mut finish = false;
363
            loop {
364
                if let Some(Ok(mut v)) = json_stream.next() {
365
                    v.host_uri = match Url::parse(v.host_uri.as_str()) {
366
                        Err(e) => {
367
                            error!("[{}] parse body hostUri error: {}", FN_NAME, e);
368
                            let err: Box<dyn StdError + Send + Sync> = Box::new(e);
369
                            yield Err(err);
370
                            finish = true;
371
                            break;
372
                        }
373
                        Ok(uri) => trunc_host_uri(&uri),
374
                    };
375
16
                    match list_format {
376
                        ListFormat::Array | ListFormat::Data => match serde_json::to_string(&v) {
377
                            Err(e) =>{
378
                                error!("[{}] serialize JSON error: {}", FN_NAME, e);
379
                                let err: Box<dyn StdError + Send + Sync> = Box::new(e);
380
                                yield Err(err);
381
                                finish = true;
382
                                break;
383
                            }
384
                            Ok(v) => {
385
                                match first_sent {
386
                                    false => first_sent = true,
387
                                    true => yield Ok(Bytes::from(",")),
388
                                }
389
                                yield Ok(Bytes::copy_from_slice(v.as_str().as_bytes()));
390
                            }
391
                        }
392
                        ListFormat::Csv => {
393
                            let mut item = CsvItem{
394
                                network_id: v.network_id,
395
                                code: v.code,
396
                                unit_id: v.unit_id,
397
                                unit_code: v.unit_code,
398
                                created_at: v.created_at,
399
                                modified_at: v.modified_at,
400
                                host_uri: v.host_uri,
401
                                name: v.name,
402
                                info: None,
403
                            };
404
                            if let Ok(info_str) = serde_json::to_string(&v.info) {
405
                                item.info = Some(info_str);
406
                            }
407
                            let mut writer =
408
                                WriterBuilder::new().has_headers(false).from_writer(vec![]);
409
                            if let Err(e) = writer.serialize(item) {
410
                                error!("[{}] serialize CSV error: {}", FN_NAME, e);
411
                                let err: Box<dyn StdError + Send + Sync> = Box::new(e);
412
                                yield Err(err);
413
                                finish = true;
414
                                break;
415
                            }
416
                            match writer.into_inner() {
417
                                Err(e) => {
418
                                    error!("[{}] serialize bytes error: {}", FN_NAME, e);
419
                                    let err: Box<dyn StdError + Send + Sync> = Box::new(e);
420
                                    yield Err(err);
421
                                    finish = true;
422
                                    break;
423
                                }
424
                                Ok(row) => yield Ok(Bytes::copy_from_slice(row.as_slice())),
425
                            }
426
                        }
427
                    }
428
                    continue;
429
                }
430
                let offset = json_stream.byte_offset();
431
                if buffer.len() <= index + offset {
432
                    index = buffer.len();
433
                    break;
434
                }
435
                match buffer[index+offset] {
436
                    b'[' | b',' => {
437
                        index += offset + 1;
438
                        if buffer.len() <= index {
439
                            break;
440
                        }
441
                        json_stream =
442
                            Deserializer::from_slice(&buffer[index..]).into_iter::<Network>();
443
                    }
444
                    b']' => {
445
                        finish = true;
446
                        break;
447
                    }
448
                    _ => break,
449
                }
450
            }
451
            if finish {
452
16
                match list_format {
453
                    ListFormat::Array => yield Ok(Bytes::from("]")),
454
                    ListFormat::Csv => (),
455
                    ListFormat::Data => yield Ok(Bytes::from("]}")),
456
                }
457
                break;
458
            }
459
            buffer = buffer.split_off(index);
460
        }
461
    });
462
16
    match resp_builder.body(body) {
463
        Err(e) => ErrResp::ErrRsc(Some(e.to_string())).into_response(),
464
16
        Ok(resp) => resp,
465
    }
466
20
}
467

            
468
/// `GET /{base}/api/v1/network/{networkId}`
469
36
async fn get_network(
470
36
    state: State<AppState>,
471
36
    Path(param): Path<NetworkIdPath>,
472
36
    req: Request,
473
36
) -> impl IntoResponse {
474
    const FN_NAME: &'static str = "get_network";
475
36
    let broker_base = state.broker_base.as_str();
476
36
    let client = state.client.clone();
477
36
    let token = match req.headers().get(header::AUTHORIZATION) {
478
        None => {
479
4
            let e = "missing Authorization".to_string();
480
4
            return ErrResp::ErrParam(Some(e)).into_response();
481
        }
482
32
        Some(value) => value.clone(),
483
    };
484

            
485
32
    let (mut network, uri, host) = match get_network_inner(
486
32
        FN_NAME,
487
32
        &client,
488
32
        broker_base,
489
32
        param.network_id.as_str(),
490
32
        &token,
491
    )
492
32
    .await
493
    {
494
4
        Err(e) => return e,
495
28
        Ok((network, uri, host)) => (network, uri, host),
496
    };
497

            
498
28
    let host = host.as_str();
499
28
    let scheme = uri.scheme();
500
28
    if scheme.eq("amqp") || scheme.eq("amqps") {
501
16
        let AmqpState::RabbitMq(opts) = &state.amqp;
502
16
        let unit_code = match network.unit_code.as_ref() {
503
            None => "",
504
16
            Some(unit_code) => unit_code.as_str(),
505
        };
506
16
        let username = mq::to_username(QueueType::Network, unit_code, network.code.as_str());
507
16
        let username = username.as_str();
508
16
        match rabbitmq::get_policies(&client, opts, host, username).await {
509
            Err(e) => {
510
                error!("[{}] get {} policies error: {}", FN_NAME, username, e);
511
                return e.into_response();
512
            }
513
16
            Ok(policies) => {
514
16
                network.ttl = policies.ttl;
515
16
                network.length = policies.length;
516
16
            }
517
        }
518
12
    }
519
28
    network.host_uri = trunc_host_uri(&uri);
520
28
    Json(&response::GetNetwork { data: network }).into_response()
521
36
}
522

            
523
/// `PATCH /{base}/api/v1/network/{networkId}`
524
52
async fn patch_network(
525
52
    state: State<AppState>,
526
52
    headers: HeaderMap,
527
52
    Path(param): Path<NetworkIdPath>,
528
52
    Json(body): Json<request::PatchNetworkBody>,
529
52
) -> impl IntoResponse {
530
    const FN_NAME: &'static str = "patch_network";
531
52
    let broker_base = state.broker_base.as_str();
532
52
    let client = state.client.clone();
533
52
    let token = match headers.get(header::AUTHORIZATION) {
534
        None => {
535
4
            let e = "missing Authorization".to_string();
536
4
            return ErrResp::ErrParam(Some(e)).into_response();
537
        }
538
48
        Some(value) => value.clone(),
539
    };
540

            
541
48
    let data = &body.data;
542
48
    if data.host_uri.is_none()
543
24
        && data.name.is_none()
544
16
        && data.info.is_none()
545
16
        && data.ttl.is_none()
546
12
        && data.length.is_none()
547
12
        && data.password.is_none()
548
    {
549
4
        return ErrResp::ErrParam(Some("at least one parameter".to_string())).into_response();
550
44
    }
551

            
552
44
    let (network, uri, hostname) = match get_network_inner(
553
44
        FN_NAME,
554
44
        &client,
555
44
        broker_base,
556
44
        param.network_id.as_str(),
557
44
        &token,
558
    )
559
44
    .await
560
    {
561
4
        Err(e) => return e,
562
40
        Ok((network, uri, hostname)) => (network, uri, hostname),
563
    };
564

            
565
40
    let mut patch_data = request::PatchNetworkData {
566
40
        name: data.name.clone(),
567
40
        info: data.info.clone(),
568
40
        ..Default::default()
569
40
    };
570
40
    let mut patch_host: Option<PatchHost> = None;
571
40
    if let Some(host) = data.host_uri.as_ref() {
572
24
        if !strings::is_uri(host) {
573
4
            return ErrResp::ErrParam(Some("invalid `hostUri`".to_string())).into_response();
574
20
        }
575
        // Change to the new broker host.
576
20
        if !cmp_host_uri(network.host_uri.as_str(), host.as_str()) {
577
20
            let password = match data.password.as_ref() {
578
                None => {
579
4
                    let e = "missing `password`".to_string();
580
4
                    return ErrResp::ErrParam(Some(e)).into_response();
581
                }
582
16
                Some(password) => match password.len() {
583
                    0 => {
584
4
                        let e = "missing `password`".to_string();
585
4
                        return ErrResp::ErrParam(Some(e)).into_response();
586
                    }
587
12
                    _ => password,
588
                },
589
            };
590
12
            let mut new_host_uri = match Url::parse(host.as_str()) {
591
                Err(e) => {
592
                    let e = format!("invalid `hostUri`: {}", e);
593
                    return ErrResp::ErrParam(Some(e)).into_response();
594
                }
595
12
                Ok(uri) => match uri.host_str() {
596
                    None => {
597
4
                        let e = "invalid `hostUri`".to_string();
598
4
                        return ErrResp::ErrParam(Some(e)).into_response();
599
                    }
600
8
                    Some(_) => uri,
601
                },
602
            };
603

            
604
8
            let unit_code = match network.unit_code.as_ref() {
605
                None => "",
606
8
                Some(unit_code) => unit_code.as_str(),
607
            };
608
8
            let code = network.code.as_str();
609
8
            let username = mq::to_username(QueueType::Network, unit_code, code);
610
8
            let resource = CreateQueueResource {
611
8
                scheme: new_host_uri.scheme(),
612
8
                host: new_host_uri.host_str().unwrap(),
613
8
                username: username.as_str(),
614
8
                password: password.as_str(),
615
8
                ttl: data.ttl,
616
8
                length: data.length,
617
8
                q_type: QueueType::Network,
618
8
            };
619
8
            if let Err(e) = create_queue_rsc(FN_NAME, &state, &resource).await {
620
                return e;
621
8
            }
622
8
            let resource = ClearQueueResource {
623
8
                scheme: uri.scheme(),
624
8
                host: uri.host_str().unwrap(),
625
8
                username: username.as_str(),
626
8
                q_type: QueueType::Network,
627
8
            };
628
8
            let _ = clear_queue_rsc(FN_NAME, &state, &resource).await;
629

            
630
8
            transfer_host_uri(&state, &mut new_host_uri, username.as_str());
631
8
            patch_data.host_uri = Some(new_host_uri.to_string());
632
8
            patch_host = Some(PatchHost {
633
8
                host_uri: new_host_uri,
634
8
                username,
635
8
            });
636
        }
637
16
    }
638

            
639
    // Send request body to the sylvia-iot-broker.
640
24
    if patch_data.host_uri.is_some() || patch_data.name.is_some() || patch_data.info.is_some() {
641
12
        let network_id = param.network_id.as_str();
642
12
        let uri = format!("{}/api/v1/network/{}", broker_base, network_id);
643
12
        let mut builder = client
644
12
            .request(reqwest::Method::PATCH, uri)
645
12
            .header(reqwest::header::AUTHORIZATION, &token)
646
12
            .json(&request::PatchNetworkBody { data: patch_data });
647
12
        if let Some(content_type) = headers.get(header::CONTENT_TYPE) {
648
12
            builder = builder.header(reqwest::header::CONTENT_TYPE, content_type);
649
12
        }
650
12
        let api_req = match builder.build() {
651
            Err(e) => {
652
                clear_patch_host(FN_NAME, &state, &patch_host, QueueType::Network).await;
653
                let e = format!("generate request error: {}", e);
654
                error!("[{}] {}", FN_NAME, e);
655
                return ErrResp::ErrRsc(Some(e)).into_response();
656
            }
657
12
            Ok(req) => req,
658
        };
659
12
        let api_resp = match client.execute(api_req).await {
660
            Err(e) => {
661
                clear_patch_host(FN_NAME, &state, &patch_host, QueueType::Network).await;
662
                let e = format!("execute request error: {}", e);
663
                error!("[{}] {}", FN_NAME, e);
664
                return ErrResp::ErrIntMsg(Some(e)).into_response();
665
            }
666
12
            Ok(resp) => resp,
667
        };
668

            
669
12
        let status_code = api_resp.status();
670
12
        if status_code != StatusCode::NO_CONTENT {
671
            clear_patch_host(FN_NAME, &state, &patch_host, QueueType::Network).await;
672
            let mut resp_builder = Response::builder().status(status_code);
673
            for (k, v) in api_resp.headers() {
674
                resp_builder = resp_builder.header(k, v);
675
            }
676
            match resp_builder.body(Body::from_stream(api_resp.bytes_stream())) {
677
                Err(e) => {
678
                    let e = format!("wrap response body error: {}", e);
679
                    error!("[{}] {}", FN_NAME, e);
680
                    return ErrResp::ErrIntMsg(Some(e)).into_response();
681
                }
682
                Ok(resp) => return resp,
683
            }
684
12
        }
685
12
    }
686

            
687
24
    if let Some(host) = patch_host {
688
8
        let resource = ClearQueueResource {
689
8
            scheme: uri.scheme(),
690
8
            host: uri.host_str().unwrap(),
691
8
            username: host.username.as_str(),
692
8
            q_type: QueueType::Network,
693
8
        };
694
8
        let _ = clear_queue_rsc(FN_NAME, &state, &resource).await;
695
8
        return StatusCode::NO_CONTENT.into_response();
696
16
    } else if data.ttl.is_none() && data.length.is_none() && data.password.is_none() {
697
4
        return StatusCode::NO_CONTENT.into_response();
698
12
    }
699

            
700
    // Update broker information without changing hostUri.
701
12
    if let Some(password) = data.password.as_ref() {
702
12
        if password.len() == 0 {
703
4
            let e = "missing `password`".to_string();
704
4
            return ErrResp::ErrParam(Some(e)).into_response();
705
8
        }
706
    }
707
8
    let unit_code = match network.unit_code.as_ref() {
708
        None => "",
709
8
        Some(unit_code) => unit_code.as_str(),
710
    };
711
8
    let code = network.code.as_str();
712
8
    let hostname = hostname.as_str();
713
8
    let username = mq::to_username(QueueType::Network, unit_code, code);
714
8
    let username = username.as_str();
715
8
    match uri.scheme() {
716
8
        "amqp" | "amqps" => match &state.amqp {
717
4
            AmqpState::RabbitMq(opts) => {
718
4
                if data.ttl.is_some() || data.length.is_some() {
719
4
                    let policies = rabbitmq::BrokerPolicies {
720
4
                        ttl: data.ttl,
721
4
                        length: data.length,
722
4
                    };
723
                    if let Err(e) =
724
4
                        rabbitmq::put_policies(&client, opts, hostname, username, &policies).await
725
                    {
726
                        let e = format!("patch RabbitMQ error: {}", e);
727
                        error!("[{}] {}", FN_NAME, e);
728
                        return ErrResp::ErrIntMsg(Some(e)).into_response();
729
4
                    }
730
                }
731
4
                if let Some(password) = data.password.as_ref() {
732
4
                    let password = password.as_str();
733
                    if let Err(e) =
734
4
                        rabbitmq::put_user(&client, opts, hostname, username, password).await
735
                    {
736
                        let e = format!("patch RabbitMQ password error: {}", e);
737
                        error!("[{}] {}", FN_NAME, e);
738
                        return ErrResp::ErrIntMsg(Some(e)).into_response();
739
4
                    }
740
                }
741
            }
742
        },
743
4
        "mqtt" | "mqtts" => match &state.mqtt {
744
2
            MqttState::Emqx(opts) => {
745
2
                if let Some(password) = data.password.as_ref() {
746
2
                    let password = password.as_str();
747
                    if let Err(e) =
748
2
                        emqx::put_user(&client, opts, hostname, username, password).await
749
                    {
750
                        let e = format!("patch EMQX password error: {}", e);
751
                        error!("[{}] {}", FN_NAME, e);
752
                        return ErrResp::ErrIntMsg(Some(e)).into_response();
753
2
                    }
754
                }
755
            }
756
2
            MqttState::Rumqttd => {}
757
        },
758
        _ => {}
759
    }
760

            
761
8
    StatusCode::NO_CONTENT.into_response()
762
52
}
763

            
764
/// `DELETE /{base}/api/v1/network/{networkId}`
765
16
async fn delete_network(
766
16
    state: State<AppState>,
767
16
    Path(param): Path<NetworkIdPath>,
768
16
    req: Request,
769
16
) -> impl IntoResponse {
770
    const FN_NAME: &'static str = "delete_network";
771
16
    let broker_base = state.broker_base.as_str();
772
16
    let network_id = param.network_id.as_str();
773
16
    let api_path = format!("{}/api/v1/network/{}", broker_base, network_id);
774
16
    let client = state.client.clone();
775
16
    let token = match req.headers().get(header::AUTHORIZATION) {
776
        None => {
777
4
            let e = "missing Authorization".to_string();
778
4
            return ErrResp::ErrParam(Some(e)).into_response();
779
        }
780
12
        Some(value) => value.clone(),
781
    };
782

            
783
8
    let (network, uri, host) =
784
12
        match get_network_inner(FN_NAME, &client, broker_base, network_id, &token).await {
785
4
            Err(e) => return e,
786
8
            Ok((network, uri, host)) => (network, uri, host),
787
        };
788

            
789
8
    let resp = api_bridge(FN_NAME, &client, req, api_path.as_str()).await;
790
8
    if !resp.status().is_success() {
791
        return resp;
792
8
    }
793

            
794
8
    let unit_code = match network.unit_code.as_ref() {
795
        None => "",
796
8
        Some(unit_code) => unit_code.as_str(),
797
    };
798
8
    let code = network.code.as_str();
799
8
    let username = mq::to_username(QueueType::Network, unit_code, code);
800
8
    let clear_rsc = ClearQueueResource {
801
8
        scheme: uri.scheme(),
802
8
        host: host.as_str(),
803
8
        username: username.as_str(),
804
8
        q_type: QueueType::Network,
805
8
    };
806
8
    if let Err(e) = clear_queue_rsc(FN_NAME, &state, &clear_rsc).await {
807
        return e;
808
8
    }
809

            
810
8
    StatusCode::NO_CONTENT.into_response()
811
16
}
812

            
813
/// `GET /{base}/api/v1/network/{networkId}/stats`
814
152
async fn get_network_stats(
815
152
    state: State<AppState>,
816
152
    Path(param): Path<NetworkIdPath>,
817
152
    req: Request,
818
152
) -> impl IntoResponse {
819
    const FN_NAME: &'static str = "get_network";
820
152
    let broker_base = state.broker_base.as_str();
821
152
    let client = state.client.clone();
822
152
    let token = match req.headers().get(header::AUTHORIZATION) {
823
        None => {
824
4
            let e = "missing Authorization".to_string();
825
4
            return ErrResp::ErrParam(Some(e)).into_response();
826
        }
827
148
        Some(value) => value.clone(),
828
    };
829

            
830
148
    let (network, uri, host) = match get_network_inner(
831
148
        FN_NAME,
832
148
        &client,
833
148
        broker_base,
834
148
        param.network_id.as_str(),
835
148
        &token,
836
    )
837
148
    .await
838
    {
839
4
        Err(e) => return e,
840
144
        Ok((network, uri, host)) => (network, uri, host),
841
    };
842

            
843
144
    let host = host.as_str();
844
144
    let scheme = uri.scheme();
845
144
    let data = match scheme {
846
144
        "amqp" | "amqps" => {
847
124
            let AmqpState::RabbitMq(opts) = &state.amqp;
848
124
            let unit_code = match network.unit_code.as_ref() {
849
                None => "",
850
124
                Some(unit_code) => unit_code.as_str(),
851
            };
852
124
            let username = mq::to_username(QueueType::Network, unit_code, network.code.as_str());
853
124
            let username = username.as_str();
854
            response::GetNetworkStatsData {
855
124
                dldata: match rabbitmq::stats(&client, opts, host, username, "dldata").await {
856
                    Err(ErrResp::ErrNotFound(_)) => response::Stats {
857
                        consumers: 0,
858
                        messages: 0,
859
                        publish_rate: 0.0,
860
                        deliver_rate: 0.0,
861
                    },
862
                    Err(e) => {
863
                        error!("[{}] get dldata stats error: {}", FN_NAME, e);
864
                        return e.into_response();
865
                    }
866
124
                    Ok(stats) => response::Stats {
867
124
                        consumers: stats.consumers,
868
124
                        messages: stats.messages,
869
124
                        publish_rate: stats.publish_rate,
870
124
                        deliver_rate: stats.deliver_rate,
871
124
                    },
872
                },
873
124
                ctrl: match rabbitmq::stats(&client, opts, host, username, "ctrl").await {
874
                    Err(ErrResp::ErrNotFound(_)) => response::Stats {
875
                        consumers: 0,
876
                        messages: 0,
877
                        publish_rate: 0.0,
878
                        deliver_rate: 0.0,
879
                    },
880
                    Err(e) => {
881
                        error!("[{}] get ctrl stats error: {}", FN_NAME, e);
882
                        return e.into_response();
883
                    }
884
124
                    Ok(stats) => response::Stats {
885
124
                        consumers: stats.consumers,
886
124
                        messages: stats.messages,
887
124
                        publish_rate: stats.publish_rate,
888
124
                        deliver_rate: stats.deliver_rate,
889
124
                    },
890
                },
891
            }
892
        }
893
20
        "mqtt" | "mqtts" => match &state.mqtt {
894
18
            MqttState::Emqx(opts) => {
895
18
                let unit_code = match network.unit_code.as_ref() {
896
                    None => "",
897
18
                    Some(unit_code) => unit_code.as_str(),
898
                };
899
18
                let username =
900
18
                    mq::to_username(QueueType::Network, unit_code, network.code.as_str());
901
18
                let username = username.as_str();
902
                response::GetNetworkStatsData {
903
18
                    dldata: match emqx::stats(&client, opts, host, username, "dldata").await {
904
                        Err(e) => {
905
                            error!("[{}] get dldata stats error: {}", FN_NAME, e);
906
                            return e.into_response();
907
                        }
908
18
                        Ok(stats) => response::Stats {
909
18
                            consumers: stats.consumers,
910
18
                            messages: stats.messages,
911
18
                            publish_rate: stats.publish_rate,
912
18
                            deliver_rate: stats.deliver_rate,
913
18
                        },
914
                    },
915
18
                    ctrl: match emqx::stats(&client, opts, host, username, "ctrl").await {
916
                        Err(e) => {
917
                            error!("[{}] get ctrl stats error: {}", FN_NAME, e);
918
                            return e.into_response();
919
                        }
920
18
                        Ok(stats) => response::Stats {
921
18
                            consumers: stats.consumers,
922
18
                            messages: stats.messages,
923
18
                            publish_rate: stats.publish_rate,
924
18
                            deliver_rate: stats.deliver_rate,
925
18
                        },
926
                    },
927
                }
928
            }
929
2
            MqttState::Rumqttd => response::GetNetworkStatsData {
930
2
                dldata: response::Stats {
931
2
                    ..Default::default()
932
2
                },
933
2
                ctrl: response::Stats {
934
2
                    ..Default::default()
935
2
                },
936
2
            },
937
        },
938
        _ => {
939
            let e = format!("unsupport scheme {}", scheme);
940
            error!("[{}] {}", FN_NAME, e);
941
            return ErrResp::ErrUnknown(Some(e)).into_response();
942
        }
943
    };
944
144
    Json(&response::GetNetworkStats { data }).into_response()
945
152
}
946

            
947
/// `POST /{base}/api/v1/network/{networkId}/uldata`
948
28
async fn post_network_uldata(
949
28
    state: State<AppState>,
950
28
    headers: HeaderMap,
951
28
    Path(param): Path<NetworkIdPath>,
952
28
    Json(body): Json<request::PostNetworkUlDataBody>,
953
28
) -> impl IntoResponse {
954
    const FN_NAME: &'static str = "post_network_uldata";
955
28
    let broker_base = state.broker_base.as_str();
956
28
    let client = state.client.clone();
957
28
    let token = match headers.get(header::AUTHORIZATION) {
958
        None => {
959
4
            let e = "missing Authorization".to_string();
960
4
            return ErrResp::ErrParam(Some(e)).into_response();
961
        }
962
24
        Some(value) => value.clone(),
963
    };
964

            
965
24
    if body.data.device_id.len() == 0 {
966
4
        let e = "empty `deviceId` is invalid".to_string();
967
4
        return ErrResp::ErrParam(Some(e)).into_response();
968
20
    }
969
20
    if let Err(e) = hex::decode(body.data.payload.as_str()) {
970
4
        let e = format!("`payload` is not hexadecimal string: {}", e);
971
4
        return ErrResp::ErrParam(Some(e)).into_response();
972
16
    }
973

            
974
16
    let (network, uri, hostname) = match get_network_inner(
975
16
        FN_NAME,
976
16
        &client,
977
16
        broker_base,
978
16
        param.network_id.as_str(),
979
16
        &token,
980
    )
981
16
    .await
982
    {
983
4
        Err(e) => return e,
984
12
        Ok((network, uri, hostname)) => (network, uri, hostname),
985
    };
986
12
    let device = match get_device_inner(
987
12
        FN_NAME,
988
12
        &client,
989
12
        broker_base,
990
12
        body.data.device_id.as_str(),
991
12
        &token,
992
    )
993
12
    .await
994
    {
995
        Err(e) => return e,
996
12
        Ok(device) => match device {
997
            None => {
998
4
                return ErrResp::Custom(
999
4
                    ErrReq::DEVICE_NOT_EXIST.0,
4
                    ErrReq::DEVICE_NOT_EXIST.1,
4
                    None,
4
                )
4
                .into_response();
            }
8
            Some(device) => device,
        },
    };
8
    let hostname = hostname.as_str();
8
    let scheme = uri.scheme();
8
    let payload = match serde_json::to_string(&UlData {
8
        time: strings::time_str(&Utc::now()),
8
        network_addr: device.network_addr,
8
        data: body.data.payload.clone(),
8
        ..Default::default()
8
    }) {
        Err(e) => {
            let e = format!("encode JSON error: {}", e);
            error!("[{}] {}", FN_NAME, e);
            return ErrResp::ErrRsc(Some(e)).into_response();
        }
8
        Ok(payload) => general_purpose::STANDARD.encode(payload),
    };
8
    match scheme {
8
        "amqp" | "amqps" => {
4
            let AmqpState::RabbitMq(opts) = &state.amqp;
4
            let unit_code = match network.unit_code.as_ref() {
                None => "",
4
                Some(unit_code) => unit_code.as_str(),
            };
4
            let username = mq::to_username(QueueType::Network, unit_code, network.code.as_str());
4
            let username = username.as_str();
            if let Err(e) =
4
                rabbitmq::publish_message(&client, opts, hostname, username, "uldata", payload)
4
                    .await
            {
                return e.into_response();
4
            }
        }
4
        "mqtt" | "mqtts" => match &state.mqtt {
2
            MqttState::Emqx(opts) => {
2
                let unit_code = match network.unit_code.as_ref() {
                    None => "",
2
                    Some(unit_code) => unit_code.as_str(),
                };
2
                let username =
2
                    mq::to_username(QueueType::Network, unit_code, network.code.as_str());
2
                let username = username.as_str();
                if let Err(e) =
2
                    emqx::publish_message(&client, opts, hostname, username, "uldata", payload)
2
                        .await
                {
                    return e.into_response();
2
                }
            }
            MqttState::Rumqttd => {
2
                let e = "not support now".to_string();
2
                return ErrResp::ErrParam(Some(e)).into_response();
            }
        },
        _ => {
            let e = format!("unsupport scheme {}", scheme);
            error!("[{}] {}", FN_NAME, e);
            return ErrResp::ErrUnknown(Some(e)).into_response();
        }
    }
6
    StatusCode::NO_CONTENT.into_response()
28
}
252
async fn get_network_inner(
252
    fn_name: &str,
252
    client: &reqwest::Client,
252
    broker_base: &str,
252
    network_id: &str,
252
    token: &HeaderValue,
252
) -> Result<(response::GetNetworkData, Url, String), Response> {
252
    let uri = format!("{}/api/v1/network/{}", broker_base, network_id);
252
    let resp = get_stream_resp(fn_name, token, &client, uri.as_str()).await?;
232
    let network = match resp.json::<response::GetNetwork>().await {
        Err(e) => {
            let e = format!("wrong response of network: {}", e);
            error!("[{}] {}", fn_name, e);
            return Err(ErrResp::ErrIntMsg(Some(e)).into_response());
        }
232
        Ok(network) => network.data,
    };
232
    let uri = match Url::parse(network.host_uri.as_str()) {
        Err(e) => {
            let e = format!("unexpected hostUri: {}", e);
            error!("[{}] {}", fn_name, e);
            return Err(ErrResp::ErrUnknown(Some(e)).into_response());
        }
232
        Ok(uri) => uri,
    };
232
    let host = match uri.host_str() {
        None => {
            let e = "unexpected hostUri".to_string();
            error!("[{}] {}", fn_name, e);
            return Err(ErrResp::ErrUnknown(Some(e)).into_response());
        }
232
        Some(host) => host.to_string(),
    };
232
    Ok((network, uri, host))
252
}
72
async fn check_network_code_inner(
72
    fn_name: &str,
72
    client: &reqwest::Client,
72
    broker_base: &str,
72
    unit_id: &str,
72
    code: &str,
72
    token: &HeaderValue,
72
) -> Result<u64, Response> {
72
    let uri = format!("{}/api/v1/network/count", broker_base);
72
    let req = match client
72
        .request(reqwest::Method::GET, uri)
72
        .header(reqwest::header::AUTHORIZATION, token)
72
        .query(&[("unit", unit_id), ("code", code)])
72
        .build()
    {
        Err(e) => {
            let e = format!("generate request error: {}", e);
            error!("[{}] {}", fn_name, e);
            return Err(ErrResp::ErrRsc(Some(e)).into_response());
        }
72
        Ok(req) => req,
    };
72
    let resp = match client.execute(req).await {
        Err(e) => {
            let e = format!("execute request error: {}", e);
            error!("[{}] {}", fn_name, e);
            return Err(ErrResp::ErrIntMsg(Some(e)).into_response());
        }
72
        Ok(resp) => resp,
    };
72
    match resp.json::<response::GetCount>().await {
        Err(e) => {
            let e = format!("wrong response of network: {}", e);
            error!("[{}] {}", fn_name, e);
            Err(ErrResp::ErrIntMsg(Some(e)).into_response())
        }
72
        Ok(data) => Ok(data.data.count),
    }
72
}