1
use std::error::Error as StdError;
2

            
3
use axum::{
4
    body::Body,
5
    extract::{Request, State},
6
    http::{header, HeaderMap, HeaderValue, StatusCode},
7
    response::{IntoResponse, Response},
8
    routing, Router,
9
};
10
use base64::{engine::general_purpose, Engine};
11
use bytes::{Bytes, BytesMut};
12
use csv::WriterBuilder;
13
use futures_util::StreamExt;
14
use hex;
15
use log::error;
16
use reqwest;
17
use serde::{Deserialize, Serialize};
18
use serde_json::{Deserializer, Map, Value};
19
use url::Url;
20

            
21
use sylvia_iot_corelib::{
22
    err::ErrResp,
23
    http::{Json, Path},
24
    strings,
25
};
26

            
27
use super::{
28
    super::{AmqpState, ErrReq, MqttState, State as AppState},
29
    api_bridge, clear_patch_host, clear_queue_rsc, cmp_host_uri, create_queue_rsc,
30
    get_device_inner, get_stream_resp, get_unit_inner, list_api_bridge, request, response,
31
    transfer_host_uri, trunc_host_uri, ClearQueueResource, CreateQueueResource, ListResp,
32
    PatchHost,
33
};
34
use crate::libs::mq::{self, emqx, rabbitmq, QueueType};
35

            
36
enum ListFormat {
37
    Array,
38
    Csv,
39
    Data,
40
}
41

            
42
#[derive(Deserialize)]
43
struct ApplicationIdPath {
44
    application_id: String,
45
}
46

            
47
#[derive(Deserialize, Serialize)]
48
struct Application {
49
    #[serde(rename = "applicationId")]
50
    application_id: String,
51
    code: String,
52
    #[serde(rename = "unitId")]
53
    unit_id: String,
54
    #[serde(rename = "unitCode")]
55
    unit_code: String,
56
    #[serde(rename = "createdAt")]
57
    created_at: String,
58
    #[serde(rename = "modifiedAt")]
59
    modified_at: String,
60
    #[serde(rename = "hostUri")]
61
    host_uri: String,
62
    name: String,
63
    info: Map<String, Value>,
64
}
65

            
66
#[derive(Deserialize, Serialize)]
67
struct CsvItem {
68
    #[serde(rename = "applicationId")]
69
    application_id: String,
70
    code: String,
71
    #[serde(rename = "unitId")]
72
    unit_id: String,
73
    #[serde(rename = "unitCode")]
74
    unit_code: String,
75
    #[serde(rename = "createdAt")]
76
    created_at: String,
77
    #[serde(rename = "modifiedAt")]
78
    modified_at: String,
79
    #[serde(rename = "hostUri")]
80
    host_uri: String,
81
    name: String,
82
    info: Option<String>,
83
}
84

            
85
/// Downlink data from application to broker.
86
#[derive(Default, Serialize)]
87
struct DlData {
88
    #[serde(rename = "correlationId")]
89
    correlation_id: String,
90
    #[serde(rename = "deviceId")]
91
    device_id: Option<String>,
92
    #[serde(rename = "networkCode")]
93
    network_code: Option<String>,
94
    #[serde(rename = "networkAddr")]
95
    network_addr: Option<String>,
96
    data: String,
97
    extension: Option<Map<String, Value>>,
98
}
99

            
100
const CSV_FIELDS: &'static [u8] =
101
    b"\xEF\xBB\xBFapplicationId,code,unitId,unitCode,createdAt,modifiedAt,hostUri,name,info\n";
