1
use axum::{
2
    body::{self, Body},
3
    extract::Request,
4
    http::{HeaderValue, StatusCode, header, response::Builder},
5
    response::{IntoResponse, Response},
6
};
7
use log::error;
8
use reqwest::{self, Client, Method};
9
use serde_urlencoded;
10
use url::Url;
11

            
12
use sylvia_iot_corelib::err::ErrResp;
13

            
14
use super::{AmqpState, MqttState, State as AppState};
15
use crate::libs::mq::{QueueType, emqx, rabbitmq};
16

            
17
pub mod application;
18
pub mod auth;
19
pub mod client;
20
pub mod device;
21
pub mod device_route;
22
pub mod dldata_buffer;
23
pub mod network;
24
pub mod network_route;
25
mod request;
26
mod response;
27
pub mod unit;
28
pub mod user;
29

            
30
enum ListResp {
31
    /// The complete response. Return this directly.
32
    Axum(Response),
33
    /// Get body from [`reqwest::Response`].
34
    /// Use [`axum::http::response::Builder`] to build response body.
35
    ArrayStream(reqwest::Response, Builder),
36
}
37

            
38
struct CreateQueueResource<'a> {
39
    scheme: &'a str,
40
    host: &'a str,
41
    username: &'a str,
42
    password: &'a str,
43
    ttl: Option<usize>,
44
    length: Option<usize>,
45
    q_type: QueueType,
46
}
47

            
48
struct ClearQueueResource<'a> {
49
    scheme: &'a str,
50
    host: &'a str,
51
    username: &'a str,
52
    q_type: QueueType,
