1
use axum::{
2
    body::{self, Body},
3
    extract::Request,
4
    http::{header, response::Builder, HeaderValue, StatusCode},
5
    response::{IntoResponse, Response},
6
};
7
use log::error;
8
use reqwest::{self, Client, Method};
9
use serde_urlencoded;
10
use url::Url;
11

            
12
use sylvia_iot_corelib::err::ErrResp;
13

            
14
use super::{AmqpState, MqttState, State as AppState};
15
use crate::libs::mq::{emqx, rabbitmq, QueueType};
16

            
17
pub mod application;
18
pub mod auth;
19
pub mod client;
20
pub mod device;
21
pub mod device_route;
22
pub mod dldata_buffer;
23
pub mod network;
24
pub mod network_route;
25
mod request;
26
mod response;
27
pub mod unit;
28
pub mod user;
29

            
30
enum ListResp {
31
    /// The complete response. Return this directly.
32
    Axum(Response),
33
    /// Get body from [`reqwest::Response`].
34
    /// Use [`axum::http::response::Builder`] to build response body.
35
    ArrayStream(reqwest::Response, Builder),
36
}
37

            
38
struct CreateQueueResource<'a> {
39
    scheme: &'a str,
40
    host: &'a str,
41
    username: &'a str,
42
    password: &'a str,
43
    ttl: Option<usize>,
44
    length: Option<usize>,
45
    q_type: QueueType,
46
}
47

            
48
struct ClearQueueResource<'a> {
49
    scheme: &'a str,
50
    host: &'a str,
51
    username: &'a str,
