1
//! Wrapper APIs for controlling EMQX.
2
//!
3
//! - `hostname` of all APIs are host name or IP address of the broker.
4

            
5
use reqwest::{self, Client, Method, StatusCode};
6
use serde::{Deserialize, Serialize};
7

            
8
use sylvia_iot_corelib::{err::ErrResp, strings::randomstring};
9

            
10
use super::QueueType;
11

            
12
/// EMQX management information.
13
#[derive(Clone)]
14
pub struct ManagementOpts {
15
    /// Management plugin API key.
16
    pub api_key: String,
17
    /// Management plugin API secret.
18
    pub api_secret: String,
19
}
20

            
21
/// Statistics.
22
#[derive(Default)]
23
pub struct Stats {
24
    /// Number of queue consumers.
25
    pub consumers: usize,
26
    /// Number of ready/unacked messages.
27
    pub messages: usize,
28
    /// Publish rate from the producer.
29
    pub publish_rate: f64,
30
    /// Deliver rate to the consumer.
31
    pub deliver_rate: f64,
32
}
33

            
34
2760
#[derive(Deserialize)]
35
struct Meta {
36
    count: usize,
37
}
38

            
39
#[derive(Serialize)]
40
struct PostAuthUsersBody<'a> {
41
    user_id: &'a str,
42
    password: &'a str,
43
    is_superuser: bool,
44
}
45

            
46
#[derive(Serialize)]
47
struct PutAuthUsersBody<'a> {
48
    password: &'a str,
49
}
50

            
51
#[derive(Serialize)]
52
struct PostAclBodyItem<'a> {
53
    username: &'a str,
54
    rules: Vec<PostAclRuleItem<'a>>,
55
}
56

            
57
#[derive(Clone, Serialize)]
58
struct PostAclRuleItem<'a> {
59
    topic: String,
60
    action: &'a str,
61
    permission: &'a str,
62
}
63

            
64
#[derive(Serialize)]
65
struct PostPublishBody<'a> {
66
    topic: String,
67
    clientid: String,
68
    payload: String,
69
    payload_encoding: &'a str,
70
    qos: usize,
