1
use std::{
2
    collections::HashMap,
3
    error::Error as StdError,
4
    io::{Error as IoError, ErrorKind},
5
    sync::{Arc, Mutex},
6
};
7

            
8
use async_trait::async_trait;
9
use axum::Router;
10
use log::{error, info, warn};
11
use reqwest;
12
use url::Url;
13

            
14
use general_mq::{
15
    Queue,
16
    queue::{EventHandler as QueueEventHandler, GmqQueue, Message, MessageHandler, Status},
17
};
18
use sylvia_iot_corelib::constants::MqEngine;
19

            
20
use crate::libs::{
21
    config::{self, Config},
22
    mq::{
23
        self, Connection, emqx::ManagementOpts as EmqxOpts,
24
        rabbitmq::ManagementOpts as RabbitMqOpts,
25
    },
26
};
27

            
28
pub mod middleware;
29
mod v1;
30

            
31
/// The resources used by this service.
32
#[derive(Clone)]
33
pub struct State {
34
    /// The scope root path for the service.
35
    ///
36
    /// For example `/coremgr`, the APIs are
37
    /// - `http://host:port/coremgr/api/v1/user/xxx`
38
    /// - `http://host:port/coremgr/api/v1/unit/xxx`
39
    pub scope_path: &'static str,
40
    /// The sylvia-iot-auth base API path with host.
41
    ///
42
    /// For example, `http://localhost:1080/auth`.
43
    pub auth_base: String,
44
    /// The sylvia-iot-broker base API path with host.
45
    ///
46
    /// For example, `http://localhost:2080/broker`.
47
    pub broker_base: String,
48
    /// The client for internal HTTP requests.
49
    pub client: reqwest::Client,
50
    /// AMQP broker management information.
51
    pub amqp: AmqpState,
52
    /// MQTT broker management information.
53
    pub mqtt: MqttState,
54
    /// Queue connections. Key is uri.
55
    pub mq_conns: Arc<Mutex<HashMap<String, Connection>>>,
56
    /// Data channel sender.
57
    pub data_sender: Option<Queue>,
58
}
59

            
60
/// AMQP broker management information.
61
#[derive(Clone)]
62
pub enum AmqpState {
63
    /// For RabbitMQ.
64
    RabbitMq(RabbitMqOpts),
65
}
66

            
67
/// MQTT broker management information.
68
#[derive(Clone)]
69
pub enum MqttState {
70
    /// For EMQX.
71
    Emqx(EmqxOpts),
72
    /// For rumqttd.
73
    Rumqttd,
74
}
75

            
76
/// The sylvia-iot module specific error codes in addition to standard
77
/// [`sylvia_iot_corelib::err::ErrResp`].
78
pub struct ErrReq;
79

            
80
struct DataSenderHandler;
81

            
82
impl ErrReq {
83
    pub const APPLICATION_EXIST: (u16, &'static str) = (400, "err_broker_application_exist");
84
    pub const DEVICE_NOT_EXIST: (u16, &'static str) = (400, "err_broker_device_not_exist");
85
    pub const NETWORK_EXIST: (u16, &'static str) = (400, "err_broker_network_exist");
86
    pub const UNIT_NOT_EXIST: (u16, &'static str) = (400, "err_broker_unit_not_exist");
87
}
88

            
89
#[async_trait]
90
impl QueueEventHandler for DataSenderHandler {
91
    async fn on_error(&self, queue: Arc<dyn GmqQueue>, err: Box<dyn StdError + Send + Sync>) {
92
        const FN_NAME: &'static str = "DataSenderHandler::on_error";
93
        let queue_name = queue.name();
94
        error!("[{}] {} error: {}", FN_NAME, queue_name, err);
95
    }
96

            
97
12
    async fn on_status(&self, queue: Arc<dyn GmqQueue>, status: Status) {
98
        const FN_NAME: &'static str = "DataSenderHandler::on_status";
99
        let queue_name = queue.name();
100

            
101
        match status {
102
            Status::Connected => info!("[{}] {} connected", queue_name, FN_NAME),
103
            _ => warn!("[{}] {} status to {:?}", FN_NAME, queue_name, status),
104
        }
105
12
    }
106
}
107

            
108
#[async_trait]
109
impl MessageHandler for DataSenderHandler {
110
    async fn on_message(&self, _queue: Arc<dyn GmqQueue>, _msg: Box<dyn Message>) {}
111
}
112

            
113
/// To create resources for the service.
114
12
pub async fn new_state(
115
12
    scope_path: &'static str,
116
12
    conf: &Config,
117
12
) -> Result<State, Box<dyn StdError>> {
118
12
    let conf = config::apply_default(conf);
119
12
    let auth_base = conf.auth.as_ref().unwrap().clone();
120
12
    let broker_base = conf.broker.as_ref().unwrap().clone();
121
12
    let mq_engine = conf.mq.as_ref().unwrap().engine.as_ref().unwrap();
122
12
    let amqp = {
123
12
        let rabbitmq = conf.mq.as_ref().unwrap().rabbitmq.as_ref().unwrap();
124
12
        AmqpState::RabbitMq(RabbitMqOpts {
125
12
            username: rabbitmq.username.as_ref().unwrap().clone(),
126
12
            password: rabbitmq.password.as_ref().unwrap().clone(),
127
12
            ttl: rabbitmq.ttl,
128
12
            length: rabbitmq.length,
129
12
        })
130
    };
131
12
    let mqtt = match mq_engine.mqtt.as_ref().unwrap().as_str() {
132
12
        MqEngine::RUMQTTD => MqttState::Rumqttd,
133
        _ => {
134
8
            let emqx = conf.mq.as_ref().unwrap().emqx.as_ref().unwrap();
135
8
            MqttState::Emqx(EmqxOpts {
136
8
                api_key: emqx.api_key.as_ref().unwrap().clone(),
137
8
                api_secret: emqx.api_secret.as_ref().unwrap().clone(),
138
8
            })
139
        }
140
    };
141
12
    let mq_conns = Arc::new(Mutex::new(HashMap::new()));
142
12
    let ch_conf = conf.mq_channels.as_ref().unwrap();
143
12
    let data_sender = match ch_conf.data.as_ref() {
144
6
        None => None,
145
6
        Some(conf) => match conf.url.as_ref() {
146
            None => None,
147
6
            Some(_) => Some(new_data_sender(&mq_conns, conf)?),
148
        },
149
    };
150
12
    let state = State {
151
12
        scope_path: match scope_path.len() {
152
            0 => "/",
153
12
            _ => scope_path,
154
        },
155
12
        auth_base,
156
12
        broker_base,
157
12
        client: reqwest::Client::new(),
158
12
        amqp,
159
12
        mqtt,
160
12
        mq_conns,
161
12
        data_sender,
162
    };
163
12
    Ok(state)
164
12
}
165

            
166
/// To register service URIs in the specified root path.
167
506
pub fn new_service(state: &State) -> Router {
168
506
    let auth_uri = format!("{}/api/v1/auth/tokeninfo", state.auth_base.as_str());
169
506
    Router::new().nest(
170
506
        &state.scope_path,
171
506
        Router::new()
172
506
            .merge(v1::auth::new_service("/api/v1/auth", state))
173
506
            .merge(v1::user::new_service("/api/v1/user", state))
174
506
            .merge(v1::client::new_service("/api/v1/client", state))
175
506
            .merge(v1::unit::new_service("/api/v1/unit", state))
176
506
            .merge(v1::application::new_service("/api/v1/application", state))
177
506
            .merge(v1::network::new_service("/api/v1/network", state))
178
506
            .merge(v1::device::new_service("/api/v1/device", state))
179
506
            .merge(v1::device_route::new_service("/api/v1/device-route", state))
180
506
            .merge(v1::network_route::new_service(
181
506
                "/api/v1/network-route",
182
506
                state,
183
            ))
184
506
            .merge(v1::dldata_buffer::new_service(
185
506
                "/api/v1/dldata-buffer",
186
506
                state,
187
            ))
188
506
            .layer(middleware::LogService::new(
189
506
                state.client.clone(),
190
506
                auth_uri,
191
506
                state.data_sender.clone(),
192
            )),
193
    )
194
506
}
195

            
196
/// Create data channel sender queue.
197
6
fn new_data_sender(
198
6
    conn_pool: &Arc<Mutex<HashMap<String, Connection>>>,
199
6
    config: &config::CoremgrData,
200
6
) -> Result<Queue, Box<dyn StdError>> {
201
6
    let url = match config.url.as_ref() {
202
        None => {
203
            return Err(Box::new(IoError::new(
204
                ErrorKind::InvalidInput,
205
                "empty control url",
206
            )));
207
        }
208
6
        Some(url) => match Url::parse(url.as_str()) {
209
            Err(e) => return Err(Box::new(e)),
210
6
            Ok(url) => url,
211
        },
212
    };
213
6
    let persistent = match config.persistent {
214
        None => false,
215
6
        Some(persistent) => persistent,
216
    };
217

            
218
6
    match mq::data::new(conn_pool, &url, persistent, Arc::new(DataSenderHandler {})) {
219
        Err(e) => Err(Box::new(IoError::new(ErrorKind::InvalidInput, e))),
220
6
        Ok(q) => Ok(q),
221
    }
222
6
}