53
}
54

            
55
struct PatchHost {
56
    host_uri: Url,
57
    username: String,
58
}
59

            
60
/// To launch a HTTP request as bridge from coremgr to auth/broker.
61
196
async fn api_bridge(fn_name: &str, client: &Client, req: Request, api_path: &str) -> Response {
62
196
    let (mut parts, body) = req.into_parts();
63

            
64
196
    parts.headers.remove(header::CONTENT_LENGTH);
65
196
    let mut builder = client
66
196
        .request(parts.method, api_path)
67
196
        .headers(parts.headers);
68
196
    if let Some(query_str) = parts.uri.query() {
69
4
        match serde_urlencoded::from_str::<Vec<(String, String)>>(query_str) {
70
            Err(e) => {
71
                let e = format!("parse query error: {}", e);
72
                return ErrResp::ErrParam(Some(e)).into_response();
73
            }
74
4
            Ok(query) => builder = builder.query(&query),
75
        }
76
192
    }
77
196
    match body::to_bytes(body, usize::MAX).await {
78
        Err(e) => {
79
            let e = format!("convert body error: {}", e);
80
            return ErrResp::ErrParam(Some(e)).into_response();
81
        }
82
196
        Ok(body) => builder = builder.body(body),
83
    }
84
196
    let api_req = match builder.build() {
85
        Err(e) => {
86
            let e = format!("generate request error: {}", e);
87
            error!("[{}] {}", fn_name, e);
88
            return ErrResp::ErrRsc(Some(e)).into_response();
89
        }
90
196
        Ok(req) => req,
91
    };
92
196
    let api_resp = match client.execute(api_req).await {
93
        Err(e) => {
94
            let e = format!("execute request error: {}", e);
95
            error!("[{}] {}", fn_name, e);
96
            return ErrResp::ErrIntMsg(Some(e)).into_response();
97
        }
98
196
        Ok(resp) => resp,
99
    };
100

            
101
196
    let mut resp_builder = Response::builder().status(api_resp.status());
102
548
    for (k, v) in api_resp.headers() {
103
548
        if k == reqwest::header::CONTENT_LENGTH {
104
176
            continue;
105
372
        }
106
372
        resp_builder = resp_builder.header(k, v);
107
    }
108
196
    match resp_builder.body(Body::from_stream(api_resp.bytes_stream())) {
109
        Err(e) => {
110
            let e = format!("wrap response body error: {}", e);
111
            error!("[{}] {}", fn_name, e);
112
            ErrResp::ErrIntMsg(Some(e)).into_response()
113
        }
114
196
        Ok(resp) => resp,
115
    }
116
196
}
117

            
118
/// To launch a HTTP request for one `/list` API as bridge from coremgr to auth/broker.
119
208
async fn list_api_bridge(
120
208
    fn_name: &str,
121
208
    client: &Client,
122
208
    req: Request,
123
208
    api_path: &str,
124
208
    force_array: bool,
125
208
    csv_file: &str,
126
208
) -> ListResp {
127
208
    let (mut parts, _body) = req.into_parts();
128
208
    parts.headers.remove(header::CONTENT_LENGTH);
129

            
130
208
    let mut is_csv = false;
131
208
    let mut builder = client
132
208
        .request(parts.method, api_path)
133
208
        .headers(parts.headers);
134
208
    if let Some(query_str) = parts.uri.query() {
135
164
        let query = match serde_urlencoded::from_str::<Vec<(String, String)>>(query_str) {
136
            Err(e) => {
137
                let e = format!("parse query error: {}", e);
138
                return ListResp::Axum(ErrResp::ErrParam(Some(e)).into_response());
139
            }
140
164
            Ok(query) => query,
141
        };
142
164
        let mut has_format = false;
143
164
        let mut query: Vec<_> = query
144
164
            .iter()
145
244
            .map(|(k, v)| {
146
244
                if k.as_str().eq("format") {
147
120
                    has_format = true;
148
120
                    if v.as_str().eq("csv") {
149
40
                        is_csv = true;
150
40
                        ("format".to_string(), "array".to_string())
151
                    } else {
152
80
                        (k.clone(), v.clone())
153
                    }
154
                } else {
155
124
                    (k.clone(), v.clone())
156
                }
157
244
            })
158
164
            .collect();
159
164
        if force_array && !has_format {
160
8
            query.push(("format".to_string(), "array".to_string()));
161
156
        }
162
164
        builder = builder.query(&query);
163
44
    } else if force_array {
164
8
        builder = builder.query(&vec![("format", "array")]);
165
36
    }
166
208
    let api_req = match builder.build() {
167
        Err(e) => {
168
            let e = format!("generate request error: {}", e);
169
            error!("[{}] {}", fn_name, e);
170
            return ListResp::Axum(ErrResp::ErrRsc(Some(e)).into_response());
171
        }
172
208
        Ok(req) => req,
173
    };
174
208
    let api_resp = match client.execute(api_req).await {
175
        Err(e) => {
176
            let e = format!("execute request error: {}", e);
177
            error!("[{}] {}", fn_name, e);
178
            return ListResp::Axum(ErrResp::ErrIntMsg(Some(e)).into_response());
179
        }
180
208
        Ok(resp) => resp,
181
    };
182

            
183
208
    let mut resp_builder = Response::builder().status(api_resp.status());
184
208
    if is_csv {
185
40
        resp_builder = resp_builder
186
40
            .header(header::CONTENT_TYPE, "text/csv")
187
40
            .header(
188
40
                header::CONTENT_DISPOSITION,
189
40
                format!("attachment;filename={}.csv", csv_file),
190
            );
191
40
        if let Some(auth) = api_resp.headers().get(header::WWW_AUTHENTICATE) {
192
            resp_builder = resp_builder.header(header::WWW_AUTHENTICATE, auth.clone());
193
40
        }
194
    } else {
195
504
        for (k, v) in api_resp.headers() {
196
504
            if k == reqwest::header::CONTENT_LENGTH {
197
48
                continue;
198
456
            }
199
456
            resp_builder = resp_builder.header(k, v);
200
        }
201
    }
202
208
    if api_resp.status() == reqwest::StatusCode::OK && (is_csv || force_array) {
203
64
        return ListResp::ArrayStream(api_resp, resp_builder);
204
144
    }
205
144
    match resp_builder.body(Body::from_stream(api_resp.bytes_stream())) {
206
        Err(e) => {
207
            let e = format!("wrap response body error: {}", e);
208
            error!("[{}] {}", fn_name, e);
209
            ListResp::Axum(ErrResp::ErrIntMsg(Some(e)).into_response())
210
        }
211
144
        Ok(resp) => ListResp::Axum(resp),
212
    }
213
208
}
214

            
215
16
async fn get_tokeninfo_inner(
216
16
    fn_name: &str,
217
16
    client: &Client,
218
16
    auth_base: &str,
219
16
    token: &HeaderValue,
220
16
) -> Result<response::TokenInfo, Response> {
221
16
    let uri = format!("{}/api/v1/auth/tokeninfo", auth_base);
222
16
    let resp = get_stream_resp(fn_name, token, &client, uri.as_str()).await?;
223
16
    match resp.json::<response::GetTokenInfo>().await {
224
        Err(e) => {
225
            let e = format!("wrong response of token info: {}", e);
226
            error!("[{}] {}", fn_name, e);
227
            Err(ErrResp::ErrIntMsg(Some(e)).into_response())
228
        }
229
16
        Ok(info) => Ok(info.data),
230
    }
231
16
}
232

            
233
144
async fn get_unit_inner(
234
144
    fn_name: &str,
235
144
    client: &Client,
236
144
    broker_base: &str,
237
144
    unit_id: &str,
238
144
    token: &HeaderValue,
239
144
) -> Result<Option<response::Unit>, Response> {
240
144
    let uri = format!("{}/api/v1/unit/{}", broker_base, unit_id);
241
144
    match get_stream_resp(fn_name, token, &client, uri.as_str()).await {
242
8
        Err(resp) => match resp.status() {
243
8
            StatusCode::NOT_FOUND => Ok(None),
244
            _ => Err(resp),
245
        },
246
136
        Ok(resp) => match resp.json::<response::GetUnit>().await {
247
            Err(e) => {
248
                let e = format!("wrong response of unit: {}", e);
249
                error!("[{}] {}", fn_name, e);
250
                Err(ErrResp::ErrIntMsg(Some(e)).into_response())
251
            }
252
136
            Ok(unit) => Ok(Some(unit.data)),
253
        },
254
    }
255
144
}
256

            
257
24
async fn get_device_inner(
258
24
    fn_name: &str,
259
24
    client: &Client,
260
24
    broker_base: &str,
261
24
    device_id: &str,
262
24
    token: &HeaderValue,
263
24
) -> Result<Option<response::Device>, Response> {
264
24
    let uri = format!("{}/api/v1/device/{}", broker_base, device_id);
265
24
    match get_stream_resp(fn_name, token, &client, uri.as_str()).await {
266
8
        Err(resp) => match resp.status() {
267
8
            StatusCode::NOT_FOUND => Ok(None),
268
            _ => Err(resp),
269
        },
270
16
        Ok(resp) => match resp.json::<response::GetDevice>().await {
271
            Err(e) => {
272
                let e = format!("wrong response of device: {}", e);
273
                error!("[{}] {}", fn_name, e);
274
                Err(ErrResp::ErrIntMsg(Some(e)).into_response())
275
            }
276
16
            Ok(device) => Ok(Some(device.data)),
277
        },
278
    }
279
24
}
280

            
281
604
async fn get_stream_resp(
282
604
    fn_name: &str,
283
604
    token: &HeaderValue,
284
604
    client: &Client,
285
604
    uri: &str,
286
604
) -> Result<reqwest::Response, Response> {
287
604
    match client
288
604
        .request(Method::GET, uri)
289
604
        .header(reqwest::header::AUTHORIZATION, token)
290
604
        .build()
291
    {
292
        Err(e) => {
293
            let e = format!("generate request error: {}", e);
294
            error!("[{}] {}", fn_name, e);
295
            Err(ErrResp::ErrRsc(Some(e)).into_response())
296
        }
297
604
        Ok(req) => match client.execute(req).await {
298
            Err(e) => {
299
                let e = format!("execute request error: {}", e);
300
                error!("[{}] {}", fn_name, e);
301
                Err(ErrResp::ErrIntMsg(Some(e)).into_response())
302
            }
303
604
            Ok(resp) => match resp.status() {
304
548
                StatusCode::OK => Ok(resp),
305
                _ => {
306
56
                    let mut resp_builder = Response::builder().status(resp.status());
307
168
                    for (k, v) in resp.headers() {
308
168
                        resp_builder = resp_builder.header(k, v);
309
168
                    }
310
56
                    match resp_builder.body(Body::from_stream(resp.bytes_stream())) {
311
                        Err(e) => {
312
                            let e = format!("wrap response body error: {}", e);
313
                            error!("[{}] {}", fn_name, e);
314
                            Err(ErrResp::ErrIntMsg(Some(e)).into_response())
315
                        }
316
56
                        Ok(resp) => Err(resp),
317
                    }
318
                }
319
            },
320
        },
321
    }
322
604
}
323

            
324
/// To compare if two broker hosts are the same.
325
///
326
/// For example:
327
/// - `amqp://localhost` is the same as `amqp://localhost:5672`
328
/// - `mqtts://localhost` is the same as `mqtts://localhost:8883`
329
40
fn cmp_host_uri(src: &str, dst: &str) -> bool {
330
40
    let src_uri = match Url::parse(src) {
331
        Err(_) => return false,
332
40
        Ok(uri) => uri,
333
    };
334
40
    let dst_uri = match Url::parse(dst) {
335
        Err(_) => return false,
336
40
        Ok(uri) => uri,
337
    };
338
40
    if src_uri.scheme() != dst_uri.scheme() || src_uri.host_str() != dst_uri.host_str() {
339
40
        return false;
340
    }
341
    let src_port = match src_uri.port() {
342
        None => match src_uri.scheme() {
343
            "amqp" => 5672,
344
            "amqps" => 5671,
345
            "mqtt" => 1883,
346
            "mqtts" => 8883,
347
            _ => return false,
348
        },
349
        Some(port) => port,
350
    };
351
    let dst_port = match dst_uri.port() {
352
        None => match dst_uri.scheme() {
353
            "amqp" => 5672,
354
            "amqps" => 5671,
355
            "mqtt" => 1883,
356
            "mqtts" => 8883,
357
            _ => return false,
358
        },
359
        Some(port) => port,
360
    };
361
    src_port == dst_port
362
40
}
363

            
364
/// To set-up queue resources (vhost, ACL, ...) in the broker.
365
128
async fn create_queue_rsc<'a>(
366
128
    fn_name: &str,
