1
use std::{
2
    collections::HashMap,
3
    error::Error as StdError,
4
    io::{Error as IoError, ErrorKind},
5
    sync::{Arc, Mutex},
6
};
7

            
8
use async_trait::async_trait;
9
use axum::{response::IntoResponse, Router};
10
use log::{error, info, warn};
11
use reqwest;
12
use serde::{Deserialize, Serialize};
13
use url::Url;
14

            
15
use general_mq::{
16
    queue::{EventHandler as QueueEventHandler, GmqQueue, Message, MessageHandler, Status},
17
    Queue,
18
};
19
use sylvia_iot_corelib::{
20
    constants::MqEngine,
21
    http::{Json, Query},
22
};
23

            
24
use crate::libs::{
25
    config::{self, Config},
26
    mq::{
27
        self, emqx::ManagementOpts as EmqxOpts, rabbitmq::ManagementOpts as RabbitMqOpts,
28
        Connection,
29
    },
30
};
31

            
32
pub mod middleware;
33
mod v1;
34

            
35
/// The resources used by this service.
36
#[derive(Clone)]
37
pub struct State {
38
    /// The scope root path for the service.
39
    ///
40
    /// For example `/coremgr`, the APIs are
41
    /// - `http://host:port/coremgr/api/v1/user/xxx`
42
    /// - `http://host:port/coremgr/api/v1/unit/xxx`
43
    pub scope_path: &'static str,
44
    /// The sylvia-iot-auth base API path with host.
45
    ///
46
    /// For example, `http://localhost:1080/auth`.
47
    pub auth_base: String,
48
    /// The sylvia-iot-broker base API path with host.
49
    ///
50
    /// For example, `http://localhost:2080/broker`.
51
    pub broker_base: String,
52
    /// The client for internal HTTP requests.
53
    pub client: reqwest::Client,
54
    /// AMQP broker management information.
55
    pub amqp: AmqpState,
56
    /// MQTT broker management information.
57
    pub mqtt: MqttState,
58
    /// Queue connections. Key is uri.
59
    pub mq_conns: Arc<Mutex<HashMap<String, Connection>>>,
60
    /// Data channel sender.
61
    pub data_sender: Option<Queue>,
62
}
63

            
64
/// AMQP broker management information.
65
#[derive(Clone)]
66
pub enum AmqpState {
67
    /// For RabbitMQ.
68
    RabbitMq(RabbitMqOpts),
69
}
70

            
71
/// MQTT broker management information.
72
#[derive(Clone)]
73
pub enum MqttState {
74
    /// For EMQX.
75
    Emqx(EmqxOpts),
76
    /// For rumqttd.
77
    Rumqttd,
78
}
79

            
80
/// The sylvia-iot module specific error codes in addition to standard [`ErrResp`].
81
pub struct ErrReq;
82

            
83
struct DataSenderHandler;
84

            
85
/// Query parameters for `GET /version`
86
#[derive(Deserialize)]
87
pub struct GetVersionQuery {
88
    q: Option<String>,
89
}
90

            
91
#[derive(Serialize)]
92
struct GetVersionRes<'a> {
93
    data: GetVersionResData<'a>,
94
}
95

            
96
#[derive(Serialize)]
97
struct GetVersionResData<'a> {
98
    name: &'a str,
99
    version: &'a str,
