1
use std::error::Error as StdError;
2

            
3
use axum::{
4
    Router,
5
    body::Body,
6
    extract::{Request, State},
7
    http::{HeaderMap, HeaderValue, StatusCode, header},
8
    response::{IntoResponse, Response},
9
    routing,
10
};
11
use base64::{Engine, engine::general_purpose};
12
use bytes::{Bytes, BytesMut};
13
use csv::WriterBuilder;
14
use futures_util::StreamExt;
15
use hex;
16
use log::error;
17
use reqwest;
18
use serde::{Deserialize, Serialize};
19
use serde_json::{Deserializer, Map, Value};
20
use url::Url;
21

            
22
use sylvia_iot_corelib::{
23
    err::ErrResp,
24
    http::{Json, Path},
25
    strings,
26
};
27

            
28
use super::{
29
    super::{AmqpState, ErrReq, MqttState, State as AppState},
30
    ClearQueueResource, CreateQueueResource, ListResp, PatchHost, api_bridge, clear_patch_host,
31
    clear_queue_rsc, cmp_host_uri, create_queue_rsc, get_device_inner, get_stream_resp,
32
    get_unit_inner, list_api_bridge, request, response, transfer_host_uri, trunc_host_uri,
33
};
34
use crate::libs::mq::{self, QueueType, emqx, rabbitmq};
35

            
36
enum ListFormat {
37
    Array,
38
    Csv,
39
    Data,
40
}
41

            
42
#[derive(Deserialize)]
43
struct ApplicationIdPath {
44
    application_id: String,
45
}
46

            
47
#[derive(Deserialize, Serialize)]
48
struct Application {
49
    #[serde(rename = "applicationId")]
50
    application_id: String,
51
    code: String,
52
    #[serde(rename = "unitId")]
53
    unit_id: String,
54
    #[serde(rename = "unitCode")]
55
    unit_code: String,
56
    #[serde(rename = "createdAt")]
57
    created_at: String,
58
    #[serde(rename = "modifiedAt")]
59
    modified_at: String,
60
    #[serde(rename = "hostUri")]
61
    host_uri: String,
62
    name: String,
63
    info: Map<String, Value>,
64
}
65

            
66
#[derive(Deserialize, Serialize)]
67
struct CsvItem {
68
    #[serde(rename = "applicationId")]
69
    application_id: String,
70
    code: String,
71
    #[serde(rename = "unitId")]
72
    unit_id: String,
73
    #[serde(rename = "unitCode")]
74
    unit_code: String,
75
    #[serde(rename = "createdAt")]
76
    created_at: String,
77
    #[serde(rename = "modifiedAt")]
78
    modified_at: String,
79
    #[serde(rename = "hostUri")]
80
    host_uri: String,
81
    name: String,
82
    info: Option<String>,
83
}
84

            
85
/// Downlink data from application to broker.
86
#[derive(Default, Serialize)]
87
struct DlData {
88
    #[serde(rename = "correlationId")]
89
    correlation_id: String,
90
    #[serde(rename = "deviceId")]
91
    device_id: Option<String>,
92
    #[serde(rename = "networkCode")]
93
    network_code: Option<String>,
94
    #[serde(rename = "networkAddr")]
95
    network_addr: Option<String>,
96
    data: String,
97
    extension: Option<Map<String, Value>>,
98
}
99

            
100
const CSV_FIELDS: &'static [u8] =
101
    b"\xEF\xBB\xBFapplicationId,code,unitId,unitCode,createdAt,modifiedAt,hostUri,name,info\n";