71
}
72

            
73
#[derive(Serialize)]
74
struct PostTopicMetricsBody {
75
    topic: String,
76
}
77

            
78
1656
#[derive(Deserialize)]
79
struct GetSubscriptionsResBody {
80
    meta: Meta,
81
}
82

            
83
208
#[derive(Default, Deserialize)]
84
struct GetTopicMetricsResBody {
85
    metrics: TopicMetrics,
86
}
87

            
88
988
#[derive(Default, Deserialize)]
89
struct TopicMetrics {
90
    #[serde(rename = "messages.in.rate")]
91
    messages_in_rate: Option<f64>,
92
    #[serde(rename = "messages.out.rate")]
93
    messages_out_rate: Option<f64>,
94
}
95

            
96
24
#[derive(Deserialize)]
97
struct ErrResBody {
98
    code: String,
99
    message: Option<String>,
100
}
101

            
102
/// Authenticator ID.
103
const AUTH_ID: &'static str = "password_based:built_in_database";
104

            
105
/// To create an account.
106
46
pub async fn post_user(
107
46
    client: &Client,
108
46
    opts: &ManagementOpts,
109
46
    hostname: &str,
110
46
    username: &str,
111
46
    password: &str,
112
46
    is_superuser: bool,
113
46
) -> Result<(), ErrResp> {
114
46
    let uri = format!(
115
46
        "http://{}:18083/api/v5/authentication/{}/users",
116
46
        hostname, AUTH_ID
117
46
    );
118
46
    let req = match client
119
46
        .request(Method::POST, uri)
120
46
        .basic_auth(opts.api_key.as_str(), Some(opts.api_secret.as_str()))
121
46
        .json(&PostAuthUsersBody {
122
46
            user_id: username,
123
46
            password,
124
46
            is_superuser,
125
46
        })
126
46
        .build()
127
    {
128
        Err(e) => {
129
            let e = format!("generate user request error: {}", e);
130
            return Err(ErrResp::ErrRsc(Some(e)));
131
        }
132
46
        Ok(req) => req,
133
46
    };
134
85
    match client.execute(req).await {
135
2
        Err(e) => {
136
2
            let e = format!("execute user request error: {}", e);
137
2
            Err(ErrResp::ErrIntMsg(Some(e)))
138
        }
139
44
        Ok(resp) => match resp.status() {
140
25
            StatusCode::CREATED => Ok(()),
141
17
            StatusCode::CONFLICT => put_user(client, opts, hostname, username, password).await,
142
            _ => {
143
2
                let e = format!("execute user request with status: {}", resp.status());
144
2
                Err(ErrResp::ErrIntMsg(Some(e)))
145
            }
146
        },
147
    }
148
46
}
149

            
150
/// To update the user's password.
151
23
pub async fn put_user(
152
23
    client: &Client,
153
23
    opts: &ManagementOpts,
154
23
    hostname: &str,
155
23
    username: &str,
156
23
    password: &str,
157
23
) -> Result<(), ErrResp> {
158
23
    let uri = format!(
159
23
        "http://{}:18083/api/v5/authentication/{}/users/{}",
160
23
        hostname, AUTH_ID, username
161
23
    );
162
23
    let req = match client
163
23
        .request(Method::PUT, uri)
164
23
        .basic_auth(opts.api_key.as_str(), Some(opts.api_secret.as_str()))
165
23
        .json(&PutAuthUsersBody { password })
166
23
        .build()
167
    {
168
        Err(e) => {
169
            let e = format!("generate user request error: {}", e);
170
            return Err(ErrResp::ErrRsc(Some(e)));
171
        }
172
23
        Ok(req) => req,
173
23
    };
174
23
    match client.execute(req).await {
175
1
        Err(e) => {
176
1
            let e = format!("execute user request error: {}", e);
177
1
            Err(ErrResp::ErrIntMsg(Some(e)))
178
        }
179
22
        Ok(resp) => match resp.status() {
180
20
            StatusCode::OK => Ok(()),
181
            _ => {
182
2
                let e = format!("execute user request with status: {}", resp.status());
183
2
                Err(ErrResp::ErrIntMsg(Some(e)))
184
            }
185
        },
186
    }
187
23
}
188

            
189
/// To delete a user.
190
183
pub async fn delete_user(
191
183
    client: &Client,
192
183
    opts: &ManagementOpts,
193
183
    hostname: &str,
194
183
    username: &str,
195
183
) -> Result<(), ErrResp> {
196
183
    let uri = format!(
197
183
        "http://{}:18083/api/v5/authentication/{}/users/{}",
198
183
        hostname, AUTH_ID, username
199
183
    );
200
183
    let req = match client
201
183
        .request(Method::DELETE, uri)
202
183
        .basic_auth(opts.api_key.as_str(), Some(opts.api_secret.as_str()))
203
183
        .build()
204
    {
205
        Err(e) => {
206
            let e = format!("generate user request error: {}", e);
207
            return Err(ErrResp::ErrRsc(Some(e)));
208
        }
209
183
        Ok(req) => req,
210
183
    };
211
207
    match client.execute(req).await {
212
1
        Err(e) => {
213
1
            let e = format!("execute user request error: {}", e);
214
1
            Err(ErrResp::ErrIntMsg(Some(e)))
215
        }
216
182
        Ok(resp) => match resp.status() {
217
181
            StatusCode::NO_CONTENT | StatusCode::NOT_FOUND => Ok(()),
218
            _ => {
219
1
                let e = format!("execute user request with status: {}", resp.status());
220
1
                Err(ErrResp::ErrIntMsg(Some(e)))
221
            }
222
        },
223
    }
224
183
}
225

            
226
/// To create an ACL rule of a topic for the user.
227
26
pub async fn post_acl(
228
26
    client: &Client,
229
26
    opts: &ManagementOpts,
230
26
    hostname: &str,
231
26
    q_type: QueueType,
232
26
    username: &str,
233
26
) -> Result<(), ErrResp> {
234
26
    let uri = format!(
235
26
        "http://{}:18083/api/v5/authorization/sources/built_in_database/rules/users",
236
26
        hostname
237
26
    );
238
26
    let rules = match q_type {
239
14
        QueueType::Application => vec![
240
14
            PostAclRuleItem {
241
14
                topic: format!("broker.{}.uldata", username),
242
14
                action: "subscribe",
243
14
                permission: "allow",
244
14
            },
245
14
            PostAclRuleItem {
246
14
                topic: format!("broker.{}.dldata", username),
247
14
                action: "publish",
248
14
                permission: "allow",
249
14
            },
250
14
            PostAclRuleItem {
251
14
                topic: format!("broker.{}.dldata-resp", username),
252
14
                action: "subscribe",
253
14
                permission: "allow",
254
14
            },
255
14
            PostAclRuleItem {
256
14
                topic: format!("broker.{}.dldata-result", username),
257
14
                action: "subscribe",
258
14
                permission: "allow",
259
14
            },
260
14
        ],
261
12
        QueueType::Network => vec![
262
12
            PostAclRuleItem {
263
12
                topic: format!("broker.{}.uldata", username),
264
12
                action: "publish",
265
12
                permission: "allow",
266
12
            },
267
12
            PostAclRuleItem {
268
12
                topic: format!("broker.{}.dldata", username),
269
12
                action: "subscribe",
270
12
                permission: "allow",
271
12
            },
272
12
            PostAclRuleItem {
273
12
                topic: format!("broker.{}.dldata-result", username),
274
12
                action: "publish",
275
12
                permission: "allow",
276
12
            },
277
12
            PostAclRuleItem {
278
12
                topic: format!("broker.{}.ctrl", username),
279
12
                action: "subscribe",
280
12
                permission: "allow",
281
12
            },
282
12
        ],
283
    };
284
26
    let req = match client
285
26
        .request(Method::POST, uri.clone())
286
26
        .basic_auth(opts.api_key.as_str(), Some(opts.api_secret.as_str()))
287
26
        .json(&vec![PostAclBodyItem {
288
26
            username,
289
26
            rules: rules.clone(),
290
26
        }])
291
26
        .build()
292
    {
293
        Err(e) => {
294
            let e = format!("generate acl request error: {}", e);
295
            return Err(ErrResp::ErrRsc(Some(e)));
296
        }
297
26
        Ok(req) => req,
298
26
    };
299
26
    match client.execute(req).await {
300
1
        Err(e) => {
301
1
            let e = format!("execute acl request error: {}", e);
302
1
            return Err(ErrResp::ErrIntMsg(Some(e)));
303
        }
304
25
        Ok(resp) => match resp.status() {
305
21
            StatusCode::NO_CONTENT => return Ok(()),
306
3
            StatusCode::CONFLICT => (),
307
            _ => {
308
1
                let e = format!("execute acl request with status: {}", resp.status());
309
1
                return Err(ErrResp::ErrIntMsg(Some(e)));
310
            }
311
        },
312
    }
313

            
314
3
    let req = match client
315
3
        .request(Method::PUT, format!("{}/{}", uri, username))
316
3
        .basic_auth(opts.api_key.as_str(), Some(opts.api_secret.as_str()))
317
3
        .json(&PostAclBodyItem { username, rules })
318
3
        .build()
319
    {
320
        Err(e) => {
321
            let e = format!("generate put acl request error: {}", e);
322
            return Err(ErrResp::ErrRsc(Some(e)));
323
        }
324
3
        Ok(req) => req,
325
3
    };
326
3
    match client.execute(req).await {
327
        Err(e) => {
328
            let e = format!("execute put acl request error: {}", e);
329
            Err(ErrResp::ErrIntMsg(Some(e)))
330
        }
331
3
        Ok(resp) => match resp.status() {
332
3
            StatusCode::NO_CONTENT => Ok(()),
333
            _ => {
334
                let e = format!("execute put acl request with status: {}", resp.status());
335
                Err(ErrResp::ErrIntMsg(Some(e)))
336
            }
337
        },
338
    }
339
26
}
340

            
341
/// To delete an ACL rule of a group of topics of an application/network for the user.
342
81
pub async fn delete_acl(
343
81
    client: &Client,
344
81
    opts: &ManagementOpts,
345
81
    hostname: &str,
346
81
    username: &str,
347
81
) -> Result<(), ErrResp> {
348
81
    let uri = format!(
349
81
        "http://{}:18083/api/v5/authorization/sources/built_in_database/rules/users/{}",
350
81
        hostname, username
351
81
    );
352
81
    let req = match client
353
81
        .request(Method::DELETE, uri)
354
81
        .basic_auth(opts.api_key.as_str(), Some(opts.api_secret.as_str()))
355
81
        .build()
356
    {
357
        Err(e) => {
358
            let e = format!("generate acl request error: {}", e);
359
            return Err(ErrResp::ErrRsc(Some(e)));
360
        }
361
81
        Ok(req) => req,
362
81
    };
363
83
    match client.execute(req).await {
364
1
        Err(e) => {
365
1
            let e = format!("execute acl request error: {}", e);
366
1
            Err(ErrResp::ErrIntMsg(Some(e)))
367
        }
368
80
        Ok(resp) => match resp.status() {
369
79
            StatusCode::NO_CONTENT | StatusCode::NOT_FOUND => Ok(()),
370
            _ => {
371
1
                let e = format!("execute acl request with status: {}", resp.status());
372
1
                Err(ErrResp::ErrIntMsg(Some(e)))
373
            }
374
        },
375
    }
376
81
}
377

            
378
/// To publish a message to the specified queue (such as `uldata` and `dldata`).
379
///
380
/// The `payload` MUST be Base64 encoded string.
381
9
pub async fn publish_message(
382
9
    client: &Client,
383
9
    opts: &ManagementOpts,
384
9
    hostname: &str,
385
9
    username: &str,
386
9
    queue: &str,     // uldata,dldata
387
9
    payload: String, // Base64
388
9
) -> Result<(), ErrResp> {
389
9
    let uri = format!("http://{}:18083/api/v5/publish", hostname);
390
9
    let body = PostPublishBody {
391
9
        topic: format!("broker.{}.{}", username, queue),
392
9
        clientid: format!("sylvia-{}", randomstring(12)),
393
9
        payload,
394
9
        payload_encoding: "base64",
395
9
        qos: 0,
396
9
    };
397
9
    let req = match client
398
9
        .request(Method::POST, uri)
399
9
        .basic_auth(opts.api_key.as_str(), Some(opts.api_secret.as_str()))
400
9
        .json(&body)
401
9
        .build()
402
    {
403
        Err(e) => {
404
            let e = format!("generate publish request error: {}", e);
405
            return Err(ErrResp::ErrRsc(Some(e)));
406
        }
407
9
        Ok(req) => req,
408
9
    };
409
9
    match client.execute(req).await {
410
1
        Err(e) => {
411
1
            let e = format!("execute publish request error: {}", e);
412
1
            Err(ErrResp::ErrIntMsg(Some(e)))
413
        }
414
8
        Ok(resp) => match resp.status() {
415
8
            StatusCode::OK | StatusCode::ACCEPTED => Ok(()), // 200 for <= 5.0.8, 202 for >= 5.0.9
416
            _ => {
417
                let e = format!("execute publish request with status: {}", resp.status());
418
                Err(ErrResp::ErrIntMsg(Some(e)))
419
            }
420
        },
421
    }
422
9
}
423

            
424
/// To enable metrics for a queue.
425
23
pub async fn post_topic_metrics(
426
23
    client: &Client,
427
23
    opts: &ManagementOpts,
428
23
    hostname: &str,
429
23
    q_type: QueueType,
430
23
    username: &str,
431
23
) -> Result<(), ErrResp> {
432
23
    let uri = format!("http://{}:18083/api/v5/mqtt/topic_metrics", hostname);
433
23
    let q_name_prefix = format!("broker.{}.", username);
434
23
    let queues = match q_type {
435
13
        QueueType::Application => vec!["uldata", "dldata", "dldata-resp", "dldata-result"],
436
10
        QueueType::Network => vec!["uldata", "dldata", "dldata-result", "ctrl"],
437
    };
438
107
    for queue in queues {
439
86
        let req = match client
440
86
            .request(Method::POST, uri.as_str())
441
86
            .basic_auth(opts.api_key.as_str(), Some(opts.api_secret.as_str()))
442
86
            .json(&PostTopicMetricsBody {
443
86
                topic: format!("{}{}", q_name_prefix, queue),
444
86
            })
445
86
            .build()
446
        {
447
            Err(e) => {
448
                let e = format!("generate topic_metrics request error: {}", e);
449
                return Err(ErrResp::ErrRsc(Some(e)));
450
            }
451
86
            Ok(req) => req,
452
86
        };
453
86
        match client.execute(req).await {
454
1
            Err(e) => {
455
1
                let e = format!("execute topic_metrics request error: {}", e);
456
1
                return Err(ErrResp::ErrIntMsg(Some(e)));
457
            }
458
85
            Ok(resp) => match resp.status() {
459
76
                StatusCode::OK => (),
460
                StatusCode::BAD_REQUEST => {
461
8
                    match resp.json::<ErrResBody>().await {
462
                        Err(e) => {
463
                            let e = format!("execute topic_metrics read 400 body error: {}", e);
464
                            return Err(ErrResp::ErrIntMsg(Some(e)));
465
                        }
466
8
                        Ok(body) => match body.code.as_str() {
467
8
                            "BAD_TOPIC" => (),
468
                            _ => {
469
                                let e = format!(
470
                                    "execute topic_metrics request with unexpected 400 code: {}, message: {:?}",
471
                                    body.code, body.message
472
                                );
473
                                return Err(ErrResp::ErrIntMsg(Some(e)));
474
                            }
475
                        },
476
                    };
477
                }
478
                _ => {
479
1
                    let e = format!(
480
1
                        "execute topic_metrics request with status: {}",
481
1
                        resp.status()
482
1
                    );
483
1
                    return Err(ErrResp::ErrIntMsg(Some(e)));
484
                }
485
            },
486
        }
487
    }
488
21
    Ok(())
489
23
}
490

            
491
/// To disable metrics for a queue.
492
82
pub async fn delete_topic_metrics(
493
82
    client: &Client,
494
82
    opts: &ManagementOpts,
495
82
    hostname: &str,
496
82
    q_type: QueueType,
497
82
    username: &str,
498
82
) -> Result<(), ErrResp> {
499
82
    let uri_prefix = format!(
500
82
        "http://{}:18083/api/v5/mqtt/topic_metrics/broker.{}.",
501
82
        hostname, username
502
82
    );
503
82
    let queues = match q_type {
504
33
        QueueType::Application => vec!["uldata", "dldata", "dldata-resp", "dldata-result"],
505
49
        QueueType::Network => vec!["uldata", "dldata", "dldata-result", "ctrl"],
506
    };
507
402
    for queue in queues {
508
322
        let req = match client
509
322
            .request(Method::DELETE, format!("{}{}", uri_prefix, queue).as_str())
510
322
            .basic_auth(opts.api_key.as_str(), Some(opts.api_secret.as_str()))
511
322
            .build()
512
        {
513
            Err(e) => {
514
                let e = format!("generate topic_metrics request error: {}", e);
515
                return Err(ErrResp::ErrRsc(Some(e)));
516
            }
517
322
            Ok(req) => req,
518
322
        };
519
329
        match client.execute(req).await {
520
1
            Err(e) => {
521
1
                let e = format!("execute topic_metrics request error: {}", e);
522
1
                return Err(ErrResp::ErrIntMsg(Some(e)));
523
            }
524
321
            Ok(resp) => match resp.status() {
525
320
                StatusCode::NO_CONTENT | StatusCode::NOT_FOUND => (),
526
                _ => {
527
1
                    let e = format!(
528
1
                        "execute topic_metrics request with status: {}",
529
1
                        resp.status()
530
1
                    );
531
1
                    return Err(ErrResp::ErrIntMsg(Some(e)));
532
                }
533
            },
534
        }
535
    }
536
80
    Ok(())
537
82
}
538

            
539
/// Get statistics of a queue.
540
554
pub async fn stats(
541
554
    client: &Client,
542
554
    opts: &ManagementOpts,
543
554
    hostname: &str,
544
554
    username: &str,
545
554
    queue: &str, // uldata,dldata,dldata-resp,dldata-result,ctrl
546
554
) -> Result<Stats, ErrResp> {
547
554
    let queue_name = format!("broker.{}.{}", username, queue);
548
554
    let uri = format!(
549
554
        "http://{}:18083/api/v5/subscriptions?topic={}",
550
554
        hostname, queue_name
551
554
    );
552
554
    let req = match client
553
554
        .request(Method::GET, uri)
554
554
        .basic_auth(opts.api_key.as_str(), Some(opts.api_secret.as_str()))
555
554
        .build()
556
    {
557
        Err(e) => {
558
            let e = format!("generate stats subscriptions request error: {}", e);
559
            return Err(ErrResp::ErrRsc(Some(e)));
560
        }
561
554
        Ok(req) => req,
562
    };
563
590
    let resp = match client.execute(req).await {
564
1
        Err(e) => {
565
1
            let e = format!("execute stats subscriptions request error: {}", e);
566
1
            return Err(ErrResp::ErrIntMsg(Some(e)));
567
        }
568
553
        Ok(resp) => match resp.status() {
569
552
            StatusCode::OK => resp,
570
            _ => {
571
1
                let e = format!(
572
1
                    "execute stats subscriptions request with status: {}",
573
1
                    resp.status()
574
1
                );
575
1
                return Err(ErrResp::ErrIntMsg(Some(e)));
576
            }
577
        },
578
    };
579
552
    let resp_stats = match resp.json::<GetSubscriptionsResBody>().await {
580
        Err(e) => {
581
            let e = format!("read stats subscriptions body error: {}", e);
582
            return Err(ErrResp::ErrIntMsg(Some(e)));
583
        }
584
552
        Ok(stats) => stats,
585
552
    };
586
552
    let mut stats = Stats {
587
552
        consumers: resp_stats.meta.count,
588
552
        ..Default::default()
589
552
    };
590
552

            
591
552
    let uri = format!(
592
552
        "http://{}:18083/api/v5/mqtt/topic_metrics/{}",
593
552
        hostname, queue_name
594
552
    );
595
552
    let req = match client
596
552
        .request(Method::GET, uri.as_str())
597
552
        .basic_auth(opts.api_key.as_str(), Some(opts.api_secret.as_str()))
598
552
        .build()
599
    {
600
        Err(e) => {
601
            let e = format!("generate stats topic_metrics request error: {}", e);
602
            return Err(ErrResp::ErrRsc(Some(e)));
603
        }
604
552
        Ok(req) => req,
605
    };
606
552
    let resp_stats = match client.execute(req).await {
607
        Err(e) => {
608
            let e = format!("execute stats topic_metrics request error: {}", e);
609
            return Err(ErrResp::ErrIntMsg(Some(e)));
610
        }
611
552
        Ok(resp) => match resp.status() {
612
52
            StatusCode::OK => match resp.json::<GetTopicMetricsResBody>().await {
613
                Err(e) => {
614
                    let e = format!("read stats topic_metrics body error: {}", e);
615
                    return Err(ErrResp::ErrIntMsg(Some(e)));
616
                }
617
52
                Ok(stats) => stats,
618
            },
619
500
            StatusCode::NOT_FOUND => GetTopicMetricsResBody::default(),
620
            _ => {
621
                let e = format!(
622
                    "execute stats topic_metrics request with status: {}",
623
                    resp.status()
624
                );
625
                return Err(ErrResp::ErrIntMsg(Some(e)));
626
            }
627
        },
628
    };
629
552
    stats.publish_rate = match resp_stats.metrics.messages_in_rate {
630
500
        None => 0.0,
631
52
        Some(rate) => rate,
632
    };
633
552
    stats.deliver_rate = match resp_stats.metrics.messages_out_rate {
634
500
        None => 0.0,
635
52
        Some(rate) => rate,
636
    };
637

            
638
552
    Ok(stats)
639
554
}