100
}
101

            
102
const SERV_NAME: &'static str = env!("CARGO_PKG_NAME");
103
const SERV_VER: &'static str = env!("CARGO_PKG_VERSION");
104

            
105
impl ErrReq {
106
    pub const APPLICATION_EXIST: (u16, &'static str) = (400, "err_broker_application_exist");
107
    pub const DEVICE_NOT_EXIST: (u16, &'static str) = (400, "err_broker_device_not_exist");
108
    pub const NETWORK_EXIST: (u16, &'static str) = (400, "err_broker_network_exist");
109
    pub const UNIT_NOT_EXIST: (u16, &'static str) = (400, "err_broker_unit_not_exist");
110
}
111

            
112
#[async_trait]
113
impl QueueEventHandler for DataSenderHandler {
114
    async fn on_error(&self, queue: Arc<dyn GmqQueue>, err: Box<dyn StdError + Send + Sync>) {
115
        const FN_NAME: &'static str = "DataSenderHandler::on_error";
116
        let queue_name = queue.name();
117
        error!("[{}] {} error: {}", FN_NAME, queue_name, err);
118
    }
119

            
120
6
    async fn on_status(&self, queue: Arc<dyn GmqQueue>, status: Status) {
121
        const FN_NAME: &'static str = "DataSenderHandler::on_status";
122
6
        let queue_name = queue.name();
123
6

            
124
6
        match status {
125
3
            Status::Connected => info!("[{}] {} connected", queue_name, FN_NAME),
126
3
            _ => warn!("[{}] {} status to {:?}", FN_NAME, queue_name, status),
127
        }
128
12
    }
129
}
130

            
131
#[async_trait]
132
impl MessageHandler for DataSenderHandler {
133
    async fn on_message(&self, _queue: Arc<dyn GmqQueue>, _msg: Box<dyn Message>) {}
134
}
135

            
136
/// To create resources for the service.
137
6
pub async fn new_state(
138
6
    scope_path: &'static str,
139
6
    conf: &Config,
140
6
) -> Result<State, Box<dyn StdError>> {
141
6
    let conf = config::apply_default(conf);
142
6
    let auth_base = conf.auth.as_ref().unwrap().clone();
143
6
    let broker_base = conf.broker.as_ref().unwrap().clone();
144
6
    let mq_engine = conf.mq.as_ref().unwrap().engine.as_ref().unwrap();
145
6
    let amqp = {
146
6
        let rabbitmq = conf.mq.as_ref().unwrap().rabbitmq.as_ref().unwrap();
147
6
        AmqpState::RabbitMq(RabbitMqOpts {
148
6
            username: rabbitmq.username.as_ref().unwrap().clone(),
149
6
            password: rabbitmq.password.as_ref().unwrap().clone(),
150
6
            ttl: rabbitmq.ttl,
151
6
            length: rabbitmq.length,
152
6
        })
153
    };
154
6
    let mqtt = match mq_engine.mqtt.as_ref().unwrap().as_str() {
155
6
        MqEngine::RUMQTTD => MqttState::Rumqttd,
156
        _ => {
157
4
            let emqx = conf.mq.as_ref().unwrap().emqx.as_ref().unwrap();
158
4
            MqttState::Emqx(EmqxOpts {
159
4
                api_key: emqx.api_key.as_ref().unwrap().clone(),
160
4
                api_secret: emqx.api_secret.as_ref().unwrap().clone(),
161
4
            })
162
        }
163
    };
164
6
    let mq_conns = Arc::new(Mutex::new(HashMap::new()));
165
6
    let ch_conf = conf.mq_channels.as_ref().unwrap();
166
6
    let data_sender = match ch_conf.data.as_ref() {
167
3
        None => None,
168
3
        Some(conf) => match conf.url.as_ref() {
169
            None => None,
170
3
            Some(_) => Some(new_data_sender(&mq_conns, conf)?),
171
        },
172
    };
173
6
    let state = State {
174
6
        scope_path: match scope_path.len() {
175
            0 => "/",
176
6
            _ => scope_path,
177
        },
178
6
        auth_base,
179
6
        broker_base,
180
6
        client: reqwest::Client::new(),
181
6
        amqp,
182
6
        mqtt,
183
6
        mq_conns,
184
6
        data_sender,
185
6
    };
186
6
    Ok(state)
187
6
}
188

            
189
/// To register service URIs in the specified root path.
190
253
pub fn new_service(state: &State) -> Router {
191
253
    let auth_uri = format!("{}/api/v1/auth/tokeninfo", state.auth_base.as_str());
192
253
    Router::new().nest(
193
253
        &state.scope_path,
194
253
        Router::new()
195
253
            .merge(v1::auth::new_service("/api/v1/auth", state))
196
253
            .merge(v1::user::new_service("/api/v1/user", state))
197
253
            .merge(v1::client::new_service("/api/v1/client", state))
198
253
            .merge(v1::unit::new_service("/api/v1/unit", state))
199
253
            .merge(v1::application::new_service("/api/v1/application", state))
200
253
            .merge(v1::network::new_service("/api/v1/network", state))
201
253
            .merge(v1::device::new_service("/api/v1/device", state))
202
253
            .merge(v1::device_route::new_service("/api/v1/device-route", state))
203
253
            .merge(v1::network_route::new_service(
204
253
                "/api/v1/network-route",
205
253
                state,
206
253
            ))
207
253
            .merge(v1::dldata_buffer::new_service(
208
253
                "/api/v1/dldata-buffer",
209
253
                state,
210
253
            ))
211
253
            .layer(middleware::LogService::new(
212
253
                auth_uri,
213
253
                state.data_sender.clone(),
214
253
            )),
215
253
    )
216
253
}
217

            
218
/// Create data channel sender queue.
219
3
fn new_data_sender(
220
3
    conn_pool: &Arc<Mutex<HashMap<String, Connection>>>,
221
3
    config: &config::CoremgrData,
222
3
) -> Result<Queue, Box<dyn StdError>> {
223
3
    let url = match config.url.as_ref() {
224
        None => {
225
            return Err(Box::new(IoError::new(
226
                ErrorKind::InvalidInput,
227
                "empty control url",
228
            )))
229
        }
230
3
        Some(url) => match Url::parse(url.as_str()) {
231
            Err(e) => return Err(Box::new(e)),
232
3
            Ok(url) => url,
233
        },
234
    };
235
3
    let persistent = match config.persistent {
236
        None => false,
237
3
        Some(persistent) => persistent,
238
    };
239

            
240
3
    match mq::data::new(conn_pool, &url, persistent, Arc::new(DataSenderHandler {})) {
241
        Err(e) => Err(Box::new(IoError::new(ErrorKind::InvalidInput, e))),
242
3
        Ok(q) => Ok(q),
243
    }
244
3
}
245

            
246
4
pub async fn get_version(Query(query): Query<GetVersionQuery>) -> impl IntoResponse {
247
4
    if let Some(q) = query.q.as_ref() {
248
3
        match q.as_str() {
249
3
            "name" => return SERV_NAME.into_response(),
250
2
            "version" => return SERV_VER.into_response(),
251
1
            _ => (),
252
        }
253
1
    }
254

            
255
2
    Json(GetVersionRes {
256
2
        data: GetVersionResData {
257
2
            name: SERV_NAME,
258
2
            version: SERV_VER,
259
2
        },
260
2
    })
261
2
    .into_response()
262
4
}