1
use std::error::Error as StdError;
2

            
3
use axum::{
4
    body::Body,
5
    extract::{Request, State},
6
    http::{header, HeaderMap, HeaderValue, StatusCode},
7
    response::{IntoResponse, Response},
8
    routing, Router,
9
};
10
use base64::{engine::general_purpose, Engine};
11
use bytes::{Bytes, BytesMut};
12
use chrono::Utc;
13
use csv::WriterBuilder;
14
use futures_util::StreamExt;
15
use hex;
16
use log::error;
17
use reqwest;
18
use serde::{Deserialize, Serialize};
19
use serde_json::{Deserializer, Map, Value};
20
use url::Url;
21

            
22
use sylvia_iot_corelib::{
23
    err::ErrResp,
24
    http::{Json, Path},
25
    role::Role,
26
    strings,
27
};
28

            
29
use super::{
30
    super::{AmqpState, ErrReq, MqttState, State as AppState},
31
    api_bridge, clear_patch_host, clear_queue_rsc, cmp_host_uri, create_queue_rsc,
32
    get_device_inner, get_stream_resp, get_tokeninfo_inner, get_unit_inner, list_api_bridge,
33
    request, response, transfer_host_uri, trunc_host_uri, ClearQueueResource, CreateQueueResource,
34
    ListResp, PatchHost,
35
};
36
use crate::libs::mq::{self, emqx, rabbitmq, QueueType};
37

            
38
enum ListFormat {
39
    Array,
40
    Csv,
41
    Data,
42
}
43

            
44
#[derive(Deserialize)]
45
struct NetworkIdPath {
46
    network_id: String,
47
}
48

            
49
#[derive(Deserialize, Serialize)]
50
struct Network {
51
    #[serde(rename = "networkId")]
52
    network_id: String,
53
    code: String,
54
    #[serde(rename = "unitId")]
55
    unit_id: Option<String>,
56
    #[serde(rename = "unitCode")]
57
    unit_code: Option<String>,
58
    #[serde(rename = "createdAt")]
59
    created_at: String,
60
    #[serde(rename = "modifiedAt")]
61
    modified_at: String,
62
    #[serde(rename = "hostUri")]
63
    host_uri: String,
64
    name: String,
65
    info: Map<String, Value>,
66
}
67

            
68
#[derive(Deserialize, Serialize)]
69
struct CsvItem {
70
    #[serde(rename = "networkId")]
71
    network_id: String,
72
    code: String,
73
    #[serde(rename = "unitId")]
74
    unit_id: Option<String>,
75
    #[serde(rename = "unitCode")]
76
    unit_code: Option<String>,
77
    #[serde(rename = "createdAt")]
78
    created_at: String,
79
    #[serde(rename = "modifiedAt")]
80
    modified_at: String,
81
    #[serde(rename = "hostUri")]
82
    host_uri: String,
83
    name: String,
84
    info: Option<String>,
85
}
86

            
87
/// Downlink data from application to broker.
88
#[derive(Default, Serialize)]
89
struct UlData {
90
    time: String,
91
    #[serde(rename = "networkAddr")]
92
    network_addr: String,
93
    data: String,
94
    extension: Option<Map<String, Value>>,
95
}
96

            
97
const CSV_FIELDS: &'static [u8] =
98
    b"\xEF\xBB\xBFnetworkId,code,unitId,createdAt,modifiedAt,hostUri,name,info\n";
