Lines
75.09 %
Functions
71.43 %
Branches
100 %
use std::{
collections::HashMap,
error::Error as StdError,
io::{Error as IoError, ErrorKind},
sync::Arc,
time::Duration,
};
use async_trait::async_trait;
use chrono::DateTime;
use log::{error, info, warn};
use serde::Deserialize;
use serde_json::{Map, Value};
use tokio::time;
use super::{super::config::DataData as DataMqConfig, new_data_queue, Connection};
use crate::models::{
application_dldata::{
ApplicationDlData, UpdateQueryCond as ApplicationDlDataCond,
Updates as ApplicationDlDataUpdate,
},
application_uldata::ApplicationUlData,
network_dldata::{
NetworkDlData, UpdateQueryCond as NetworkDlDataCond, Updates as NetworkDlDataUpdate,
network_uldata::NetworkUlData,
Model,
use general_mq::{
queue::{EventHandler, GmqQueue, Message, MessageHandler, Status},
Queue,
#[derive(Clone)]
struct DataHandler {
model: Arc<dyn Model>,
}
#[derive(Deserialize)]
#[serde(tag = "kind")]
enum RecvDataMsg {
#[serde(rename = "application-uldata")]
AppUlData { data: AppUlData },
#[serde(rename = "application-dldata")]
AppDlData { data: AppDlData },
#[serde(rename = "application-dldata-result")]
AppDlDataResult { data: AppDlDataResult },
#[serde(rename = "network-uldata")]
NetUlData { data: NetUlData },
#[serde(rename = "network-dldata")]
NetDlData { data: NetDlData },
#[serde(rename = "network-dldata-result")]
NetDlDataResult { data: NetDlDataResult },
struct AppUlData {
#[serde(rename = "dataId")]
data_id: String,
proc: String,
#[serde(rename = "pub")]
publish: String,
#[serde(rename = "unitCode")]
unit_code: Option<String>,
#[serde(rename = "networkCode")]
network_code: String,
#[serde(rename = "networkAddr")]
network_addr: String,
#[serde(rename = "unitId")]
unit_id: String,
#[serde(rename = "deviceId")]
device_id: String,
time: String,
profile: String,
data: String,
extension: Option<Map<String, Value>>,
struct AppDlData {
status: i32,
device_id: Option<String>,
network_code: Option<String>,
network_addr: Option<String>,
struct AppDlDataResult {
resp: String,
struct NetUlData {
unit_id: Option<String>,
struct NetDlData {
struct NetDlDataResult {
const QUEUE_NAME: &'static str = "broker.data";
/// Create a receive queue to receive data from `broker.data` queue.
pub fn new(
mq_conns: &mut HashMap<String, Connection>,
config: &DataMqConfig,
) -> Result<Queue, Box<dyn StdError>> {
let handler = Arc::new(DataHandler { model });
match new_data_queue(mq_conns, config, QUEUE_NAME, handler.clone(), handler) {
Err(e) => Err(Box::new(IoError::new(ErrorKind::Other, e))),
Ok(q) => Ok(q),
#[async_trait]
impl EventHandler for DataHandler {
async fn on_error(&self, queue: Arc<dyn GmqQueue>, err: Box<dyn StdError + Send + Sync>) {
const FN_NAME: &'static str = "DataHandler::on_error";
let queue_name = queue.name();
error!("[{}] {} error: {}", FN_NAME, queue_name, err);
async fn on_status(&self, queue: Arc<dyn GmqQueue>, status: Status) {
const FN_NAME: &'static str = "DataHandler::on_status";
match status {
Status::Connected => info!("[{}] {} connected", queue_name, FN_NAME),
_ => warn!("[{}] {} status to {:?}", FN_NAME, queue_name, status),
impl MessageHandler for DataHandler {
async fn on_message(&self, queue: Arc<dyn GmqQueue>, msg: Box<dyn Message>) {
const FN_NAME: &'static str = "DataHandler::on_message";
let data_msg = match serde_json::from_slice::<RecvDataMsg>(msg.payload()) {
Err(e) => {
let src_str: String = String::from_utf8_lossy(msg.payload()).into();
warn!(
"[{}] {} parse JSON error: {}, src: {}",
FN_NAME, queue_name, e, src_str
);
if let Err(e) = msg.ack().await {
error!("[{}] {} ACK error: {}", FN_NAME, queue_name, e);
return;
Ok(msg) => msg,
match data_msg {
RecvDataMsg::AppDlData { data } => {
let data = ApplicationDlData {
data_id: data.data_id,
proc: match DateTime::parse_from_rfc3339(data.proc.as_str()) {
"[{}] {} parse application_dldata proc \"{}\" error: {}",
FN_NAME, queue_name, data.proc, e
Ok(proc) => proc.into(),
resp: None,
status: data.status,
unit_id: data.unit_id,
device_id: data.device_id,
network_code: data.network_code,
network_addr: data.network_addr,
profile: data.profile,
data: data.data,
extension: data.extension,
let mut is_err = false;
if let Err(e) = self.model.application_dldata().add(&data).await {
error!(
"[{}] {} add application_dldata error: {}",
FN_NAME, queue_name, e
is_err = true;
if is_err {
time::sleep(Duration::from_secs(1)).await;
if let Err(e) = msg.nack().await {
error!("[{}] {} NACK error: {}", FN_NAME, queue_name, e);
RecvDataMsg::AppDlDataResult { data } => {
// FIXME: wait 1 second to wait for the associated dldata has been written in DB.
let cond = ApplicationDlDataCond {
data_id: data.data_id.as_str(),
let updates = ApplicationDlDataUpdate {
resp: match DateTime::parse_from_rfc3339(data.resp.as_str()) {
"[{}] {} parse application_dldata resp \"{}\" error: {}",
FN_NAME, queue_name, data.resp, e
Ok(resp) => resp.into(),
if let Err(e) = self
.model
.application_dldata()
.update(&cond, &updates)
.await
{
"[{}] {} update application_dldata error: {}",
RecvDataMsg::AppUlData { data } => {
let data = ApplicationUlData {
"[{}] {} parse application_uldata proc \"{}\" error: {}",
publish: match DateTime::parse_from_rfc3339(data.publish.as_str()) {
"[{}] {} parse application_uldata publish \"{}\" error: {}",
FN_NAME, queue_name, data.publish, e
Ok(publish) => publish.into(),
unit_code: data.unit_code,
time: match DateTime::parse_from_rfc3339(data.time.as_str()) {
"[{}] {} parse application_uldata time \"{}\" error: {}",
FN_NAME, queue_name, data.time, e
Ok(time) => time.into(),
if let Err(e) = self.model.application_uldata().add(&data).await {
"[{}] {} add application_uldata error: {}",
RecvDataMsg::NetDlData { data } => {
let data = NetworkDlData {
"[{}] {} parse network_dldata proc \"{}\" error: {}",
"[{}] {} parse network_dldata publish \"{}\" error: {}",
if let Err(e) = self.model.network_dldata().add(&data).await {
"[{}] {} add network_dldata error: {}",
RecvDataMsg::NetDlDataResult { data } => {
let cond = NetworkDlDataCond {
let updates = NetworkDlDataUpdate {
"[{}] {} parse network_dldata resp \"{}\" error: {}",
if let Err(e) = self.model.network_dldata().update(&cond, &updates).await {
"[{}] {} update network_dldata error: {}",
RecvDataMsg::NetUlData { data } => {
let data = NetworkUlData {
"[{}] {} parse network_uldata proc \"{}\" error: {}",
"[{}] {} parse network_uldata time \"{}\" error: {}",
if let Err(e) = self.model.network_uldata().add(&data).await {
"[{}] {} add network_uldata error: {}",