Lines
85.17 %
Functions
100 %
Branches
//! Wrapper APIs for controlling EMQX.
//!
//! - `hostname` of all APIs are host name or IP address of the broker.
use reqwest::{self, Client, Method, StatusCode};
use serde::{Deserialize, Serialize};
use sylvia_iot_corelib::{err::ErrResp, strings::randomstring};
use super::QueueType;
/// EMQX management information.
#[derive(Clone)]
pub struct ManagementOpts {
/// Management plugin API key.
pub api_key: String,
/// Management plugin API secret.
pub api_secret: String,
}
/// Statistics.
#[derive(Default)]
pub struct Stats {
/// Number of queue consumers.
pub consumers: usize,
/// Number of ready/unacked messages.
pub messages: usize,
/// Publish rate from the producer.
pub publish_rate: f64,
/// Deliver rate to the consumer.
pub deliver_rate: f64,
#[derive(Deserialize)]
struct Meta {
count: usize,
#[derive(Serialize)]
struct PostAuthUsersBody<'a> {
user_id: &'a str,
password: &'a str,
is_superuser: bool,
struct PutAuthUsersBody<'a> {
struct PostAclBodyItem<'a> {
username: &'a str,
rules: Vec<PostAclRuleItem<'a>>,
#[derive(Clone, Serialize)]
struct PostAclRuleItem<'a> {
topic: String,
action: &'a str,
permission: &'a str,
struct PostPublishBody<'a> {
clientid: String,
payload: String,
payload_encoding: &'a str,
qos: usize,
struct PostTopicMetricsBody {
struct GetSubscriptionsResBody {
meta: Meta,
#[derive(Default, Deserialize)]
struct GetTopicMetricsResBody {
metrics: TopicMetrics,
struct TopicMetrics {
#[serde(rename = "messages.in.rate")]
messages_in_rate: Option<f64>,
#[serde(rename = "messages.out.rate")]
messages_out_rate: Option<f64>,
struct ErrResBody {
code: String,
message: Option<String>,
/// Authenticator ID.
const AUTH_ID: &'static str = "password_based:built_in_database";
/// To create an account.
pub async fn post_user(
client: &Client,
opts: &ManagementOpts,
hostname: &str,
username: &str,
password: &str,
) -> Result<(), ErrResp> {
let uri = format!(
"http://{}:18083/api/v5/authentication/{}/users",
hostname, AUTH_ID
);
let req = match client
.request(Method::POST, uri)
.basic_auth(opts.api_key.as_str(), Some(opts.api_secret.as_str()))
.json(&PostAuthUsersBody {
user_id: username,
password,
is_superuser,
})
.build()
{
Err(e) => {
let e = format!("generate user request error: {}", e);
return Err(ErrResp::ErrRsc(Some(e)));
Ok(req) => req,
};
match client.execute(req).await {
let e = format!("execute user request error: {}", e);
Err(ErrResp::ErrIntMsg(Some(e)))
Ok(resp) => match resp.status() {
StatusCode::CREATED => Ok(()),
StatusCode::CONFLICT => put_user(client, opts, hostname, username, password).await,
_ => {
let e = format!("execute user request with status: {}", resp.status());
},
/// To update the user's password.
pub async fn put_user(
"http://{}:18083/api/v5/authentication/{}/users/{}",
hostname, AUTH_ID, username
.request(Method::PUT, uri)
.json(&PutAuthUsersBody { password })
StatusCode::OK => Ok(()),
/// To delete a user.
pub async fn delete_user(
.request(Method::DELETE, uri)
StatusCode::NO_CONTENT | StatusCode::NOT_FOUND => Ok(()),
/// To create an ACL rule of a topic for the user.
pub async fn post_acl(
q_type: QueueType,
"http://{}:18083/api/v5/authorization/sources/built_in_database/rules/users",
hostname
let rules = match q_type {
QueueType::Application => vec![
PostAclRuleItem {
topic: format!("broker.{}.uldata", username),
action: "subscribe",
permission: "allow",
topic: format!("broker.{}.dldata", username),
action: "publish",
topic: format!("broker.{}.dldata-resp", username),
topic: format!("broker.{}.dldata-result", username),
],
QueueType::Network => vec![
topic: format!("broker.{}.ctrl", username),
.request(Method::POST, uri.clone())
.json(&vec![PostAclBodyItem {
username,
rules: rules.clone(),
}])
let e = format!("generate acl request error: {}", e);
let e = format!("execute acl request error: {}", e);
return Err(ErrResp::ErrIntMsg(Some(e)));
StatusCode::NO_CONTENT => return Ok(()),
StatusCode::CONFLICT => (),
let e = format!("execute acl request with status: {}", resp.status());
.request(Method::PUT, format!("{}/{}", uri, username))
.json(&PostAclBodyItem { username, rules })
let e = format!("generate put acl request error: {}", e);
let e = format!("execute put acl request error: {}", e);
StatusCode::NO_CONTENT => Ok(()),
let e = format!("execute put acl request with status: {}", resp.status());
/// To delete an ACL rule of a group of topics of an application/network for the user.
pub async fn delete_acl(
"http://{}:18083/api/v5/authorization/sources/built_in_database/rules/users/{}",
hostname, username
/// To publish a message to the specified queue (such as `uldata` and `dldata`).
///
/// The `payload` MUST be Base64 encoded string.
pub async fn publish_message(
queue: &str, // uldata,dldata
payload: String, // Base64
let uri = format!("http://{}:18083/api/v5/publish", hostname);
let body = PostPublishBody {
topic: format!("broker.{}.{}", username, queue),
clientid: format!("sylvia-{}", randomstring(12)),
payload,
payload_encoding: "base64",
qos: 0,
.json(&body)
let e = format!("generate publish request error: {}", e);
let e = format!("execute publish request error: {}", e);
StatusCode::OK | StatusCode::ACCEPTED => Ok(()), // 200 for <= 5.0.8, 202 for >= 5.0.9
let e = format!("execute publish request with status: {}", resp.status());
/// To enable metrics for a queue.
pub async fn post_topic_metrics(
let uri = format!("http://{}:18083/api/v5/mqtt/topic_metrics", hostname);
let q_name_prefix = format!("broker.{}.", username);
let queues = match q_type {
QueueType::Application => vec!["uldata", "dldata", "dldata-resp", "dldata-result"],
QueueType::Network => vec!["uldata", "dldata", "dldata-result", "ctrl"],
for queue in queues {
.request(Method::POST, uri.as_str())
.json(&PostTopicMetricsBody {
topic: format!("{}{}", q_name_prefix, queue),
let e = format!("generate topic_metrics request error: {}", e);
let e = format!("execute topic_metrics request error: {}", e);
StatusCode::OK => (),
StatusCode::BAD_REQUEST => {
match resp.json::<ErrResBody>().await {
let e = format!("execute topic_metrics read 400 body error: {}", e);
Ok(body) => match body.code.as_str() {
"BAD_TOPIC" => (),
let e = format!(
"execute topic_metrics request with unexpected 400 code: {}, message: {:?}",
body.code, body.message
"execute topic_metrics request with status: {}",
resp.status()
Ok(())
/// To disable metrics for a queue.
pub async fn delete_topic_metrics(
let uri_prefix = format!(
"http://{}:18083/api/v5/mqtt/topic_metrics/broker.{}.",
.request(Method::DELETE, format!("{}{}", uri_prefix, queue).as_str())
StatusCode::NO_CONTENT | StatusCode::NOT_FOUND => (),
/// Get statistics of a queue.
pub async fn stats(
queue: &str, // uldata,dldata,dldata-resp,dldata-result,ctrl
) -> Result<Stats, ErrResp> {
let queue_name = format!("broker.{}.{}", username, queue);
"http://{}:18083/api/v5/subscriptions?topic={}",
hostname, queue_name
.request(Method::GET, uri)
let e = format!("generate stats subscriptions request error: {}", e);
let resp = match client.execute(req).await {
let e = format!("execute stats subscriptions request error: {}", e);
StatusCode::OK => resp,
"execute stats subscriptions request with status: {}",
let resp_stats = match resp.json::<GetSubscriptionsResBody>().await {
let e = format!("read stats subscriptions body error: {}", e);
Ok(stats) => stats,
let mut stats = Stats {
consumers: resp_stats.meta.count,
..Default::default()
"http://{}:18083/api/v5/mqtt/topic_metrics/{}",
.request(Method::GET, uri.as_str())
let e = format!("generate stats topic_metrics request error: {}", e);
let resp_stats = match client.execute(req).await {
let e = format!("execute stats topic_metrics request error: {}", e);
StatusCode::OK => match resp.json::<GetTopicMetricsResBody>().await {
let e = format!("read stats topic_metrics body error: {}", e);
StatusCode::NOT_FOUND => GetTopicMetricsResBody::default(),
"execute stats topic_metrics request with status: {}",
stats.publish_rate = match resp_stats.metrics.messages_in_rate {
None => 0.0,
Some(rate) => rate,
stats.deliver_rate = match resp_stats.metrics.messages_out_rate {
Ok(stats)