1
use std::error::Error as StdError;
2

            
3
use axum::{
4
    body::Body,
5
    extract::{Request, State},
6
    http::{header, HeaderMap, HeaderValue, StatusCode},
7
    response::{IntoResponse, Response},
8
    routing, Router,
9
};
10
use base64::{engine::general_purpose, Engine};
11
use bytes::{Bytes, BytesMut};
12
use csv::WriterBuilder;
13
use futures_util::StreamExt;
14
use hex;
15
use log::error;
16
use reqwest;
17
use serde::{Deserialize, Serialize};
18
use serde_json::{Deserializer, Map, Value};
19
use url::Url;
20

            
21
use sylvia_iot_corelib::{
22
    err::ErrResp,
23
    http::{Json, Path},
24
    strings,
25
};
26

            
27
use super::{
28
    super::{AmqpState, ErrReq, MqttState, State as AppState},
29
    api_bridge, clear_patch_host, clear_queue_rsc, cmp_host_uri, create_queue_rsc,
30
    get_device_inner, get_stream_resp, get_unit_inner, list_api_bridge, request, response,
31
    transfer_host_uri, trunc_host_uri, ClearQueueResource, CreateQueueResource, ListResp,
32
    PatchHost,
33
};
34
use crate::libs::mq::{self, emqx, rabbitmq, QueueType};
35

            
36
enum ListFormat {
37
    Array,
38
    Csv,
39
    Data,
40
}
41

            
42
#[derive(Deserialize)]
43
struct ApplicationIdPath {
44
    application_id: String,
45
}
46

            
47
#[derive(Deserialize, Serialize)]
48
struct Application {
49
    #[serde(rename = "applicationId")]
50
    application_id: String,
51
    code: String,
52
    #[serde(rename = "unitId")]
53
    unit_id: String,
54
    #[serde(rename = "unitCode")]
55
    unit_code: String,
56
    #[serde(rename = "createdAt")]
57
    created_at: String,
58
    #[serde(rename = "modifiedAt")]
59
    modified_at: String,
60
    #[serde(rename = "hostUri")]
61
    host_uri: String,
62
    name: String,
63
    info: Map<String, Value>,
64
}
65

            
66
#[derive(Deserialize, Serialize)]
67
struct CsvItem {
68
    #[serde(rename = "applicationId")]
69
    application_id: String,
70
    code: String,
71
    #[serde(rename = "unitId")]
72
    unit_id: String,
73
    #[serde(rename = "unitCode")]
74
    unit_code: String,
75
    #[serde(rename = "createdAt")]
76
    created_at: String,
77
    #[serde(rename = "modifiedAt")]
78
    modified_at: String,
79
    #[serde(rename = "hostUri")]
80
    host_uri: String,
81
    name: String,
82
    info: Option<String>,
83
}
84

            
85
/// Downlink data from application to broker.
86
#[derive(Default, Serialize)]
87
struct DlData {
88
    #[serde(rename = "correlationId")]
89
    correlation_id: String,
90
    #[serde(rename = "deviceId")]
91
    device_id: Option<String>,
92
    #[serde(rename = "networkCode")]
93
    network_code: Option<String>,
94
    #[serde(rename = "networkAddr")]
95
    network_addr: Option<String>,
96
    data: String,
97
    extension: Option<Map<String, Value>>,
98
}
99

            
100
const CSV_FIELDS: &'static [u8] =
101
    b"\xEF\xBB\xBFapplicationId,code,unitId,unitCode,createdAt,modifiedAt,hostUri,name,info\n";
