1
use axum::{
2
    body::{self, Body},
3
    extract::Request,
4
    http::{header, response::Builder, HeaderValue, StatusCode},
5
    response::{IntoResponse, Response},
6
};
7
use log::error;
8
use reqwest::{self, Client, Method};
9
use serde_urlencoded;
10
use url::Url;
11

            
12
use sylvia_iot_corelib::err::ErrResp;
13

            
14
use super::{AmqpState, MqttState, State as AppState};
15
use crate::libs::mq::{emqx, rabbitmq, QueueType};
16

            
17
pub mod application;
18
pub mod auth;
19
pub mod client;
20
pub mod device;
21
pub mod device_route;
22
pub mod dldata_buffer;
23
pub mod network;
24
pub mod network_route;
25
mod request;
26
mod response;
27
pub mod unit;
28
pub mod user;
29

            
30
enum ListResp {
31
    /// The complete response. Return this directly.
32
    Axum(Response),
33
    /// Get body from [`reqwest::Response`].
34
    /// Use [`axum::http::response::Builder`] to build response body.
35
    ArrayStream(reqwest::Response, Builder),
36
}
37

            
38
struct CreateQueueResource<'a> {
39
    scheme: &'a str,
40
    host: &'a str,
41
    username: &'a str,
42
    password: &'a str,
43
    ttl: Option<usize>,
44
    length: Option<usize>,
45
    q_type: QueueType,
46
}
47

            
48
struct ClearQueueResource<'a> {
49
    scheme: &'a str,
50
    host: &'a str,
51
    username: &'a str,
