1
use std::{
2
    collections::HashMap,
3
    error::Error as StdError,
4
    io::{Error as IoError, ErrorKind},
5
    sync::{Arc, Mutex},
6
};
7

            
8
use async_trait::async_trait;
9
use axum::{response::IntoResponse, Router};
10
use log::{error, info, warn};
11
use reqwest;
12
use serde::{Deserialize, Serialize};
13
use url::Url;
14

            
15
use general_mq::{
16
    queue::{EventHandler as QueueEventHandler, GmqQueue, Message, MessageHandler, Status},
17
    Queue,
18
};
19
use sylvia_iot_corelib::{
20
    constants::MqEngine,
21
    http::{Json, Query},
22
};
23

            
24
use crate::libs::{
25
    config::{self, Config},
26
    mq::{
27
        self, emqx::ManagementOpts as EmqxOpts, rabbitmq::ManagementOpts as RabbitMqOpts,
28
        Connection,
29
    },
30
};
31

            
32
pub mod middleware;
33
mod v1;
34

            
35
/// The resources used by this service.
36
#[derive(Clone)]
37
pub struct State {
38
    /// The scope root path for the service.
39
    ///
40
    /// For example `/coremgr`, the APIs are
41
    /// - `http://host:port/coremgr/api/v1/user/xxx`
42
    /// - `http://host:port/coremgr/api/v1/unit/xxx`
43
    pub scope_path: &'static str,
44
    /// The sylvia-iot-auth base API path with host.
45
    ///
46
    /// For example, `http://localhost:1080/auth`.
47
    pub auth_base: String,
48
    /// The sylvia-iot-broker base API path with host.
49
    ///
50
    /// For example, `http://localhost:2080/broker`.
51
    pub broker_base: String,
52
    /// The client for internal HTTP requests.
53
    pub client: reqwest::Client,
54
    /// AMQP broker management information.
55
    pub amqp: AmqpState,
56
    /// MQTT broker management information.
57
    pub mqtt: MqttState,
58
    /// Queue connections. Key is uri.
59
    pub mq_conns: Arc<Mutex<HashMap<String, Connection>>>,
60
    /// Data channel sender.
61
    pub data_sender: Option<Queue>,
62
}
63

            
64
/// AMQP broker management information.
65
#[derive(Clone)]
66
pub enum AmqpState {
67
    /// For RabbitMQ.
68
    RabbitMq(RabbitMqOpts),
69
}
70

            
71
/// MQTT broker management information.
72
#[derive(Clone)]
73
pub enum MqttState {
74
    /// For EMQX.
75
    Emqx(EmqxOpts),
76
    /// For rumqttd.
77
    Rumqttd,
78
}
79

            
80
/// The sylvia-iot module specific error codes in addition to standard
81
/// [`sylvia_iot_corelib::err::ErrResp`].
82
pub struct ErrReq;
83

            
84
struct DataSenderHandler;
85

            
86
/// Query parameters for `GET /version`
87
#[derive(Deserialize)]
88
pub struct GetVersionQuery {
89
    q: Option<String>,
90
}
91

            
92
#[derive(Serialize)]
93
struct GetVersionRes<'a> {
94
    data: GetVersionResData<'a>,
95
}
96

            
97
#[derive(Serialize)]
98
struct GetVersionResData<'a> {
99
    name: &'a str,
100
    version: &'a str,
