1
//! Wrapper APIs for controlling EMQX.
2
//!
3
//! - `hostname` of all APIs are host name or IP address of the broker.
4

            
5
use reqwest::{self, Client, Method, StatusCode};
6
use serde::{Deserialize, Serialize};
7

            
8
use sylvia_iot_corelib::{err::ErrResp, strings::randomstring};
9

            
10
use super::QueueType;
11

            
12
/// EMQX management information.
13
#[derive(Clone)]
14
pub struct ManagementOpts {
15
    /// Management plugin API key.
16
    pub api_key: String,
17
    /// Management plugin API secret.
18
    pub api_secret: String,
19
}
20

            
21
/// Statistics.
22
#[derive(Default)]
23
pub struct Stats {
24
    /// Number of queue consumers.
25
    pub consumers: usize,
26
    /// Number of ready/unacked messages.
27
    pub messages: usize,
28
    /// Publish rate from the producer.
29
    pub publish_rate: f64,
30
    /// Deliver rate to the consumer.
31
    pub deliver_rate: f64,
32
}
33

            
34
#[derive(Deserialize)]
35
struct Meta {
36
    count: usize,
37
}
38

            
39
#[derive(Serialize)]
40
struct PostAuthUsersBody<'a> {
41
    user_id: &'a str,
42
    password: &'a str,
43
    is_superuser: bool,
44
}
45

            
46
#[derive(Serialize)]
47
struct PutAuthUsersBody<'a> {
48
    password: &'a str,
49
}
50

            
51
#[derive(Serialize)]
52
struct PostAclBodyItem<'a> {
53
    username: &'a str,
54
    rules: Vec<PostAclRuleItem<'a>>,
55
}
56

            
57
#[derive(Clone, Serialize)]
58
struct PostAclRuleItem<'a> {
59
    topic: String,
60
    action: &'a str,
61
    permission: &'a str,
62
}
63

            
64
#[derive(Serialize)]
65
struct PostPublishBody<'a> {
66
    topic: String,
67
    clientid: String,
68
    payload: String,
69
    payload_encoding: &'a str,
70
    qos: usize,