102

            
103
253
pub fn new_service(scope_path: &str, state: &AppState) -> Router {
104
253
    Router::new().nest(
105
253
        scope_path,
106
253
        Router::new()
107
253
            .route("/", routing::post(post_application))
108
253
            .route("/count", routing::get(get_application_count))
109
253
            .route("/list", routing::get(get_application_list))
110
253
            .route(
111
253
                "/:application_id",
112
253
                routing::get(get_application)
113
253
                    .patch(patch_application)
114
253
                    .delete(delete_application),
115
253
            )
116
253
            .route(
117
253
                "/:application_id/stats",
118
253
                routing::get(get_application_stats),
119
253
            )
120
253
            .route(
121
253
                "/:application_id/dldata",
122
253
                routing::post(post_application_dldata),
123
253
            )
124
253
            .with_state(state.clone()),
125
253
    )
126
253
}
127

            
128
/// `POST /{base}/api/v1/application`
129
40
async fn post_application(
130
40
    State(state): State<AppState>,
131
40
    mut headers: HeaderMap,
132
40
    Json(mut body): Json<request::PostApplicationBody>,
133
40
) -> impl IntoResponse {
134
    const FN_NAME: &'static str = "post_application";
135
40
    let broker_base = state.broker_base.as_str();
136
40
    let api_path = format!("{}/api/v1/application", broker_base);
137
40
    let client = state.client.clone();
138
40
    let token = match headers.get(header::AUTHORIZATION) {
139
        None => {
140
2
            let e = "missing Authorization".to_string();
141
2
            return ErrResp::ErrParam(Some(e)).into_response();
142
        }
143
38
        Some(value) => value.clone(),
144
38
    };
145
38

            
146
38
    // Get unit information to create queue information.
147
38
    let unit_id = body.data.unit_id.as_str();
148
38
    if unit_id.len() == 0 {
149
2
        return ErrResp::ErrParam(Some(
150
2
            "`unitId` must with at least one character".to_string(),
151
2
        ))
152
2
        .into_response();
153
36
    }
154
36
    let unit = match get_unit_inner(FN_NAME, &client, broker_base, unit_id, &token).await {
155
        Err(e) => return e,
156
36
        Ok(unit) => match unit {
157
            None => {
158
2
                return ErrResp::Custom(ErrReq::UNIT_NOT_EXIST.0, ErrReq::UNIT_NOT_EXIST.1, None)
159
2
                    .into_response()
160
            }
161
34
            Some(unit) => unit,
162
34
        },
163
34
    };
164
34
    let unit_code = unit.code.as_str();
165
34
    let code = body.data.code.as_str();
166
34
    if !strings::is_code(code) {
167
2
        return ErrResp::ErrParam(Some(
168
2
            "`code` must be [A-Za-z0-9]{1}[A-Za-z0-9-_]*".to_string(),
169
2
        ))
170
2
        .into_response();
171
32
    }
172
32
    match check_application_code_inner(FN_NAME, &client, broker_base, unit_id, code, &token).await {
173
        Err(e) => return e,
174
32
        Ok(count) => match count {
175
30
            0 => (),
176
            _ => {
177
2
                return ErrResp::Custom(
178
2
                    ErrReq::APPLICATION_EXIST.0,
179
2
                    ErrReq::APPLICATION_EXIST.1,
180
2
                    None,
181
2
                )
182
2
                .into_response()
183
            }
184
        },
185
    }
186
30
    let q_type = QueueType::Application;
187
30
    let username = mq::to_username(q_type, unit_code, code);
188
30
    let password = strings::randomstring(8);
189
30
    let uri = match Url::parse(body.data.host_uri.as_str()) {
190
2
        Err(e) => {
191
2
            return ErrResp::ErrParam(Some(format!("invalid `hostUri`: {}", e))).into_response();
192
        }
193
28
        Ok(uri) => uri,
194
    };
195
28
    let host = match uri.host() {
196
        None => {
197
2
            let e = "invalid `hostUri`".to_string();
198
2
            return ErrResp::ErrParam(Some(e)).into_response();
199
        }
200
26
        Some(host) => host.to_string(),
201
26
    };
202
26
    let scheme = uri.scheme();
203
26
    let host = host.as_str();
204
26
    let username = username.as_str();
205
26
    let password = password.as_str();
206
26

            
207
26
    // Create message broker resources.
208
26
    let create_rsc = CreateQueueResource {
209
26
        scheme,
210
26
        host,
211
26
        username,
212
26
        password,
213
26
        ttl: body.data.ttl,
214
26
        length: body.data.length,
215
26
        q_type: QueueType::Application,
216
26
    };
217
26
    if let Err(e) = create_queue_rsc(FN_NAME, &state, &create_rsc).await {
218
        return e;
219
26
    }
220
26
    let clear_rsc = ClearQueueResource {
221
26
        scheme,
222
26
        host,
223
26
        username,
224
26
    };
225
26

            
226
26
    // Create application instance.
227
26
    let mut body_uri = uri.clone();
228
26
    transfer_host_uri(&state, &mut body_uri, username);
229
26
    body.data.host_uri = body_uri.to_string();
230
26
    headers.remove(header::CONTENT_LENGTH);
231
26
    let builder = client
232
26
        .request(reqwest::Method::POST, api_path)
233
26
        .headers(headers)
234
26
        .json(&body);
235
26
    let api_req = match builder.build() {
236
        Err(e) => {
237
            let _ = clear_queue_rsc(FN_NAME, &state, &clear_rsc);
238
            let e = format!("generate request error: {}", e);
239
            error!("[{}] {}", FN_NAME, e);
240
            return ErrResp::ErrRsc(Some(e)).into_response();
241
        }
242
26
        Ok(req) => req,
243
    };
244
26
    let api_resp = match client.execute(api_req).await {
245
        Err(e) => {
246
            let _ = clear_queue_rsc(FN_NAME, &state, &clear_rsc);
247
            let e = format!("execute request error: {}", e);
248
            error!("[{}] {}", FN_NAME, e);
249
            return ErrResp::ErrIntMsg(Some(e)).into_response();
250
        }
251
26
        Ok(resp) => match resp.status() {
252
26
            reqwest::StatusCode::OK => resp,
253
            _ => {
254
                let mut resp_builder = Response::builder().status(resp.status());
255
                for (k, v) in resp.headers() {
256
                    resp_builder = resp_builder.header(k, v);
257
                }
258
                match resp_builder.body(Body::from_stream(resp.bytes_stream())) {
259
                    Err(e) => {
260
                        let e = format!("wrap response body error: {}", e);
261
                        error!("[{}] {}", FN_NAME, e);
262
                        return ErrResp::ErrIntMsg(Some(e)).into_response();
263
                    }
264
                    Ok(resp) => return resp,
265
                }
266
            }
267
        },
268
    };
269
26
    let mut body = match api_resp.json::<response::PostApplication>().await {
270
        Err(e) => {
271
            let _ = clear_queue_rsc(FN_NAME, &state, &clear_rsc);
272
            let e = format!("unexpected response: {}", e);
273
            return ErrResp::ErrUnknown(Some(e)).into_response();
274
        }
275
26
        Ok(body) => body,
276
26
    };
277
26
    body.data.password = Some(password.to_string());
278
26

            
279
26
    Json(&body).into_response()
280
40
}
281

            
282
/// `GET /{base}/api/v1/application/count`
283
2
async fn get_application_count(state: State<AppState>, req: Request) -> impl IntoResponse {
284
    const FN_NAME: &'static str = "get_application_count";
285
2
    let api_path = format!("{}/api/v1/application/count", state.broker_base.as_str());
286
2
    let client = state.client.clone();
287
2

            
288
2
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
289
2
}
290

            
291
/// `GET /{base}/api/v1/application/list`
292
10
async fn get_application_list(state: State<AppState>, req: Request) -> impl IntoResponse {
293
    const FN_NAME: &'static str = "get_application_list";
294
10
    let api_path = format!("{}/api/v1/application/list", state.broker_base.as_str());
295
10
    let api_path = api_path.as_str();
296
10
    let client = state.client.clone();
297
10

            
298
10
    let mut list_format = ListFormat::Data;
299
10
    if let Some(query_str) = req.uri().query() {
300
8
        let query = match serde_urlencoded::from_str::<Vec<(String, String)>>(query_str) {
301
            Err(e) => {
302
                let e = format!("parse query error: {}", e);
303
                return ErrResp::ErrParam(Some(e)).into_response();
304
            }
305
8
            Ok(query) => query,
306
        };
307
12
        for (k, v) in query.iter() {
308
12
            if k.as_str().eq("format") {
309
6
                if v.as_str().eq("array") {
310
2
                    list_format = ListFormat::Array;
311
4
                } else if v.as_str().eq("csv") {
312
2
                    list_format = ListFormat::Csv;
313
2
                }
314
6
            }
315
        }
316
2
    }
317

            
318
8
    let (api_resp, resp_builder) =
319
10
        match list_api_bridge(FN_NAME, &client, req, api_path, true, "application").await {
320
2
            ListResp::Axum(resp) => return resp,
321
8
            ListResp::ArrayStream(api_resp, resp_builder) => (api_resp, resp_builder),
322
8
        };
323
8

            
324
8
    let mut resp_stream = api_resp.bytes_stream();
325
8
    let body = Body::from_stream(async_stream::stream! {
326
8
        match list_format {
327
8
            ListFormat::Array => yield Ok(Bytes::from("[")),
328
8
            ListFormat::Csv => yield Ok(Bytes::from(CSV_FIELDS)),
329
8
            ListFormat::Data => yield Ok(Bytes::from("{\"data\":[")),
330
8
        }
331
8
        let mut first_sent = false;
332
8

            
333
8
        let mut buffer = BytesMut::new();
334
8
        while let Some(body) = resp_stream.next().await {
335
8
            match body {
336
8
                Err(e) => {
337
8
                    error!("[{}] get body error: {}", FN_NAME, e);
338
8
                    let err: Box<dyn StdError + Send + Sync> = Box::new(e);
339
8
                    yield Err(err);
340
8
                    break;
341
8
                }
342
8
                Ok(body) => buffer.extend_from_slice(&body[..]),
343
8
            }
344
8

            
345
8
            let mut json_stream = Deserializer::from_slice(&buffer[..]).into_iter::<Application>();
346
8
            let mut index = 0;
347
8
            let mut finish = false;
348
8
            loop {
349
8
                if let Some(Ok(mut v)) = json_stream.next() {
350
8
                    v.host_uri = match Url::parse(v.host_uri.as_str()) {
351
8
                        Err(e) => {
352
8
                            error!("[{}] parse body hostUri error: {}", FN_NAME, e);
353
8
                            let err: Box<dyn StdError + Send + Sync> = Box::new(e);
354
8
                            yield Err(err);
355
8
                            finish = true;
356
8
                            break;
357
8
                        }
358
8
                        Ok(uri) => trunc_host_uri(&uri),
359
8
                    };
360
8
                    match list_format {
361
8
                        ListFormat::Array | ListFormat::Data => match serde_json::to_string(&v) {
362
8
                            Err(e) =>{
363
8
                                error!("[{}] serialize JSON error: {}", FN_NAME, e);
364
8
                                let err: Box<dyn StdError + Send + Sync> = Box::new(e);
365
8
                                yield Err(err);
366
8
                                finish = true;
367
8
                                break;
368
8
                            }
369
8
                            Ok(v) => {
370
8
                                match first_sent {
371
8
                                    false => first_sent = true,
372
8
                                    true => yield Ok(Bytes::from(",")),
373
8
                                }
374
8
                                yield Ok(Bytes::copy_from_slice(v.as_str().as_bytes()));
375
8
                            }
376
8
                        }
377
8
                        ListFormat::Csv => {
378
8
                            let mut item = CsvItem{
379
8
                                application_id: v.application_id,
380
8
                                code: v.code,
381
8
                                unit_id: v.unit_id,
382
8
                                unit_code: v.unit_code,
383
8
                                created_at: v.created_at,
384
8
                                modified_at: v.modified_at,
385
8
                                host_uri: v.host_uri,
386
8
                                name: v.name,
387
8
                                info: None,
388
8
                            };
389
8
                            if let Ok(info_str) = serde_json::to_string(&v.info) {
390
8
                                item.info = Some(info_str);
391
8
                            }
392
8
                            let mut writer =
393
8
                                WriterBuilder::new().has_headers(false).from_writer(vec![]);
394
8
                            if let Err(e) = writer.serialize(item) {
395
8
                                error!("[{}] serialize CSV error: {}", FN_NAME, e);
396
8
                                let err: Box<dyn StdError + Send + Sync> = Box::new(e);
397
8
                                yield Err(err);
398
8
                                finish = true;
399
8
                                break;
400
8
                            }
401
8
                            match writer.into_inner() {
402
8
                                Err(e) => {
403
8
                                    error!("[{}] serialize bytes error: {}", FN_NAME, e);
404
8
                                    let err: Box<dyn StdError + Send + Sync> = Box::new(e);
405
8
                                    yield Err(err);
406
8
                                    finish = true;
407
8
                                    break;
408
8
                                }
409
8
                                Ok(row) => yield Ok(Bytes::copy_from_slice(row.as_slice())),
410
8
                            }
411
8
                        }
412
8
                    }
413
8
                    continue;
414
8
                }
415
8
                let offset = json_stream.byte_offset();
416
8
                if buffer.len() <= index + offset {
417
8
                    index = buffer.len();
418
8
                    break;
419
8
                }
420
8
                match buffer[index+offset] {
421
8
                    b'[' | b',' => {
422
8
                        index += offset + 1;
423
8
                        if buffer.len() <= index {
424
8
                            break;
425
8
                        }
426
8
                        json_stream =
427
8
                            Deserializer::from_slice(&buffer[index..]).into_iter::<Application>();
428
8
                    }
429
8
                    b']' => {
430
8
                        finish = true;
431
8
                        break;
432
8
                    }
433
8
                    _ => break,
434
8
                }
435
8
            }
436
8
            if finish {
437
8
                match list_format {
438
8
                    ListFormat::Array => yield Ok(Bytes::from("]")),
439
8
                    ListFormat::Csv => (),
440
8
                    ListFormat::Data => yield Ok(Bytes::from("]}")),
441
8
                }
442
8
                break;
443
8
            }
444
8
            buffer = buffer.split_off(index);
445
8
        }
446
8
    });
447
8
    match resp_builder.body(body) {
448
        Err(e) => ErrResp::ErrRsc(Some(e.to_string())).into_response(),
449
8
        Ok(resp) => resp,
450
    }
451
10
}
452

            
453
/// `GET /{base}/api/v1/application/{applicationId}`
454
18
async fn get_application(
455
18
    state: State<AppState>,
456
18
    Path(param): Path<ApplicationIdPath>,
457
18
    req: Request,
458
18
) -> impl IntoResponse {
459
    const FN_NAME: &'static str = "get_application";
460
18
    let broker_base = state.broker_base.as_str();
461
18
    let client = state.client.clone();
462
18
    let token = match req.headers().get(header::AUTHORIZATION) {
463
        None => {
464
2
            let e = "missing Authorization".to_string();
465
2
            return ErrResp::ErrParam(Some(e)).into_response();
466
        }
467
16
        Some(value) => value.clone(),
468
    };
469

            
470
16
    let (mut application, uri, host) = match get_application_inner(
471
16
        FN_NAME,
472
16
        &client,
473
16
        broker_base,
474
16
        param.application_id.as_str(),
475
16
        &token,
476
16
    )
477
16
    .await
478
    {
479
2
        Err(e) => return e,
480
14
        Ok((application, uri, host)) => (application, uri, host),
481
14
    };
482
14

            
483
14
    let host = host.as_str();
484
14
    let scheme = uri.scheme();
485
14
    if scheme.eq("amqp") || scheme.eq("amqps") {
486
8
        let AmqpState::RabbitMq(opts) = &state.amqp;
487
8
        let username = mq::to_username(
488
8
            QueueType::Application,
489
8
            application.unit_code.as_str(),
490
8
            application.code.as_str(),
491
8
        );
492
8
        let username = username.as_str();
493
8
        match rabbitmq::get_policies(&client, opts, host, username).await {
494
            Err(e) => {
495
                error!("[{}] get {} policies error: {}", FN_NAME, username, e);
496
                return e.into_response();
497
            }
498
8
            Ok(policies) => {
499
8
                application.ttl = policies.ttl;
500
8
                application.length = policies.length;
501
8
            }
502
        }
503
6
    }
504
14
    application.host_uri = trunc_host_uri(&uri);
505
14
    Json(&response::GetApplication { data: application }).into_response()
506
18
}
507

            
508
/// `PATCH /{base}/api/v1/application/{applicationId}`
509
26
async fn patch_application(
510
26
    state: State<AppState>,
511
26
    headers: HeaderMap,
512
26
    Path(param): Path<ApplicationIdPath>,
513
26
    Json(body): Json<request::PatchApplicationBody>,
514
26
) -> impl IntoResponse {
515
    const FN_NAME: &'static str = "patch_application";
516
26
    let broker_base = state.broker_base.as_str();
517
26
    let client = state.client.clone();
518
26
    let token = match headers.get(header::AUTHORIZATION) {
519
        None => {
520
2
            let e = "missing Authorization".to_string();
521
2
            return ErrResp::ErrParam(Some(e)).into_response();
522
        }
523
24
        Some(value) => value.clone(),
524
24
    };
525
24

            
526
24
    let data = &body.data;
527
24
    if data.host_uri.is_none()
528
12
        && data.name.is_none()
529
8
        && data.info.is_none()
530
8
        && data.ttl.is_none()
531
6
        && data.length.is_none()
532
6
        && data.password.is_none()
533
    {
534
2
        return ErrResp::ErrParam(Some("at least one parameter".to_string())).into_response();
535
22
    }
536

            
537
22
    let (application, uri, hostname) = match get_application_inner(
538
22
        FN_NAME,
539
22
        &client,
540
22
        broker_base,
541
22
        param.application_id.as_str(),
542
22
        &token,
543
22
    )
544
22
    .await
545
    {
546
2
        Err(e) => return e,
547
20
        Ok((application, uri, hostname)) => (application, uri, hostname),
548
20
    };
549
20

            
550
20
    let mut patch_data = request::PatchApplicationData {
551
20
        name: data.name.clone(),
552
20
        info: data.info.clone(),
553
20
        ..Default::default()
554
20
    };
555
20
    let mut patch_host: Option<PatchHost> = None;
556
20
    if let Some(host) = data.host_uri.as_ref() {
557
12
        if !strings::is_uri(host) {
558
2
            return ErrResp::ErrParam(Some("invalid `hostUri`".to_string())).into_response();
559
10
        }
560
10
        // Change to the new broker host.
561
10
        if !cmp_host_uri(application.host_uri.as_str(), host.as_str()) {
562
10
            let password = match data.password.as_ref() {
563
                None => {
564
2
                    let e = "missing `password`".to_string();
565
2
                    return ErrResp::ErrParam(Some(e)).into_response();
566
                }
567
8
                Some(password) => match password.len() {
568
                    0 => {
569
2
                        let e = "missing `password`".to_string();
570
2
                        return ErrResp::ErrParam(Some(e)).into_response();
571
                    }
572
6
                    _ => password,
573
                },
574
            };
575
6
            let mut new_host_uri = match Url::parse(host.as_str()) {
576
                Err(e) => {
577
                    let e = format!("invalid `hostUri`: {}", e);
578
                    return ErrResp::ErrParam(Some(e)).into_response();
579
                }
580
6
                Ok(uri) => match uri.host_str() {
581
                    None => {
582
2
                        let e = "invalid `hostUri`".to_string();
583
2
                        return ErrResp::ErrParam(Some(e)).into_response();
584
                    }
585
4
                    Some(_) => uri,
586
4
                },
587
4
            };
588
4

            
589
4
            let unit_code = application.unit_code.as_str();
590
4
            let code = application.code.as_str();
591
4
            let username = mq::to_username(QueueType::Application, unit_code, code);
592
4
            let resource = CreateQueueResource {
593
4
                scheme: new_host_uri.scheme(),
594
4
                host: new_host_uri.host_str().unwrap(),
595
4
                username: username.as_str(),
596
4
                password: password.as_str(),
597
4
                ttl: data.ttl,
598
4
                length: data.length,
599
4
                q_type: QueueType::Application,
600
4
            };
601
4
            if let Err(e) = create_queue_rsc(FN_NAME, &state, &resource).await {
602
                return e;
603
4
            }
604
4

            
605
4
            transfer_host_uri(&state, &mut new_host_uri, username.as_str());
606
4
            patch_data.host_uri = Some(new_host_uri.to_string());
607
4
            patch_host = Some(PatchHost {
608
4
                host_uri: new_host_uri,
609
4
                username,
610
4
            });
611
        }
612
8
    }
613

            
614
    // Send request body to the sylvia-iot-broker.
615
12
    if patch_data.host_uri.is_some() || patch_data.name.is_some() || patch_data.info.is_some() {
616
6
        let application_id = param.application_id.as_str();
617
6
        let uri = format!("{}/api/v1/application/{}", broker_base, application_id);
618
6
        let mut builder = client
619
6
            .request(reqwest::Method::PATCH, uri)
620
6
            .header(reqwest::header::AUTHORIZATION, &token)
621
6
            .json(&request::PatchApplicationBody { data: patch_data });
622
6
        if let Some(content_type) = headers.get(header::CONTENT_TYPE) {
623
6
            builder = builder.header(reqwest::header::CONTENT_TYPE, content_type);
624
6
        }
625
6
        let api_req = match builder.build() {
626
            Err(e) => {
627
                clear_patch_host(FN_NAME, &state, &patch_host).await;
628
                let e = format!("generate request error: {}", e);
629
                error!("[{}] {}", FN_NAME, e);
630
                return ErrResp::ErrRsc(Some(e)).into_response();
631
            }
632
6
            Ok(req) => req,
633
        };
634
6
        let api_resp = match client.execute(api_req).await {
635
            Err(e) => {
636
                clear_patch_host(FN_NAME, &state, &patch_host).await;
637
                let e = format!("execute request error: {}", e);
638
                error!("[{}] {}", FN_NAME, e);
639
                return ErrResp::ErrIntMsg(Some(e)).into_response();
640
            }
641
6
            Ok(resp) => resp,
642
6
        };
643
6

            
644
6
        let status_code = api_resp.status();
645
6
        if status_code != StatusCode::NO_CONTENT {
646
            clear_patch_host(FN_NAME, &state, &patch_host).await;
647
            let mut resp_builder = Response::builder().status(status_code);
648
            for (k, v) in api_resp.headers() {
649
                resp_builder = resp_builder.header(k, v);
650
            }
651
            match resp_builder.body(Body::from_stream(api_resp.bytes_stream())) {
652
                Err(e) => {
653
                    let e = format!("wrap response body error: {}", e);
654
                    error!("[{}] {}", FN_NAME, e);
655
                    return ErrResp::ErrIntMsg(Some(e)).into_response();
656
                }
657
                Ok(resp) => return resp,
658
            }
659
6
        }
660
6
    }
661

            
662
12
    if let Some(host) = patch_host {
663
4
        let resource = ClearQueueResource {
664
4
            scheme: uri.scheme(),
665
4
            host: uri.host_str().unwrap(),
666
4
            username: host.username.as_str(),
667
4
        };
668
4
        let _ = clear_queue_rsc(FN_NAME, &state, &resource).await;
669
4
        return StatusCode::NO_CONTENT.into_response();
670
8
    } else if data.ttl.is_none() && data.length.is_none() && data.password.is_none() {
671
2
        return StatusCode::NO_CONTENT.into_response();
672
6
    }
673

            
674
    // Update broker information without changing hostUri.
675
6
    if let Some(password) = data.password.as_ref() {
676
6
        if password.len() == 0 {
677
2
            let e = "missing `password`".to_string();
678
2
            return ErrResp::ErrParam(Some(e)).into_response();
679
4
        }
680
    }
681
4
    let unit_code = application.unit_code.as_str();
682
4
    let code = application.code.as_str();
683
4
    let hostname = hostname.as_str();
684
4
    let username = mq::to_username(QueueType::Application, unit_code, code);
685
4
    let username = username.as_str();
686
4
    match uri.scheme() {
687
4
        "amqp" | "amqps" => match &state.amqp {
688
2
            AmqpState::RabbitMq(opts) => {
689
2
                if data.ttl.is_some() || data.length.is_some() {
690
2
                    let policies = rabbitmq::BrokerPolicies {
691
2
                        ttl: data.ttl,
692
2
                        length: data.length,
693
2
                    };
694
                    if let Err(e) =
695
2
                        rabbitmq::put_policies(&client, opts, hostname, username, &policies).await
696
                    {
697
                        let e = format!("patch RabbitMQ error: {}", e);
698
                        error!("[{}] {}", FN_NAME, e);
699
                        return ErrResp::ErrIntMsg(Some(e)).into_response();
700
2
                    }
701
                }
702
2
                if let Some(password) = data.password.as_ref() {
703
2
                    let password = password.as_str();
704
                    if let Err(e) =
705
2
                        rabbitmq::put_user(&client, opts, hostname, username, password).await
706
                    {
707
                        let e = format!("patch RabbitMQ password error: {}", e);
708
                        error!("[{}] {}", FN_NAME, e);
709
                        return ErrResp::ErrIntMsg(Some(e)).into_response();
710
2
                    }
711
                }
712
            }
713
        },
714
2
        "mqtt" | "mqtts" => match &state.mqtt {
715
1
            MqttState::Emqx(opts) => {
716
1
                if let Some(password) = data.password.as_ref() {
717
1
                    let password = password.as_str();
718
                    if let Err(e) =
719
1
                        emqx::put_user(&client, opts, hostname, username, password).await
720
                    {
721
                        let e = format!("patch EMQX password error: {}", e);
722
                        error!("[{}] {}", FN_NAME, e);
723
                        return ErrResp::ErrIntMsg(Some(e)).into_response();
724
1
                    }
725
                }
726
            }
727
1
            MqttState::Rumqttd => {}
728
        },
729
        _ => {}
730
    }
731

            
732
4
    StatusCode::NO_CONTENT.into_response()
733
26
}
734

            
735
/// `DELETE /{base}/api/v1/application/{applicationId}`
736
8
async fn delete_application(
737
8
    state: State<AppState>,
738
8
    Path(param): Path<ApplicationIdPath>,
739
8
    req: Request,
740
8
) -> impl IntoResponse {
741
    const FN_NAME: &'static str = "delete_application";
742
8
    let broker_base = state.broker_base.as_str();
743
8
    let application_id = param.application_id.as_str();
744
8
    let api_path = format!("{}/api/v1/application/{}", broker_base, application_id);
745
8
    let client = state.client.clone();
746
8
    let token = match req.headers().get(header::AUTHORIZATION) {
747
        None => {
748
2
            let e = "missing Authorization".to_string();
749
2
            return ErrResp::ErrParam(Some(e)).into_response();
750
        }
751
6
        Some(value) => value.clone(),
752
    };
753

            
754
4
    let (application, uri, host) =
755
6
        match get_application_inner(FN_NAME, &client, broker_base, application_id, &token).await {
756
2
            Err(e) => return e,
757
4
            Ok((application, uri, host)) => (application, uri, host),
758
        };
759

            
760
4
    let resp = api_bridge(FN_NAME, &client, req, api_path.as_str()).await;
761
4
    if !resp.status().is_success() {
762
        return resp;
763
4
    }
764
4

            
765
4
    let unit_code = application.unit_code.as_str();
766
4
    let code = application.code.as_str();
767
4
    let username = mq::to_username(QueueType::Application, unit_code, code);
768
4
    let clear_rsc = ClearQueueResource {
769
4
        scheme: uri.scheme(),
770
4
        host: host.as_str(),
771
4
        username: username.as_str(),
772
4
    };
773
4
    if let Err(e) = clear_queue_rsc(FN_NAME, &state, &clear_rsc).await {
774
        return e;
775
4
    }
776
4

            
777
4
    StatusCode::NO_CONTENT.into_response()
778
8
}
779

            
780
/// `GET /{base}/api/v1/application/{applicationId}/stats`
781
43
async fn get_application_stats(
782
43
    state: State<AppState>,
783
43
    Path(param): Path<ApplicationIdPath>,
784
43
    req: Request,
785
43
) -> impl IntoResponse {
786
    const FN_NAME: &'static str = "get_application";
787
43
    let broker_base = state.broker_base.as_str();
788
43
    let client = state.client.clone();
789
43
    let token = match req.headers().get(header::AUTHORIZATION) {
790
        None => {
791
2
            let e = "missing Authorization".to_string();
792
2
            return ErrResp::ErrParam(Some(e)).into_response();
793
        }
794
41
        Some(value) => value.clone(),
795
    };
796

            
797
41
    let (application, uri, host) = match get_application_inner(
798
41
        FN_NAME,
799
41
        &client,
800
41
        broker_base,
801
41
        param.application_id.as_str(),
802
41
        &token,
803
41
    )
804
41
    .await
805
    {
806
2
        Err(e) => return e,
807
39
        Ok((application, uri, host)) => (application, uri, host),
808
39
    };
809
39

            
810
39
    let host = host.as_str();
811
39
    let scheme = uri.scheme();
812
39
    let data = match scheme {
813
39
        "amqp" | "amqps" => {
814
31
            let AmqpState::RabbitMq(opts) = &state.amqp;
815
31
            let username = mq::to_username(
816
31
                QueueType::Application,
817
31
                application.unit_code.as_str(),
818
31
                application.code.as_str(),
819
31
            );
820
31
            let username = username.as_str();
821
31
            response::GetApplicationStatsData {
822
31
                uldata: match rabbitmq::stats(&client, opts, host, username, "uldata").await {
823
                    Err(ErrResp::ErrNotFound(_)) => response::Stats {
824
                        consumers: 0,
825
                        messages: 0,
826
                        publish_rate: 0.0,
827
                        deliver_rate: 0.0,
828
                    },
829
                    Err(e) => {
830
                        error!("[{}] get uldata stats error: {}", FN_NAME, e);
831
                        return e.into_response();
832
                    }
833
31
                    Ok(stats) => response::Stats {
834
31
                        consumers: stats.consumers,
835
31
                        messages: stats.messages,
836
31
                        publish_rate: stats.publish_rate,
837
31
                        deliver_rate: stats.deliver_rate,
838
31
                    },
839
                },
840
31
                dldata_resp: match rabbitmq::stats(&client, opts, host, username, "dldata-resp")
841
31
                    .await
842
                {
843
                    Err(ErrResp::ErrNotFound(_)) => response::Stats {
844
                        consumers: 0,
845
                        messages: 0,
846
                        publish_rate: 0.0,
847
                        deliver_rate: 0.0,
848
                    },
849
                    Err(e) => {
850
                        error!("[{}] get dldata-resp stats error: {}", FN_NAME, e);
851
                        return e.into_response();
852
                    }
853
31
                    Ok(stats) => response::Stats {
854
31
                        consumers: stats.consumers,
855
31
                        messages: stats.messages,
856
31
                        publish_rate: stats.publish_rate,
857
31
                        deliver_rate: stats.deliver_rate,
858
31
                    },
859
                },
860
31
                dldata_result: match rabbitmq::stats(&client, opts, host, username, "dldata-result")
861
31
                    .await
862
                {
863
                    Err(ErrResp::ErrNotFound(_)) => response::Stats {
864
                        consumers: 0,
865
                        messages: 0,
866
                        publish_rate: 0.0,
867
                        deliver_rate: 0.0,
868
                    },
869
                    Err(e) => {
870
                        error!("[{}] get dldata-result stats error: {}", FN_NAME, e);
871
                        return e.into_response();
872
                    }
873
31
                    Ok(stats) => response::Stats {
874
31
                        consumers: stats.consumers,
875
31
                        messages: stats.messages,
876
31
                        publish_rate: stats.publish_rate,
877
31
                        deliver_rate: stats.deliver_rate,
878
31
                    },
879
                },
880
            }
881
        }
882
8
        "mqtt" | "mqtts" => match &state.mqtt {
883
7
            MqttState::Emqx(opts) => {
884
7
                let username = mq::to_username(
885
7
                    QueueType::Application,
886
7
                    application.unit_code.as_str(),
887
7
                    application.code.as_str(),
888
7
                );
889
7
                let username = username.as_str();
890
7
                response::GetApplicationStatsData {
891
7
                    uldata: match emqx::stats(&client, opts, host, username, "uldata").await {
892
                        Err(e) => {
893
                            error!("[{}] get uldata stats error: {}", FN_NAME, e);
894
                            return e.into_response();
895
                        }
896
7
                        Ok(stats) => response::Stats {
897
7
                            consumers: stats.consumers,
898
7
                            messages: stats.messages,
899
7
                            publish_rate: stats.publish_rate,
900
7
                            deliver_rate: stats.deliver_rate,
901
7
                        },
902
7
                    },
903
7
                    dldata_resp: match emqx::stats(&client, opts, host, username, "dldata-resp")
904
7
                        .await
905
                    {
906
                        Err(e) => {
907
                            error!("[{}] get dldata-resp stats error: {}", FN_NAME, e);
908
                            return e.into_response();
909
                        }
910
7
                        Ok(stats) => response::Stats {
911
7
                            consumers: stats.consumers,
912
7
                            messages: stats.messages,
913
7
                            publish_rate: stats.publish_rate,
914
7
                            deliver_rate: stats.deliver_rate,
915
7
                        },
916
7
                    },
917
7
                    dldata_result: match emqx::stats(&client, opts, host, username, "dldata-result")
918
7
                        .await
919
                    {
920
                        Err(e) => {
921
                            error!("[{}] get dldata-result stats error: {}", FN_NAME, e);
922
                            return e.into_response();
923
                        }
924
7
                        Ok(stats) => response::Stats {
925
7
                            consumers: stats.consumers,
926
7
                            messages: stats.messages,
927
7
                            publish_rate: stats.publish_rate,
928
7
                            deliver_rate: stats.deliver_rate,
929
7
                        },
930
                    },
931
                }
932
            }
933
1
            MqttState::Rumqttd => response::GetApplicationStatsData {
934
1
                uldata: response::Stats {
935
1
                    ..Default::default()
936
1
                },
937
1
                dldata_resp: response::Stats {
938
1
                    ..Default::default()
939
1
                },
940
1
                dldata_result: response::Stats {
941
1
                    ..Default::default()
942
1
                },
943
1
            },
944
        },
945
        _ => {
946
            let e = format!("unsupport scheme {}", scheme);
947
            error!("[{}] {}", FN_NAME, e);
948
            return ErrResp::ErrUnknown(Some(e)).into_response();
949
        }
950
    };
951
39
    Json(&response::GetApplicationStats { data }).into_response()
952
43
}
953

            
954
/// `POST /{base}/api/v1/application/{applicationId}/dldata`
955
14
async fn post_application_dldata(
956
14
    state: State<AppState>,
957
14
    headers: HeaderMap,
958
14
    Path(param): Path<ApplicationIdPath>,
959
14
    Json(body): Json<request::PostApplicationDlDataBody>,
960
14
) -> impl IntoResponse {
961
    const FN_NAME: &'static str = "post_application_dldata";
962
14
    let broker_base = state.broker_base.as_str();
963
14
    let client = state.client.clone();
964
14
    let token = match headers.get(header::AUTHORIZATION) {
965
        None => {
966
2
            let e = "missing Authorization".to_string();
967
2
            return ErrResp::ErrParam(Some(e)).into_response();
968
        }
969
12
        Some(value) => value.clone(),
970
12
    };
971
12

            
972
12
    if body.data.device_id.len() == 0 {
973
2
        let e = "empty `deviceId` is invalid".to_string();
974
2
        return ErrResp::ErrParam(Some(e)).into_response();
975
10
    }
976
10
    if let Err(e) = hex::decode(body.data.payload.as_str()) {
977
2
        let e = format!("`payload` is not hexadecimal string: {}", e);
978
2
        return ErrResp::ErrParam(Some(e)).into_response();
979
8
    }
980

            
981
8
    let (application, uri, hostname) = match get_application_inner(
982
8
        FN_NAME,
983
8
        &client,
984
8
        broker_base,
985
8
        param.application_id.as_str(),
986
8
        &token,
987
8
    )
988
8
    .await
989
    {
990
2
        Err(e) => return e,
991
6
        Ok((application, uri, hostname)) => (application, uri, hostname),
992
6
    };
993
6
    match get_device_inner(
994
6
        FN_NAME,
995
6
        &client,
996
6
        broker_base,
997
6
        body.data.device_id.as_str(),
998
6
        &token,
999
6
    )
6
    .await
    {
        Err(e) => return e,
6
        Ok(device) => match device {
            None => {
2
                return ErrResp::Custom(
2
                    ErrReq::DEVICE_NOT_EXIST.0,
2
                    ErrReq::DEVICE_NOT_EXIST.1,
2
                    None,
2
                )
2
                .into_response()
            }
4
            Some(_) => (),
4
        },
4
    };
4

            
4
    let hostname = hostname.as_str();
4
    let scheme = uri.scheme();
4
    let payload = match serde_json::to_string(&DlData {
4
        correlation_id: "1".to_string(),
4
        device_id: Some(body.data.device_id.clone()),
4
        data: body.data.payload.clone(),
4
        ..Default::default()
4
    }) {
        Err(e) => {
            let e = format!("encode JSON error: {}", e);
            error!("[{}] {}", FN_NAME, e);
            return ErrResp::ErrRsc(Some(e)).into_response();
        }
4
        Ok(payload) => general_purpose::STANDARD.encode(payload),
4
    };
4
    match scheme {
4
        "amqp" | "amqps" => {
2
            let AmqpState::RabbitMq(opts) = &state.amqp;
2
            let username = mq::to_username(
2
                QueueType::Application,
2
                application.unit_code.as_str(),
2
                application.code.as_str(),
2
            );
2
            let username = username.as_str();
            if let Err(e) =
2
                rabbitmq::publish_message(&client, opts, hostname, username, "dldata", payload)
2
                    .await
            {
                return e.into_response();
2
            }
        }
2
        "mqtt" | "mqtts" => match &state.mqtt {
1
            MqttState::Emqx(opts) => {
1
                let username = mq::to_username(
1
                    QueueType::Application,
1
                    application.unit_code.as_str(),
1
                    application.code.as_str(),
1
                );
1
                let username = username.as_str();
                if let Err(e) =
1
                    emqx::publish_message(&client, opts, hostname, username, "dldata", payload)
1
                        .await
                {
                    return e.into_response();
1
                }
            }
            MqttState::Rumqttd => {
1
                let e = "not support now".to_string();
1
                return ErrResp::ErrParam(Some(e)).into_response();
            }
        },
        _ => {
            let e = format!("unsupport scheme {}", scheme);
            error!("[{}] {}", FN_NAME, e);
            return ErrResp::ErrUnknown(Some(e)).into_response();
        }
    }
3
    StatusCode::NO_CONTENT.into_response()
14
}
93
async fn get_application_inner(
93
    fn_name: &str,
93
    client: &reqwest::Client,
93
    broker_base: &str,
93
    application_id: &str,
93
    token: &HeaderValue,
93
) -> Result<(response::GetApplicationData, Url, String), Response> {
93
    let uri = format!("{}/api/v1/application/{}", broker_base, application_id);
93
    let resp = get_stream_resp(fn_name, token, &client, uri.as_str()).await?;
83
    let application = match resp.json::<response::GetApplication>().await {
        Err(e) => {
            let e = format!("wrong response of application: {}", e);
            error!("[{}] {}", fn_name, e);
            return Err(ErrResp::ErrIntMsg(Some(e)).into_response());
        }
83
        Ok(application) => application.data,
    };
83
    let uri = match Url::parse(application.host_uri.as_str()) {
        Err(e) => {
            let e = format!("unexpected hostUri: {}", e);
            error!("[{}] {}", fn_name, e);
            return Err(ErrResp::ErrUnknown(Some(e)).into_response());
        }
83
        Ok(uri) => uri,
    };
83
    let host = match uri.host_str() {
        None => {
            let e = "unexpected hostUri".to_string();
            error!("[{}] {}", fn_name, e);
            return Err(ErrResp::ErrUnknown(Some(e)).into_response());
        }
83
        Some(host) => host.to_string(),
83
    };
83
    Ok((application, uri, host))
93
}
32
async fn check_application_code_inner(
32
    fn_name: &str,
32
    client: &reqwest::Client,
32
    broker_base: &str,
32
    unit_id: &str,
32
    code: &str,
32
    token: &HeaderValue,
32
) -> Result<u64, Response> {
32
    let uri = format!("{}/api/v1/application/count", broker_base);
32
    let req = match client
32
        .request(reqwest::Method::GET, uri)
32
        .header(reqwest::header::AUTHORIZATION, token)
32
        .query(&[("unit", unit_id), ("code", code)])
32
        .build()
    {
        Err(e) => {
            let e = format!("generate request error: {}", e);
            error!("[{}] {}", fn_name, e);
            return Err(ErrResp::ErrRsc(Some(e)).into_response());
        }
32
        Ok(req) => req,
    };
32
    let resp = match client.execute(req).await {
        Err(e) => {
            let e = format!("execute request error: {}", e);
            error!("[{}] {}", fn_name, e);
            return Err(ErrResp::ErrIntMsg(Some(e)).into_response());
        }
32
        Ok(resp) => resp,
32
    };
32

            
32
    match resp.json::<response::GetCount>().await {
        Err(e) => {
            let e = format!("wrong response of application: {}", e);
            error!("[{}] {}", fn_name, e);
            Err(ErrResp::ErrIntMsg(Some(e)).into_response())
        }
32
        Ok(data) => Ok(data.data.count),
    }
32
}