367
128
    state: &AppState,
368
128
    rsc: &CreateQueueResource<'a>,
369
128
) -> Result<(), Response> {
370
128
    let scheme = rsc.scheme;
371
128
    match scheme {
372
128
        "amqp" | "amqps" => match &state.amqp {
373
60
            AmqpState::RabbitMq(opts) => {
374
60
                let client = state.client.clone();
375
60
                let host = rsc.host;
376
60
                let username = rsc.username;
377
60
                let password = rsc.password;
378
60
                let clear_rsc = ClearQueueResource {
379
60
                    scheme,
380
60
                    host,
381
60
                    username,
382
60
                    q_type: rsc.q_type,
383
60
                };
384
60
                if let Err(e) = rabbitmq::put_user(&client, opts, host, username, password).await {
385
                    error!("[{}] add RabbitMQ user {} error: {}", fn_name, username, e);
386
                    return Err(e.into_response());
387
60
                }
388
60
                if let Err(e) = rabbitmq::put_vhost(&client, opts, host, username).await {
389
                    let _ = clear_queue_rsc(fn_name, &state, &clear_rsc);
390
                    error!("[{}] add RabbitMQ vhost {} error: {}", fn_name, username, e);
391
                    return Err(e.into_response());
392
60
                }
393
                if let Err(e) =
394
60
                    rabbitmq::put_permissions(&client, opts, host, rsc.q_type, username).await
395
                {
396
                    let _ = clear_queue_rsc(fn_name, &state, &clear_rsc);
397
                    error!(
398
                        "[{}] add RabbitMQ permission {} error: {}",
399
                        fn_name, username, e
400
                    );
401
                    return Err(e.into_response());
402
60
                }
403
60
                if rsc.ttl.is_some() && rsc.length.is_some() {
404
28
                    let policies = rabbitmq::BrokerPolicies {
405
28
                        ttl: rsc.ttl,
406
28
                        length: rsc.length,
407
28
                    };
408
                    if let Err(e) =
409
28
                        rabbitmq::put_policies(&client, opts, host, username, &policies).await
410
                    {
411
                        let _ = clear_queue_rsc(fn_name, &state, &clear_rsc);
412
                        error!("[{}] patch RabbitMQ {} error: {}", fn_name, username, e);
413
                        return Err(e.into_response());
414
28
                    }
415
32
                }
416
            }
417
        },
418
68
        "mqtt" | "mqtts" => match &state.mqtt {
419
34
            MqttState::Emqx(opts) => {
420
34
                let client = state.client.clone();
421
34
                let host = rsc.host;
422
34
                let username = rsc.username;
423
34
                let password = rsc.password;
424
34
                let clear_rsc = ClearQueueResource {
425
34
                    scheme,
426
34
                    host,
427
34
                    username,
428
34
                    q_type: rsc.q_type,
429
34
                };
430
34
                if let Err(e) = emqx::post_user(
431
34
                    &client,
432
34
                    opts,
433
34
                    host,
434
34
                    opts.api_key.as_str(),
435
34
                    opts.api_secret.as_str(),
436
                    true,
437
                )
438
34
                .await
439
                {
440
                    error!("[{}] add EMQX user {} error: {}", fn_name, username, e);
441
                    return Err(e.into_response());
442
34
                }
443
                if let Err(e) =
444
34
                    emqx::post_user(&client, opts, host, username, password, false).await
445
                {
446
                    error!("[{}] add EMQX user {} error: {}", fn_name, username, e);
447
                    return Err(e.into_response());
448
34
                }
449
34
                if let Err(e) = emqx::post_acl(&client, opts, host, rsc.q_type, username).await {
450
                    let _ = clear_queue_rsc(fn_name, &state, &clear_rsc);
451
                    error!("[{}] add EMQX ACL {} error: {}", fn_name, username, e);
452
                    return Err(e.into_response());
453
34
                }
454
                if let Err(e) =
455
34
                    emqx::post_topic_metrics(&client, opts, host, rsc.q_type, username).await
456
                {
457
                    let _ = clear_queue_rsc(fn_name, &state, &clear_rsc);
458
                    error!("[{}] add EMQX metrics {} error: {}", fn_name, username, e);
459
                    return Err(e.into_response());
460
34
                }
461
            }
462
34
            MqttState::Rumqttd => {}
463
        },
464
        _ => return Err(ErrResp::ErrParam(Some("unsupport scheme".to_string())).into_response()),
465
    }
466
128
    Ok(())
467
128
}
468

            
469
/// To clear queue resources (vhost, ACL, ...) in the broker.
470
40
async fn clear_queue_rsc<'a>(
471
40
    fn_name: &str,