52
}
53

            
54
struct PatchHost {
55
    host_uri: Url,
56
    username: String,
57
}
58

            
59
/// To launch a HTTP request as bridge from coremgr to auth/broker.
60
196
async fn api_bridge(fn_name: &str, client: &Client, req: Request, api_path: &str) -> Response {
61
196
    let (mut parts, body) = req.into_parts();
62
196

            
63
196
    parts.headers.remove(header::CONTENT_LENGTH);
64
196
    let mut builder = client
65
196
        .request(parts.method, api_path)
66
196
        .headers(parts.headers);
67
196
    if let Some(query_str) = parts.uri.query() {
68
4
        match serde_urlencoded::from_str::<Vec<(String, String)>>(query_str) {
69
            Err(e) => {
70
                let e = format!("parse query error: {}", e);
71
                return ErrResp::ErrParam(Some(e)).into_response();
72
            }
73
4
            Ok(query) => builder = builder.query(&query),
74
        }
75
192
    }
76
196
    match body::to_bytes(body, usize::MAX).await {
77
        Err(e) => {
78
            let e = format!("convert body error: {}", e);
79
            return ErrResp::ErrParam(Some(e)).into_response();
80
        }
81
196
        Ok(body) => builder = builder.body(body),
82
    }
83
196
    let api_req = match builder.build() {
84
        Err(e) => {
85
            let e = format!("generate request error: {}", e);
86
            error!("[{}] {}", fn_name, e);
87
            return ErrResp::ErrRsc(Some(e)).into_response();
88
        }
89
196
        Ok(req) => req,
90
    };
91
196
    let api_resp = match client.execute(api_req).await {
92
        Err(e) => {
93
            let e = format!("execute request error: {}", e);
94
            error!("[{}] {}", fn_name, e);
95
            return ErrResp::ErrIntMsg(Some(e)).into_response();
96
        }
97
196
        Ok(resp) => resp,
98
196
    };
99
196

            
100
196
    let mut resp_builder = Response::builder().status(api_resp.status());
101
548
    for (k, v) in api_resp.headers() {
102
548
        if k == reqwest::header::CONTENT_LENGTH {
103
176
            continue;
104
372
        }
105
372
        resp_builder = resp_builder.header(k, v);
106
    }
107
196
    match resp_builder.body(Body::from_stream(api_resp.bytes_stream())) {
108
        Err(e) => {
109
            let e = format!("wrap response body error: {}", e);
110
            error!("[{}] {}", fn_name, e);
111
            ErrResp::ErrIntMsg(Some(e)).into_response()
112
        }
113
196
        Ok(resp) => resp,
114
    }
115
196
}
116

            
117
/// To launch a HTTP request for one `/list` API as bridge from coremgr to auth/broker.
118
208
async fn list_api_bridge(
119
208
    fn_name: &str,
120
208
    client: &Client,
121
208
    req: Request,
122
208
    api_path: &str,
123
208
    force_array: bool,
124
208
    csv_file: &str,
125
208
) -> ListResp {
126
208
    let (mut parts, _body) = req.into_parts();
127
208
    parts.headers.remove(header::CONTENT_LENGTH);
128
208

            
129
208
    let mut is_csv = false;
130
208
    let mut builder = client
131
208
        .request(parts.method, api_path)
132
208
        .headers(parts.headers);
133
208
    if let Some(query_str) = parts.uri.query() {
134
164
        let query = match serde_urlencoded::from_str::<Vec<(String, String)>>(query_str) {
135
            Err(e) => {
136
                let e = format!("parse query error: {}", e);
137
                return ListResp::Axum(ErrResp::ErrParam(Some(e)).into_response());
138
            }
139
164
            Ok(query) => query,
140
164
        };
141
164
        let mut has_format = false;
142
164
        let mut query: Vec<_> = query
143
164
            .iter()
144
244
            .map(|(k, v)| {
145
244
                if k.as_str().eq("format") {
146
120
                    has_format = true;
147
120
                    if v.as_str().eq("csv") {
148
40
                        is_csv = true;
149
40
                        ("format".to_string(), "array".to_string())
150
                    } else {
151
80
                        (k.clone(), v.clone())
152
                    }
153
                } else {
154
124
                    (k.clone(), v.clone())
155
                }
156
244
            })
157
164
            .collect();
158
164
        if force_array && !has_format {
159
8
            query.push(("format".to_string(), "array".to_string()));
160
156
        }
161
164
        builder = builder.query(&query);
162
44
    } else if force_array {
163
8
        builder = builder.query(&vec![("format", "array")]);
164
36
    }
165
208
    let api_req = match builder.build() {
166
        Err(e) => {
167
            let e = format!("generate request error: {}", e);
168
            error!("[{}] {}", fn_name, e);
169
            return ListResp::Axum(ErrResp::ErrRsc(Some(e)).into_response());
170
        }
171
208
        Ok(req) => req,
172
    };
173
208
    let api_resp = match client.execute(api_req).await {
174
        Err(e) => {
175
            let e = format!("execute request error: {}", e);
176
            error!("[{}] {}", fn_name, e);
177
            return ListResp::Axum(ErrResp::ErrIntMsg(Some(e)).into_response());
178
        }
179
208
        Ok(resp) => resp,
180
208
    };
181
208

            
182
208
    let mut resp_builder = Response::builder().status(api_resp.status());
183
208
    if is_csv {
184
40
        resp_builder = resp_builder
185
40
            .header(header::CONTENT_TYPE, "text/csv")
186
40
            .header(
187
40
                header::CONTENT_DISPOSITION,
188
40
                format!("attachment;filename={}.csv", csv_file),
189
40
            );
190
40
        if let Some(auth) = api_resp.headers().get(header::WWW_AUTHENTICATE) {
191
            resp_builder = resp_builder.header(header::WWW_AUTHENTICATE, auth.clone());
192
40
        }
193
    } else {
194
504
        for (k, v) in api_resp.headers() {
195
504
            if k == reqwest::header::CONTENT_LENGTH {
196
48
                continue;
197
456
            }
198
456
            resp_builder = resp_builder.header(k, v);
199
        }
200
    }
201
208
    if api_resp.status() == reqwest::StatusCode::OK && (is_csv || force_array) {
202
64
        return ListResp::ArrayStream(api_resp, resp_builder);
203
144
    }
204
144
    match resp_builder.body(Body::from_stream(api_resp.bytes_stream())) {
205
        Err(e) => {
206
            let e = format!("wrap response body error: {}", e);
207
            error!("[{}] {}", fn_name, e);
208
            ListResp::Axum(ErrResp::ErrIntMsg(Some(e)).into_response())
209
        }
210
144
        Ok(resp) => ListResp::Axum(resp),
211
    }
212
208
}
213

            
214
16
async fn get_tokeninfo_inner(
215
16
    fn_name: &str,
216
16
    client: &Client,
217
16
    auth_base: &str,
218
16
    token: &HeaderValue,
219
16
) -> Result<response::TokenInfo, Response> {
220
16
    let uri = format!("{}/api/v1/auth/tokeninfo", auth_base);
221
16
    let resp = get_stream_resp(fn_name, token, &client, uri.as_str()).await?;
222
16
    match resp.json::<response::GetTokenInfo>().await {
223
        Err(e) => {
224
            let e = format!("wrong response of token info: {}", e);
225
            error!("[{}] {}", fn_name, e);
226
            Err(ErrResp::ErrIntMsg(Some(e)).into_response())
227
        }
228
16
        Ok(info) => Ok(info.data),
229
    }
230
16
}
231

            
232
144
async fn get_unit_inner(
233
144
    fn_name: &str,
234
144
    client: &Client,
235
144
    broker_base: &str,
236
144
    unit_id: &str,
237
144
    token: &HeaderValue,
238
144
) -> Result<Option<response::Unit>, Response> {
239
144
    let uri = format!("{}/api/v1/unit/{}", broker_base, unit_id);
240
144
    match get_stream_resp(fn_name, token, &client, uri.as_str()).await {
241
8
        Err(resp) => match resp.status() {
242
8
            StatusCode::NOT_FOUND => Ok(None),
243
            _ => Err(resp),
244
        },
245
136
        Ok(resp) => match resp.json::<response::GetUnit>().await {
246
            Err(e) => {
247
                let e = format!("wrong response of unit: {}", e);
248
                error!("[{}] {}", fn_name, e);
249
                Err(ErrResp::ErrIntMsg(Some(e)).into_response())
250
            }
251
136
            Ok(unit) => Ok(Some(unit.data)),
252
        },
253
    }
254
144
}
255

            
256
24
async fn get_device_inner(
257
24
    fn_name: &str,
258
24
    client: &Client,
259
24
    broker_base: &str,
260
24
    device_id: &str,
261
24
    token: &HeaderValue,
262
24
) -> Result<Option<response::Device>, Response> {
263
24
    let uri = format!("{}/api/v1/device/{}", broker_base, device_id);
264
24
    match get_stream_resp(fn_name, token, &client, uri.as_str()).await {
265
8
        Err(resp) => match resp.status() {
266
8
            StatusCode::NOT_FOUND => Ok(None),
267
            _ => Err(resp),
268
        },
269
16
        Ok(resp) => match resp.json::<response::GetDevice>().await {
270
            Err(e) => {
271
                let e = format!("wrong response of device: {}", e);
272
                error!("[{}] {}", fn_name, e);
273
                Err(ErrResp::ErrIntMsg(Some(e)).into_response())
274
            }
275
16
            Ok(device) => Ok(Some(device.data)),
276
        },
277
    }
278
24
}
279

            
280
620
async fn get_stream_resp(
281
620
    fn_name: &str,
282
620
    token: &HeaderValue,
283
620
    client: &Client,
284
620
    uri: &str,
285
620
) -> Result<reqwest::Response, Response> {
286
620
    match client
287
620
        .request(Method::GET, uri)
288
620
        .header(reqwest::header::AUTHORIZATION, token)
289
620
        .build()
290
    {
291
        Err(e) => {
292
            let e = format!("generate request error: {}", e);
293
            error!("[{}] {}", fn_name, e);
294
            Err(ErrResp::ErrRsc(Some(e)).into_response())
295
        }
296
620
        Ok(req) => match client.execute(req).await {
297
            Err(e) => {
298
                let e = format!("execute request error: {}", e);
299
                error!("[{}] {}", fn_name, e);
300
                Err(ErrResp::ErrIntMsg(Some(e)).into_response())
301
            }
302
620
            Ok(resp) => match resp.status() {
303
564
                StatusCode::OK => Ok(resp),
304
                _ => {
305
56
                    let mut resp_builder = Response::builder().status(resp.status());
306
168
                    for (k, v) in resp.headers() {
307
168
                        resp_builder = resp_builder.header(k, v);
308
168
                    }
309
56
                    match resp_builder.body(Body::from_stream(resp.bytes_stream())) {
310
                        Err(e) => {
311
                            let e = format!("wrap response body error: {}", e);
312
                            error!("[{}] {}", fn_name, e);
313
                            Err(ErrResp::ErrIntMsg(Some(e)).into_response())
314
                        }
315
56
                        Ok(resp) => Err(resp),
316
                    }
317
                }
318
            },
319
        },
320
    }
321
620
}
322

            
323
/// To compare if two broker hosts are the same.
324
///
325
/// For example:
326
/// - `amqp://localhost` is the same as `amqp://localhost:5672`
327
/// - `mqtts://localhost` is the same as `mqtts://localhost:8883`
328
40
fn cmp_host_uri(src: &str, dst: &str) -> bool {
329
40
    let src_uri = match Url::parse(src) {
330
        Err(_) => return false,
331
40
        Ok(uri) => uri,
332
    };
333
40
    let dst_uri = match Url::parse(dst) {
334
        Err(_) => return false,
335
40
        Ok(uri) => uri,
336
40
    };
337
40
    if src_uri.scheme() != dst_uri.scheme() || src_uri.host_str() != dst_uri.host_str() {
338
40
        return false;
339
    }
340
    let src_port = match src_uri.port() {
341
        None => match src_uri.scheme() {
342
            "amqp" => 5672,
343
            "amqps" => 5671,
344
            "mqtt" => 1883,
345
            "mqtts" => 8883,
346
            _ => return false,
347
        },
348
        Some(port) => port,
349
    };
350
    let dst_port = match dst_uri.port() {
351
        None => match dst_uri.scheme() {
352
            "amqp" => 5672,
353
            "amqps" => 5671,
354
            "mqtt" => 1883,
355
            "mqtts" => 8883,
356
            _ => return false,
357
        },
358
        Some(port) => port,
359
    };
360
    src_port == dst_port
361
40
}
362

            
363
/// To set-up queue resources (vhost, ACL, ...) in the broker.
364
128
async fn create_queue_rsc<'a>(
365
128
    fn_name: &str,