101
}
102

            
103
const SERV_NAME: &'static str = env!("CARGO_PKG_NAME");
104
const SERV_VER: &'static str = env!("CARGO_PKG_VERSION");
105

            
106
impl ErrReq {
107
    pub const APPLICATION_EXIST: (u16, &'static str) = (400, "err_broker_application_exist");
108
    pub const DEVICE_NOT_EXIST: (u16, &'static str) = (400, "err_broker_device_not_exist");
109
    pub const NETWORK_EXIST: (u16, &'static str) = (400, "err_broker_network_exist");
110
    pub const UNIT_NOT_EXIST: (u16, &'static str) = (400, "err_broker_unit_not_exist");
111
}
112

            
113
#[async_trait]
114
impl QueueEventHandler for DataSenderHandler {
115
    async fn on_error(&self, queue: Arc<dyn GmqQueue>, err: Box<dyn StdError + Send + Sync>) {
116
        const FN_NAME: &'static str = "DataSenderHandler::on_error";
117
        let queue_name = queue.name();
118
        error!("[{}] {} error: {}", FN_NAME, queue_name, err);
119
    }
120

            
121
6
    async fn on_status(&self, queue: Arc<dyn GmqQueue>, status: Status) {
122
        const FN_NAME: &'static str = "DataSenderHandler::on_status";
123
6
        let queue_name = queue.name();
124
6

            
125
6
        match status {
126
3
            Status::Connected => info!("[{}] {} connected", queue_name, FN_NAME),
127
3
            _ => warn!("[{}] {} status to {:?}", FN_NAME, queue_name, status),
128
        }
129
12
    }
130
}
131

            
132
#[async_trait]
133
impl MessageHandler for DataSenderHandler {
134
    async fn on_message(&self, _queue: Arc<dyn GmqQueue>, _msg: Box<dyn Message>) {}
135
}
136

            
137
/// To create resources for the service.
138
6
pub async fn new_state(
139
6
    scope_path: &'static str,
140
6
    conf: &Config,
141
6
) -> Result<State, Box<dyn StdError>> {
142
6
    let conf = config::apply_default(conf);
143
6
    let auth_base = conf.auth.as_ref().unwrap().clone();
144
6
    let broker_base = conf.broker.as_ref().unwrap().clone();
145
6
    let mq_engine = conf.mq.as_ref().unwrap().engine.as_ref().unwrap();
146
6
    let amqp = {
147
6
        let rabbitmq = conf.mq.as_ref().unwrap().rabbitmq.as_ref().unwrap();
148
6
        AmqpState::RabbitMq(RabbitMqOpts {
149
6
            username: rabbitmq.username.as_ref().unwrap().clone(),
150
6
            password: rabbitmq.password.as_ref().unwrap().clone(),
151
6
            ttl: rabbitmq.ttl,
152
6
            length: rabbitmq.length,
153
6
        })
154
    };
155
6
    let mqtt = match mq_engine.mqtt.as_ref().unwrap().as_str() {
156
6
        MqEngine::RUMQTTD => MqttState::Rumqttd,
157
        _ => {
158
4
            let emqx = conf.mq.as_ref().unwrap().emqx.as_ref().unwrap();
159
4
            MqttState::Emqx(EmqxOpts {
160
4
                api_key: emqx.api_key.as_ref().unwrap().clone(),
161
4
                api_secret: emqx.api_secret.as_ref().unwrap().clone(),
162
4
            })
163
        }
164
    };
165
6
    let mq_conns = Arc::new(Mutex::new(HashMap::new()));
166
6
    let ch_conf = conf.mq_channels.as_ref().unwrap();
167
6
    let data_sender = match ch_conf.data.as_ref() {
168
3
        None => None,
169
3
        Some(conf) => match conf.url.as_ref() {
170
            None => None,
171
3
            Some(_) => Some(new_data_sender(&mq_conns, conf)?),
172
        },
173
    };
174
6
    let state = State {
175
6
        scope_path: match scope_path.len() {
176
            0 => "/",
177
6
            _ => scope_path,
178
        },
179
6
        auth_base,
180
6
        broker_base,
181
6
        client: reqwest::Client::new(),
182
6
        amqp,
183
6
        mqtt,
184
6
        mq_conns,
185
6
        data_sender,
186
6
    };
187
6
    Ok(state)
188
6
}
189

            
190
/// To register service URIs in the specified root path.
191
253
pub fn new_service(state: &State) -> Router {
192
253
    let auth_uri = format!("{}/api/v1/auth/tokeninfo", state.auth_base.as_str());
193
253
    Router::new().nest(
194
253
        &state.scope_path,
195
253
        Router::new()
196
253
            .merge(v1::auth::new_service("/api/v1/auth", state))
197
253
            .merge(v1::user::new_service("/api/v1/user", state))
198
253
            .merge(v1::client::new_service("/api/v1/client", state))
199
253
            .merge(v1::unit::new_service("/api/v1/unit", state))
200
253
            .merge(v1::application::new_service("/api/v1/application", state))
201
253
            .merge(v1::network::new_service("/api/v1/network", state))
202
253
            .merge(v1::device::new_service("/api/v1/device", state))
203
253
            .merge(v1::device_route::new_service("/api/v1/device-route", state))
204
253
            .merge(v1::network_route::new_service(
205
253
                "/api/v1/network-route",
206
253
                state,
207
253
            ))
208
253
            .merge(v1::dldata_buffer::new_service(
209
253
                "/api/v1/dldata-buffer",
210
253
                state,
211
253
            ))
212
253
            .layer(middleware::LogService::new(
213
253
                auth_uri,
214
253
                state.data_sender.clone(),
215
253
            )),
216
253
    )
217
253
}
218

            
219
/// Create data channel sender queue.
220
3
fn new_data_sender(
221
3
    conn_pool: &Arc<Mutex<HashMap<String, Connection>>>,
222
3
    config: &config::CoremgrData,
223
3
) -> Result<Queue, Box<dyn StdError>> {
224
3
    let url = match config.url.as_ref() {
225
        None => {
226
            return Err(Box::new(IoError::new(
227
                ErrorKind::InvalidInput,
228
                "empty control url",
229
            )))
230
        }
231
3
        Some(url) => match Url::parse(url.as_str()) {
232
            Err(e) => return Err(Box::new(e)),
233
3
            Ok(url) => url,
234
        },
235
    };
236
3
    let persistent = match config.persistent {
237
        None => false,
238
3
        Some(persistent) => persistent,
239
    };
240

            
241
3
    match mq::data::new(conn_pool, &url, persistent, Arc::new(DataSenderHandler {})) {
242
        Err(e) => Err(Box::new(IoError::new(ErrorKind::InvalidInput, e))),
243
3
        Ok(q) => Ok(q),
244
    }
245
3
}
246

            
247
4
pub async fn get_version(Query(query): Query<GetVersionQuery>) -> impl IntoResponse {
248
4
    if let Some(q) = query.q.as_ref() {
249
3
        match q.as_str() {
250
3
            "name" => return SERV_NAME.into_response(),
251
2
            "version" => return SERV_VER.into_response(),
252
1
            _ => (),
253
        }
254
1
    }
255

            
256
2
    Json(GetVersionRes {
257
2
        data: GetVersionResData {
258
2
            name: SERV_NAME,
259
2
            version: SERV_VER,
260
2
        },
261
2
    })
262
2
    .into_response()
263
4
}