472
40
    state: &AppState,
473
40
    rsc: &ClearQueueResource<'a>,
474
40
) -> Result<(), Response> {
475
40
    match rsc.scheme {
476
40
        "amqp" | "amqps" => match &state.amqp {
477
20
            AmqpState::RabbitMq(opts) => {
478
20
                let client = state.client.clone();
479
20
                if let Err(e) = rabbitmq::delete_user(&client, opts, rsc.host, rsc.username).await {
480
                    error!(
481
                        "[{}] clear RabbitMQ user {} error: {}",
482
                        fn_name, rsc.username, e
483
                    );
484
                    return Err(e.into_response());
485
20
                }
486
20
                if let Err(e) = rabbitmq::delete_vhost(&client, opts, rsc.host, rsc.username).await
487
                {
488
                    error!(
489
                        "[{}] clear RabbitMQ vhost {} error: {}",
490
                        fn_name, rsc.username, e
491
                    );
492
                    return Err(e.into_response());
493
20
                }
494
            }
495
        },
496
20
        "mqtt" | "mqtts" => match &state.mqtt {
497
10
            MqttState::Emqx(opts) => {
498
10
                let client = state.client.clone();
499
10
                if let Err(e) = emqx::delete_user(&client, opts, rsc.host, rsc.username).await {
500
                    error!(
501
                        "[{}] clear EMQX user {} error: {}",
502
                        fn_name, rsc.username, e
503
                    );
504
                    return Err(e.into_response());
505
10
                }
506
10
                if let Err(e) = emqx::delete_acl(&client, opts, rsc.host, rsc.username).await {
507
                    error!("[{}] clear EMQX ACL {} error: {}", fn_name, rsc.username, e);
508
                    return Err(e.into_response());
509
10
                }
510
                if let Err(e) =
511
10
                    emqx::delete_topic_metrics(&client, opts, rsc.host, rsc.q_type, rsc.username)
512
10
                        .await
513
                {
514
                    error!(
515
                        "[{}] clear EMQX topic metrics {} error: {}",
516
                        fn_name, rsc.username, e
517
                    );
518
                    return Err(e.into_response());
519
10
                }
520
            }
521
10
            MqttState::Rumqttd => {}
522
        },
523
        _ => {}
524
    }
525
40
    Ok(())
526
40
}
527

            
528
/// To clear new resources after something wrong when patching the application/network.
529
async fn clear_patch_host(
530
    fn_name: &str,
531
    state: &AppState,
532
    patch_host: &Option<PatchHost>,
533
    q_type: QueueType,
534
) {
535
    if let Some(patch_host) = patch_host {
536
        if let Some(host) = patch_host.host_uri.host_str() {
537
            let clear_rsc = ClearQueueResource {
538
                scheme: patch_host.host_uri.scheme(),
539
                host,
540
                username: patch_host.username.as_str(),
541
                q_type,
542
            };
543
            let _ = clear_queue_rsc(fn_name, &state, &clear_rsc);
544
        }
545
    }
546
}
547

            
548
/// To composite management plugin's information in the URI for sylvia-iot-broker.
549
128
fn transfer_host_uri(state: &AppState, host_uri: &mut Url, mq_username: &str) {
550
128
    match host_uri.scheme() {
551
128
        "amqp" | "amqps" => match &state.amqp {
552
60
            AmqpState::RabbitMq(opts) => {
553
60
                let _ = host_uri.set_username(opts.username.as_str());
554
60
                let _ = host_uri.set_password(Some(opts.password.as_str()));
555
60
                let _ = host_uri.set_path(mq_username);
556
60
            }
557
        },
558
68
        "mqtt" | "mqtts" => match &state.mqtt {
559
34
            MqttState::Emqx(opts) => {
560
34
                let _ = host_uri.set_username(opts.api_key.as_str());
561
34
                let _ = host_uri.set_password(Some(opts.api_secret.as_str()));
562
34
            }
563
34
            MqttState::Rumqttd => {}
564
        },
565
        _ => {}
566
    }
567
128
}
568

            
569
/// Truncates the host (from sylvia-iot-broker) to `scheme://host:port`.
570
3280
fn trunc_host_uri(host_uri: &Url) -> String {
571
3280
    let mut new_uri = host_uri.clone();
572
3280
    let _ = new_uri.set_username("");
573
3280
    let _ = new_uri.set_password(None);
574
3280
    new_uri.set_path("");
575
3280
    new_uri.to_string()
576
3280
}