52
}
53

            
54
struct PatchHost {
55
    host_uri: Url,
56
    username: String,
57
}
58

            
59
/// To launch a HTTP request as bridge from coremgr to auth/broker.
60
98
async fn api_bridge(fn_name: &str, client: &Client, req: Request, api_path: &str) -> Response {
61
98
    let (mut parts, body) = req.into_parts();
62
98

            
63
98
    parts.headers.remove(header::CONTENT_LENGTH);
64
98
    let mut builder = client
65
98
        .request(parts.method, api_path)
66
98
        .headers(parts.headers);
67
98
    if let Some(query_str) = parts.uri.query() {
68
2
        match serde_urlencoded::from_str::<Vec<(String, String)>>(query_str) {
69
            Err(e) => {
70
                let e = format!("parse query error: {}", e);
71
                return ErrResp::ErrParam(Some(e)).into_response();
72
            }
73
2
            Ok(query) => builder = builder.query(&query),
74
        }
75
96
    }
76
98
    match body::to_bytes(body, usize::MAX).await {
77
        Err(e) => {
78
            let e = format!("convert body error: {}", e);
79
            return ErrResp::ErrParam(Some(e)).into_response();
80
        }
81
98
        Ok(body) => builder = builder.body(body),
82
    }
83
98
    let api_req = match builder.build() {
84
        Err(e) => {
85
            let e = format!("generate request error: {}", e);
86
            error!("[{}] {}", fn_name, e);
87
            return ErrResp::ErrRsc(Some(e)).into_response();
88
        }
89
98
        Ok(req) => req,
90
    };
91
106
    let api_resp = match client.execute(api_req).await {
92
        Err(e) => {
93
            let e = format!("execute request error: {}", e);
94
            error!("[{}] {}", fn_name, e);
95
            return ErrResp::ErrIntMsg(Some(e)).into_response();
96
        }
97
98
        Ok(resp) => resp,
98
98
    };
99
98

            
100
98
    let mut resp_builder = Response::builder().status(api_resp.status());
101
274
    for (k, v) in api_resp.headers() {
102
274
        if k == reqwest::header::CONTENT_LENGTH {
103
88
            continue;
104
186
        }
105
186
        resp_builder = resp_builder.header(k, v);
106
    }
107
98
    match resp_builder.body(Body::from_stream(api_resp.bytes_stream())) {
108
        Err(e) => {
109
            let e = format!("wrap response body error: {}", e);
110
            error!("[{}] {}", fn_name, e);
111
            ErrResp::ErrIntMsg(Some(e)).into_response()
112
        }
113
98
        Ok(resp) => resp,
114
    }
115
98
}
116

            
117
/// To launch a HTTP request for one `/list` API as bridge from coremgr to auth/broker.
118
104
async fn list_api_bridge(
119
104
    fn_name: &str,
120
104
    client: &Client,
121
104
    req: Request,
122
104
    api_path: &str,
123
104
    force_array: bool,
124
104
    csv_file: &str,
125
104
) -> ListResp {
126
104
    let (mut parts, _body) = req.into_parts();
127
104
    parts.headers.remove(header::CONTENT_LENGTH);
128
104

            
129
104
    let mut is_csv = false;
130
104
    let mut builder = client
131
104
        .request(parts.method, api_path)
132
104
        .headers(parts.headers);
133
104
    if let Some(query_str) = parts.uri.query() {
134
82
        let query = match serde_urlencoded::from_str::<Vec<(String, String)>>(query_str) {
135
            Err(e) => {
136
                let e = format!("parse query error: {}", e);
137
                return ListResp::Axum(ErrResp::ErrParam(Some(e)).into_response());
138
            }
139
82
            Ok(query) => query,
140
82
        };
141
82
        let mut has_format = false;
142
82
        let mut query: Vec<_> = query
143
82
            .iter()
144
122
            .map(|(k, v)| {
145
122
                if k.as_str().eq("format") {
146
60
                    has_format = true;
147
60
                    if v.as_str().eq("csv") {
148
20
                        is_csv = true;
149
20
                        ("format".to_string(), "array".to_string())
150
                    } else {
151
40
                        (k.clone(), v.clone())
152
                    }
153
                } else {
154
62
                    (k.clone(), v.clone())
155
                }
156
122
            })
157
82
            .collect();
158
82
        if force_array && !has_format {
159
4
            query.push(("format".to_string(), "array".to_string()));
160
78
        }
161
82
        builder = builder.query(&query);
162
22
    } else if force_array {
163
4
        builder = builder.query(&vec![("format", "array")]);
164
18
    }
165
104
    let api_req = match builder.build() {
166
        Err(e) => {
167
            let e = format!("generate request error: {}", e);
168
            error!("[{}] {}", fn_name, e);
169
            return ListResp::Axum(ErrResp::ErrRsc(Some(e)).into_response());
170
        }
171
104
        Ok(req) => req,
172
    };
173
104
    let api_resp = match client.execute(api_req).await {
174
        Err(e) => {
175
            let e = format!("execute request error: {}", e);
176
            error!("[{}] {}", fn_name, e);
177
            return ListResp::Axum(ErrResp::ErrIntMsg(Some(e)).into_response());
178
        }
179
104
        Ok(resp) => resp,
180
104
    };
181
104

            
182
104
    let mut resp_builder = Response::builder().status(api_resp.status());
183
104
    if is_csv {
184
20
        resp_builder = resp_builder
185
20
            .header(header::CONTENT_TYPE, "text/csv")
186
20
            .header(
187
20
                header::CONTENT_DISPOSITION,
188
20
                format!("attachment;filename={}.csv", csv_file),
189
20
            );
190
20
        if let Some(auth) = api_resp.headers().get(header::WWW_AUTHENTICATE) {
191
            resp_builder = resp_builder.header(header::WWW_AUTHENTICATE, auth.clone());
192
20
        }
193
    } else {
194
252
        for (k, v) in api_resp.headers() {
195
252
            if k == reqwest::header::CONTENT_LENGTH {
196
24
                continue;
197
228
            }
198
228
            resp_builder = resp_builder.header(k, v);
199
        }
200
    }
201
104
    if api_resp.status() == reqwest::StatusCode::OK && (is_csv || force_array) {
202
32
        return ListResp::ArrayStream(api_resp, resp_builder);
203
72
    }
204
72
    match resp_builder.body(Body::from_stream(api_resp.bytes_stream())) {
205
        Err(e) => {
206
            let e = format!("wrap response body error: {}", e);
207
            error!("[{}] {}", fn_name, e);
208
            ListResp::Axum(ErrResp::ErrIntMsg(Some(e)).into_response())
209
        }
210
72
        Ok(resp) => ListResp::Axum(resp),
211
    }
212
104
}
213

            
214
8
async fn get_tokeninfo_inner(
215
8
    fn_name: &str,
216
8
    client: &Client,
217
8
    auth_base: &str,
218
8
    token: &HeaderValue,
219
8
) -> Result<response::TokenInfo, Response> {
220
8
    let uri = format!("{}/api/v1/auth/tokeninfo", auth_base);
221
8
    let resp = get_stream_resp(fn_name, token, &client, uri.as_str()).await?;
222
8
    match resp.json::<response::GetTokenInfo>().await {
223
        Err(e) => {
224
            let e = format!("wrong response of token info: {}", e);
225
            error!("[{}] {}", fn_name, e);
226
            Err(ErrResp::ErrIntMsg(Some(e)).into_response())
227
        }
228
8
        Ok(info) => Ok(info.data),
229
    }
230
8
}
231

            
232
72
async fn get_unit_inner(
233
72
    fn_name: &str,
234
72
    client: &Client,
235
72
    broker_base: &str,
236
72
    unit_id: &str,
237
72
    token: &HeaderValue,
238
72
) -> Result<Option<response::Unit>, Response> {
239
72
    let uri = format!("{}/api/v1/unit/{}", broker_base, unit_id);
240
72
    match get_stream_resp(fn_name, token, &client, uri.as_str()).await {
241
4
        Err(resp) => match resp.status() {
242
4
            StatusCode::NOT_FOUND => Ok(None),
243
            _ => Err(resp),
244
        },
245
68
        Ok(resp) => match resp.json::<response::GetUnit>().await {
246
            Err(e) => {
247
                let e = format!("wrong response of unit: {}", e);
248
                error!("[{}] {}", fn_name, e);
249
                Err(ErrResp::ErrIntMsg(Some(e)).into_response())
250
            }
251
68
            Ok(unit) => Ok(Some(unit.data)),
252
        },
253
    }
254
72
}
255

            
256
12
async fn get_device_inner(
257
12
    fn_name: &str,
258
12
    client: &Client,
259
12
    broker_base: &str,
260
12
    device_id: &str,
261
12
    token: &HeaderValue,
262
12
) -> Result<Option<response::Device>, Response> {
263
12
    let uri = format!("{}/api/v1/device/{}", broker_base, device_id);
264
12
    match get_stream_resp(fn_name, token, &client, uri.as_str()).await {
265
4
        Err(resp) => match resp.status() {
266
4
            StatusCode::NOT_FOUND => Ok(None),
267
            _ => Err(resp),
268
        },
269
8
        Ok(resp) => match resp.json::<response::GetDevice>().await {
270
            Err(e) => {
271
                let e = format!("wrong response of device: {}", e);
272
                error!("[{}] {}", fn_name, e);
273
                Err(ErrResp::ErrIntMsg(Some(e)).into_response())
274
            }
275
8
            Ok(device) => Ok(Some(device.data)),
276
        },
277
    }
278
12
}
279

            
280
316
async fn get_stream_resp(
281
316
    fn_name: &str,
282
316
    token: &HeaderValue,
283
316
    client: &Client,
284
316
    uri: &str,
285
316
) -> Result<reqwest::Response, Response> {
286
316
    match client
287
316
        .request(Method::GET, uri)
288
316
        .header(reqwest::header::AUTHORIZATION, token)
289
316
        .build()
290
    {
291
        Err(e) => {
292
            let e = format!("generate request error: {}", e);
293
            error!("[{}] {}", fn_name, e);
294
            Err(ErrResp::ErrRsc(Some(e)).into_response())
295
        }
296
316
        Ok(req) => match client.execute(req).await {
297
            Err(e) => {
298
                let e = format!("execute request error: {}", e);
299
                error!("[{}] {}", fn_name, e);
300
                Err(ErrResp::ErrIntMsg(Some(e)).into_response())
301
            }
302
316
            Ok(resp) => match resp.status() {
303
288
                StatusCode::OK => Ok(resp),
304
                _ => {
305
28
                    let mut resp_builder = Response::builder().status(resp.status());
306
84
                    for (k, v) in resp.headers() {
307
84
                        resp_builder = resp_builder.header(k, v);
308
84
                    }
309
28
                    match resp_builder.body(Body::from_stream(resp.bytes_stream())) {
310
                        Err(e) => {
311
                            let e = format!("wrap response body error: {}", e);
312
                            error!("[{}] {}", fn_name, e);
313
                            Err(ErrResp::ErrIntMsg(Some(e)).into_response())
314
                        }
315
28
                        Ok(resp) => Err(resp),
316
                    }
317
                }
318
            },
319
        },
320
    }
321
316
}
322

            
323
/// To compare if two broker hosts are the same.
324
///
325
/// For example:
326
/// - `amqp://localhost` is the same as `amqp://localhost:5672`
327
/// - `mqtts://localhost` is the same as `mqtts://localhost:8883`
328
20
fn cmp_host_uri(src: &str, dst: &str) -> bool {
329
20
    let src_uri = match Url::parse(src) {
330
        Err(_) => return false,
331
20
        Ok(uri) => uri,
332
    };
333
20
    let dst_uri = match Url::parse(dst) {
334
        Err(_) => return false,
335
20
        Ok(uri) => uri,
336
20
    };
337
20
    if src_uri.scheme() != dst_uri.scheme() || src_uri.host_str() != dst_uri.host_str() {
338
20
        return false;
339
    }
340
    let src_port = match src_uri.port() {
341
        None => match src_uri.scheme() {
342
            "amqp" => 5672,
343
            "amqps" => 5671,
344
            "mqtt" => 1883,
345
            "mqtts" => 8883,
346
            _ => return false,
347
        },
348
        Some(port) => port,
349
    };
350
    let dst_port = match dst_uri.port() {
351
        None => match dst_uri.scheme() {
352
            "amqp" => 5672,
353
            "amqps" => 5671,
354
            "mqtt" => 1883,
355
            "mqtts" => 8883,
356
            _ => return false,
357
        },
358
        Some(port) => port,
359
    };
360
    src_port == dst_port
361
20
}
362

            
363
/// To set-up queue resources (vhost, ACL, ...) in the broker.
364
64
async fn create_queue_rsc<'a>(
365
64
    fn_name: &str,
