Lines
82.87 %
Functions
100 %
Branches
use std::error::Error as StdError;
use axum::{
body::Body,
extract::{Request, State},
http::{header, HeaderMap, HeaderValue, StatusCode},
response::{IntoResponse, Response},
routing, Router,
};
use base64::{engine::general_purpose, Engine};
use bytes::{Bytes, BytesMut};
use csv::WriterBuilder;
use futures_util::StreamExt;
use hex;
use log::error;
use reqwest;
use serde::{Deserialize, Serialize};
use serde_json::{Deserializer, Map, Value};
use url::Url;
use sylvia_iot_corelib::{
err::ErrResp,
http::{Json, Path},
strings,
use super::{
super::{AmqpState, ErrReq, MqttState, State as AppState},
api_bridge, clear_patch_host, clear_queue_rsc, cmp_host_uri, create_queue_rsc,
get_device_inner, get_stream_resp, get_unit_inner, list_api_bridge, request, response,
transfer_host_uri, trunc_host_uri, ClearQueueResource, CreateQueueResource, ListResp,
PatchHost,
use crate::libs::mq::{self, emqx, rabbitmq, QueueType};
enum ListFormat {
Array,
Csv,
Data,
}
#[derive(Deserialize)]
struct ApplicationIdPath {
application_id: String,
#[derive(Deserialize, Serialize)]
struct Application {
#[serde(rename = "applicationId")]
code: String,
#[serde(rename = "unitId")]
unit_id: String,
#[serde(rename = "unitCode")]
unit_code: String,
#[serde(rename = "createdAt")]
created_at: String,
#[serde(rename = "modifiedAt")]
modified_at: String,
#[serde(rename = "hostUri")]
host_uri: String,
name: String,
info: Map<String, Value>,
struct CsvItem {
info: Option<String>,
/// Downlink data from application to broker.
#[derive(Default, Serialize)]
struct DlData {
#[serde(rename = "correlationId")]
correlation_id: String,
#[serde(rename = "deviceId")]
device_id: Option<String>,
#[serde(rename = "networkCode")]
network_code: Option<String>,
#[serde(rename = "networkAddr")]
network_addr: Option<String>,
data: String,
extension: Option<Map<String, Value>>,
const CSV_FIELDS: &'static [u8] =
b"\xEF\xBB\xBFapplicationId,code,unitId,unitCode,createdAt,modifiedAt,hostUri,name,info\n";
pub fn new_service(scope_path: &str, state: &AppState) -> Router {
Router::new().nest(
scope_path,
Router::new()
.route("/", routing::post(post_application))
.route("/count", routing::get(get_application_count))
.route("/list", routing::get(get_application_list))
.route(
"/:application_id",
routing::get(get_application)
.patch(patch_application)
.delete(delete_application),
)
"/:application_id/stats",
routing::get(get_application_stats),
"/:application_id/dldata",
routing::post(post_application_dldata),
.with_state(state.clone()),
/// `POST /{base}/api/v1/application`
async fn post_application(
State(state): State<AppState>,
mut headers: HeaderMap,
Json(mut body): Json<request::PostApplicationBody>,
) -> impl IntoResponse {
const FN_NAME: &'static str = "post_application";
let broker_base = state.broker_base.as_str();
let api_path = format!("{}/api/v1/application", broker_base);
let client = state.client.clone();
let token = match headers.get(header::AUTHORIZATION) {
None => {
let e = "missing Authorization".to_string();
return ErrResp::ErrParam(Some(e)).into_response();
Some(value) => value.clone(),
// Get unit information to create queue information.
let unit_id = body.data.unit_id.as_str();
if unit_id.len() == 0 {
return ErrResp::ErrParam(Some(
"`unitId` must with at least one character".to_string(),
))
.into_response();
let unit = match get_unit_inner(FN_NAME, &client, broker_base, unit_id, &token).await {
Err(e) => return e,
Ok(unit) => match unit {
return ErrResp::Custom(ErrReq::UNIT_NOT_EXIST.0, ErrReq::UNIT_NOT_EXIST.1, None)
.into_response()
Some(unit) => unit,
},
let unit_code = unit.code.as_str();
let code = body.data.code.as_str();
if !strings::is_code(code) {
"`code` must be [A-Za-z0-9]{1}[A-Za-z0-9-_]*".to_string(),
match check_application_code_inner(FN_NAME, &client, broker_base, unit_id, code, &token).await {
Ok(count) => match count {
0 => (),
_ => {
return ErrResp::Custom(
ErrReq::APPLICATION_EXIST.0,
ErrReq::APPLICATION_EXIST.1,
None,
let q_type = QueueType::Application;
let username = mq::to_username(q_type, unit_code, code);
let password = strings::randomstring(8);
let uri = match Url::parse(body.data.host_uri.as_str()) {
Err(e) => {
return ErrResp::ErrParam(Some(format!("invalid `hostUri`: {}", e))).into_response();
Ok(uri) => uri,
let host = match uri.host() {
let e = "invalid `hostUri`".to_string();
Some(host) => host.to_string(),
let scheme = uri.scheme();
let host = host.as_str();
let username = username.as_str();
let password = password.as_str();
// Create message broker resources.
let create_rsc = CreateQueueResource {
scheme,
host,
username,
password,
ttl: body.data.ttl,
length: body.data.length,
q_type: QueueType::Application,
if let Err(e) = create_queue_rsc(FN_NAME, &state, &create_rsc).await {
return e;
let clear_rsc = ClearQueueResource {
// Create application instance.
let mut body_uri = uri.clone();
transfer_host_uri(&state, &mut body_uri, username);
body.data.host_uri = body_uri.to_string();
headers.remove(header::CONTENT_LENGTH);
let builder = client
.request(reqwest::Method::POST, api_path)
.headers(headers)
.json(&body);
let api_req = match builder.build() {
let _ = clear_queue_rsc(FN_NAME, &state, &clear_rsc);
let e = format!("generate request error: {}", e);
error!("[{}] {}", FN_NAME, e);
return ErrResp::ErrRsc(Some(e)).into_response();
Ok(req) => req,
let api_resp = match client.execute(api_req).await {
let e = format!("execute request error: {}", e);
return ErrResp::ErrIntMsg(Some(e)).into_response();
Ok(resp) => match resp.status() {
reqwest::StatusCode::OK => resp,
let mut resp_builder = Response::builder().status(resp.status());
for (k, v) in resp.headers() {
resp_builder = resp_builder.header(k, v);
match resp_builder.body(Body::from_stream(resp.bytes_stream())) {
let e = format!("wrap response body error: {}", e);
Ok(resp) => return resp,
let mut body = match api_resp.json::<response::PostApplication>().await {
let e = format!("unexpected response: {}", e);
return ErrResp::ErrUnknown(Some(e)).into_response();
Ok(body) => body,
body.data.password = Some(password.to_string());
Json(&body).into_response()
/// `GET /{base}/api/v1/application/count`
async fn get_application_count(state: State<AppState>, req: Request) -> impl IntoResponse {
const FN_NAME: &'static str = "get_application_count";
let api_path = format!("{}/api/v1/application/count", state.broker_base.as_str());
api_bridge(FN_NAME, &client, req, api_path.as_str()).await
/// `GET /{base}/api/v1/application/list`
async fn get_application_list(state: State<AppState>, req: Request) -> impl IntoResponse {
const FN_NAME: &'static str = "get_application_list";
let api_path = format!("{}/api/v1/application/list", state.broker_base.as_str());
let api_path = api_path.as_str();
let mut list_format = ListFormat::Data;
if let Some(query_str) = req.uri().query() {
let query = match serde_urlencoded::from_str::<Vec<(String, String)>>(query_str) {
let e = format!("parse query error: {}", e);
Ok(query) => query,
for (k, v) in query.iter() {
if k.as_str().eq("format") {
if v.as_str().eq("array") {
list_format = ListFormat::Array;
} else if v.as_str().eq("csv") {
list_format = ListFormat::Csv;
let (api_resp, resp_builder) =
match list_api_bridge(FN_NAME, &client, req, api_path, true, "application").await {
ListResp::Axum(resp) => return resp,
ListResp::ArrayStream(api_resp, resp_builder) => (api_resp, resp_builder),
let mut resp_stream = api_resp.bytes_stream();
let body = Body::from_stream(async_stream::stream! {
match list_format {
ListFormat::Array => yield Ok(Bytes::from("[")),
ListFormat::Csv => yield Ok(Bytes::from(CSV_FIELDS)),
ListFormat::Data => yield Ok(Bytes::from("{\"data\":[")),
let mut first_sent = false;
let mut buffer = BytesMut::new();
while let Some(body) = resp_stream.next().await {
match body {
error!("[{}] get body error: {}", FN_NAME, e);
let err: Box<dyn StdError + Send + Sync> = Box::new(e);
yield Err(err);
break;
Ok(body) => buffer.extend_from_slice(&body[..]),
let mut json_stream = Deserializer::from_slice(&buffer[..]).into_iter::<Application>();
let mut index = 0;
let mut finish = false;
loop {
if let Some(Ok(mut v)) = json_stream.next() {
v.host_uri = match Url::parse(v.host_uri.as_str()) {
error!("[{}] parse body hostUri error: {}", FN_NAME, e);
finish = true;
Ok(uri) => trunc_host_uri(&uri),
ListFormat::Array | ListFormat::Data => match serde_json::to_string(&v) {
Err(e) =>{
error!("[{}] serialize JSON error: {}", FN_NAME, e);
Ok(v) => {
match first_sent {
false => first_sent = true,
true => yield Ok(Bytes::from(",")),
yield Ok(Bytes::copy_from_slice(v.as_str().as_bytes()));
ListFormat::Csv => {
let mut item = CsvItem{
application_id: v.application_id,
code: v.code,
unit_id: v.unit_id,
unit_code: v.unit_code,
created_at: v.created_at,
modified_at: v.modified_at,
host_uri: v.host_uri,
name: v.name,
info: None,
if let Ok(info_str) = serde_json::to_string(&v.info) {
item.info = Some(info_str);
let mut writer =
WriterBuilder::new().has_headers(false).from_writer(vec![]);
if let Err(e) = writer.serialize(item) {
error!("[{}] serialize CSV error: {}", FN_NAME, e);
match writer.into_inner() {
error!("[{}] serialize bytes error: {}", FN_NAME, e);
Ok(row) => yield Ok(Bytes::copy_from_slice(row.as_slice())),
continue;
let offset = json_stream.byte_offset();
if buffer.len() <= index + offset {
index = buffer.len();
match buffer[index+offset] {
b'[' | b',' => {
index += offset + 1;
if buffer.len() <= index {
json_stream =
Deserializer::from_slice(&buffer[index..]).into_iter::<Application>();
b']' => {
_ => break,
if finish {
ListFormat::Array => yield Ok(Bytes::from("]")),
ListFormat::Csv => (),
ListFormat::Data => yield Ok(Bytes::from("]}")),
buffer = buffer.split_off(index);
});
match resp_builder.body(body) {
Err(e) => ErrResp::ErrRsc(Some(e.to_string())).into_response(),
Ok(resp) => resp,
/// `GET /{base}/api/v1/application/{applicationId}`
async fn get_application(
state: State<AppState>,
Path(param): Path<ApplicationIdPath>,
req: Request,
const FN_NAME: &'static str = "get_application";
let token = match req.headers().get(header::AUTHORIZATION) {
let (mut application, uri, host) = match get_application_inner(
FN_NAME,
&client,
broker_base,
param.application_id.as_str(),
&token,
.await
{
Ok((application, uri, host)) => (application, uri, host),
if scheme.eq("amqp") || scheme.eq("amqps") {
let AmqpState::RabbitMq(opts) = &state.amqp;
let username = mq::to_username(
QueueType::Application,
application.unit_code.as_str(),
application.code.as_str(),
);
match rabbitmq::get_policies(&client, opts, host, username).await {
error!("[{}] get {} policies error: {}", FN_NAME, username, e);
return e.into_response();
Ok(policies) => {
application.ttl = policies.ttl;
application.length = policies.length;
application.host_uri = trunc_host_uri(&uri);
Json(&response::GetApplication { data: application }).into_response()
/// `PATCH /{base}/api/v1/application/{applicationId}`
async fn patch_application(
headers: HeaderMap,
Json(body): Json<request::PatchApplicationBody>,
const FN_NAME: &'static str = "patch_application";
let data = &body.data;
if data.host_uri.is_none()
&& data.name.is_none()
&& data.info.is_none()
&& data.ttl.is_none()
&& data.length.is_none()
&& data.password.is_none()
return ErrResp::ErrParam(Some("at least one parameter".to_string())).into_response();
let (application, uri, hostname) = match get_application_inner(
Ok((application, uri, hostname)) => (application, uri, hostname),
let mut patch_data = request::PatchApplicationData {
name: data.name.clone(),
info: data.info.clone(),
..Default::default()
let mut patch_host: Option<PatchHost> = None;
if let Some(host) = data.host_uri.as_ref() {
if !strings::is_uri(host) {
return ErrResp::ErrParam(Some("invalid `hostUri`".to_string())).into_response();
// Change to the new broker host.
if !cmp_host_uri(application.host_uri.as_str(), host.as_str()) {
let password = match data.password.as_ref() {
let e = "missing `password`".to_string();
Some(password) => match password.len() {
0 => {
_ => password,
let mut new_host_uri = match Url::parse(host.as_str()) {
let e = format!("invalid `hostUri`: {}", e);
Ok(uri) => match uri.host_str() {
Some(_) => uri,
let unit_code = application.unit_code.as_str();
let code = application.code.as_str();
let username = mq::to_username(QueueType::Application, unit_code, code);
let resource = CreateQueueResource {
scheme: new_host_uri.scheme(),
host: new_host_uri.host_str().unwrap(),
username: username.as_str(),
password: password.as_str(),
ttl: data.ttl,
length: data.length,
if let Err(e) = create_queue_rsc(FN_NAME, &state, &resource).await {
transfer_host_uri(&state, &mut new_host_uri, username.as_str());
patch_data.host_uri = Some(new_host_uri.to_string());
patch_host = Some(PatchHost {
host_uri: new_host_uri,
// Send request body to the sylvia-iot-broker.
if patch_data.host_uri.is_some() || patch_data.name.is_some() || patch_data.info.is_some() {
let application_id = param.application_id.as_str();
let uri = format!("{}/api/v1/application/{}", broker_base, application_id);
let mut builder = client
.request(reqwest::Method::PATCH, uri)
.header(reqwest::header::AUTHORIZATION, &token)
.json(&request::PatchApplicationBody { data: patch_data });
if let Some(content_type) = headers.get(header::CONTENT_TYPE) {
builder = builder.header(reqwest::header::CONTENT_TYPE, content_type);
clear_patch_host(FN_NAME, &state, &patch_host).await;
let status_code = api_resp.status();
if status_code != StatusCode::NO_CONTENT {
let mut resp_builder = Response::builder().status(status_code);
for (k, v) in api_resp.headers() {
match resp_builder.body(Body::from_stream(api_resp.bytes_stream())) {
if let Some(host) = patch_host {
let resource = ClearQueueResource {
scheme: uri.scheme(),
host: uri.host_str().unwrap(),
username: host.username.as_str(),
let _ = clear_queue_rsc(FN_NAME, &state, &resource).await;
return StatusCode::NO_CONTENT.into_response();
} else if data.ttl.is_none() && data.length.is_none() && data.password.is_none() {
// Update broker information without changing hostUri.
if let Some(password) = data.password.as_ref() {
if password.len() == 0 {
let hostname = hostname.as_str();
match uri.scheme() {
"amqp" | "amqps" => match &state.amqp {
AmqpState::RabbitMq(opts) => {
if data.ttl.is_some() || data.length.is_some() {
let policies = rabbitmq::BrokerPolicies {
if let Err(e) =
rabbitmq::put_policies(&client, opts, hostname, username, &policies).await
let e = format!("patch RabbitMQ error: {}", e);
rabbitmq::put_user(&client, opts, hostname, username, password).await
let e = format!("patch RabbitMQ password error: {}", e);
"mqtt" | "mqtts" => match &state.mqtt {
MqttState::Emqx(opts) => {
emqx::put_user(&client, opts, hostname, username, password).await
let e = format!("patch EMQX password error: {}", e);
MqttState::Rumqttd => {}
_ => {}
StatusCode::NO_CONTENT.into_response()
/// `DELETE /{base}/api/v1/application/{applicationId}`
async fn delete_application(
const FN_NAME: &'static str = "delete_application";
let api_path = format!("{}/api/v1/application/{}", broker_base, application_id);
let (application, uri, host) =
match get_application_inner(FN_NAME, &client, broker_base, application_id, &token).await {
let resp = api_bridge(FN_NAME, &client, req, api_path.as_str()).await;
if !resp.status().is_success() {
return resp;
host: host.as_str(),
if let Err(e) = clear_queue_rsc(FN_NAME, &state, &clear_rsc).await {
/// `GET /{base}/api/v1/application/{applicationId}/stats`
async fn get_application_stats(
let (application, uri, host) = match get_application_inner(
let data = match scheme {
"amqp" | "amqps" => {
response::GetApplicationStatsData {
uldata: match rabbitmq::stats(&client, opts, host, username, "uldata").await {
Err(ErrResp::ErrNotFound(_)) => response::Stats {
consumers: 0,
messages: 0,
publish_rate: 0.0,
deliver_rate: 0.0,
error!("[{}] get uldata stats error: {}", FN_NAME, e);
Ok(stats) => response::Stats {
consumers: stats.consumers,
messages: stats.messages,
publish_rate: stats.publish_rate,
deliver_rate: stats.deliver_rate,
dldata_resp: match rabbitmq::stats(&client, opts, host, username, "dldata-resp")
error!("[{}] get dldata-resp stats error: {}", FN_NAME, e);
dldata_result: match rabbitmq::stats(&client, opts, host, username, "dldata-result")
error!("[{}] get dldata-result stats error: {}", FN_NAME, e);
uldata: match emqx::stats(&client, opts, host, username, "uldata").await {
dldata_resp: match emqx::stats(&client, opts, host, username, "dldata-resp")
dldata_result: match emqx::stats(&client, opts, host, username, "dldata-result")
MqttState::Rumqttd => response::GetApplicationStatsData {
uldata: response::Stats {
dldata_resp: response::Stats {
dldata_result: response::Stats {
let e = format!("unsupport scheme {}", scheme);
Json(&response::GetApplicationStats { data }).into_response()
/// `POST /{base}/api/v1/application/{applicationId}/dldata`
async fn post_application_dldata(
Json(body): Json<request::PostApplicationDlDataBody>,
const FN_NAME: &'static str = "post_application_dldata";
if body.data.device_id.len() == 0 {
let e = "empty `deviceId` is invalid".to_string();
if let Err(e) = hex::decode(body.data.payload.as_str()) {
let e = format!("`payload` is not hexadecimal string: {}", e);
match get_device_inner(
body.data.device_id.as_str(),
Ok(device) => match device {
ErrReq::DEVICE_NOT_EXIST.0,
ErrReq::DEVICE_NOT_EXIST.1,
Some(_) => (),
let payload = match serde_json::to_string(&DlData {
correlation_id: "1".to_string(),
device_id: Some(body.data.device_id.clone()),
data: body.data.payload.clone(),
}) {
let e = format!("encode JSON error: {}", e);
Ok(payload) => general_purpose::STANDARD.encode(payload),
match scheme {
rabbitmq::publish_message(&client, opts, hostname, username, "dldata", payload)
emqx::publish_message(&client, opts, hostname, username, "dldata", payload)
MqttState::Rumqttd => {
let e = "not support now".to_string();
async fn get_application_inner(
fn_name: &str,
client: &reqwest::Client,
broker_base: &str,
application_id: &str,
token: &HeaderValue,
) -> Result<(response::GetApplicationData, Url, String), Response> {
let resp = get_stream_resp(fn_name, token, &client, uri.as_str()).await?;
let application = match resp.json::<response::GetApplication>().await {
let e = format!("wrong response of application: {}", e);
error!("[{}] {}", fn_name, e);
return Err(ErrResp::ErrIntMsg(Some(e)).into_response());
Ok(application) => application.data,
let uri = match Url::parse(application.host_uri.as_str()) {
let e = format!("unexpected hostUri: {}", e);
return Err(ErrResp::ErrUnknown(Some(e)).into_response());
let host = match uri.host_str() {
let e = "unexpected hostUri".to_string();
Ok((application, uri, host))
async fn check_application_code_inner(
unit_id: &str,
code: &str,
) -> Result<u64, Response> {
let uri = format!("{}/api/v1/application/count", broker_base);
let req = match client
.request(reqwest::Method::GET, uri)
.header(reqwest::header::AUTHORIZATION, token)
.query(&[("unit", unit_id), ("code", code)])
.build()
return Err(ErrResp::ErrRsc(Some(e)).into_response());
let resp = match client.execute(req).await {
match resp.json::<response::GetCount>().await {
Err(ErrResp::ErrIntMsg(Some(e)).into_response())
Ok(data) => Ok(data.data.count),