366
128
    state: &AppState,
367
128
    rsc: &CreateQueueResource<'a>,
368
128
) -> Result<(), Response> {
369
128
    let scheme = rsc.scheme;
370
128
    match scheme {
371
128
        "amqp" | "amqps" => match &state.amqp {
372
60
            AmqpState::RabbitMq(opts) => {
373
60
                let client = state.client.clone();
374
60
                let host = rsc.host;
375
60
                let username = rsc.username;
376
60
                let password = rsc.password;
377
60
                let clear_rsc = ClearQueueResource {
378
60
                    scheme,
379
60
                    host,
380
60
                    username,
381
60
                };
382
60
                if let Err(e) = rabbitmq::put_user(&client, opts, host, username, password).await {
383
                    error!("[{}] add RabbitMQ user {} error: {}", fn_name, username, e);
384
                    return Err(e.into_response());
385
60
                }
386
60
                if let Err(e) = rabbitmq::put_vhost(&client, opts, host, username).await {
387
                    let _ = clear_queue_rsc(fn_name, &state, &clear_rsc);
388
                    error!("[{}] add RabbitMQ vhost {} error: {}", fn_name, username, e);
389
                    return Err(e.into_response());
390
60
                }
391
                if let Err(e) =
392
60
                    rabbitmq::put_permissions(&client, opts, host, rsc.q_type, username).await
393
                {
394
                    let _ = clear_queue_rsc(fn_name, &state, &clear_rsc);
395
                    error!(
396
                        "[{}] add RabbitMQ permission {} error: {}",
397
                        fn_name, username, e
398
                    );
399
                    return Err(e.into_response());
400
60
                }
401
60
                if rsc.ttl.is_some() && rsc.ttl.is_some() {
402
28
                    let policies = rabbitmq::BrokerPolicies {
403
28
                        ttl: rsc.ttl,
404
28
                        length: rsc.length,
405
28
                    };
406
                    if let Err(e) =
407
28
                        rabbitmq::put_policies(&client, opts, host, username, &policies).await
408
                    {
409
                        error!("[{}] patch RabbitMQ {} error: {}", fn_name, username, e);
410
                        return Err(e.into_response());
411
28
                    }
412
32
                }
413
            }
414
        },
415
68
        "mqtt" | "mqtts" => match &state.mqtt {
416
34
            MqttState::Emqx(opts) => {
417
34
                let client = state.client.clone();
418
34
                let host = rsc.host;
419
34
                let username = rsc.username;
420
34
                let password = rsc.password;
421
34
                let clear_rsc = ClearQueueResource {
422
34
                    scheme,
423
34
                    host,
424
34
                    username,
425
34
                };
426
34
                if let Err(e) = emqx::post_user(
427
34
                    &client,
428
34
                    opts,
429
34
                    host,
430
34
                    opts.api_key.as_str(),
431
34
                    opts.api_secret.as_str(),
432
34
                    true,
433
34
                )
434
34
                .await
435
                {
436
                    error!("[{}] add EMQX user {} error: {}", fn_name, username, e);
437
                    return Err(e.into_response());
438
34
                }
439
                if let Err(e) =
440
34
                    emqx::post_user(&client, opts, host, username, password, false).await
441
                {
442
                    error!("[{}] add EMQX user {} error: {}", fn_name, username, e);
443
                    return Err(e.into_response());
444
34
                }
445
34
                if let Err(e) = emqx::post_acl(&client, opts, host, rsc.q_type, username).await {
446
                    let _ = clear_queue_rsc(fn_name, &state, &clear_rsc);
447
                    error!("[{}] add EMQX ACL {} error: {}", fn_name, username, e);
448
                    return Err(e.into_response());
449
34
                }
450
                if let Err(e) =
451
34
                    emqx::post_topic_metrics(&client, opts, host, rsc.q_type, username).await
452
                {
453
                    let _ = clear_queue_rsc(fn_name, &state, &clear_rsc);
454
                    error!("[{}] add EMQX metrics {} error: {}", fn_name, username, e);
455
                    return Err(e.into_response());
456
34
                }
457
            }
458
34
            MqttState::Rumqttd => {}
459
        },
460
        _ => return Err(ErrResp::ErrParam(Some("unsupport scheme".to_string())).into_response()),
461
    }
462
128
    Ok(())
463
128
}
464

            
465
/// To clear queue resources (vhost, ACL, ...) in the broker.
466
40
async fn clear_queue_rsc<'a>(
467
40
    fn_name: &str,