366
64
    state: &AppState,
367
64
    rsc: &CreateQueueResource<'a>,
368
64
) -> Result<(), Response> {
369
64
    let scheme = rsc.scheme;
370
64
    match scheme {
371
64
        "amqp" | "amqps" => match &state.amqp {
372
30
            AmqpState::RabbitMq(opts) => {
373
30
                let client = state.client.clone();
374
30
                let host = rsc.host;
375
30
                let username = rsc.username;
376
30
                let password = rsc.password;
377
30
                let clear_rsc = ClearQueueResource {
378
30
                    scheme,
379
30
                    host,
380
30
                    username,
381
30
                };
382
78
                if let Err(e) = rabbitmq::put_user(&client, opts, host, username, password).await {
383
                    error!("[{}] add RabbitMQ user {} error: {}", fn_name, username, e);
384
                    return Err(e.into_response());
385
30
                }
386
30
                if let Err(e) = rabbitmq::put_vhost(&client, opts, host, username).await {
387
                    let _ = clear_queue_rsc(fn_name, &state, &clear_rsc);
388
                    error!("[{}] add RabbitMQ vhost {} error: {}", fn_name, username, e);
389
                    return Err(e.into_response());
390
30
                }
391
                if let Err(e) =
392
30
                    rabbitmq::put_permissions(&client, opts, host, rsc.q_type, username).await
393
                {
394
                    let _ = clear_queue_rsc(fn_name, &state, &clear_rsc);
395
                    error!(
396
                        "[{}] add RabbitMQ permission {} error: {}",
397
                        fn_name, username, e
398
                    );
399
                    return Err(e.into_response());
400
30
                }
401
30
                if rsc.ttl.is_some() && rsc.ttl.is_some() {
402
14
                    let policies = rabbitmq::BrokerPolicies {
403
14
                        ttl: rsc.ttl,
404
14
                        length: rsc.length,
405
14
                    };
406
                    if let Err(e) =
407
14
                        rabbitmq::put_policies(&client, opts, host, username, &policies).await
408
                    {
409
                        error!("[{}] patch RabbitMQ {} error: {}", fn_name, username, e);
410
                        return Err(e.into_response());
411
14
                    }
412
16
                }
413
            }
414
        },
415
34
        "mqtt" | "mqtts" => match &state.mqtt {
416
17
            MqttState::Emqx(opts) => {
417
17
                let client = state.client.clone();
418
17
                let host = rsc.host;
419
17
                let username = rsc.username;
420
17
                let password = rsc.password;
421
17
                let clear_rsc = ClearQueueResource {
422
17
                    scheme,
423
17
                    host,
424
17
                    username,
425
17
                };
426
17
                if let Err(e) = emqx::post_user(
427
17
                    &client,
428
17
                    opts,
429
17
                    host,
430
17
                    opts.api_key.as_str(),
431
17
                    opts.api_secret.as_str(),
432
17
                    true,
433
17
                )
434
69
                .await
435
                {
436
                    error!("[{}] add EMQX user {} error: {}", fn_name, username, e);
437
                    return Err(e.into_response());
438
17
                }
439
                if let Err(e) =
440
17
                    emqx::post_user(&client, opts, host, username, password, false).await
441
                {
442
                    error!("[{}] add EMQX user {} error: {}", fn_name, username, e);
443
                    return Err(e.into_response());
444
17
                }
445
17
                if let Err(e) = emqx::post_acl(&client, opts, host, rsc.q_type, username).await {
446
                    let _ = clear_queue_rsc(fn_name, &state, &clear_rsc);
447
                    error!("[{}] add EMQX ACL {} error: {}", fn_name, username, e);
448
                    return Err(e.into_response());
449
17
                }
450
                if let Err(e) =
451
68
                    emqx::post_topic_metrics(&client, opts, host, rsc.q_type, username).await
452
                {
453
                    let _ = clear_queue_rsc(fn_name, &state, &clear_rsc);
454
                    error!("[{}] add EMQX metrics {} error: {}", fn_name, username, e);
455
                    return Err(e.into_response());
456
17
                }
457
            }
458
17
            MqttState::Rumqttd => {}
459
        },
460
        _ => return Err(ErrResp::ErrParam(Some("unsupport scheme".to_string())).into_response()),
461
    }
462
64
    Ok(())
463
64
}
464

            
465
/// To clear queue resources (vhost, ACL, ...) in the broker.
466
20
async fn clear_queue_rsc<'a>(
467
20
    fn_name: &str,