102

            
103
506
pub fn new_service(scope_path: &str, state: &AppState) -> Router {
104
506
    Router::new().nest(
105
506
        scope_path,
106
506
        Router::new()
107
506
            .route("/", routing::post(post_application))
108
506
            .route("/count", routing::get(get_application_count))
109
506
            .route("/list", routing::get(get_application_list))
110
506
            .route(
111
506
                "/{application_id}",
112
506
                routing::get(get_application)
113
506
                    .patch(patch_application)
114
506
                    .delete(delete_application),
115
506
            )
116
506
            .route(
117
506
                "/{application_id}/stats",
118
506
                routing::get(get_application_stats),
119
506
            )
120
506
            .route(
121
506
                "/{application_id}/dldata",
122
506
                routing::post(post_application_dldata),
123
506
            )
124
506
            .with_state(state.clone()),
125
506
    )
126
506
}
127

            
128
/// `POST /{base}/api/v1/application`
129
80
async fn post_application(
130
80
    State(state): State<AppState>,
131
80
    mut headers: HeaderMap,
132
80
    Json(mut body): Json<request::PostApplicationBody>,
133
80
) -> impl IntoResponse {
134
    const FN_NAME: &'static str = "post_application";
135
80
    let broker_base = state.broker_base.as_str();
136
80
    let api_path = format!("{}/api/v1/application", broker_base);
137
80
    let client = state.client.clone();
138
80
    let token = match headers.get(header::AUTHORIZATION) {
139
        None => {
140
4
            let e = "missing Authorization".to_string();
141
4
            return ErrResp::ErrParam(Some(e)).into_response();
142
        }
143
76
        Some(value) => value.clone(),
144
76
    };
145
76

            
146
76
    // Get unit information to create queue information.
147
76
    let unit_id = body.data.unit_id.as_str();
148
76
    if unit_id.len() == 0 {
149
4
        return ErrResp::ErrParam(Some(
150
4
            "`unitId` must with at least one character".to_string(),
151
4
        ))
152
4
        .into_response();
153
72
    }
154
72
    let unit = match get_unit_inner(FN_NAME, &client, broker_base, unit_id, &token).await {
155
        Err(e) => return e,
156
72
        Ok(unit) => match unit {
157
            None => {
158
4
                return ErrResp::Custom(ErrReq::UNIT_NOT_EXIST.0, ErrReq::UNIT_NOT_EXIST.1, None)
159
4
                    .into_response()
160
            }
161
68
            Some(unit) => unit,
162
68
        },
163
68
    };
164
68
    let unit_code = unit.code.as_str();
165
68
    let code = body.data.code.as_str();
166
68
    if !strings::is_code(code) {
167
4
        return ErrResp::ErrParam(Some(
168
4
            "`code` must be [A-Za-z0-9]{1}[A-Za-z0-9-_]*".to_string(),
169
4
        ))
170
4
        .into_response();
171
64
    }
172
64
    match check_application_code_inner(FN_NAME, &client, broker_base, unit_id, code, &token).await {
173
        Err(e) => return e,
174
64
        Ok(count) => match count {
175
60
            0 => (),
176
            _ => {
177
4
                return ErrResp::Custom(
178
4
                    ErrReq::APPLICATION_EXIST.0,
179
4
                    ErrReq::APPLICATION_EXIST.1,
180
4
                    None,
181
4
                )
182
4
                .into_response()
183
            }
184
        },
185
    }
186
60
    let q_type = QueueType::Application;
187
60
    let username = mq::to_username(q_type, unit_code, code);
188
60
    let password = strings::randomstring(8);
189
60
    let uri = match Url::parse(body.data.host_uri.as_str()) {
190
4
        Err(e) => {
191
4
            return ErrResp::ErrParam(Some(format!("invalid `hostUri`: {}", e))).into_response();
192
        }
193
56
        Ok(uri) => uri,
194
    };
195
56
    let host = match uri.host() {
196
        None => {
197
4
            let e = "invalid `hostUri`".to_string();
198
4
            return ErrResp::ErrParam(Some(e)).into_response();
199
        }
200
52
        Some(host) => host.to_string(),
201
52
    };
202
52
    let scheme = uri.scheme();
203
52
    let host = host.as_str();
204
52
    let username = username.as_str();
205
52
    let password = password.as_str();
206
52

            
207
52
    // Create message broker resources.
208
52
    let create_rsc = CreateQueueResource {
209
52
        scheme,
210
52
        host,
211
52
        username,
212
52
        password,
213
52
        ttl: body.data.ttl,
214
52
        length: body.data.length,
215
52
        q_type: QueueType::Application,
216
52
    };
217
52
    if let Err(e) = create_queue_rsc(FN_NAME, &state, &create_rsc).await {
218
        return e;
219
52
    }
220
52
    let clear_rsc = ClearQueueResource {
221
52
        scheme,
222
52
        host,
223
52
        username,
224
52
    };
225
52

            
226
52
    // Create application instance.
227
52
    let mut body_uri = uri.clone();
228
52
    transfer_host_uri(&state, &mut body_uri, username);
229
52
    body.data.host_uri = body_uri.to_string();
230
52
    headers.remove(header::CONTENT_LENGTH);
231
52
    let builder = client
232
52
        .request(reqwest::Method::POST, api_path)
233
52
        .headers(headers)
234
52
        .json(&body);
235
52
    let api_req = match builder.build() {
236
        Err(e) => {
237
            let _ = clear_queue_rsc(FN_NAME, &state, &clear_rsc);
238
            let e = format!("generate request error: {}", e);
239
            error!("[{}] {}", FN_NAME, e);
240
            return ErrResp::ErrRsc(Some(e)).into_response();
241
        }
242
52
        Ok(req) => req,
243
    };
244
52
    let api_resp = match client.execute(api_req).await {
245
        Err(e) => {
246
            let _ = clear_queue_rsc(FN_NAME, &state, &clear_rsc);
247
            let e = format!("execute request error: {}", e);
248
            error!("[{}] {}", FN_NAME, e);
249
            return ErrResp::ErrIntMsg(Some(e)).into_response();
250
        }
251
52
        Ok(resp) => match resp.status() {
252
52
            reqwest::StatusCode::OK => resp,
253
            _ => {
254
                let mut resp_builder = Response::builder().status(resp.status());
255
                for (k, v) in resp.headers() {
256
                    resp_builder = resp_builder.header(k, v);
257
                }
258
                match resp_builder.body(Body::from_stream(resp.bytes_stream())) {
259
                    Err(e) => {
260
                        let e = format!("wrap response body error: {}", e);
261
                        error!("[{}] {}", FN_NAME, e);
262
                        return ErrResp::ErrIntMsg(Some(e)).into_response();
263
                    }
264
                    Ok(resp) => return resp,
265
                }
266
            }
267
        },
268
    };
269
52
    let mut body = match api_resp.json::<response::PostApplication>().await {
270
        Err(e) => {
271
            let _ = clear_queue_rsc(FN_NAME, &state, &clear_rsc);
272
            let e = format!("unexpected response: {}", e);
273
            return ErrResp::ErrUnknown(Some(e)).into_response();
274
        }
275
52
        Ok(body) => body,
276
52
    };
277
52
    body.data.password = Some(password.to_string());
278
52

            
279
52
    Json(&body).into_response()
280
80
}
281

            
282
/// `GET /{base}/api/v1/application/count`
283
4
async fn get_application_count(state: State<AppState>, req: Request) -> impl IntoResponse {
284
    const FN_NAME: &'static str = "get_application_count";
285
4
    let api_path = format!("{}/api/v1/application/count", state.broker_base.as_str());
286
4
    let client = state.client.clone();
287
4

            
288
4
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
289
4
}
290

            
291
/// `GET /{base}/api/v1/application/list`
292
20
async fn get_application_list(state: State<AppState>, req: Request) -> impl IntoResponse {
293
    const FN_NAME: &'static str = "get_application_list";
294
20
    let api_path = format!("{}/api/v1/application/list", state.broker_base.as_str());
295
20
    let api_path = api_path.as_str();
296
20
    let client = state.client.clone();
297
20

            
298
20
    let mut list_format = ListFormat::Data;
299
20
    if let Some(query_str) = req.uri().query() {
300
16
        let query = match serde_urlencoded::from_str::<Vec<(String, String)>>(query_str) {
301
            Err(e) => {
302
                let e = format!("parse query error: {}", e);
303
                return ErrResp::ErrParam(Some(e)).into_response();
304
            }
305
16
            Ok(query) => query,
306
        };
307
24
        for (k, v) in query.iter() {
308
24
            if k.as_str().eq("format") {
309
12
                if v.as_str().eq("array") {
310
4
                    list_format = ListFormat::Array;
311
8
                } else if v.as_str().eq("csv") {
312
4
                    list_format = ListFormat::Csv;
313
4
                }
314
12
            }
315
        }
316
4
    }
317

            
318
16
    let (api_resp, resp_builder) =
319
20
        match list_api_bridge(FN_NAME, &client, req, api_path, true, "application").await {
320
4
            ListResp::Axum(resp) => return resp,
321
16
            ListResp::ArrayStream(api_resp, resp_builder) => (api_resp, resp_builder),
322
16
        };
323
16

            
324
16
    let mut resp_stream = api_resp.bytes_stream();
325
16
    let body = Body::from_stream(async_stream::stream! {
326
16
        match list_format {
327
16
            ListFormat::Array => yield Ok(Bytes::from("[")),
328
16
            ListFormat::Csv => yield Ok(Bytes::from(CSV_FIELDS)),
329
16
            ListFormat::Data => yield Ok(Bytes::from("{\"data\":[")),
330
16
        }
331
16
        let mut first_sent = false;
332
16

            
333
16
        let mut buffer = BytesMut::new();
334
16
        while let Some(body) = resp_stream.next().await {
335
16
            match body {
336
16
                Err(e) => {
337
16
                    error!("[{}] get body error: {}", FN_NAME, e);
338
16
                    let err: Box<dyn StdError + Send + Sync> = Box::new(e);
339
16
                    yield Err(err);
340
16
                    break;
341
16
                }
342
16
                Ok(body) => buffer.extend_from_slice(&body[..]),
343
16
            }
344
16

            
345
16
            let mut json_stream = Deserializer::from_slice(&buffer[..]).into_iter::<Application>();
346
16
            let mut index = 0;
347
16
            let mut finish = false;
348
16
            loop {
349
16
                if let Some(Ok(mut v)) = json_stream.next() {
350
16
                    v.host_uri = match Url::parse(v.host_uri.as_str()) {
351
16
                        Err(e) => {
352
16
                            error!("[{}] parse body hostUri error: {}", FN_NAME, e);
353
16
                            let err: Box<dyn StdError + Send + Sync> = Box::new(e);
354
16
                            yield Err(err);
355
16
                            finish = true;
356
16
                            break;
357
16
                        }
358
16
                        Ok(uri) => trunc_host_uri(&uri),
359
16
                    };
360
16
                    match list_format {
361
16
                        ListFormat::Array | ListFormat::Data => match serde_json::to_string(&v) {
362
16
                            Err(e) =>{
363
16
                                error!("[{}] serialize JSON error: {}", FN_NAME, e);
364
16
                                let err: Box<dyn StdError + Send + Sync> = Box::new(e);
365
16
                                yield Err(err);
366
16
                                finish = true;
367
16
                                break;
368
16
                            }
369
16
                            Ok(v) => {
370
16
                                match first_sent {
371
16
                                    false => first_sent = true,
372
16
                                    true => yield Ok(Bytes::from(",")),
373
16
                                }
374
16
                                yield Ok(Bytes::copy_from_slice(v.as_str().as_bytes()));
375
16
                            }
376
16
                        }
377
16
                        ListFormat::Csv => {
378
16
                            let mut item = CsvItem{
379
16
                                application_id: v.application_id,
380
16
                                code: v.code,
381
16
                                unit_id: v.unit_id,
382
16
                                unit_code: v.unit_code,
383
16
                                created_at: v.created_at,
384
16
                                modified_at: v.modified_at,
385
16
                                host_uri: v.host_uri,
386
16
                                name: v.name,
387
16
                                info: None,
388
16
                            };
389
16
                            if let Ok(info_str) = serde_json::to_string(&v.info) {
390
16
                                item.info = Some(info_str);
391
16
                            }
392
16
                            let mut writer =
393
16
                                WriterBuilder::new().has_headers(false).from_writer(vec![]);
394
16
                            if let Err(e) = writer.serialize(item) {
395
16
                                error!("[{}] serialize CSV error: {}", FN_NAME, e);
396
16
                                let err: Box<dyn StdError + Send + Sync> = Box::new(e);
397
16
                                yield Err(err);
398
16
                                finish = true;
399
16
                                break;
400
16
                            }
401
16
                            match writer.into_inner() {
402
16
                                Err(e) => {
403
16
                                    error!("[{}] serialize bytes error: {}", FN_NAME, e);
404
16
                                    let err: Box<dyn StdError + Send + Sync> = Box::new(e);
405
16
                                    yield Err(err);
406
16
                                    finish = true;
407
16
                                    break;
408
16
                                }
409
16
                                Ok(row) => yield Ok(Bytes::copy_from_slice(row.as_slice())),
410
16
                            }
411
16
                        }
412
16
                    }
413
16
                    continue;
414
16
                }
415
16
                let offset = json_stream.byte_offset();
416
16
                if buffer.len() <= index + offset {
417
16
                    index = buffer.len();
418
16
                    break;
419
16
                }
420
16
                match buffer[index+offset] {
421
16
                    b'[' | b',' => {
422
16
                        index += offset + 1;
423
16
                        if buffer.len() <= index {
424
16
                            break;
425
16
                        }
426
16
                        json_stream =
427
16
                            Deserializer::from_slice(&buffer[index..]).into_iter::<Application>();
428
16
                    }
429
16
                    b']' => {
430
16
                        finish = true;
431
16
                        break;
432
16
                    }
433
16
                    _ => break,
434
16
                }
435
16
            }
436
16
            if finish {
437
16
                match list_format {
438
16
                    ListFormat::Array => yield Ok(Bytes::from("]")),
439
16
                    ListFormat::Csv => (),
440
16
                    ListFormat::Data => yield Ok(Bytes::from("]}")),
441
16
                }
442
16
                break;
443
16
            }
444
16
            buffer = buffer.split_off(index);
445
16
        }
446
16
    });
447
16
    match resp_builder.body(body) {
448
        Err(e) => ErrResp::ErrRsc(Some(e.to_string())).into_response(),
449
16
        Ok(resp) => resp,
450
    }
451
20
}
452

            
453
/// `GET /{base}/api/v1/application/{applicationId}`
454
36
async fn get_application(
455
36
    state: State<AppState>,
456
36
    Path(param): Path<ApplicationIdPath>,
457
36
    req: Request,
458
36
) -> impl IntoResponse {
459
    const FN_NAME: &'static str = "get_application";
460
36
    let broker_base = state.broker_base.as_str();
461
36
    let client = state.client.clone();
462
36
    let token = match req.headers().get(header::AUTHORIZATION) {
463
        None => {
464
4
            let e = "missing Authorization".to_string();
465
4
            return ErrResp::ErrParam(Some(e)).into_response();
466
        }
467
32
        Some(value) => value.clone(),
468
    };
469

            
470
32
    let (mut application, uri, host) = match get_application_inner(
471
32
        FN_NAME,
472
32
        &client,
473
32
        broker_base,
474
32
        param.application_id.as_str(),
475
32
        &token,
476
32
    )
477
32
    .await
478
    {
479
4
        Err(e) => return e,
480
28
        Ok((application, uri, host)) => (application, uri, host),
481
28
    };
482
28

            
483
28
    let host = host.as_str();
484
28
    let scheme = uri.scheme();
485
28
    if scheme.eq("amqp") || scheme.eq("amqps") {
486
16
        let AmqpState::RabbitMq(opts) = &state.amqp;
487
16
        let username = mq::to_username(
488
16
            QueueType::Application,
489
16
            application.unit_code.as_str(),
490
16
            application.code.as_str(),
491
16
        );
492
16
        let username = username.as_str();
493
16
        match rabbitmq::get_policies(&client, opts, host, username).await {
494
            Err(e) => {
495
                error!("[{}] get {} policies error: {}", FN_NAME, username, e);
496
                return e.into_response();
497
            }
498
16
            Ok(policies) => {
499
16
                application.ttl = policies.ttl;
500
16
                application.length = policies.length;
501
16
            }
502
        }
503
12
    }
504
28
    application.host_uri = trunc_host_uri(&uri);
505
28
    Json(&response::GetApplication { data: application }).into_response()
506
36
}
507

            
508
/// `PATCH /{base}/api/v1/application/{applicationId}`
509
52
async fn patch_application(
510
52
    state: State<AppState>,
511
52
    headers: HeaderMap,
512
52
    Path(param): Path<ApplicationIdPath>,
513
52
    Json(body): Json<request::PatchApplicationBody>,
514
52
) -> impl IntoResponse {
515
    const FN_NAME: &'static str = "patch_application";
516
52
    let broker_base = state.broker_base.as_str();
517
52
    let client = state.client.clone();
518
52
    let token = match headers.get(header::AUTHORIZATION) {
519
        None => {
520
4
            let e = "missing Authorization".to_string();
521
4
            return ErrResp::ErrParam(Some(e)).into_response();
522
        }
523
48
        Some(value) => value.clone(),
524
48
    };
525
48

            
526
48
    let data = &body.data;
527
48
    if data.host_uri.is_none()
528
24
        && data.name.is_none()
529
16
        && data.info.is_none()
530
16
        && data.ttl.is_none()
531
12
        && data.length.is_none()
532
12
        && data.password.is_none()
533
    {
534
4
        return ErrResp::ErrParam(Some("at least one parameter".to_string())).into_response();
535
44
    }
536

            
537
44
    let (application, uri, hostname) = match get_application_inner(
538
44
        FN_NAME,
539
44
        &client,
540
44
        broker_base,
541
44
        param.application_id.as_str(),
542
44
        &token,
543
44
    )
544
44
    .await
545
    {
546
4
        Err(e) => return e,
547
40
        Ok((application, uri, hostname)) => (application, uri, hostname),
548
40
    };
549
40

            
550
40
    let mut patch_data = request::PatchApplicationData {
551
40
        name: data.name.clone(),
552
40
        info: data.info.clone(),
553
40
        ..Default::default()
554
40
    };
555
40
    let mut patch_host: Option<PatchHost> = None;
556
40
    if let Some(host) = data.host_uri.as_ref() {
557
24
        if !strings::is_uri(host) {
558
4
            return ErrResp::ErrParam(Some("invalid `hostUri`".to_string())).into_response();
559
20
        }
560
20
        // Change to the new broker host.
561
20
        if !cmp_host_uri(application.host_uri.as_str(), host.as_str()) {
562
20
            let password = match data.password.as_ref() {
563
                None => {
564
4
                    let e = "missing `password`".to_string();
565
4
                    return ErrResp::ErrParam(Some(e)).into_response();
566
                }
567
16
                Some(password) => match password.len() {
568
                    0 => {
569
4
                        let e = "missing `password`".to_string();
570
4
                        return ErrResp::ErrParam(Some(e)).into_response();
571
                    }
572
12
                    _ => password,
573
                },
574
            };
575
12
            let mut new_host_uri = match Url::parse(host.as_str()) {
576
                Err(e) => {
577
                    let e = format!("invalid `hostUri`: {}", e);
578
                    return ErrResp::ErrParam(Some(e)).into_response();
579
                }
580
12
                Ok(uri) => match uri.host_str() {
581
                    None => {
582
4
                        let e = "invalid `hostUri`".to_string();
583
4
                        return ErrResp::ErrParam(Some(e)).into_response();
584
                    }
585
8
                    Some(_) => uri,
586
8
                },
587
8
            };
588
8

            
589
8
            let unit_code = application.unit_code.as_str();
590
8
            let code = application.code.as_str();
591
8
            let username = mq::to_username(QueueType::Application, unit_code, code);
592
8
            let resource = CreateQueueResource {
593
8
                scheme: new_host_uri.scheme(),
594
8
                host: new_host_uri.host_str().unwrap(),
595
8
                username: username.as_str(),
596
8
                password: password.as_str(),
597
8
                ttl: data.ttl,
598
8
                length: data.length,
599
8
                q_type: QueueType::Application,
600
8
            };
601
8
            if let Err(e) = create_queue_rsc(FN_NAME, &state, &resource).await {
602
                return e;
603
8
            }
604
8

            
605
8
            transfer_host_uri(&state, &mut new_host_uri, username.as_str());
606
8
            patch_data.host_uri = Some(new_host_uri.to_string());
607
8
            patch_host = Some(PatchHost {
608
8
                host_uri: new_host_uri,
609
8
                username,
610
8
            });
611
        }
612
16
    }
613

            
614
    // Send request body to the sylvia-iot-broker.
615
24
    if patch_data.host_uri.is_some() || patch_data.name.is_some() || patch_data.info.is_some() {
616
12
        let application_id = param.application_id.as_str();
617
12
        let uri = format!("{}/api/v1/application/{}", broker_base, application_id);
618
12
        let mut builder = client
619
12
            .request(reqwest::Method::PATCH, uri)
620
12
            .header(reqwest::header::AUTHORIZATION, &token)
621
12
            .json(&request::PatchApplicationBody { data: patch_data });
622
12
        if let Some(content_type) = headers.get(header::CONTENT_TYPE) {
623
12
            builder = builder.header(reqwest::header::CONTENT_TYPE, content_type);
624
12
        }
625
12
        let api_req = match builder.build() {
626
            Err(e) => {
627
                clear_patch_host(FN_NAME, &state, &patch_host).await;
628
                let e = format!("generate request error: {}", e);
629
                error!("[{}] {}", FN_NAME, e);
630
                return ErrResp::ErrRsc(Some(e)).into_response();
631
            }
632
12
            Ok(req) => req,
633
        };
634
12
        let api_resp = match client.execute(api_req).await {
635
            Err(e) => {
636
                clear_patch_host(FN_NAME, &state, &patch_host).await;
637
                let e = format!("execute request error: {}", e);
638
                error!("[{}] {}", FN_NAME, e);
639
                return ErrResp::ErrIntMsg(Some(e)).into_response();
640
            }
641
12
            Ok(resp) => resp,
642
12
        };
643
12

            
644
12
        let status_code = api_resp.status();
645
12
        if status_code != StatusCode::NO_CONTENT {
646
            clear_patch_host(FN_NAME, &state, &patch_host).await;
647
            let mut resp_builder = Response::builder().status(status_code);
648
            for (k, v) in api_resp.headers() {
649
                resp_builder = resp_builder.header(k, v);
650
            }
651
            match resp_builder.body(Body::from_stream(api_resp.bytes_stream())) {
652
                Err(e) => {
653
                    let e = format!("wrap response body error: {}", e);
654
                    error!("[{}] {}", FN_NAME, e);
655
                    return ErrResp::ErrIntMsg(Some(e)).into_response();
656
                }
657
                Ok(resp) => return resp,
658
            }
659
12
        }
660
12
    }
661

            
662
24
    if let Some(host) = patch_host {
663
8
        let resource = ClearQueueResource {
664
8
            scheme: uri.scheme(),
665
8
            host: uri.host_str().unwrap(),
666
8
            username: host.username.as_str(),
667
8
        };
668
8
        let _ = clear_queue_rsc(FN_NAME, &state, &resource).await;
669
8
        return StatusCode::NO_CONTENT.into_response();
670
16
    } else if data.ttl.is_none() && data.length.is_none() && data.password.is_none() {
671
4
        return StatusCode::NO_CONTENT.into_response();
672
12
    }
673

            
674
    // Update broker information without changing hostUri.
675
12
    if let Some(password) = data.password.as_ref() {
676
12
        if password.len() == 0 {
677
4
            let e = "missing `password`".to_string();
678
4
            return ErrResp::ErrParam(Some(e)).into_response();
679
8
        }
680
    }
681
8
    let unit_code = application.unit_code.as_str();
682
8
    let code = application.code.as_str();
683
8
    let hostname = hostname.as_str();
684
8
    let username = mq::to_username(QueueType::Application, unit_code, code);
685
8
    let username = username.as_str();
686
8
    match uri.scheme() {
687
8
        "amqp" | "amqps" => match &state.amqp {
688
4
            AmqpState::RabbitMq(opts) => {
689
4
                if data.ttl.is_some() || data.length.is_some() {
690
4
                    let policies = rabbitmq::BrokerPolicies {
691
4
                        ttl: data.ttl,
692
4
                        length: data.length,
693
4
                    };
694
                    if let Err(e) =
695
4
                        rabbitmq::put_policies(&client, opts, hostname, username, &policies).await
696
                    {
697
                        let e = format!("patch RabbitMQ error: {}", e);
698
                        error!("[{}] {}", FN_NAME, e);
699
                        return ErrResp::ErrIntMsg(Some(e)).into_response();
700
4
                    }
701
                }
702
4
                if let Some(password) = data.password.as_ref() {
703
4
                    let password = password.as_str();
704
                    if let Err(e) =
705
4
                        rabbitmq::put_user(&client, opts, hostname, username, password).await
706
                    {
707
                        let e = format!("patch RabbitMQ password error: {}", e);
708
                        error!("[{}] {}", FN_NAME, e);
709
                        return ErrResp::ErrIntMsg(Some(e)).into_response();
710
4
                    }
711
                }
712
            }
713
        },
714
4
        "mqtt" | "mqtts" => match &state.mqtt {
715
2
            MqttState::Emqx(opts) => {
716
2
                if let Some(password) = data.password.as_ref() {
717
2
                    let password = password.as_str();
718
                    if let Err(e) =
719
2
                        emqx::put_user(&client, opts, hostname, username, password).await
720
                    {
721
                        let e = format!("patch EMQX password error: {}", e);
722
                        error!("[{}] {}", FN_NAME, e);
723
                        return ErrResp::ErrIntMsg(Some(e)).into_response();
724
2
                    }
725
                }
726
            }
727
2
            MqttState::Rumqttd => {}
728
        },
729
        _ => {}
730
    }
731

            
732
8
    StatusCode::NO_CONTENT.into_response()
733
52
}
734

            
735
/// `DELETE /{base}/api/v1/application/{applicationId}`
736
16
async fn delete_application(
737
16
    state: State<AppState>,
738
16
    Path(param): Path<ApplicationIdPath>,
739
16
    req: Request,
740
16
) -> impl IntoResponse {
741
    const FN_NAME: &'static str = "delete_application";
742
16
    let broker_base = state.broker_base.as_str();
743
16
    let application_id = param.application_id.as_str();
744
16
    let api_path = format!("{}/api/v1/application/{}", broker_base, application_id);
745
16
    let client = state.client.clone();
746
16
    let token = match req.headers().get(header::AUTHORIZATION) {
747
        None => {
748
4
            let e = "missing Authorization".to_string();
749
4
            return ErrResp::ErrParam(Some(e)).into_response();
750
        }
751
12
        Some(value) => value.clone(),
752
    };
753

            
754
8
    let (application, uri, host) =
755
12
        match get_application_inner(FN_NAME, &client, broker_base, application_id, &token).await {
756
4
            Err(e) => return e,
757
8
            Ok((application, uri, host)) => (application, uri, host),
758
        };
759

            
760
8
    let resp = api_bridge(FN_NAME, &client, req, api_path.as_str()).await;
761
8
    if !resp.status().is_success() {
762
        return resp;
763
8
    }
764
8

            
765
8
    let unit_code = application.unit_code.as_str();
766
8
    let code = application.code.as_str();
767
8
    let username = mq::to_username(QueueType::Application, unit_code, code);
768
8
    let clear_rsc = ClearQueueResource {
769
8
        scheme: uri.scheme(),
770
8
        host: host.as_str(),
771
8
        username: username.as_str(),
772
8
    };
773
8
    if let Err(e) = clear_queue_rsc(FN_NAME, &state, &clear_rsc).await {
774
        return e;
775
8
    }
776
8

            
777
8
    StatusCode::NO_CONTENT.into_response()
778
16
}
779

            
780
/// `GET /{base}/api/v1/application/{applicationId}/stats`
781
80
async fn get_application_stats(
782
80
    state: State<AppState>,
783
80
    Path(param): Path<ApplicationIdPath>,
784
80
    req: Request,
785
80
) -> impl IntoResponse {
786
    const FN_NAME: &'static str = "get_application";
787
80
    let broker_base = state.broker_base.as_str();
788
80
    let client = state.client.clone();
789
80
    let token = match req.headers().get(header::AUTHORIZATION) {
790
        None => {
791
4
            let e = "missing Authorization".to_string();
792
4
            return ErrResp::ErrParam(Some(e)).into_response();
793
        }
794
76
        Some(value) => value.clone(),
795
    };
796

            
797
76
    let (application, uri, host) = match get_application_inner(
798
76
        FN_NAME,
799
76
        &client,
800
76
        broker_base,
801
76
        param.application_id.as_str(),
802
76
        &token,
803
76
    )
804
76
    .await
805
    {
806
4
        Err(e) => return e,
807
72
        Ok((application, uri, host)) => (application, uri, host),
808
72
    };
809
72

            
810
72
    let host = host.as_str();
811
72
    let scheme = uri.scheme();
812
72
    let data = match scheme {
813
72
        "amqp" | "amqps" => {
814
58
            let AmqpState::RabbitMq(opts) = &state.amqp;
815
58
            let username = mq::to_username(
816
58
                QueueType::Application,
817
58
                application.unit_code.as_str(),
818
58
                application.code.as_str(),
819
58
            );
820
58
            let username = username.as_str();
821
58
            response::GetApplicationStatsData {
822
58
                uldata: match rabbitmq::stats(&client, opts, host, username, "uldata").await {
823
                    Err(ErrResp::ErrNotFound(_)) => response::Stats {
824
                        consumers: 0,
825
                        messages: 0,
826
                        publish_rate: 0.0,
827
                        deliver_rate: 0.0,
828
                    },
829
                    Err(e) => {
830
                        error!("[{}] get uldata stats error: {}", FN_NAME, e);
831
                        return e.into_response();
832
                    }
833
58
                    Ok(stats) => response::Stats {
834
58
                        consumers: stats.consumers,
835
58
                        messages: stats.messages,
836
58
                        publish_rate: stats.publish_rate,
837
58
                        deliver_rate: stats.deliver_rate,
838
58
                    },
839
                },
840
58
                dldata_resp: match rabbitmq::stats(&client, opts, host, username, "dldata-resp")
841
58
                    .await
842
                {
843
                    Err(ErrResp::ErrNotFound(_)) => response::Stats {
844
                        consumers: 0,
845
                        messages: 0,
846
                        publish_rate: 0.0,
847
                        deliver_rate: 0.0,
848
                    },
849
                    Err(e) => {
850
                        error!("[{}] get dldata-resp stats error: {}", FN_NAME, e);
851
                        return e.into_response();
852
                    }
853
58
                    Ok(stats) => response::Stats {
854
58
                        consumers: stats.consumers,
855
58
                        messages: stats.messages,
856
58
                        publish_rate: stats.publish_rate,
857
58
                        deliver_rate: stats.deliver_rate,
858
58
                    },
859
                },
860
58
                dldata_result: match rabbitmq::stats(&client, opts, host, username, "dldata-result")
861
58
                    .await
862
                {
863
                    Err(ErrResp::ErrNotFound(_)) => response::Stats {
864
                        consumers: 0,
865
                        messages: 0,
866
                        publish_rate: 0.0,
867
                        deliver_rate: 0.0,
868
                    },
869
                    Err(e) => {
870
                        error!("[{}] get dldata-result stats error: {}", FN_NAME, e);
871
                        return e.into_response();
872
                    }
873
58
                    Ok(stats) => response::Stats {
874
58
                        consumers: stats.consumers,
875
58
                        messages: stats.messages,
876
58
                        publish_rate: stats.publish_rate,
877
58
                        deliver_rate: stats.deliver_rate,
878
58
                    },
879
                },
880
            }
881
        }
882
14
        "mqtt" | "mqtts" => match &state.mqtt {
883
12
            MqttState::Emqx(opts) => {
884
12
                let username = mq::to_username(
885
12
                    QueueType::Application,
886
12
                    application.unit_code.as_str(),
887
12
                    application.code.as_str(),
888
12
                );
889
12
                let username = username.as_str();
890
12
                response::GetApplicationStatsData {
891
12
                    uldata: match emqx::stats(&client, opts, host, username, "uldata").await {
892
                        Err(e) => {
893
                            error!("[{}] get uldata stats error: {}", FN_NAME, e);
894
                            return e.into_response();
895
                        }
896
12
                        Ok(stats) => response::Stats {
897
12
                            consumers: stats.consumers,
898
12
                            messages: stats.messages,
899
12
                            publish_rate: stats.publish_rate,
900
12
                            deliver_rate: stats.deliver_rate,
901
12
                        },
902
12
                    },
903
12
                    dldata_resp: match emqx::stats(&client, opts, host, username, "dldata-resp")
904
12
                        .await
905
                    {
906
                        Err(e) => {
907
                            error!("[{}] get dldata-resp stats error: {}", FN_NAME, e);
908
                            return e.into_response();
909
                        }
910
12
                        Ok(stats) => response::Stats {
911
12
                            consumers: stats.consumers,
912
12
                            messages: stats.messages,
913
12
                            publish_rate: stats.publish_rate,
914
12
                            deliver_rate: stats.deliver_rate,
915
12
                        },
916
12
                    },
917
12
                    dldata_result: match emqx::stats(&client, opts, host, username, "dldata-result")
918
12
                        .await
919
                    {
920
                        Err(e) => {
921
                            error!("[{}] get dldata-result stats error: {}", FN_NAME, e);
922
                            return e.into_response();
923
                        }
924
12
                        Ok(stats) => response::Stats {
925
12
                            consumers: stats.consumers,
926
12
                            messages: stats.messages,
927
12
                            publish_rate: stats.publish_rate,
928
12
                            deliver_rate: stats.deliver_rate,
929
12
                        },
930
                    },
931
                }
932
            }
933
2
            MqttState::Rumqttd => response::GetApplicationStatsData {
934
2
                uldata: response::Stats {
935
2
                    ..Default::default()
936
2
                },
937
2
                dldata_resp: response::Stats {
938
2
                    ..Default::default()
939
2
                },
940
2
                dldata_result: response::Stats {
941
2
                    ..Default::default()
942
2
                },
943
2
            },
944
        },
945
        _ => {
946
            let e = format!("unsupport scheme {}", scheme);
947
            error!("[{}] {}", FN_NAME, e);
948
            return ErrResp::ErrUnknown(Some(e)).into_response();
949
        }
950
    };
951
72
    Json(&response::GetApplicationStats { data }).into_response()
952
80
}
953

            
954
/// `POST /{base}/api/v1/application/{applicationId}/dldata`
955
28
async fn post_application_dldata(
956
28
    state: State<AppState>,
957
28
    headers: HeaderMap,
958
28
    Path(param): Path<ApplicationIdPath>,
959
28
    Json(body): Json<request::PostApplicationDlDataBody>,
960
28
) -> impl IntoResponse {
961
    const FN_NAME: &'static str = "post_application_dldata";
962
28
    let broker_base = state.broker_base.as_str();
963
28
    let client = state.client.clone();
964
28
    let token = match headers.get(header::AUTHORIZATION) {
965
        None => {
966
4
            let e = "missing Authorization".to_string();
967
4
            return ErrResp::ErrParam(Some(e)).into_response();
968
        }
969
24
        Some(value) => value.clone(),
970
24
    };
971
24

            
972
24
    if body.data.device_id.len() == 0 {
973
4
        let e = "empty `deviceId` is invalid".to_string();
974
4
        return ErrResp::ErrParam(Some(e)).into_response();
975
20
    }
976
20
    if let Err(e) = hex::decode(body.data.payload.as_str()) {
977
4
        let e = format!("`payload` is not hexadecimal string: {}", e);
978
4
        return ErrResp::ErrParam(Some(e)).into_response();
979
16
    }
980

            
981
16
    let (application, uri, hostname) = match get_application_inner(
982
16
        FN_NAME,
983
16
        &client,
984
16
        broker_base,
985
16
        param.application_id.as_str(),
986
16
        &token,
987
16
    )
988
16
    .await
989
    {
990
4
        Err(e) => return e,
991
12
        Ok((application, uri, hostname)) => (application, uri, hostname),
992
12
    };
993
12
    match get_device_inner(
994
12
        FN_NAME,
995
12
        &client,
996
12
        broker_base,
997
12
        body.data.device_id.as_str(),
998
12
        &token,
999
12
    )
12
    .await
    {
        Err(e) => return e,
12
        Ok(device) => match device {
            None => {
4
                return ErrResp::Custom(
4
                    ErrReq::DEVICE_NOT_EXIST.0,
4
                    ErrReq::DEVICE_NOT_EXIST.1,
4
                    None,
4
                )
4
                .into_response()
            }
8
            Some(_) => (),
8
        },
8
    };
8

            
8
    let hostname = hostname.as_str();
8
    let scheme = uri.scheme();
8
    let payload = match serde_json::to_string(&DlData {
8
        correlation_id: "1".to_string(),
8
        device_id: Some(body.data.device_id.clone()),
8
        data: body.data.payload.clone(),
8
        ..Default::default()
8
    }) {
        Err(e) => {
            let e = format!("encode JSON error: {}", e);
            error!("[{}] {}", FN_NAME, e);
            return ErrResp::ErrRsc(Some(e)).into_response();
        }
8
        Ok(payload) => general_purpose::STANDARD.encode(payload),
8
    };
8
    match scheme {
8
        "amqp" | "amqps" => {
4
            let AmqpState::RabbitMq(opts) = &state.amqp;
4
            let username = mq::to_username(
4
                QueueType::Application,
4
                application.unit_code.as_str(),
4
                application.code.as_str(),
4
            );
4
            let username = username.as_str();
            if let Err(e) =
4
                rabbitmq::publish_message(&client, opts, hostname, username, "dldata", payload)
4
                    .await
            {
                return e.into_response();
4
            }
        }
4
        "mqtt" | "mqtts" => match &state.mqtt {
2
            MqttState::Emqx(opts) => {
2
                let username = mq::to_username(
2
                    QueueType::Application,
2
                    application.unit_code.as_str(),
2
                    application.code.as_str(),
2
                );
2
                let username = username.as_str();
                if let Err(e) =
2
                    emqx::publish_message(&client, opts, hostname, username, "dldata", payload)
2
                        .await
                {
                    return e.into_response();
2
                }
            }
            MqttState::Rumqttd => {
2
                let e = "not support now".to_string();
2
                return ErrResp::ErrParam(Some(e)).into_response();
            }
        },
        _ => {
            let e = format!("unsupport scheme {}", scheme);
            error!("[{}] {}", FN_NAME, e);
            return ErrResp::ErrUnknown(Some(e)).into_response();
        }
    }
6
    StatusCode::NO_CONTENT.into_response()
28
}
180
async fn get_application_inner(
180
    fn_name: &str,
180
    client: &reqwest::Client,
180
    broker_base: &str,
180
    application_id: &str,
180
    token: &HeaderValue,
180
) -> Result<(response::GetApplicationData, Url, String), Response> {
180
    let uri = format!("{}/api/v1/application/{}", broker_base, application_id);
180
    let resp = get_stream_resp(fn_name, token, &client, uri.as_str()).await?;
160
    let application = match resp.json::<response::GetApplication>().await {
        Err(e) => {
            let e = format!("wrong response of application: {}", e);
            error!("[{}] {}", fn_name, e);
            return Err(ErrResp::ErrIntMsg(Some(e)).into_response());
        }
160
        Ok(application) => application.data,
    };
160
    let uri = match Url::parse(application.host_uri.as_str()) {
        Err(e) => {
            let e = format!("unexpected hostUri: {}", e);
            error!("[{}] {}", fn_name, e);
            return Err(ErrResp::ErrUnknown(Some(e)).into_response());
        }
160
        Ok(uri) => uri,
    };
160
    let host = match uri.host_str() {
        None => {
            let e = "unexpected hostUri".to_string();
            error!("[{}] {}", fn_name, e);
            return Err(ErrResp::ErrUnknown(Some(e)).into_response());
        }
160
        Some(host) => host.to_string(),
160
    };
160
    Ok((application, uri, host))
180
}
64
async fn check_application_code_inner(
64
    fn_name: &str,
64
    client: &reqwest::Client,
64
    broker_base: &str,
64
    unit_id: &str,
64
    code: &str,
64
    token: &HeaderValue,
64
) -> Result<u64, Response> {
64
    let uri = format!("{}/api/v1/application/count", broker_base);
64
    let req = match client
64
        .request(reqwest::Method::GET, uri)
64
        .header(reqwest::header::AUTHORIZATION, token)
64
        .query(&[("unit", unit_id), ("code", code)])
64
        .build()
    {
        Err(e) => {
            let e = format!("generate request error: {}", e);
            error!("[{}] {}", fn_name, e);
            return Err(ErrResp::ErrRsc(Some(e)).into_response());
        }
64
        Ok(req) => req,
    };
64
    let resp = match client.execute(req).await {
        Err(e) => {
            let e = format!("execute request error: {}", e);
            error!("[{}] {}", fn_name, e);
            return Err(ErrResp::ErrIntMsg(Some(e)).into_response());
        }
64
        Ok(resp) => resp,
64
    };
64

            
64
    match resp.json::<response::GetCount>().await {
        Err(e) => {
            let e = format!("wrong response of application: {}", e);
            error!("[{}] {}", fn_name, e);
            Err(ErrResp::ErrIntMsg(Some(e)).into_response())
        }
64
        Ok(data) => Ok(data.data.count),
    }
64
}