99

            
100
506
pub fn new_service(scope_path: &str, state: &AppState) -> Router {
101
506
    Router::new().nest(
102
506
        scope_path,
103
506
        Router::new()
104
506
            .route("/", routing::post(post_network))
105
506
            .route("/count", routing::get(get_network_count))
106
506
            .route("/list", routing::get(get_network_list))
107
506
            .route(
108
506
                "/{network_id}",
109
506
                routing::get(get_network)
110
506
                    .patch(patch_network)
111
506
                    .delete(delete_network),
112
506
            )
113
506
            .route("/{network_id}/stats", routing::get(get_network_stats))
114
506
            .route("/{network_id}/uldata", routing::post(post_network_uldata))
115
506
            .with_state(state.clone()),
116
506
    )
117
506
}
118

            
119
/// `POST /{base}/api/v1/network`
120
92
async fn post_network(
121
92
    State(state): State<AppState>,
122
92
    mut headers: HeaderMap,
123
92
    Json(mut body): Json<request::PostNetworkBody>,
124
92
) -> impl IntoResponse {
125
    const FN_NAME: &'static str = "post_network";
126
92
    let broker_base = state.broker_base.as_str();
127
92
    let api_path = format!("{}/api/v1/network", broker_base);
128
92
    let client = state.client.clone();
129
92
    let token = match headers.get(header::AUTHORIZATION) {
130
        None => {
131
4
            let e = "missing Authorization".to_string();
132
4
            return ErrResp::ErrParam(Some(e)).into_response();
133
        }
134
88
        Some(value) => value.clone(),
135
88
    };
136
88

            
137
88
    if body.data.unit_id.is_none() {
138
16
        let auth_base = state.auth_base.as_str();
139
16
        let token_info = match get_tokeninfo_inner(FN_NAME, &client, auth_base, &token).await {
140
            Err(e) => return e,
141
16
            Ok(info) => info,
142
16
        };
143
16
        if !Role::is_role(&token_info.roles, Role::ADMIN)
144
4
            && !Role::is_role(&token_info.roles, Role::MANAGER)
145
        {
146
4
            let e = "missing `unitId`".to_string();
147
4
            return ErrResp::ErrParam(Some(e)).into_response();
148
12
        }
149
72
    }
150

            
151
    // Get unit information to create queue information.
152
84
    let unit_code = match body.data.unit_id.as_ref() {
153
12
        None => "".to_string(),
154
72
        Some(unit_id) => {
155
72
            if unit_id.len() == 0 {
156
4
                return ErrResp::ErrParam(Some(
157
4
                    "`unitId` must with at least one character".to_string(),
158
4
                ))
159
4
                .into_response();
160
68
            }
161
68
            let unit = match get_unit_inner(FN_NAME, &client, broker_base, unit_id, &token).await {
162
                Err(e) => return e,
163
68
                Ok(unit) => match unit {
164
                    None => {
165
4
                        return ErrResp::Custom(
166
4
                            ErrReq::UNIT_NOT_EXIST.0,
167
4
                            ErrReq::UNIT_NOT_EXIST.1,
168
4
                            None,
169
4
                        )
170
4
                        .into_response()
171
                    }
172
64
                    Some(unit) => unit,
173
64
                },
174
64
            };
175
64
            unit.code
176
        }
177
    };
178
76
    let code = body.data.code.as_str();
179
76
    if !strings::is_code(code) {
180
4
        return ErrResp::ErrParam(Some(
181
4
            "`code` must be [A-Za-z0-9]{1}[A-Za-z0-9-_]*".to_string(),
182
4
        ))
183
4
        .into_response();
184
72
    }
185
72
    let unit_id = match body.data.unit_id.as_ref() {
186
12
        None => "",
187
60
        Some(unit_id) => unit_id.as_str(),
188
    };
189
72
    match check_network_code_inner(FN_NAME, &client, broker_base, unit_id, code, &token).await {
190
        Err(e) => return e,
191
72
        Ok(count) => match count {
192
68
            0 => (),
193
            _ => {
194
4
                return ErrResp::Custom(ErrReq::NETWORK_EXIST.0, ErrReq::NETWORK_EXIST.1, None)
195
4
                    .into_response()
196
            }
197
        },
198
    }
199
68
    let q_type = QueueType::Network;
200
68
    let username = mq::to_username(q_type, unit_code.as_str(), code);
201
68
    let password = strings::randomstring(8);
202
68
    let uri = match Url::parse(body.data.host_uri.as_str()) {
203
4
        Err(e) => {
204
4
            return ErrResp::ErrParam(Some(format!("invalid `hostUri`: {}", e))).into_response();
205
        }
206
64
        Ok(uri) => uri,
207
    };
208
64
    let host = match uri.host() {
209
        None => {
210
4
            let e = "invalid `hostUri`".to_string();
211
4
            return ErrResp::ErrParam(Some(e)).into_response();
212
        }
213
60
        Some(host) => host.to_string(),
214
60
    };
215
60
    let scheme = uri.scheme();
216
60
    let host = host.as_str();
217
60
    let username = username.as_str();
218
60
    let password = password.as_str();
219
60

            
220
60
    // Create message broker resources.
221
60
    let create_rsc = CreateQueueResource {
222
60
        scheme,
223
60
        host,
224
60
        username,
225
60
        password,
226
60
        ttl: body.data.ttl,
227
60
        length: body.data.length,
228
60
        q_type: QueueType::Network,
229
60
    };
230
60
    if let Err(e) = create_queue_rsc(FN_NAME, &state, &create_rsc).await {
231
        return e;
232
60
    }
233
60
    let clear_rsc = ClearQueueResource {
234
60
        scheme,
235
60
        host,
236
60
        username,
237
60
    };
238
60

            
239
60
    // Create network instance.
240
60
    let mut body_uri = uri.clone();
241
60
    transfer_host_uri(&state, &mut body_uri, username);
242
60
    body.data.host_uri = body_uri.to_string();
243
60
    headers.remove(header::CONTENT_LENGTH);
244
60
    let builder = client
245
60
        .request(reqwest::Method::POST, api_path)
246
60
        .headers(headers)
247
60
        .json(&body);
248
60
    let api_req = match builder.build() {
249
        Err(e) => {
250
            let _ = clear_queue_rsc(FN_NAME, &state, &clear_rsc);
251
            let e = format!("generate request error: {}", e);
252
            error!("[{}] {}", FN_NAME, e);
253
            return ErrResp::ErrRsc(Some(e)).into_response();
254
        }
255
60
        Ok(req) => req,
256
    };
257
60
    let api_resp = match client.execute(api_req).await {
258
        Err(e) => {
259
            let _ = clear_queue_rsc(FN_NAME, &state, &clear_rsc);
260
            let e = format!("execute request error: {}", e);
261
            error!("[{}] {}", FN_NAME, e);
262
            return ErrResp::ErrIntMsg(Some(e)).into_response();
263
        }
264
60
        Ok(resp) => match resp.status() {
265
60
            StatusCode::OK => resp,
266
            _ => {
267
                let mut resp_builder = Response::builder().status(resp.status());
268
                for (k, v) in resp.headers() {
269
                    resp_builder = resp_builder.header(k, v);
270
                }
271
                match resp_builder.body(Body::from_stream(resp.bytes_stream())) {
272
                    Err(e) => {
273
                        let e = format!("wrap response body error: {}", e);
274
                        error!("[{}] {}", FN_NAME, e);
275
                        return ErrResp::ErrIntMsg(Some(e)).into_response();
276
                    }
277
                    Ok(resp) => return resp,
278
                }
279
            }
280
        },
281
    };
282
60
    let mut body = match api_resp.json::<response::PostNetwork>().await {
283
        Err(e) => {
284
            let _ = clear_queue_rsc(FN_NAME, &state, &clear_rsc);
285
            let e = format!("unexpected response: {}", e);
286
            return ErrResp::ErrUnknown(Some(e)).into_response();
287
        }
288
60
        Ok(body) => body,
289
60
    };
290
60
    body.data.password = Some(password.to_string());
291
60

            
292
60
    Json(&body).into_response()
293
92
}
294

            
295
/// `GET /{base}/api/v1/network/count`
296
4
async fn get_network_count(state: State<AppState>, req: Request) -> impl IntoResponse {
297
    const FN_NAME: &'static str = "get_network_count";
298
4
    let api_path = format!("{}/api/v1/network/count", state.broker_base.as_str());
299
4
    let client = state.client.clone();
300
4

            
301
4
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
302
4
}
303

            
304
/// `GET /{base}/api/v1/network/list`
305
20
async fn get_network_list(state: State<AppState>, req: Request) -> impl IntoResponse {
306
    const FN_NAME: &'static str = "get_network_list";
307
20
    let api_path = format!("{}/api/v1/network/list", state.broker_base.as_str());
308
20
    let api_path = api_path.as_str();
309
20
    let client = state.client.clone();
310
20

            
311
20
    let mut list_format = ListFormat::Data;
312
20
    if let Some(query_str) = req.uri().query() {
313
16
        let query = match serde_urlencoded::from_str::<Vec<(String, String)>>(query_str) {
314
            Err(e) => {
315
                let e = format!("parse query error: {}", e);
316
                return ErrResp::ErrParam(Some(e)).into_response();
317
            }
318
16
            Ok(query) => query,
319
        };
320
24
        for (k, v) in query.iter() {
321
24
            if k.as_str().eq("format") {
322
12
                if v.as_str().eq("array") {
323
4
                    list_format = ListFormat::Array;
324
8
                } else if v.as_str().eq("csv") {
325
4
                    list_format = ListFormat::Csv;
326
4
                }
327
12
            }
328
        }
329
4
    }
330

            
331
16
    let (api_resp, resp_builder) =
332
20
        match list_api_bridge(FN_NAME, &client, req, api_path, true, "network").await {
333
4
            ListResp::Axum(resp) => return resp,
334
16
            ListResp::ArrayStream(api_resp, resp_builder) => (api_resp, resp_builder),
335
16
        };
336
16

            
337
16
    let mut resp_stream = api_resp.bytes_stream();
338
16
    let body = Body::from_stream(async_stream::stream! {
339
16
        match list_format {
340
16
            ListFormat::Array => yield Ok(Bytes::from("[")),
341
16
            ListFormat::Csv => yield Ok(Bytes::from(CSV_FIELDS)),
342
16
            ListFormat::Data => yield Ok(Bytes::from("{\"data\":[")),
343
16
        }
344
16
        let mut first_sent = false;
345
16

            
346
16
        let mut buffer = BytesMut::new();
347
16
        while let Some(body) = resp_stream.next().await {
348
16
            match body {
349
16
                Err(e) => {
350
16
                    error!("[{}] get body error: {}", FN_NAME, e);
351
16
                    let err: Box<dyn StdError + Send + Sync> = Box::new(e);
352
16
                    yield Err(err);
353
16
                    break;
354
16
                }
355
16
                Ok(body) => buffer.extend_from_slice(&body[..]),
356
16
            }
357
16

            
358
16
            let mut json_stream = Deserializer::from_slice(&buffer[..]).into_iter::<Network>();
359
16
            let mut index = 0;
360
16
            let mut finish = false;
361
16
            loop {
362
16
                if let Some(Ok(mut v)) = json_stream.next() {
363
16
                    v.host_uri = match Url::parse(v.host_uri.as_str()) {
364
16
                        Err(e) => {
365
16
                            error!("[{}] parse body hostUri error: {}", FN_NAME, e);
366
16
                            let err: Box<dyn StdError + Send + Sync> = Box::new(e);
367
16
                            yield Err(err);
368
16
                            finish = true;
369
16
                            break;
370
16
                        }
371
16
                        Ok(uri) => trunc_host_uri(&uri),
372
16
                    };
373
16
                    match list_format {
374
16
                        ListFormat::Array | ListFormat::Data => match serde_json::to_string(&v) {
375
16
                            Err(e) =>{
376
16
                                error!("[{}] serialize JSON error: {}", FN_NAME, e);
377
16
                                let err: Box<dyn StdError + Send + Sync> = Box::new(e);
378
16
                                yield Err(err);
379
16
                                finish = true;
380
16
                                break;
381
16
                            }
382
16
                            Ok(v) => {
383
16
                                match first_sent {
384
16
                                    false => first_sent = true,
385
16
                                    true => yield Ok(Bytes::from(",")),
386
16
                                }
387
16
                                yield Ok(Bytes::copy_from_slice(v.as_str().as_bytes()));
388
16
                            }
389
16
                        }
390
16
                        ListFormat::Csv => {
391
16
                            let mut item = CsvItem{
392
16
                                network_id: v.network_id,
393
16
                                code: v.code,
394
16
                                unit_id: v.unit_id,
395
16
                                unit_code: v.unit_code,
396
16
                                created_at: v.created_at,
397
16
                                modified_at: v.modified_at,
398
16
                                host_uri: v.host_uri,
399
16
                                name: v.name,
400
16
                                info: None,
401
16
                            };
402
16
                            if let Ok(info_str) = serde_json::to_string(&v.info) {
403
16
                                item.info = Some(info_str);
404
16
                            }
405
16
                            let mut writer =
406
16
                                WriterBuilder::new().has_headers(false).from_writer(vec![]);
407
16
                            if let Err(e) = writer.serialize(item) {
408
16
                                error!("[{}] serialize CSV error: {}", FN_NAME, e);
409
16
                                let err: Box<dyn StdError + Send + Sync> = Box::new(e);
410
16
                                yield Err(err);
411
16
                                finish = true;
412
16
                                break;
413
16
                            }
414
16
                            match writer.into_inner() {
415
16
                                Err(e) => {
416
16
                                    error!("[{}] serialize bytes error: {}", FN_NAME, e);
417
16
                                    let err: Box<dyn StdError + Send + Sync> = Box::new(e);
418
16
                                    yield Err(err);
419
16
                                    finish = true;
420
16
                                    break;
421
16
                                }
422
16
                                Ok(row) => yield Ok(Bytes::copy_from_slice(row.as_slice())),
423
16
                            }
424
16
                        }
425
16
                    }
426
16
                    continue;
427
16
                }
428
16
                let offset = json_stream.byte_offset();
429
16
                if buffer.len() <= index + offset {
430
16
                    index = buffer.len();
431
16
                    break;
432
16
                }
433
16
                match buffer[index+offset] {
434
16
                    b'[' | b',' => {
435
16
                        index += offset + 1;
436
16
                        if buffer.len() <= index {
437
16
                            break;
438
16
                        }
439
16
                        json_stream =
440
16
                            Deserializer::from_slice(&buffer[index..]).into_iter::<Network>();
441
16
                    }
442
16
                    b']' => {
443
16
                        finish = true;
444
16
                        break;
445
16
                    }
446
16
                    _ => break,
447
16
                }
448
16
            }
449
16
            if finish {
450
16
                match list_format {
451
16
                    ListFormat::Array => yield Ok(Bytes::from("]")),
452
16
                    ListFormat::Csv => (),
453
16
                    ListFormat::Data => yield Ok(Bytes::from("]}")),
454
16
                }
455
16
                break;
456
16
            }
457
16
            buffer = buffer.split_off(index);
458
16
        }
459
16
    });
460
16
    match resp_builder.body(body) {
461
        Err(e) => ErrResp::ErrRsc(Some(e.to_string())).into_response(),
462
16
        Ok(resp) => resp,
463
    }
464
20
}
465

            
466
/// `GET /{base}/api/v1/network/{networkId}`
467
36
async fn get_network(
468
36
    state: State<AppState>,
469
36
    Path(param): Path<NetworkIdPath>,
470
36
    req: Request,
471
36
) -> impl IntoResponse {
472
    const FN_NAME: &'static str = "get_network";
473
36
    let broker_base = state.broker_base.as_str();
474
36
    let client = state.client.clone();
475
36
    let token = match req.headers().get(header::AUTHORIZATION) {
476
        None => {
477
4
            let e = "missing Authorization".to_string();
478
4
            return ErrResp::ErrParam(Some(e)).into_response();
479
        }
480
32
        Some(value) => value.clone(),
481
    };
482

            
483
32
    let (mut network, uri, host) = match get_network_inner(
484
32
        FN_NAME,
485
32
        &client,
486
32
        broker_base,
487
32
        param.network_id.as_str(),
488
32
        &token,
489
32
    )
490
32
    .await
491
    {
492
4
        Err(e) => return e,
493
28
        Ok((network, uri, host)) => (network, uri, host),
494
28
    };
495
28

            
496
28
    let host = host.as_str();
497
28
    let scheme = uri.scheme();
498
28
    if scheme.eq("amqp") || scheme.eq("amqps") {
499
16
        let AmqpState::RabbitMq(opts) = &state.amqp;
500
16
        let unit_code = match network.unit_code.as_ref() {
501
            None => "",
502
16
            Some(unit_code) => unit_code.as_str(),
503
        };
504
16
        let username = mq::to_username(QueueType::Network, unit_code, network.code.as_str());
505
16
        let username = username.as_str();
506
16
        match rabbitmq::get_policies(&client, opts, host, username).await {
507
            Err(e) => {
508
                error!("[{}] get {} policies error: {}", FN_NAME, username, e);
509
                return e.into_response();
510
            }
511
16
            Ok(policies) => {
512
16
                network.ttl = policies.ttl;
513
16
                network.length = policies.length;
514
16
            }
515
        }
516
12
    }
517
28
    network.host_uri = trunc_host_uri(&uri);
518
28
    Json(&response::GetNetwork { data: network }).into_response()
519
36
}
520

            
521
/// `PATCH /{base}/api/v1/network/{networkId}`
522
52
async fn patch_network(
523
52
    state: State<AppState>,
524
52
    headers: HeaderMap,
525
52
    Path(param): Path<NetworkIdPath>,
526
52
    Json(body): Json<request::PatchNetworkBody>,
527
52
) -> impl IntoResponse {
528
    const FN_NAME: &'static str = "patch_network";
529
52
    let broker_base = state.broker_base.as_str();
530
52
    let client = state.client.clone();
531
52
    let token = match headers.get(header::AUTHORIZATION) {
532
        None => {
533
4
            let e = "missing Authorization".to_string();
534
4
            return ErrResp::ErrParam(Some(e)).into_response();
535
        }
536
48
        Some(value) => value.clone(),
537
48
    };
538
48

            
539
48
    let data = &body.data;
540
48
    if data.host_uri.is_none()
541
24
        && data.name.is_none()
542
16
        && data.info.is_none()
543
16
        && data.ttl.is_none()
544
12
        && data.length.is_none()
545
12
        && data.password.is_none()
546
    {
547
4
        return ErrResp::ErrParam(Some("at least one parameter".to_string())).into_response();
548
44
    }
549

            
550
44
    let (network, uri, hostname) = match get_network_inner(
551
44
        FN_NAME,
552
44
        &client,
553
44
        broker_base,
554
44
        param.network_id.as_str(),
555
44
        &token,
556
44
    )
557
44
    .await
558
    {
559
4
        Err(e) => return e,
560
40
        Ok((network, uri, hostname)) => (network, uri, hostname),
561
40
    };
562
40

            
563
40
    let mut patch_data = request::PatchNetworkData {
564
40
        name: data.name.clone(),
565
40
        info: data.info.clone(),
566
40
        ..Default::default()
567
40
    };
568
40
    let mut patch_host: Option<PatchHost> = None;
569
40
    if let Some(host) = data.host_uri.as_ref() {
570
24
        if !strings::is_uri(host) {
571
4
            return ErrResp::ErrParam(Some("invalid `hostUri`".to_string())).into_response();
572
20
        }
573
20
        // Change to the new broker host.
574
20
        if !cmp_host_uri(network.host_uri.as_str(), host.as_str()) {
575
20
            let password = match data.password.as_ref() {
576
                None => {
577
4
                    let e = "missing `password`".to_string();
578
4
                    return ErrResp::ErrParam(Some(e)).into_response();
579
                }
580
16
                Some(password) => match password.len() {
581
                    0 => {
582
4
                        let e = "missing `password`".to_string();
583
4
                        return ErrResp::ErrParam(Some(e)).into_response();
584
                    }
585
12
                    _ => password,
586
                },
587
            };
588
12
            let mut new_host_uri = match Url::parse(host.as_str()) {
589
                Err(e) => {
590
                    let e = format!("invalid `hostUri`: {}", e);
591
                    return ErrResp::ErrParam(Some(e)).into_response();
592
                }
593
12
                Ok(uri) => match uri.host_str() {
594
                    None => {
595
4
                        let e = "invalid `hostUri`".to_string();
596
4
                        return ErrResp::ErrParam(Some(e)).into_response();
597
                    }
598
8
                    Some(_) => uri,
599
                },
600
            };
601

            
602
8
            let unit_code = match network.unit_code.as_ref() {
603
                None => "",
604
8
                Some(unit_code) => unit_code.as_str(),
605
            };
606
8
            let code = network.code.as_str();
607
8
            let username = mq::to_username(QueueType::Network, unit_code, code);
608
8
            let resource = CreateQueueResource {
609
8
                scheme: new_host_uri.scheme(),
610
8
                host: new_host_uri.host_str().unwrap(),
611
8
                username: username.as_str(),
612
8
                password: password.as_str(),
613
8
                ttl: data.ttl,
614
8
                length: data.length,
615
8
                q_type: QueueType::Network,
616
8
            };
617
8
            if let Err(e) = create_queue_rsc(FN_NAME, &state, &resource).await {
618
                return e;
619
8
            }
620
8
            let resource = ClearQueueResource {
621
8
                scheme: uri.scheme(),
622
8
                host: uri.host_str().unwrap(),
623
8
                username: username.as_str(),
624
8
            };
625
8
            let _ = clear_queue_rsc(FN_NAME, &state, &resource).await;
626

            
627
8
            transfer_host_uri(&state, &mut new_host_uri, username.as_str());
628
8
            patch_data.host_uri = Some(new_host_uri.to_string());
629
8
            patch_host = Some(PatchHost {
630
8
                host_uri: new_host_uri,
631
8
                username,
632
8
            });
633
        }
634
16
    }
635

            
636
    // Send request body to the sylvia-iot-broker.
637
24
    if patch_data.host_uri.is_some() || patch_data.name.is_some() || patch_data.info.is_some() {
638
12
        let network_id = param.network_id.as_str();
639
12
        let uri = format!("{}/api/v1/network/{}", broker_base, network_id);
640
12
        let mut builder = client
641
12
            .request(reqwest::Method::PATCH, uri)
642
12
            .header(reqwest::header::AUTHORIZATION, &token)
643
12
            .json(&request::PatchNetworkBody { data: patch_data });
644
12
        if let Some(content_type) = headers.get(header::CONTENT_TYPE) {
645
12
            builder = builder.header(reqwest::header::CONTENT_TYPE, content_type);
646
12
        }
647
12
        let api_req = match builder.build() {
648
            Err(e) => {
649
                clear_patch_host(FN_NAME, &state, &patch_host).await;
650
                let e = format!("generate request error: {}", e);
651
                error!("[{}] {}", FN_NAME, e);
652
                return ErrResp::ErrRsc(Some(e)).into_response();
653
            }
654
12
            Ok(req) => req,
655
        };
656
12
        let api_resp = match client.execute(api_req).await {
657
            Err(e) => {
658
                clear_patch_host(FN_NAME, &state, &patch_host).await;
659
                let e = format!("execute request error: {}", e);
660
                error!("[{}] {}", FN_NAME, e);
661
                return ErrResp::ErrIntMsg(Some(e)).into_response();
662
            }
663
12
            Ok(resp) => resp,
664
12
        };
665
12

            
666
12
        let status_code = api_resp.status();
667
12
        if status_code != StatusCode::NO_CONTENT {
668
            clear_patch_host(FN_NAME, &state, &patch_host).await;
669
            let mut resp_builder = Response::builder().status(status_code);
670
            for (k, v) in api_resp.headers() {
671
                resp_builder = resp_builder.header(k, v);
672
            }
673
            match resp_builder.body(Body::from_stream(api_resp.bytes_stream())) {
674
                Err(e) => {
675
                    let e = format!("wrap response body error: {}", e);
676
                    error!("[{}] {}", FN_NAME, e);
677
                    return ErrResp::ErrIntMsg(Some(e)).into_response();
678
                }
679
                Ok(resp) => return resp,
680
            }
681
12
        }
682
12
    }
683

            
684
24
    if let Some(host) = patch_host {
685
8
        let resource = ClearQueueResource {
686
8
            scheme: uri.scheme(),
687
8
            host: uri.host_str().unwrap(),
688
8
            username: host.username.as_str(),
689
8
        };
690
8
        let _ = clear_queue_rsc(FN_NAME, &state, &resource).await;
691
8
        return StatusCode::NO_CONTENT.into_response();
692
16
    } else if data.ttl.is_none() && data.length.is_none() && data.password.is_none() {
693
4
        return StatusCode::NO_CONTENT.into_response();
694
12
    }
695

            
696
    // Update broker information without changing hostUri.
697
12
    if let Some(password) = data.password.as_ref() {
698
12
        if password.len() == 0 {
699
4
            let e = "missing `password`".to_string();
700
4
            return ErrResp::ErrParam(Some(e)).into_response();
701
8
        }
702
    }
703
8
    let unit_code = match network.unit_code.as_ref() {
704
        None => "",
705
8
        Some(unit_code) => unit_code.as_str(),
706
    };
707
8
    let code = network.code.as_str();
708
8
    let hostname = hostname.as_str();
709
8
    let username = mq::to_username(QueueType::Network, unit_code, code);
710
8
    let username = username.as_str();
711
8
    match uri.scheme() {
712
8
        "amqp" | "amqps" => match &state.amqp {
713
4
            AmqpState::RabbitMq(opts) => {
714
4
                if data.ttl.is_some() || data.length.is_some() {
715
4
                    let policies = rabbitmq::BrokerPolicies {
716
4
                        ttl: data.ttl,
717
4
                        length: data.length,
718
4
                    };
719
                    if let Err(e) =
720
4
                        rabbitmq::put_policies(&client, opts, hostname, username, &policies).await
721
                    {
722
                        let e = format!("patch RabbitMQ error: {}", e);
723
                        error!("[{}] {}", FN_NAME, e);
724
                        return ErrResp::ErrIntMsg(Some(e)).into_response();
725
4
                    }
726
                }
727
4
                if let Some(password) = data.password.as_ref() {
728
4
                    let password = password.as_str();
729
                    if let Err(e) =
730
4
                        rabbitmq::put_user(&client, opts, hostname, username, password).await
731
                    {
732
                        let e = format!("patch RabbitMQ password error: {}", e);
733
                        error!("[{}] {}", FN_NAME, e);
734
                        return ErrResp::ErrIntMsg(Some(e)).into_response();
735
4
                    }
736
                }
737
            }
738
        },
739
4
        "mqtt" | "mqtts" => match &state.mqtt {
740
2
            MqttState::Emqx(opts) => {
741
2
                if let Some(password) = data.password.as_ref() {
742
2
                    let password = password.as_str();
743
                    if let Err(e) =
744
2
                        emqx::put_user(&client, opts, hostname, username, password).await
745
                    {
746
                        let e = format!("patch EMQX password error: {}", e);
747
                        error!("[{}] {}", FN_NAME, e);
748
                        return ErrResp::ErrIntMsg(Some(e)).into_response();
749
2
                    }
750
                }
751
            }
752
2
            MqttState::Rumqttd => {}
753
        },
754
        _ => {}
755
    }
756

            
757
8
    StatusCode::NO_CONTENT.into_response()
758
52
}
759

            
760
/// `DELETE /{base}/api/v1/network/{networkId}`
761
16
async fn delete_network(
762
16
    state: State<AppState>,
763
16
    Path(param): Path<NetworkIdPath>,
764
16
    req: Request,
765
16
) -> impl IntoResponse {
766
    const FN_NAME: &'static str = "delete_network";
767
16
    let broker_base = state.broker_base.as_str();
768
16
    let network_id = param.network_id.as_str();
769
16
    let api_path = format!("{}/api/v1/network/{}", broker_base, network_id);
770
16
    let client = state.client.clone();
771
16
    let token = match req.headers().get(header::AUTHORIZATION) {
772
        None => {
773
4
            let e = "missing Authorization".to_string();
774
4
            return ErrResp::ErrParam(Some(e)).into_response();
775
        }
776
12
        Some(value) => value.clone(),
777
    };
778

            
779
8
    let (network, uri, host) =
780
12
        match get_network_inner(FN_NAME, &client, broker_base, network_id, &token).await {
781
4
            Err(e) => return e,
782
8
            Ok((network, uri, host)) => (network, uri, host),
783
        };
784

            
785
8
    let resp = api_bridge(FN_NAME, &client, req, api_path.as_str()).await;
786
8
    if !resp.status().is_success() {
787
        return resp;
788
8
    }
789

            
790
8
    let unit_code = match network.unit_code.as_ref() {
791
        None => "",
792
8
        Some(unit_code) => unit_code.as_str(),
793
    };
794
8
    let code = network.code.as_str();
795
8
    let username = mq::to_username(QueueType::Network, unit_code, code);
796
8
    let clear_rsc = ClearQueueResource {
797
8
        scheme: uri.scheme(),
798
8
        host: host.as_str(),
799
8
        username: username.as_str(),
800
8
    };
801
8
    if let Err(e) = clear_queue_rsc(FN_NAME, &state, &clear_rsc).await {
802
        return e;
803
8
    }
804
8

            
805
8
    StatusCode::NO_CONTENT.into_response()
806
16
}
807

            
808
/// `GET /{base}/api/v1/network/{networkId}/stats`
809
148
async fn get_network_stats(
810
148
    state: State<AppState>,
811
148
    Path(param): Path<NetworkIdPath>,
812
148
    req: Request,
813
148
) -> impl IntoResponse {
814
    const FN_NAME: &'static str = "get_network";
815
148
    let broker_base = state.broker_base.as_str();
816
148
    let client = state.client.clone();
817
148
    let token = match req.headers().get(header::AUTHORIZATION) {
818
        None => {
819
4
            let e = "missing Authorization".to_string();
820
4
            return ErrResp::ErrParam(Some(e)).into_response();
821
        }
822
144
        Some(value) => value.clone(),
823
    };
824

            
825
144
    let (network, uri, host) = match get_network_inner(
826
144
        FN_NAME,
827
144
        &client,
828
144
        broker_base,
829
144
        param.network_id.as_str(),
830
144
        &token,
831
144
    )
832
144
    .await
833
    {
834
4
        Err(e) => return e,
835
140
        Ok((network, uri, host)) => (network, uri, host),
836
140
    };
837
140

            
838
140
    let host = host.as_str();
839
140
    let scheme = uri.scheme();
840
140
    let data = match scheme {
841
140
        "amqp" | "amqps" => {
842
122
            let AmqpState::RabbitMq(opts) = &state.amqp;
843
122
            let unit_code = match network.unit_code.as_ref() {
844
                None => "",
845
122
                Some(unit_code) => unit_code.as_str(),
846
            };
847
122
            let username = mq::to_username(QueueType::Network, unit_code, network.code.as_str());
848
122
            let username = username.as_str();
849
122
            response::GetNetworkStatsData {
850
122
                dldata: match rabbitmq::stats(&client, opts, host, username, "dldata").await {
851
                    Err(ErrResp::ErrNotFound(_)) => response::Stats {
852
                        consumers: 0,
853
                        messages: 0,
854
                        publish_rate: 0.0,
855
                        deliver_rate: 0.0,
856
                    },
857
                    Err(e) => {
858
                        error!("[{}] get dldata stats error: {}", FN_NAME, e);
859
                        return e.into_response();
860
                    }
861
122
                    Ok(stats) => response::Stats {
862
122
                        consumers: stats.consumers,
863
122
                        messages: stats.messages,
864
122
                        publish_rate: stats.publish_rate,
865
122
                        deliver_rate: stats.deliver_rate,
866
122
                    },
867
                },
868
122
                ctrl: match rabbitmq::stats(&client, opts, host, username, "ctrl").await {
869
                    Err(ErrResp::ErrNotFound(_)) => response::Stats {
870
                        consumers: 0,
871
                        messages: 0,
872
                        publish_rate: 0.0,
873
                        deliver_rate: 0.0,
874
                    },
875
                    Err(e) => {
876
                        error!("[{}] get ctrl stats error: {}", FN_NAME, e);
877
                        return e.into_response();
878
                    }
879
122
                    Ok(stats) => response::Stats {
880
122
                        consumers: stats.consumers,
881
122
                        messages: stats.messages,
882
122
                        publish_rate: stats.publish_rate,
883
122
                        deliver_rate: stats.deliver_rate,
884
122
                    },
885
                },
886
            }
887
        }
888
18
        "mqtt" | "mqtts" => match &state.mqtt {
889
16
            MqttState::Emqx(opts) => {
890
16
                let unit_code = match network.unit_code.as_ref() {
891
                    None => "",
892
16
                    Some(unit_code) => unit_code.as_str(),
893
                };
894
16
                let username =
895
16
                    mq::to_username(QueueType::Network, unit_code, network.code.as_str());
896
16
                let username = username.as_str();
897
16
                response::GetNetworkStatsData {
898
16
                    dldata: match emqx::stats(&client, opts, host, username, "dldata").await {
899
                        Err(e) => {
900
                            error!("[{}] get dldata stats error: {}", FN_NAME, e);
901
                            return e.into_response();
902
                        }
903
16
                        Ok(stats) => response::Stats {
904
16
                            consumers: stats.consumers,
905
16
                            messages: stats.messages,
906
16
                            publish_rate: stats.publish_rate,
907
16
                            deliver_rate: stats.deliver_rate,
908
16
                        },
909
16
                    },
910
16
                    ctrl: match emqx::stats(&client, opts, host, username, "ctrl").await {
911
                        Err(e) => {
912
                            error!("[{}] get ctrl stats error: {}", FN_NAME, e);
913
                            return e.into_response();
914
                        }
915
16
                        Ok(stats) => response::Stats {
916
16
                            consumers: stats.consumers,
917
16
                            messages: stats.messages,
918
16
                            publish_rate: stats.publish_rate,
919
16
                            deliver_rate: stats.deliver_rate,
920
16
                        },
921
                    },
922
                }
923
            }
924
2
            MqttState::Rumqttd => response::GetNetworkStatsData {
925
2
                dldata: response::Stats {
926
2
                    ..Default::default()
927
2
                },
928
2
                ctrl: response::Stats {
929
2
                    ..Default::default()
930
2
                },
931
2
            },
932
        },
933
        _ => {
934
            let e = format!("unsupport scheme {}", scheme);
935
            error!("[{}] {}", FN_NAME, e);
936
            return ErrResp::ErrUnknown(Some(e)).into_response();
937
        }
938
    };
939
140
    Json(&response::GetNetworkStats { data }).into_response()
940
148
}
941

            
942
/// `POST /{base}/api/v1/network/{networkId}/uldata`
943
28
async fn post_network_uldata(
944
28
    state: State<AppState>,
945
28
    headers: HeaderMap,
946
28
    Path(param): Path<NetworkIdPath>,
947
28
    Json(body): Json<request::PostNetworkUlDataBody>,
948
28
) -> impl IntoResponse {
949
    const FN_NAME: &'static str = "post_network_uldata";
950
28
    let broker_base = state.broker_base.as_str();
951
28
    let client = state.client.clone();
952
28
    let token = match headers.get(header::AUTHORIZATION) {
953
        None => {
954
4
            let e = "missing Authorization".to_string();
955
4
            return ErrResp::ErrParam(Some(e)).into_response();
956
        }
957
24
        Some(value) => value.clone(),
958
24
    };
959
24

            
960
24
    if body.data.device_id.len() == 0 {
961
4
        let e = "empty `deviceId` is invalid".to_string();
962
4
        return ErrResp::ErrParam(Some(e)).into_response();
963
20
    }
964
20
    if let Err(e) = hex::decode(body.data.payload.as_str()) {
965
4
        let e = format!("`payload` is not hexadecimal string: {}", e);
966
4
        return ErrResp::ErrParam(Some(e)).into_response();
967
16
    }
968

            
969
16
    let (network, uri, hostname) = match get_network_inner(
970
16
        FN_NAME,
971
16
        &client,
972
16
        broker_base,
973
16
        param.network_id.as_str(),
974
16
        &token,
975
16
    )
976
16
    .await
977
    {
978
4
        Err(e) => return e,
979
12
        Ok((network, uri, hostname)) => (network, uri, hostname),
980
    };
981
12
    let device = match get_device_inner(
982
12
        FN_NAME,
983
12
        &client,
984
12
        broker_base,
985
12
        body.data.device_id.as_str(),
986
12
        &token,
987
12
    )
988
12
    .await
989
    {
990
        Err(e) => return e,
991
12
        Ok(device) => match device {
992
            None => {
993
4
                return ErrResp::Custom(
994
4
                    ErrReq::DEVICE_NOT_EXIST.0,
995
4
                    ErrReq::DEVICE_NOT_EXIST.1,
996
4
                    None,
997
4
                )
998
4
                .into_response()
999
            }
8
            Some(device) => device,
8
        },
8
    };
8

            
8
    let hostname = hostname.as_str();
8
    let scheme = uri.scheme();
8
    let payload = match serde_json::to_string(&UlData {
8
        time: strings::time_str(&Utc::now()),
8
        network_addr: device.network_addr,
8
        data: body.data.payload.clone(),
8
        ..Default::default()
8
    }) {
        Err(e) => {
            let e = format!("encode JSON error: {}", e);
            error!("[{}] {}", FN_NAME, e);
            return ErrResp::ErrRsc(Some(e)).into_response();
        }
8
        Ok(payload) => general_purpose::STANDARD.encode(payload),
8
    };
8
    match scheme {
8
        "amqp" | "amqps" => {
4
            let AmqpState::RabbitMq(opts) = &state.amqp;
4
            let unit_code = match network.unit_code.as_ref() {
                None => "",
4
                Some(unit_code) => unit_code.as_str(),
            };
4
            let username = mq::to_username(QueueType::Network, unit_code, network.code.as_str());
4
            let username = username.as_str();
            if let Err(e) =
4
                rabbitmq::publish_message(&client, opts, hostname, username, "uldata", payload)
4
                    .await
            {
                return e.into_response();
4
            }
        }
4
        "mqtt" | "mqtts" => match &state.mqtt {
2
            MqttState::Emqx(opts) => {
2
                let unit_code = match network.unit_code.as_ref() {
                    None => "",
2
                    Some(unit_code) => unit_code.as_str(),
                };
2
                let username =
2
                    mq::to_username(QueueType::Network, unit_code, network.code.as_str());
2
                let username = username.as_str();
                if let Err(e) =
2
                    emqx::publish_message(&client, opts, hostname, username, "uldata", payload)
2
                        .await
                {
                    return e.into_response();
2
                }
            }
            MqttState::Rumqttd => {
2
                let e = "not support now".to_string();
2
                return ErrResp::ErrParam(Some(e)).into_response();
            }
        },
        _ => {
            let e = format!("unsupport scheme {}", scheme);
            error!("[{}] {}", FN_NAME, e);
            return ErrResp::ErrUnknown(Some(e)).into_response();
        }
    }
6
    StatusCode::NO_CONTENT.into_response()
28
}
248
async fn get_network_inner(
248
    fn_name: &str,
248
    client: &reqwest::Client,
248
    broker_base: &str,
248
    network_id: &str,
248
    token: &HeaderValue,
248
) -> Result<(response::GetNetworkData, Url, String), Response> {
248
    let uri = format!("{}/api/v1/network/{}", broker_base, network_id);
248
    let resp = get_stream_resp(fn_name, token, &client, uri.as_str()).await?;
228
    let network = match resp.json::<response::GetNetwork>().await {
        Err(e) => {
            let e = format!("wrong response of network: {}", e);
            error!("[{}] {}", fn_name, e);
            return Err(ErrResp::ErrIntMsg(Some(e)).into_response());
        }
228
        Ok(network) => network.data,
    };
228
    let uri = match Url::parse(network.host_uri.as_str()) {
        Err(e) => {
            let e = format!("unexpected hostUri: {}", e);
            error!("[{}] {}", fn_name, e);
            return Err(ErrResp::ErrUnknown(Some(e)).into_response());
        }
228
        Ok(uri) => uri,
    };
228
    let host = match uri.host_str() {
        None => {
            let e = "unexpected hostUri".to_string();
            error!("[{}] {}", fn_name, e);
            return Err(ErrResp::ErrUnknown(Some(e)).into_response());
        }
228
        Some(host) => host.to_string(),
228
    };
228
    Ok((network, uri, host))
248
}
72
async fn check_network_code_inner(
72
    fn_name: &str,
72
    client: &reqwest::Client,
72
    broker_base: &str,
72
    unit_id: &str,
72
    code: &str,
72
    token: &HeaderValue,
72
) -> Result<u64, Response> {
72
    let uri = format!("{}/api/v1/network/count", broker_base);
72
    let req = match client
72
        .request(reqwest::Method::GET, uri)
72
        .header(reqwest::header::AUTHORIZATION, token)
72
        .query(&[("unit", unit_id), ("code", code)])
72
        .build()
    {
        Err(e) => {
            let e = format!("generate request error: {}", e);
            error!("[{}] {}", fn_name, e);
            return Err(ErrResp::ErrRsc(Some(e)).into_response());
        }
72
        Ok(req) => req,
    };
72
    let resp = match client.execute(req).await {
        Err(e) => {
            let e = format!("execute request error: {}", e);
            error!("[{}] {}", fn_name, e);
            return Err(ErrResp::ErrIntMsg(Some(e)).into_response());
        }
72
        Ok(resp) => resp,
72
    };
72

            
72
    match resp.json::<response::GetCount>().await {
        Err(e) => {
            let e = format!("wrong response of network: {}", e);
            error!("[{}] {}", fn_name, e);
            Err(ErrResp::ErrIntMsg(Some(e)).into_response())
        }
72
        Ok(data) => Ok(data.data.count),
    }
72
}