102

            
103
506
pub fn new_service(scope_path: &str, state: &AppState) -> Router {
104
506
    Router::new().nest(
105
506
        scope_path,
106
506
        Router::new()
107
506
            .route("/", routing::post(post_application))
108
506
            .route("/count", routing::get(get_application_count))
109
506
            .route("/list", routing::get(get_application_list))
110
506
            .route(
111
506
                "/{application_id}",
112
506
                routing::get(get_application)
113
506
                    .patch(patch_application)
114
506
                    .delete(delete_application),
115
            )
116
506
            .route(
117
506
                "/{application_id}/stats",
118
506
                routing::get(get_application_stats),
119
            )
120
506
            .route(
121
506
                "/{application_id}/dldata",
122
506
                routing::post(post_application_dldata),
123
            )
124
506
            .with_state(state.clone()),
125
    )
126
506
}
127

            
128
/// `POST /{base}/api/v1/application`
129
80
async fn post_application(
130
80
    State(state): State<AppState>,
131
80
    mut headers: HeaderMap,
132
80
    Json(mut body): Json<request::PostApplicationBody>,
133
80
) -> impl IntoResponse {
134
    const FN_NAME: &'static str = "post_application";
135
80
    let broker_base = state.broker_base.as_str();
136
80
    let api_path = format!("{}/api/v1/application", broker_base);
137
80
    let client = state.client.clone();
138
80
    let token = match headers.get(header::AUTHORIZATION) {
139
        None => {
140
4
            let e = "missing Authorization".to_string();
141
4
            return ErrResp::ErrParam(Some(e)).into_response();
142
        }
143
76
        Some(value) => value.clone(),
144
    };
145

            
146
    // Get unit information to create queue information.
147
76
    let unit_id = body.data.unit_id.as_str();
148
76
    if unit_id.len() == 0 {
149
4
        return ErrResp::ErrParam(Some(
150
4
            "`unitId` must with at least one character".to_string(),
151
4
        ))
152
4
        .into_response();
153
72
    }
154
72
    let unit = match get_unit_inner(FN_NAME, &client, broker_base, unit_id, &token).await {
155
        Err(e) => return e,
156
72
        Ok(unit) => match unit {
157
            None => {
158
4
                return ErrResp::Custom(ErrReq::UNIT_NOT_EXIST.0, ErrReq::UNIT_NOT_EXIST.1, None)
159
4
                    .into_response();
160
            }
161
68
            Some(unit) => unit,
162
        },
163
    };
164
68
    let unit_code = unit.code.as_str();
165
68
    let code = body.data.code.as_str();
166
68
    if !strings::is_code(code) {
167
4
        return ErrResp::ErrParam(Some(
168
4
            "`code` must be [A-Za-z0-9]{1}[A-Za-z0-9-_]*".to_string(),
169
4
        ))
170
4
        .into_response();
171
64
    }
172
64
    match check_application_code_inner(FN_NAME, &client, broker_base, unit_id, code, &token).await {
173
        Err(e) => return e,
174
64
        Ok(count) => match count {
175
60
            0 => (),
176
            _ => {
177
4
                return ErrResp::Custom(
178
4
                    ErrReq::APPLICATION_EXIST.0,
179
4
                    ErrReq::APPLICATION_EXIST.1,
180
4
                    None,
181
4
                )
182
4
                .into_response();
183
            }
184
        },
185
    }
186
60
    let q_type = QueueType::Application;
187
60
    let username = mq::to_username(q_type, unit_code, code);
188
60
    let password = strings::randomstring(8);
189
60
    let uri = match Url::parse(body.data.host_uri.as_str()) {
190
4
        Err(e) => {
191
4
            return ErrResp::ErrParam(Some(format!("invalid `hostUri`: {}", e))).into_response();
192
        }
193
56
        Ok(uri) => uri,
194
    };
195
56
    let host = match uri.host() {
196
        None => {
197
4
            let e = "invalid `hostUri`".to_string();
198
4
            return ErrResp::ErrParam(Some(e)).into_response();
199
        }
200
52
        Some(host) => host.to_string(),
201
    };
202
52
    let scheme = uri.scheme();
203
52
    let host = host.as_str();
204
52
    let username = username.as_str();
205
52
    let password = password.as_str();
206

            
207
    // Create message broker resources.
208
52
    let create_rsc = CreateQueueResource {
209
52
        scheme,
210
52
        host,
211
52
        username,
212
52
        password,
213
52
        ttl: body.data.ttl,
214
52
        length: body.data.length,
215
52
        q_type: QueueType::Application,
216
52
    };
217
52
    if let Err(e) = create_queue_rsc(FN_NAME, &state, &create_rsc).await {
218
        return e;
219
52
    }
220
52
    let clear_rsc = ClearQueueResource {
221
52
        scheme,
222
52
        host,
223
52
        username,
224
52
        q_type: QueueType::Application,
225
52
    };
226

            
227
    // Create application instance.
228
52
    let mut body_uri = uri.clone();
229
52
    transfer_host_uri(&state, &mut body_uri, username);
230
52
    body.data.host_uri = body_uri.to_string();
231
52
    headers.remove(header::CONTENT_LENGTH);
232
52
    let builder = client
233
52
        .request(reqwest::Method::POST, api_path)
234
52
        .headers(headers)
235
52
        .json(&body);
236
52
    let api_req = match builder.build() {
237
        Err(e) => {
238
            let _ = clear_queue_rsc(FN_NAME, &state, &clear_rsc);
239
            let e = format!("generate request error: {}", e);
240
            error!("[{}] {}", FN_NAME, e);
241
            return ErrResp::ErrRsc(Some(e)).into_response();
242
        }
243
52
        Ok(req) => req,
244
    };
245
52
    let api_resp = match client.execute(api_req).await {
246
        Err(e) => {
247
            let _ = clear_queue_rsc(FN_NAME, &state, &clear_rsc);
248
            let e = format!("execute request error: {}", e);
249
            error!("[{}] {}", FN_NAME, e);
250
            return ErrResp::ErrIntMsg(Some(e)).into_response();
251
        }
252
52
        Ok(resp) => match resp.status() {
253
52
            reqwest::StatusCode::OK => resp,
254
            _ => {
255
                let mut resp_builder = Response::builder().status(resp.status());
256
                for (k, v) in resp.headers() {
257
                    resp_builder = resp_builder.header(k, v);
258
                }
259
                match resp_builder.body(Body::from_stream(resp.bytes_stream())) {
260
                    Err(e) => {
261
                        let e = format!("wrap response body error: {}", e);
262
                        error!("[{}] {}", FN_NAME, e);
263
                        return ErrResp::ErrIntMsg(Some(e)).into_response();
264
                    }
265
                    Ok(resp) => return resp,
266
                }
267
            }
268
        },
269
    };
270
52
    let mut body = match api_resp.json::<response::PostApplication>().await {
271
        Err(e) => {
272
            let _ = clear_queue_rsc(FN_NAME, &state, &clear_rsc);
273
            let e = format!("unexpected response: {}", e);
274
            return ErrResp::ErrUnknown(Some(e)).into_response();
275
        }
276
52
        Ok(body) => body,
277
    };
278
52
    body.data.password = Some(password.to_string());
279

            
280
52
    Json(&body).into_response()
281
80
}
282

            
283
/// `GET /{base}/api/v1/application/count`
284
4
async fn get_application_count(state: State<AppState>, req: Request) -> impl IntoResponse {
285
    const FN_NAME: &'static str = "get_application_count";
286
4
    let api_path = format!("{}/api/v1/application/count", state.broker_base.as_str());
287
4
    let client = state.client.clone();
288

            
289
4
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
290
4
}
291

            
292
/// `GET /{base}/api/v1/application/list`
293
20
async fn get_application_list(state: State<AppState>, req: Request) -> impl IntoResponse {
294
    const FN_NAME: &'static str = "get_application_list";
295
20
    let api_path = format!("{}/api/v1/application/list", state.broker_base.as_str());
296
20
    let api_path = api_path.as_str();
297
20
    let client = state.client.clone();
298

            
299
20
    let mut list_format = ListFormat::Data;
300
20
    if let Some(query_str) = req.uri().query() {
301
16
        let query = match serde_urlencoded::from_str::<Vec<(String, String)>>(query_str) {
302
            Err(e) => {
303
                let e = format!("parse query error: {}", e);
304
                return ErrResp::ErrParam(Some(e)).into_response();
305
            }
306
16
            Ok(query) => query,
307
        };
308
24
        for (k, v) in query.iter() {
309
24
            if k.as_str().eq("format") {
310
12
                if v.as_str().eq("array") {
311
4
                    list_format = ListFormat::Array;
312
8
                } else if v.as_str().eq("csv") {
313
4
                    list_format = ListFormat::Csv;
314
4
                }
315
12
            }
316
        }
317
4
    }
318

            
319
16
    let (api_resp, resp_builder) =
320
20
        match list_api_bridge(FN_NAME, &client, req, api_path, true, "application").await {
321
4
            ListResp::Axum(resp) => return resp,
322
16
            ListResp::ArrayStream(api_resp, resp_builder) => (api_resp, resp_builder),
323
        };
324

            
325
16
    let mut resp_stream = api_resp.bytes_stream();
326
16
    let body = Body::from_stream(async_stream::stream! {
327
16
        match list_format {
328
            ListFormat::Array => yield Ok(Bytes::from("[")),
329
            ListFormat::Csv => yield Ok(Bytes::from(CSV_FIELDS)),
330
            ListFormat::Data => yield Ok(Bytes::from("{\"data\":[")),
331
        }
332
        let mut first_sent = false;
333

            
334
        let mut buffer = BytesMut::new();
335
        while let Some(body) = resp_stream.next().await {
336
            match body {
337
                Err(e) => {
338
                    error!("[{}] get body error: {}", FN_NAME, e);
339
                    let err: Box<dyn StdError + Send + Sync> = Box::new(e);
340
                    yield Err(err);
341
                    break;
342
                }
343
                Ok(body) => buffer.extend_from_slice(&body[..]),
344
            }
345

            
346
            let mut json_stream = Deserializer::from_slice(&buffer[..]).into_iter::<Application>();
347
            let mut index = 0;
348
            let mut finish = false;
349
            loop {
350
                if let Some(Ok(mut v)) = json_stream.next() {
351
                    v.host_uri = match Url::parse(v.host_uri.as_str()) {
352
                        Err(e) => {
353
                            error!("[{}] parse body hostUri error: {}", FN_NAME, e);
354
                            let err: Box<dyn StdError + Send + Sync> = Box::new(e);
355
                            yield Err(err);
356
                            finish = true;
357
                            break;
358
                        }
359
                        Ok(uri) => trunc_host_uri(&uri),
360
                    };
361
16
                    match list_format {
362
                        ListFormat::Array | ListFormat::Data => match serde_json::to_string(&v) {
363
                            Err(e) =>{
364
                                error!("[{}] serialize JSON error: {}", FN_NAME, e);
365
                                let err: Box<dyn StdError + Send + Sync> = Box::new(e);
366
                                yield Err(err);
367
                                finish = true;
368
                                break;
369
                            }
370
                            Ok(v) => {
371
                                match first_sent {
372
                                    false => first_sent = true,
373
                                    true => yield Ok(Bytes::from(",")),
374
                                }
375
                                yield Ok(Bytes::copy_from_slice(v.as_str().as_bytes()));
376
                            }
377
                        }
378
                        ListFormat::Csv => {
379
                            let mut item = CsvItem{
380
                                application_id: v.application_id,
381
                                code: v.code,
382
                                unit_id: v.unit_id,
383
                                unit_code: v.unit_code,
384
                                created_at: v.created_at,
385
                                modified_at: v.modified_at,
386
                                host_uri: v.host_uri,
387
                                name: v.name,
388
                                info: None,
389
                            };
390
                            if let Ok(info_str) = serde_json::to_string(&v.info) {
391
                                item.info = Some(info_str);
392
                            }
393
                            let mut writer =
394
                                WriterBuilder::new().has_headers(false).from_writer(vec![]);
395
                            if let Err(e) = writer.serialize(item) {
396
                                error!("[{}] serialize CSV error: {}", FN_NAME, e);
397
                                let err: Box<dyn StdError + Send + Sync> = Box::new(e);
398
                                yield Err(err);
399
                                finish = true;
400
                                break;
401
                            }
402
                            match writer.into_inner() {
403
                                Err(e) => {
404
                                    error!("[{}] serialize bytes error: {}", FN_NAME, e);
405
                                    let err: Box<dyn StdError + Send + Sync> = Box::new(e);
406
                                    yield Err(err);
407
                                    finish = true;
408
                                    break;
409
                                }
410
                                Ok(row) => yield Ok(Bytes::copy_from_slice(row.as_slice())),
411
                            }
412
                        }
413
                    }
414
                    continue;
415
                }
416
                let offset = json_stream.byte_offset();
417
                if buffer.len() <= index + offset {
418
                    index = buffer.len();
419
                    break;
420
                }
421
                match buffer[index+offset] {
422
                    b'[' | b',' => {
423
                        index += offset + 1;
424
                        if buffer.len() <= index {
425
                            break;
426
                        }
427
                        json_stream =
428
                            Deserializer::from_slice(&buffer[index..]).into_iter::<Application>();
429
                    }
430
                    b']' => {
431
                        finish = true;
432
                        break;
433
                    }
434
                    _ => break,
435
                }
436
            }
437
            if finish {
438
16
                match list_format {
439
                    ListFormat::Array => yield Ok(Bytes::from("]")),
440
                    ListFormat::Csv => (),
441
                    ListFormat::Data => yield Ok(Bytes::from("]}")),
442
                }
443
                break;
444
            }
445
            buffer = buffer.split_off(index);
446
        }
447
    });
448
16
    match resp_builder.body(body) {
449
        Err(e) => ErrResp::ErrRsc(Some(e.to_string())).into_response(),
450
16
        Ok(resp) => resp,
451
    }
452
20
}
453

            
454
/// `GET /{base}/api/v1/application/{applicationId}`
455
36
async fn get_application(
456
36
    state: State<AppState>,
457
36
    Path(param): Path<ApplicationIdPath>,
458
36
    req: Request,
459
36
) -> impl IntoResponse {
460
    const FN_NAME: &'static str = "get_application";
461
36
    let broker_base = state.broker_base.as_str();
462
36
    let client = state.client.clone();
463
36
    let token = match req.headers().get(header::AUTHORIZATION) {
464
        None => {
465
4
            let e = "missing Authorization".to_string();
466
4
            return ErrResp::ErrParam(Some(e)).into_response();
467
        }
468
32
        Some(value) => value.clone(),
469
    };
470

            
471
32
    let (mut application, uri, host) = match get_application_inner(
472
32
        FN_NAME,
473
32
        &client,
474
32
        broker_base,
475
32
        param.application_id.as_str(),
476
32
        &token,
477
    )
478
32
    .await
479
    {
480
4
        Err(e) => return e,
481
28
        Ok((application, uri, host)) => (application, uri, host),
482
    };
483

            
484
28
    let host = host.as_str();
485
28
    let scheme = uri.scheme();
486
28
    if scheme.eq("amqp") || scheme.eq("amqps") {
487
16
        let AmqpState::RabbitMq(opts) = &state.amqp;
488
16
        let username = mq::to_username(
489
16
            QueueType::Application,
490
16
            application.unit_code.as_str(),
491
16
            application.code.as_str(),
492
        );
493
16
        let username = username.as_str();
494
16
        match rabbitmq::get_policies(&client, opts, host, username).await {
495
            Err(e) => {
496
                error!("[{}] get {} policies error: {}", FN_NAME, username, e);
497
                return e.into_response();
498
            }
499
16
            Ok(policies) => {
500
16
                application.ttl = policies.ttl;
501
16
                application.length = policies.length;
502
16
            }
503
        }
504
12
    }
505
28
    application.host_uri = trunc_host_uri(&uri);
506
28
    Json(&response::GetApplication { data: application }).into_response()
507
36
}
508

            
509
/// `PATCH /{base}/api/v1/application/{applicationId}`
510
52
async fn patch_application(
511
52
    state: State<AppState>,
512
52
    headers: HeaderMap,
513
52
    Path(param): Path<ApplicationIdPath>,
514
52
    Json(body): Json<request::PatchApplicationBody>,
515
52
) -> impl IntoResponse {
516
    const FN_NAME: &'static str = "patch_application";
517
52
    let broker_base = state.broker_base.as_str();
518
52
    let client = state.client.clone();
519
52
    let token = match headers.get(header::AUTHORIZATION) {
520
        None => {
521
4
            let e = "missing Authorization".to_string();
522
4
            return ErrResp::ErrParam(Some(e)).into_response();
523
        }
524
48
        Some(value) => value.clone(),
525
    };
526

            
527
48
    let data = &body.data;
528
48
    if data.host_uri.is_none()
529
24
        && data.name.is_none()
530
16
        && data.info.is_none()
531
16
        && data.ttl.is_none()
532
12
        && data.length.is_none()
533
12
        && data.password.is_none()
534
    {
535
4
        return ErrResp::ErrParam(Some("at least one parameter".to_string())).into_response();
536
44
    }
537

            
538
44
    let (application, uri, hostname) = match get_application_inner(
539
44
        FN_NAME,
540
44
        &client,
541
44
        broker_base,
542
44
        param.application_id.as_str(),
543
44
        &token,
544
    )
545
44
    .await
546
    {
547
4
        Err(e) => return e,
548
40
        Ok((application, uri, hostname)) => (application, uri, hostname),
549
    };
550

            
551
40
    let mut patch_data = request::PatchApplicationData {
552
40
        name: data.name.clone(),
553
40
        info: data.info.clone(),
554
40
        ..Default::default()
555
40
    };
556
40
    let mut patch_host: Option<PatchHost> = None;
557
40
    if let Some(host) = data.host_uri.as_ref() {
558
24
        if !strings::is_uri(host) {
559
4
            return ErrResp::ErrParam(Some("invalid `hostUri`".to_string())).into_response();
560
20
        }
561
        // Change to the new broker host.
562
20
        if !cmp_host_uri(application.host_uri.as_str(), host.as_str()) {
563
20
            let password = match data.password.as_ref() {
564
                None => {
565
4
                    let e = "missing `password`".to_string();
566
4
                    return ErrResp::ErrParam(Some(e)).into_response();
567
                }
568
16
                Some(password) => match password.len() {
569
                    0 => {
570
4
                        let e = "missing `password`".to_string();
571
4
                        return ErrResp::ErrParam(Some(e)).into_response();
572
                    }
573
12
                    _ => password,
574
                },
575
            };
576
12
            let mut new_host_uri = match Url::parse(host.as_str()) {
577
                Err(e) => {
578
                    let e = format!("invalid `hostUri`: {}", e);
579
                    return ErrResp::ErrParam(Some(e)).into_response();
580
                }
581
12
                Ok(uri) => match uri.host_str() {
582
                    None => {
583
4
                        let e = "invalid `hostUri`".to_string();
584
4
                        return ErrResp::ErrParam(Some(e)).into_response();
585
                    }
586
8
                    Some(_) => uri,
587
                },
588
            };
589

            
590
8
            let unit_code = application.unit_code.as_str();
591
8
            let code = application.code.as_str();
592
8
            let username = mq::to_username(QueueType::Application, unit_code, code);
593
8
            let resource = CreateQueueResource {
594
8
                scheme: new_host_uri.scheme(),
595
8
                host: new_host_uri.host_str().unwrap(),
596
8
                username: username.as_str(),
597
8
                password: password.as_str(),
598
8
                ttl: data.ttl,
599
8
                length: data.length,
600
8
                q_type: QueueType::Application,
601
8
            };
602
8
            if let Err(e) = create_queue_rsc(FN_NAME, &state, &resource).await {
603
                return e;
604
8
            }
605

            
606
8
            transfer_host_uri(&state, &mut new_host_uri, username.as_str());
607
8
            patch_data.host_uri = Some(new_host_uri.to_string());
608
8
            patch_host = Some(PatchHost {
609
8
                host_uri: new_host_uri,
610
8
                username,
611
8
            });
612
        }
613
16
    }
614

            
615
    // Send request body to the sylvia-iot-broker.
616
24
    if patch_data.host_uri.is_some() || patch_data.name.is_some() || patch_data.info.is_some() {
617
12
        let application_id = param.application_id.as_str();
618
12
        let uri = format!("{}/api/v1/application/{}", broker_base, application_id);
619
12
        let mut builder = client
620
12
            .request(reqwest::Method::PATCH, uri)
621
12
            .header(reqwest::header::AUTHORIZATION, &token)
622
12
            .json(&request::PatchApplicationBody { data: patch_data });
623
12
        if let Some(content_type) = headers.get(header::CONTENT_TYPE) {
624
12
            builder = builder.header(reqwest::header::CONTENT_TYPE, content_type);
625
12
        }
626
12
        let api_req = match builder.build() {
627
            Err(e) => {
628
                clear_patch_host(FN_NAME, &state, &patch_host, QueueType::Application).await;
629
                let e = format!("generate request error: {}", e);
630
                error!("[{}] {}", FN_NAME, e);
631
                return ErrResp::ErrRsc(Some(e)).into_response();
632
            }
633
12
            Ok(req) => req,
634
        };
635
12
        let api_resp = match client.execute(api_req).await {
636
            Err(e) => {
637
                clear_patch_host(FN_NAME, &state, &patch_host, QueueType::Application).await;
638
                let e = format!("execute request error: {}", e);
639
                error!("[{}] {}", FN_NAME, e);
640
                return ErrResp::ErrIntMsg(Some(e)).into_response();
641
            }
642
12
            Ok(resp) => resp,
643
        };
644

            
645
12
        let status_code = api_resp.status();
646
12
        if status_code != StatusCode::NO_CONTENT {
647
            clear_patch_host(FN_NAME, &state, &patch_host, QueueType::Application).await;
648
            let mut resp_builder = Response::builder().status(status_code);
649
            for (k, v) in api_resp.headers() {
650
                resp_builder = resp_builder.header(k, v);
651
            }
652
            match resp_builder.body(Body::from_stream(api_resp.bytes_stream())) {
653
                Err(e) => {
654
                    let e = format!("wrap response body error: {}", e);
655
                    error!("[{}] {}", FN_NAME, e);
656
                    return ErrResp::ErrIntMsg(Some(e)).into_response();
657
                }
658
                Ok(resp) => return resp,
659
            }
660
12
        }
661
12
    }
662

            
663
24
    if let Some(host) = patch_host {
664
8
        let resource = ClearQueueResource {
665
8
            scheme: uri.scheme(),
666
8
            host: uri.host_str().unwrap(),
667
8
            username: host.username.as_str(),
668
8
            q_type: QueueType::Application,
669
8
        };
670
8
        let _ = clear_queue_rsc(FN_NAME, &state, &resource).await;
671
8
        return StatusCode::NO_CONTENT.into_response();
672
16
    } else if data.ttl.is_none() && data.length.is_none() && data.password.is_none() {
673
4
        return StatusCode::NO_CONTENT.into_response();
674
12
    }
675

            
676
    // Update broker information without changing hostUri.
677
12
    if let Some(password) = data.password.as_ref() {
678
12
        if password.len() == 0 {
679
4
            let e = "missing `password`".to_string();
680
4
            return ErrResp::ErrParam(Some(e)).into_response();
681
8
        }
682
    }
683
8
    let unit_code = application.unit_code.as_str();
684
8
    let code = application.code.as_str();
685
8
    let hostname = hostname.as_str();
686
8
    let username = mq::to_username(QueueType::Application, unit_code, code);
687
8
    let username = username.as_str();
688
8
    match uri.scheme() {
689
8
        "amqp" | "amqps" => match &state.amqp {
690
4
            AmqpState::RabbitMq(opts) => {
691
4
                if data.ttl.is_some() || data.length.is_some() {
692
4
                    let policies = rabbitmq::BrokerPolicies {
693
4
                        ttl: data.ttl,
694
4
                        length: data.length,
695
4
                    };
696
                    if let Err(e) =
697
4
                        rabbitmq::put_policies(&client, opts, hostname, username, &policies).await
698
                    {
699
                        let e = format!("patch RabbitMQ error: {}", e);
700
                        error!("[{}] {}", FN_NAME, e);
701
                        return ErrResp::ErrIntMsg(Some(e)).into_response();
702
4
                    }
703
                }
704
4
                if let Some(password) = data.password.as_ref() {
705
4
                    let password = password.as_str();
706
                    if let Err(e) =
707
4
                        rabbitmq::put_user(&client, opts, hostname, username, password).await
708
                    {
709
                        let e = format!("patch RabbitMQ password error: {}", e);
710
                        error!("[{}] {}", FN_NAME, e);
711
                        return ErrResp::ErrIntMsg(Some(e)).into_response();
712
4
                    }
713
                }
714
            }
715
        },
716
4
        "mqtt" | "mqtts" => match &state.mqtt {
717
2
            MqttState::Emqx(opts) => {
718
2
                if let Some(password) = data.password.as_ref() {
719
2
                    let password = password.as_str();
720
                    if let Err(e) =
721
2
                        emqx::put_user(&client, opts, hostname, username, password).await
722
                    {
723
                        let e = format!("patch EMQX password error: {}", e);
724
                        error!("[{}] {}", FN_NAME, e);
725
                        return ErrResp::ErrIntMsg(Some(e)).into_response();
726
2
                    }
727
                }
728
            }
729
2
            MqttState::Rumqttd => {}
730
        },
731
        _ => {}
732
    }
733

            
734
8
    StatusCode::NO_CONTENT.into_response()
735
52
}
736

            
737
/// `DELETE /{base}/api/v1/application/{applicationId}`
738
16
async fn delete_application(
739
16
    state: State<AppState>,
740
16
    Path(param): Path<ApplicationIdPath>,
741
16
    req: Request,
742
16
) -> impl IntoResponse {
743
    const FN_NAME: &'static str = "delete_application";
744
16
    let broker_base = state.broker_base.as_str();
745
16
    let application_id = param.application_id.as_str();
746
16
    let api_path = format!("{}/api/v1/application/{}", broker_base, application_id);
747
16
    let client = state.client.clone();
748
16
    let token = match req.headers().get(header::AUTHORIZATION) {
749
        None => {
750
4
            let e = "missing Authorization".to_string();
751
4
            return ErrResp::ErrParam(Some(e)).into_response();
752
        }
753
12
        Some(value) => value.clone(),
754
    };
755

            
756
8
    let (application, uri, host) =
757
12
        match get_application_inner(FN_NAME, &client, broker_base, application_id, &token).await {
758
4
            Err(e) => return e,
759
8
            Ok((application, uri, host)) => (application, uri, host),
760
        };
761

            
762
8
    let resp = api_bridge(FN_NAME, &client, req, api_path.as_str()).await;
763
8
    if !resp.status().is_success() {
764
        return resp;
765
8
    }
766

            
767
8
    let unit_code = application.unit_code.as_str();
768
8
    let code = application.code.as_str();
769
8
    let username = mq::to_username(QueueType::Application, unit_code, code);
770
8
    let clear_rsc = ClearQueueResource {
771
8
        scheme: uri.scheme(),
772
8
        host: host.as_str(),
773
8
        username: username.as_str(),
774
8
        q_type: QueueType::Application,
775
8
    };
776
8
    if let Err(e) = clear_queue_rsc(FN_NAME, &state, &clear_rsc).await {
777
        return e;
778
8
    }
779

            
780
8
    StatusCode::NO_CONTENT.into_response()
781
16
}
782

            
783
/// `GET /{base}/api/v1/application/{applicationId}/stats`
784
84
async fn get_application_stats(
785
84
    state: State<AppState>,
786
84
    Path(param): Path<ApplicationIdPath>,
787
84
    req: Request,
788
84
) -> impl IntoResponse {
789
    const FN_NAME: &'static str = "get_application";
790
84
    let broker_base = state.broker_base.as_str();
791
84
    let client = state.client.clone();
792
84
    let token = match req.headers().get(header::AUTHORIZATION) {
793
        None => {
794
4
            let e = "missing Authorization".to_string();
795
4
            return ErrResp::ErrParam(Some(e)).into_response();
796
        }
797
80
        Some(value) => value.clone(),
798
    };
799

            
800
80
    let (application, uri, host) = match get_application_inner(
801
80
        FN_NAME,
802
80
        &client,
803
80
        broker_base,
804
80
        param.application_id.as_str(),
805
80
        &token,
806
    )
807
80
    .await
808
    {
809
4
        Err(e) => return e,
810
76
        Ok((application, uri, host)) => (application, uri, host),
811
    };
812

            
813
76
    let host = host.as_str();
814
76
    let scheme = uri.scheme();
815
76
    let data = match scheme {
816
76
        "amqp" | "amqps" => {
817
60
            let AmqpState::RabbitMq(opts) = &state.amqp;
818
60
            let username = mq::to_username(
819
60
                QueueType::Application,
820
60
                application.unit_code.as_str(),
821
60
                application.code.as_str(),
822
            );
823
60
            let username = username.as_str();
824
            response::GetApplicationStatsData {
825
60
                uldata: match rabbitmq::stats(&client, opts, host, username, "uldata").await {
826
                    Err(ErrResp::ErrNotFound(_)) => response::Stats {
827
                        consumers: 0,
828
                        messages: 0,
829
                        publish_rate: 0.0,
830
                        deliver_rate: 0.0,
831
                    },
832
                    Err(e) => {
833
                        error!("[{}] get uldata stats error: {}", FN_NAME, e);
834
                        return e.into_response();
835
                    }
836
60
                    Ok(stats) => response::Stats {
837
60
                        consumers: stats.consumers,
838
60
                        messages: stats.messages,
839
60
                        publish_rate: stats.publish_rate,
840
60
                        deliver_rate: stats.deliver_rate,
841
60
                    },
842
                },
843
60
                dldata_resp: match rabbitmq::stats(&client, opts, host, username, "dldata-resp")
844
60
                    .await
845
                {
846
                    Err(ErrResp::ErrNotFound(_)) => response::Stats {
847
                        consumers: 0,
848
                        messages: 0,
849
                        publish_rate: 0.0,
850
                        deliver_rate: 0.0,
851
                    },
852
                    Err(e) => {
853
                        error!("[{}] get dldata-resp stats error: {}", FN_NAME, e);
854
                        return e.into_response();
855
                    }
856
60
                    Ok(stats) => response::Stats {
857
60
                        consumers: stats.consumers,
858
60
                        messages: stats.messages,
859
60
                        publish_rate: stats.publish_rate,
860
60
                        deliver_rate: stats.deliver_rate,
861
60
                    },
862
                },
863
60
                dldata_result: match rabbitmq::stats(&client, opts, host, username, "dldata-result")
864
60
                    .await
865
                {
866
                    Err(ErrResp::ErrNotFound(_)) => response::Stats {
867
                        consumers: 0,
868
                        messages: 0,
869
                        publish_rate: 0.0,
870
                        deliver_rate: 0.0,
871
                    },
872
                    Err(e) => {
873
                        error!("[{}] get dldata-result stats error: {}", FN_NAME, e);
874
                        return e.into_response();
875
                    }
876
60
                    Ok(stats) => response::Stats {
877
60
                        consumers: stats.consumers,
878
60
                        messages: stats.messages,
879
60
                        publish_rate: stats.publish_rate,
880
60
                        deliver_rate: stats.deliver_rate,
881
60
                    },
882
                },
883
            }
884
        }
885
16
        "mqtt" | "mqtts" => match &state.mqtt {
886
14
            MqttState::Emqx(opts) => {
887
14
                let username = mq::to_username(
888
14
                    QueueType::Application,
889
14
                    application.unit_code.as_str(),
890
14
                    application.code.as_str(),
891
                );
892
14
                let username = username.as_str();
893
                response::GetApplicationStatsData {
894
14
                    uldata: match emqx::stats(&client, opts, host, username, "uldata").await {
895
                        Err(e) => {
896
                            error!("[{}] get uldata stats error: {}", FN_NAME, e);
897
                            return e.into_response();
898
                        }
899
14
                        Ok(stats) => response::Stats {
900
14
                            consumers: stats.consumers,
901
14
                            messages: stats.messages,
902
14
                            publish_rate: stats.publish_rate,
903
14
                            deliver_rate: stats.deliver_rate,
904
14
                        },
905
                    },
906
14
                    dldata_resp: match emqx::stats(&client, opts, host, username, "dldata-resp")
907
14
                        .await
908
                    {
909
                        Err(e) => {
910
                            error!("[{}] get dldata-resp stats error: {}", FN_NAME, e);
911
                            return e.into_response();
912
                        }
913
14
                        Ok(stats) => response::Stats {
914
14
                            consumers: stats.consumers,
915
14
                            messages: stats.messages,
916
14
                            publish_rate: stats.publish_rate,
917
14
                            deliver_rate: stats.deliver_rate,
918
14
                        },
919
                    },
920
14
                    dldata_result: match emqx::stats(&client, opts, host, username, "dldata-result")
921
14
                        .await
922
                    {
923
                        Err(e) => {
924
                            error!("[{}] get dldata-result stats error: {}", FN_NAME, e);
925
                            return e.into_response();
926
                        }
927
14
                        Ok(stats) => response::Stats {
928
14
                            consumers: stats.consumers,
929
14
                            messages: stats.messages,
930
14
                            publish_rate: stats.publish_rate,
931
14
                            deliver_rate: stats.deliver_rate,
932
14
                        },
933
                    },
934
                }
935
            }
936
2
            MqttState::Rumqttd => response::GetApplicationStatsData {
937
2
                uldata: response::Stats {
938
2
                    ..Default::default()
939
2
                },
940
2
                dldata_resp: response::Stats {
941
2
                    ..Default::default()
942
2
                },
943
2
                dldata_result: response::Stats {
944
2
                    ..Default::default()
945
2
                },
946
2
            },
947
        },
948
        _ => {
949
            let e = format!("unsupport scheme {}", scheme);
950
            error!("[{}] {}", FN_NAME, e);
951
            return ErrResp::ErrUnknown(Some(e)).into_response();
952
        }
953
    };
954
76
    Json(&response::GetApplicationStats { data }).into_response()
955
84
}
956

            
957
/// `POST /{base}/api/v1/application/{applicationId}/dldata`
958
28
async fn post_application_dldata(
959
28
    state: State<AppState>,
960
28
    headers: HeaderMap,
961
28
    Path(param): Path<ApplicationIdPath>,
962
28
    Json(body): Json<request::PostApplicationDlDataBody>,
963
28
) -> impl IntoResponse {
964
    const FN_NAME: &'static str = "post_application_dldata";
965
28
    let broker_base = state.broker_base.as_str();
966
28
    let client = state.client.clone();
967
28
    let token = match headers.get(header::AUTHORIZATION) {
968
        None => {
969
4
            let e = "missing Authorization".to_string();
970
4
            return ErrResp::ErrParam(Some(e)).into_response();
971
        }
972
24
        Some(value) => value.clone(),
973
    };
974

            
975
24
    if body.data.device_id.len() == 0 {
976
4
        let e = "empty `deviceId` is invalid".to_string();
977
4
        return ErrResp::ErrParam(Some(e)).into_response();
978
20
    }
979
20
    if let Err(e) = hex::decode(body.data.payload.as_str()) {
980
4
        let e = format!("`payload` is not hexadecimal string: {}", e);
981
4
        return ErrResp::ErrParam(Some(e)).into_response();
982
16
    }
983

            
984
16
    let (application, uri, hostname) = match get_application_inner(
985
16
        FN_NAME,
986
16
        &client,
987
16
        broker_base,
988
16
        param.application_id.as_str(),
989
16
        &token,
990
    )
991
16
    .await
992
    {
993
4
        Err(e) => return e,
994
12
        Ok((application, uri, hostname)) => (application, uri, hostname),
995
    };
996
12
    match get_device_inner(
997
12
        FN_NAME,
998
12
        &client,
999
12
        broker_base,
12
        body.data.device_id.as_str(),
12
        &token,
    )
12
    .await
    {
        Err(e) => return e,
12
        Ok(device) => match device {
            None => {
4
                return ErrResp::Custom(
4
                    ErrReq::DEVICE_NOT_EXIST.0,
4
                    ErrReq::DEVICE_NOT_EXIST.1,
4
                    None,
4
                )
4
                .into_response();
            }
8
            Some(_) => (),
        },
    };
8
    let hostname = hostname.as_str();
8
    let scheme = uri.scheme();
8
    let payload = match serde_json::to_string(&DlData {
8
        correlation_id: "1".to_string(),
8
        device_id: Some(body.data.device_id.clone()),
8
        data: body.data.payload.clone(),
8
        ..Default::default()
8
    }) {
        Err(e) => {
            let e = format!("encode JSON error: {}", e);
            error!("[{}] {}", FN_NAME, e);
            return ErrResp::ErrRsc(Some(e)).into_response();
        }
8
        Ok(payload) => general_purpose::STANDARD.encode(payload),
    };
8
    match scheme {
8
        "amqp" | "amqps" => {
4
            let AmqpState::RabbitMq(opts) = &state.amqp;
4
            let username = mq::to_username(
4
                QueueType::Application,
4
                application.unit_code.as_str(),
4
                application.code.as_str(),
            );
4
            let username = username.as_str();
            if let Err(e) =
4
                rabbitmq::publish_message(&client, opts, hostname, username, "dldata", payload)
4
                    .await
            {
                return e.into_response();
4
            }
        }
4
        "mqtt" | "mqtts" => match &state.mqtt {
2
            MqttState::Emqx(opts) => {
2
                let username = mq::to_username(
2
                    QueueType::Application,
2
                    application.unit_code.as_str(),
2
                    application.code.as_str(),
                );
2
                let username = username.as_str();
                if let Err(e) =
2
                    emqx::publish_message(&client, opts, hostname, username, "dldata", payload)
2
                        .await
                {
                    return e.into_response();
2
                }
            }
            MqttState::Rumqttd => {
2
                let e = "not support now".to_string();
2
                return ErrResp::ErrParam(Some(e)).into_response();
            }
        },
        _ => {
            let e = format!("unsupport scheme {}", scheme);
            error!("[{}] {}", FN_NAME, e);
            return ErrResp::ErrUnknown(Some(e)).into_response();
        }
    }
6
    StatusCode::NO_CONTENT.into_response()
28
}
184
async fn get_application_inner(
184
    fn_name: &str,
184
    client: &reqwest::Client,
184
    broker_base: &str,
184
    application_id: &str,
184
    token: &HeaderValue,
184
) -> Result<(response::GetApplicationData, Url, String), Response> {
184
    let uri = format!("{}/api/v1/application/{}", broker_base, application_id);
184
    let resp = get_stream_resp(fn_name, token, &client, uri.as_str()).await?;
164
    let application = match resp.json::<response::GetApplication>().await {
        Err(e) => {
            let e = format!("wrong response of application: {}", e);
            error!("[{}] {}", fn_name, e);
            return Err(ErrResp::ErrIntMsg(Some(e)).into_response());
        }
164
        Ok(application) => application.data,
    };
164
    let uri = match Url::parse(application.host_uri.as_str()) {
        Err(e) => {
            let e = format!("unexpected hostUri: {}", e);
            error!("[{}] {}", fn_name, e);
            return Err(ErrResp::ErrUnknown(Some(e)).into_response());
        }
164
        Ok(uri) => uri,
    };
164
    let host = match uri.host_str() {
        None => {
            let e = "unexpected hostUri".to_string();
            error!("[{}] {}", fn_name, e);
            return Err(ErrResp::ErrUnknown(Some(e)).into_response());
        }
164
        Some(host) => host.to_string(),
    };
164
    Ok((application, uri, host))
184
}
64
async fn check_application_code_inner(
64
    fn_name: &str,
64
    client: &reqwest::Client,
64
    broker_base: &str,
64
    unit_id: &str,
64
    code: &str,
64
    token: &HeaderValue,
64
) -> Result<u64, Response> {
64
    let uri = format!("{}/api/v1/application/count", broker_base);
64
    let req = match client
64
        .request(reqwest::Method::GET, uri)
64
        .header(reqwest::header::AUTHORIZATION, token)
64
        .query(&[("unit", unit_id), ("code", code)])
64
        .build()
    {
        Err(e) => {
            let e = format!("generate request error: {}", e);
            error!("[{}] {}", fn_name, e);
            return Err(ErrResp::ErrRsc(Some(e)).into_response());
        }
64
        Ok(req) => req,
    };
64
    let resp = match client.execute(req).await {
        Err(e) => {
            let e = format!("execute request error: {}", e);
            error!("[{}] {}", fn_name, e);
            return Err(ErrResp::ErrIntMsg(Some(e)).into_response());
        }
64
        Ok(resp) => resp,
    };
64
    match resp.json::<response::GetCount>().await {
        Err(e) => {
            let e = format!("wrong response of application: {}", e);
            error!("[{}] {}", fn_name, e);
            Err(ErrResp::ErrIntMsg(Some(e)).into_response())
        }
64
        Ok(data) => Ok(data.data.count),
    }
64
}