71
}
72

            
73
#[derive(Serialize)]
74
struct PostTopicMetricsBody {
75
    topic: String,
76
}
77

            
78
#[derive(Deserialize)]
79
struct GetSubscriptionsResBody {
80
    meta: Meta,
81
}
82

            
83
#[derive(Default, Deserialize)]
84
struct GetTopicMetricsResBody {
85
    metrics: TopicMetrics,
86
}
87

            
88
#[derive(Default, Deserialize)]
89
struct TopicMetrics {
90
    #[serde(rename = "messages.in.rate")]
91
    messages_in_rate: Option<f64>,
92
    #[serde(rename = "messages.out.rate")]
93
    messages_out_rate: Option<f64>,
94
}
95

            
96
#[derive(Deserialize)]
97
struct ErrResBody {
98
    code: String,
99
    message: Option<String>,
100
}
101

            
102
/// Authenticator ID.
103
const AUTH_ID: &'static str = "password_based:built_in_database";
104

            
105
/// To create an account.
106
92
pub async fn post_user(
107
92
    client: &Client,
108
92
    opts: &ManagementOpts,
109
92
    hostname: &str,
110
92
    username: &str,
111
92
    password: &str,
112
92
    is_superuser: bool,
113
92
) -> Result<(), ErrResp> {
114
92
    let uri = format!(
115
92
        "http://{}:18083/api/v5/authentication/{}/users",
116
92
        hostname, AUTH_ID
117
92
    );
118
92
    let req = match client
119
92
        .request(Method::POST, uri)
120
92
        .basic_auth(opts.api_key.as_str(), Some(opts.api_secret.as_str()))
121
92
        .json(&PostAuthUsersBody {
122
92
            user_id: username,
123
92
            password,
124
92
            is_superuser,
125
92
        })
126
92
        .build()
127
    {
128
        Err(e) => {
129
            let e = format!("generate user request error: {}", e);
130
            return Err(ErrResp::ErrRsc(Some(e)));
131
        }
132
92
        Ok(req) => req,
133
92
    };
134
92
    match client.execute(req).await {
135
4
        Err(e) => {
136
4
            let e = format!("execute user request error: {}", e);
137
4
            Err(ErrResp::ErrIntMsg(Some(e)))
138
        }
139
88
        Ok(resp) => match resp.status() {
140
50
            StatusCode::CREATED => Ok(()),
141
34
            StatusCode::CONFLICT => put_user(client, opts, hostname, username, password).await,
142
            _ => {
143
4
                let e = format!("execute user request with status: {}", resp.status());
144
4
                Err(ErrResp::ErrIntMsg(Some(e)))
145
            }
146
        },
147
    }
148
92
}
149

            
150
/// To update the user's password.
151
46
pub async fn put_user(
152
46
    client: &Client,
153
46
    opts: &ManagementOpts,
154
46
    hostname: &str,
155
46
    username: &str,
156
46
    password: &str,
157
46
) -> Result<(), ErrResp> {
158
46
    let uri = format!(
159
46
        "http://{}:18083/api/v5/authentication/{}/users/{}",
160
46
        hostname, AUTH_ID, username
161
46
    );
162
46
    let req = match client
163
46
        .request(Method::PUT, uri)
164
46
        .basic_auth(opts.api_key.as_str(), Some(opts.api_secret.as_str()))
165
46
        .json(&PutAuthUsersBody { password })
166
46
        .build()
167
    {
168
        Err(e) => {
169
            let e = format!("generate user request error: {}", e);
170
            return Err(ErrResp::ErrRsc(Some(e)));
171
        }
172
46
        Ok(req) => req,
173
46
    };
174
46
    match client.execute(req).await {
175
2
        Err(e) => {
176
2
            let e = format!("execute user request error: {}", e);
177
2
            Err(ErrResp::ErrIntMsg(Some(e)))
178
        }
179
44
        Ok(resp) => match resp.status() {
180
40
            StatusCode::OK => Ok(()),
181
            _ => {
182
4
                let e = format!("execute user request with status: {}", resp.status());
183
4
                Err(ErrResp::ErrIntMsg(Some(e)))
184
            }
185
        },
186
    }
187
46
}
188

            
189
/// To delete a user.
190
366
pub async fn delete_user(
191
366
    client: &Client,
192
366
    opts: &ManagementOpts,
193
366
    hostname: &str,
194
366
    username: &str,
195
366
) -> Result<(), ErrResp> {
196
366
    let uri = format!(
197
366
        "http://{}:18083/api/v5/authentication/{}/users/{}",
198
366
        hostname, AUTH_ID, username
199
366
    );
200
366
    let req = match client
201
366
        .request(Method::DELETE, uri)
202
366
        .basic_auth(opts.api_key.as_str(), Some(opts.api_secret.as_str()))
203
366
        .build()
204
    {
205
        Err(e) => {
206
            let e = format!("generate user request error: {}", e);
207
            return Err(ErrResp::ErrRsc(Some(e)));
208
        }
209
366
        Ok(req) => req,
210
366
    };
211
366
    match client.execute(req).await {
212
2
        Err(e) => {
213
2
            let e = format!("execute user request error: {}", e);
214
2
            Err(ErrResp::ErrIntMsg(Some(e)))
215
        }
216
364
        Ok(resp) => match resp.status() {
217
362
            StatusCode::NO_CONTENT | StatusCode::NOT_FOUND => Ok(()),
218
            _ => {
219
2
                let e = format!("execute user request with status: {}", resp.status());
220
2
                Err(ErrResp::ErrIntMsg(Some(e)))
221
            }
222
        },
223
    }
224
366
}
225

            
226
/// To create an ACL rule of a topic for the user.
227
52
pub async fn post_acl(
228
52
    client: &Client,
229
52
    opts: &ManagementOpts,
230
52
    hostname: &str,
231
52
    q_type: QueueType,
232
52
    username: &str,
233
52
) -> Result<(), ErrResp> {
234
52
    let uri = format!(
235
52
        "http://{}:18083/api/v5/authorization/sources/built_in_database/rules/users",
236
52
        hostname
237
52
    );
238
52
    let rules = match q_type {
239
28
        QueueType::Application => vec![
240
28
            PostAclRuleItem {
241
28
                topic: format!("broker.{}.uldata", username),
242
28
                action: "subscribe",
243
28
                permission: "allow",
244
28
            },
245
28
            PostAclRuleItem {
246
28
                topic: format!("broker.{}.dldata", username),
247
28
                action: "publish",
248
28
                permission: "allow",
249
28
            },
250
28
            PostAclRuleItem {
251
28
                topic: format!("broker.{}.dldata-resp", username),
252
28
                action: "subscribe",
253
28
                permission: "allow",
254
28
            },
255
28
            PostAclRuleItem {
256
28
                topic: format!("broker.{}.dldata-result", username),
257
28
                action: "subscribe",
258
28
                permission: "allow",
259
28
            },
260
28
        ],
261
24
        QueueType::Network => vec![
262
24
            PostAclRuleItem {
263
24
                topic: format!("broker.{}.uldata", username),
264
24
                action: "publish",
265
24
                permission: "allow",
266
24
            },
267
24
            PostAclRuleItem {
268
24
                topic: format!("broker.{}.dldata", username),
269
24
                action: "subscribe",
270
24
                permission: "allow",
271
24
            },
272
24
            PostAclRuleItem {
273
24
                topic: format!("broker.{}.dldata-result", username),
274
24
                action: "publish",
275
24
                permission: "allow",
276
24
            },
277
24
            PostAclRuleItem {
278
24
                topic: format!("broker.{}.ctrl", username),
279
24
                action: "subscribe",
280
24
                permission: "allow",
281
24
            },
282
24
        ],
283
    };
284
52
    let req = match client
285
52
        .request(Method::POST, uri.clone())
286
52
        .basic_auth(opts.api_key.as_str(), Some(opts.api_secret.as_str()))
287
52
        .json(&vec![PostAclBodyItem {
288
52
            username,
289
52
            rules: rules.clone(),
290
52
        }])
291
52
        .build()
292
    {
293
        Err(e) => {
294
            let e = format!("generate acl request error: {}", e);
295
            return Err(ErrResp::ErrRsc(Some(e)));
296
        }
297
52
        Ok(req) => req,
298
52
    };
299
52
    match client.execute(req).await {
300
2
        Err(e) => {
301
2
            let e = format!("execute acl request error: {}", e);
302
2
            return Err(ErrResp::ErrIntMsg(Some(e)));
303
        }
304
50
        Ok(resp) => match resp.status() {
305
42
            StatusCode::NO_CONTENT => return Ok(()),
306
6
            StatusCode::CONFLICT => (),
307
            _ => {
308
2
                let e = format!("execute acl request with status: {}", resp.status());
309
2
                return Err(ErrResp::ErrIntMsg(Some(e)));
310
            }
311
        },
312
    }
313

            
314
6
    let req = match client
315
6
        .request(Method::PUT, format!("{}/{}", uri, username))
316
6
        .basic_auth(opts.api_key.as_str(), Some(opts.api_secret.as_str()))
317
6
        .json(&PostAclBodyItem { username, rules })
318
6
        .build()
319
    {
320
        Err(e) => {
321
            let e = format!("generate put acl request error: {}", e);
322
            return Err(ErrResp::ErrRsc(Some(e)));
323
        }
324
6
        Ok(req) => req,
325
6
    };
326
6
    match client.execute(req).await {
327
        Err(e) => {
328
            let e = format!("execute put acl request error: {}", e);
329
            Err(ErrResp::ErrIntMsg(Some(e)))
330
        }
331
6
        Ok(resp) => match resp.status() {
332
6
            StatusCode::NO_CONTENT => Ok(()),
333
            _ => {
334
                let e = format!("execute put acl request with status: {}", resp.status());
335
                Err(ErrResp::ErrIntMsg(Some(e)))
336
            }
337
        },
338
    }
339
52
}
340

            
341
/// To delete an ACL rule of a group of topics of an application/network for the user.
342
162
pub async fn delete_acl(
343
162
    client: &Client,
344
162
    opts: &ManagementOpts,
345
162
    hostname: &str,
346
162
    username: &str,
347
162
) -> Result<(), ErrResp> {
348
162
    let uri = format!(
349
162
        "http://{}:18083/api/v5/authorization/sources/built_in_database/rules/users/{}",
350
162
        hostname, username
351
162
    );
352
162
    let req = match client
353
162
        .request(Method::DELETE, uri)
354
162
        .basic_auth(opts.api_key.as_str(), Some(opts.api_secret.as_str()))
355
162
        .build()
356
    {
357
        Err(e) => {
358
            let e = format!("generate acl request error: {}", e);
359
            return Err(ErrResp::ErrRsc(Some(e)));
360
        }
361
162
        Ok(req) => req,
362
162
    };
363
162
    match client.execute(req).await {
364
2
        Err(e) => {
365
2
            let e = format!("execute acl request error: {}", e);
366
2
            Err(ErrResp::ErrIntMsg(Some(e)))
367
        }
368
160
        Ok(resp) => match resp.status() {
369
158
            StatusCode::NO_CONTENT | StatusCode::NOT_FOUND => Ok(()),
370
            _ => {
371
2
                let e = format!("execute acl request with status: {}", resp.status());
372
2
                Err(ErrResp::ErrIntMsg(Some(e)))
373
            }
374
        },
375
    }
376
162
}
377

            
378
/// To publish a message to the specified queue (such as `uldata` and `dldata`).
379
///
380
/// The `payload` MUST be Base64 encoded string.
381
18
pub async fn publish_message(
382
18
    client: &Client,
383
18
    opts: &ManagementOpts,
384
18
    hostname: &str,
385
18
    username: &str,
386
18
    queue: &str,     // uldata,dldata
387
18
    payload: String, // Base64
388
18
) -> Result<(), ErrResp> {
389
18
    let uri = format!("http://{}:18083/api/v5/publish", hostname);
390
18
    let body = PostPublishBody {
391
18
        topic: format!("broker.{}.{}", username, queue),
392
18
        clientid: format!("sylvia-{}", randomstring(12)),
393
18
        payload,
394
18
        payload_encoding: "base64",
395
18
        qos: 0,
396
18
    };
397
18
    let req = match client
398
18
        .request(Method::POST, uri)
399
18
        .basic_auth(opts.api_key.as_str(), Some(opts.api_secret.as_str()))
400
18
        .json(&body)
401
18
        .build()
402
    {
403
        Err(e) => {
404
            let e = format!("generate publish request error: {}", e);
405
            return Err(ErrResp::ErrRsc(Some(e)));
406
        }
407
18
        Ok(req) => req,
408
18
    };
409
18
    match client.execute(req).await {
410
2
        Err(e) => {
411
2
            let e = format!("execute publish request error: {}", e);
412
2
            Err(ErrResp::ErrIntMsg(Some(e)))
413
        }
414
16
        Ok(resp) => match resp.status() {
415
16
            StatusCode::OK | StatusCode::ACCEPTED => Ok(()), // 200 for <= 5.0.8, 202 for >= 5.0.9
416
            _ => {
417
                let e = format!("execute publish request with status: {}", resp.status());
418
                Err(ErrResp::ErrIntMsg(Some(e)))
419
            }
420
        },
421
    }
422
18
}
423

            
424
/// To enable metrics for a queue.
425
46
pub async fn post_topic_metrics(
426
46
    client: &Client,
427
46
    opts: &ManagementOpts,
428
46
    hostname: &str,
429
46
    q_type: QueueType,
430
46
    username: &str,
431
46
) -> Result<(), ErrResp> {
432
46
    let uri = format!("http://{}:18083/api/v5/mqtt/topic_metrics", hostname);
433
46
    let q_name_prefix = format!("broker.{}.", username);
434
46
    let queues = match q_type {
435
26
        QueueType::Application => vec!["uldata", "dldata", "dldata-resp", "dldata-result"],
436
20
        QueueType::Network => vec!["uldata", "dldata", "dldata-result", "ctrl"],
437
    };
438
214
    for queue in queues {
439
172
        let req = match client
440
172
            .request(Method::POST, uri.as_str())
441
172
            .basic_auth(opts.api_key.as_str(), Some(opts.api_secret.as_str()))
442
172
            .json(&PostTopicMetricsBody {
443
172
                topic: format!("{}{}", q_name_prefix, queue),
444
172
            })
445
172
            .build()
446
        {
447
            Err(e) => {
448
                let e = format!("generate topic_metrics request error: {}", e);
449
                return Err(ErrResp::ErrRsc(Some(e)));
450
            }
451
172
            Ok(req) => req,
452
172
        };
453
172
        match client.execute(req).await {
454
2
            Err(e) => {
455
2
                let e = format!("execute topic_metrics request error: {}", e);
456
2
                return Err(ErrResp::ErrIntMsg(Some(e)));
457
            }
458
170
            Ok(resp) => match resp.status() {
459
152
                StatusCode::OK => (),
460
                StatusCode::BAD_REQUEST => {
461
16
                    match resp.json::<ErrResBody>().await {
462
                        Err(e) => {
463
                            let e = format!("execute topic_metrics read 400 body error: {}", e);
464
                            return Err(ErrResp::ErrIntMsg(Some(e)));
465
                        }
466
16
                        Ok(body) => match body.code.as_str() {
467
16
                            "BAD_TOPIC" => (),
468
                            _ => {
469
                                let e = format!(
470
                                    "execute topic_metrics request with unexpected 400 code: {}, message: {:?}",
471
                                    body.code, body.message
472
                                );
473
                                return Err(ErrResp::ErrIntMsg(Some(e)));
474
                            }
475
                        },
476
                    };
477
                }
478
                _ => {
479
2
                    let e = format!(
480
2
                        "execute topic_metrics request with status: {}",
481
2
                        resp.status()
482
2
                    );
483
2
                    return Err(ErrResp::ErrIntMsg(Some(e)));
484
                }
485
            },
486
        }
487
    }
488
42
    Ok(())
489
46
}
490

            
491
/// To disable metrics for a queue.
492
164
pub async fn delete_topic_metrics(
493
164
    client: &Client,
494
164
    opts: &ManagementOpts,
495
164
    hostname: &str,
496
164
    q_type: QueueType,
497
164
    username: &str,
498
164
) -> Result<(), ErrResp> {
499
164
    let uri_prefix = format!(
500
164
        "http://{}:18083/api/v5/mqtt/topic_metrics/broker.{}.",
501
164
        hostname, username
502
164
    );
503
164
    let queues = match q_type {
504
66
        QueueType::Application => vec!["uldata", "dldata", "dldata-resp", "dldata-result"],
505
98
        QueueType::Network => vec!["uldata", "dldata", "dldata-result", "ctrl"],
506
    };
507
804
    for queue in queues {
508
644
        let req = match client
509
644
            .request(Method::DELETE, format!("{}{}", uri_prefix, queue).as_str())
510
644
            .basic_auth(opts.api_key.as_str(), Some(opts.api_secret.as_str()))
511
644
            .build()
512
        {
513
            Err(e) => {
514
                let e = format!("generate topic_metrics request error: {}", e);
515
                return Err(ErrResp::ErrRsc(Some(e)));
516
            }
517
644
            Ok(req) => req,
518
644
        };
519
644
        match client.execute(req).await {
520
2
            Err(e) => {
521
2
                let e = format!("execute topic_metrics request error: {}", e);
522
2
                return Err(ErrResp::ErrIntMsg(Some(e)));
523
            }
524
642
            Ok(resp) => match resp.status() {
525
640
                StatusCode::NO_CONTENT | StatusCode::NOT_FOUND => (),
526
                _ => {
527
2
                    let e = format!(
528
2
                        "execute topic_metrics request with status: {}",
529
2
                        resp.status()
530
2
                    );
531
2
                    return Err(ErrResp::ErrIntMsg(Some(e)));
532
                }
533
            },
534
        }
535
    }
536
160
    Ok(())
537
164
}
538

            
539
/// Get statistics of a queue.
540
1228
pub async fn stats(
541
1228
    client: &Client,
542
1228
    opts: &ManagementOpts,
543
1228
    hostname: &str,
544
1228
    username: &str,
545
1228
    queue: &str, // uldata,dldata,dldata-resp,dldata-result,ctrl
546
1228
) -> Result<Stats, ErrResp> {
547
1228
    let queue_name = format!("broker.{}.{}", username, queue);
548
1228
    let uri = format!(
549
1228
        "http://{}:18083/api/v5/subscriptions?topic={}",
550
1228
        hostname, queue_name
551
1228
    );
552
1228
    let req = match client
553
1228
        .request(Method::GET, uri)
554
1228
        .basic_auth(opts.api_key.as_str(), Some(opts.api_secret.as_str()))
555
1228
        .build()
556
    {
557
        Err(e) => {
558
            let e = format!("generate stats subscriptions request error: {}", e);
559
            return Err(ErrResp::ErrRsc(Some(e)));
560
        }
561
1228
        Ok(req) => req,
562
    };
563
1228
    let resp = match client.execute(req).await {
564
2
        Err(e) => {
565
2
            let e = format!("execute stats subscriptions request error: {}", e);
566
2
            return Err(ErrResp::ErrIntMsg(Some(e)));
567
        }
568
1226
        Ok(resp) => match resp.status() {
569
1224
            StatusCode::OK => resp,
570
            _ => {
571
2
                let e = format!(
572
2
                    "execute stats subscriptions request with status: {}",
573
2
                    resp.status()
574
2
                );
575
2
                return Err(ErrResp::ErrIntMsg(Some(e)));
576
            }
577
        },
578
    };
579
1224
    let resp_stats = match resp.json::<GetSubscriptionsResBody>().await {
580
        Err(e) => {
581
            let e = format!("read stats subscriptions body error: {}", e);
582
            return Err(ErrResp::ErrIntMsg(Some(e)));
583
        }
584
1224
        Ok(stats) => stats,
585
1224
    };
586
1224
    let mut stats = Stats {
587
1224
        consumers: resp_stats.meta.count,
588
1224
        ..Default::default()
589
1224
    };
590
1224

            
591
1224
    let uri = format!(
592
1224
        "http://{}:18083/api/v5/mqtt/topic_metrics/{}",
593
1224
        hostname, queue_name
594
1224
    );
595
1224
    let req = match client
596
1224
        .request(Method::GET, uri.as_str())
597
1224
        .basic_auth(opts.api_key.as_str(), Some(opts.api_secret.as_str()))
598
1224
        .build()
599
    {
600
        Err(e) => {
601
            let e = format!("generate stats topic_metrics request error: {}", e);
602
            return Err(ErrResp::ErrRsc(Some(e)));
603
        }
604
1224
        Ok(req) => req,
605
    };
606
1224
    let resp_stats = match client.execute(req).await {
607
        Err(e) => {
608
            let e = format!("execute stats topic_metrics request error: {}", e);
609
            return Err(ErrResp::ErrIntMsg(Some(e)));
610
        }
611
1224
        Ok(resp) => match resp.status() {
612
224
            StatusCode::OK => match resp.json::<GetTopicMetricsResBody>().await {
613
                Err(e) => {
614
                    let e = format!("read stats topic_metrics body error: {}", e);
615
                    return Err(ErrResp::ErrIntMsg(Some(e)));
616
                }
617
224
                Ok(stats) => stats,
618
            },
619
1000
            StatusCode::NOT_FOUND => GetTopicMetricsResBody::default(),
620
            _ => {
621
                let e = format!(
622
                    "execute stats topic_metrics request with status: {}",
623
                    resp.status()
624
                );
625
                return Err(ErrResp::ErrIntMsg(Some(e)));
626
            }
627
        },
628
    };
629
1224
    stats.publish_rate = match resp_stats.metrics.messages_in_rate {
630
1000
        None => 0.0,
631
224
        Some(rate) => rate,
632
    };
633
1224
    stats.deliver_rate = match resp_stats.metrics.messages_out_rate {
634
1000
        None => 0.0,
635
224
        Some(rate) => rate,
636
    };
637

            
638
1224
    Ok(stats)
639
1228
}