468
20
    state: &AppState,
469
20
    rsc: &ClearQueueResource<'a>,
470
20
) -> Result<(), Response> {
471
20
    match rsc.scheme {
472
20
        "amqp" | "amqps" => match &state.amqp {
473
10
            AmqpState::RabbitMq(opts) => {
474
10
                let client = state.client.clone();
475
18
                if let Err(e) = rabbitmq::delete_user(&client, opts, rsc.host, rsc.username).await {
476
                    error!(
477
                        "[{}] clear RabbitMQ user {} error: {}",
478
                        fn_name, rsc.username, e
479
                    );
480
                    return Err(e.into_response());
481
10
                }
482
10
                if let Err(e) = rabbitmq::delete_vhost(&client, opts, rsc.host, rsc.username).await
483
                {
484
                    error!(
485
                        "[{}] clear RabbitMQ vhost {} error: {}",
486
                        fn_name, rsc.username, e
487
                    );
488
                    return Err(e.into_response());
489
10
                }
490
            }
491
        },
492
10
        "mqtt" | "mqtts" => match &state.mqtt {
493
5
            MqttState::Emqx(opts) => {
494
5
                let client = state.client.clone();
495
5
                if let Err(e) = emqx::delete_user(&client, opts, rsc.host, rsc.username).await {
496
                    error!(
497
                        "[{}] clear EMQX user {} error: {}",
498
                        fn_name, rsc.username, e
499
                    );
500
                    return Err(e.into_response());
501
5
                }
502
5
                let q_type = QueueType::Application;
503
5
                if let Err(e) = emqx::delete_acl(&client, opts, rsc.host, rsc.username).await {
504
                    error!("[{}] clear EMQX ACL {} error: {}", fn_name, rsc.username, e);
505
                    return Err(e.into_response());
506
5
                }
507
                if let Err(e) =
508
20
                    emqx::delete_topic_metrics(&client, opts, rsc.host, q_type, rsc.username).await
509
                {
510
                    error!(
511
                        "[{}] clear EMQX topic metrics {} error: {}",
512
                        fn_name, rsc.username, e
513
                    );
514
                    return Err(e.into_response());
515
5
                }
516
            }
517
5
            MqttState::Rumqttd => {}
518
        },
519
        _ => {}
520
    }
521
20
    Ok(())
522
20
}
523

            
524
/// To clear new resources after something wrong when patching the application/network.
525
async fn clear_patch_host(fn_name: &str, state: &AppState, patch_host: &Option<PatchHost>) {
526
    if let Some(patch_host) = patch_host {
527
        if let Some(host) = patch_host.host_uri.host_str() {
528
            let clear_rsc = ClearQueueResource {
529
                scheme: patch_host.host_uri.scheme(),
530
                host,
531
                username: patch_host.username.as_str(),
532
            };
533
            let _ = clear_queue_rsc(fn_name, &state, &clear_rsc);
534
        }
535
    }
536
}
537

            
538
/// To composite management plugin's information in the URI for sylvia-iot-broker.
539
64
fn transfer_host_uri(state: &AppState, host_uri: &mut Url, mq_username: &str) {
540
64
    match host_uri.scheme() {
541
64
        "amqp" | "amqps" => match &state.amqp {
542
30
            AmqpState::RabbitMq(opts) => {
543
30
                let _ = host_uri.set_username(opts.username.as_str());
544
30
                let _ = host_uri.set_password(Some(opts.password.as_str()));
545
30
                let _ = host_uri.set_path(mq_username);
546
30
            }
547
        },
548
34
        "mqtt" | "mqtts" => match &state.mqtt {
549
17
            MqttState::Emqx(opts) => {
550
17
                let _ = host_uri.set_username(opts.api_key.as_str());
551
17
                let _ = host_uri.set_password(Some(opts.api_secret.as_str()));
552
17
            }
553
17
            MqttState::Rumqttd => {}
554
        },
555
        _ => {}
556
    }
557
64
}
558

            
559
/// Truncates the host (from sylvia-iot-broker) to `scheme://host:port`.
560
1640
fn trunc_host_uri(host_uri: &Url) -> String {
561
1640
    let mut new_uri = host_uri.clone();
562
1640
    let _ = new_uri.set_username("");
563
1640
    let _ = new_uri.set_password(None);
564
1640
    new_uri.set_path("");
565
1640
    new_uri.to_string()
566
1640
}