468
40
    state: &AppState,
469
40
    rsc: &ClearQueueResource<'a>,
470
40
) -> Result<(), Response> {
471
40
    match rsc.scheme {
472
40
        "amqp" | "amqps" => match &state.amqp {
473
20
            AmqpState::RabbitMq(opts) => {
474
20
                let client = state.client.clone();
475
20
                if let Err(e) = rabbitmq::delete_user(&client, opts, rsc.host, rsc.username).await {
476
                    error!(
477
                        "[{}] clear RabbitMQ user {} error: {}",
478
                        fn_name, rsc.username, e
479
                    );
480
                    return Err(e.into_response());
481
20
                }
482
20
                if let Err(e) = rabbitmq::delete_vhost(&client, opts, rsc.host, rsc.username).await
483
                {
484
                    error!(
485
                        "[{}] clear RabbitMQ vhost {} error: {}",
486
                        fn_name, rsc.username, e
487
                    );
488
                    return Err(e.into_response());
489
20
                }
490
            }
491
        },
492
20
        "mqtt" | "mqtts" => match &state.mqtt {
493
10
            MqttState::Emqx(opts) => {
494
10
                let client = state.client.clone();
495
10
                if let Err(e) = emqx::delete_user(&client, opts, rsc.host, rsc.username).await {
496
                    error!(
497
                        "[{}] clear EMQX user {} error: {}",
498
                        fn_name, rsc.username, e
499
                    );
500
                    return Err(e.into_response());
501
10
                }
502
10
                let q_type = QueueType::Application;
503
10
                if let Err(e) = emqx::delete_acl(&client, opts, rsc.host, rsc.username).await {
504
                    error!("[{}] clear EMQX ACL {} error: {}", fn_name, rsc.username, e);
505
                    return Err(e.into_response());
506
10
                }
507
                if let Err(e) =
508
10
                    emqx::delete_topic_metrics(&client, opts, rsc.host, q_type, rsc.username).await
509
                {
510
                    error!(
511
                        "[{}] clear EMQX topic metrics {} error: {}",
512
                        fn_name, rsc.username, e
513
                    );
514
                    return Err(e.into_response());
515
10
                }
516
            }
517
10
            MqttState::Rumqttd => {}
518
        },
519
        _ => {}
520
    }
521
40
    Ok(())
522
40
}
523

            
524
/// To clear new resources after something wrong when patching the application/network.
525
async fn clear_patch_host(fn_name: &str, state: &AppState, patch_host: &Option<PatchHost>) {
526
    if let Some(patch_host) = patch_host {
527
        if let Some(host) = patch_host.host_uri.host_str() {
528
            let clear_rsc = ClearQueueResource {
529
                scheme: patch_host.host_uri.scheme(),
530
                host,
531
                username: patch_host.username.as_str(),
532
            };
533
            let _ = clear_queue_rsc(fn_name, &state, &clear_rsc);
534
        }
535
    }
536
}
537

            
538
/// To composite management plugin's information in the URI for sylvia-iot-broker.
539
128
fn transfer_host_uri(state: &AppState, host_uri: &mut Url, mq_username: &str) {
540
128
    match host_uri.scheme() {
541
128
        "amqp" | "amqps" => match &state.amqp {
542
60
            AmqpState::RabbitMq(opts) => {
543
60
                let _ = host_uri.set_username(opts.username.as_str());
544
60
                let _ = host_uri.set_password(Some(opts.password.as_str()));
545
60
                let _ = host_uri.set_path(mq_username);
546
60
            }
547
        },
548
68
        "mqtt" | "mqtts" => match &state.mqtt {
549
34
            MqttState::Emqx(opts) => {
550
34
                let _ = host_uri.set_username(opts.api_key.as_str());
551
34
                let _ = host_uri.set_password(Some(opts.api_secret.as_str()));
552
34
            }
553
34
            MqttState::Rumqttd => {}
554
        },
555
        _ => {}
556
    }
557
128
}
558

            
559
/// Truncates the host (from sylvia-iot-broker) to `scheme://host:port`.
560
3280
fn trunc_host_uri(host_uri: &Url) -> String {
561
3280
    let mut new_uri = host_uri.clone();
562
3280
    let _ = new_uri.set_username("");
563
3280
    let _ = new_uri.set_password(None);
564
3280
    new_uri.set_path("");
565
3280
    new_uri.to_string()
566
3280
}