1
use std::{
2
    collections::HashMap,
3
    error::Error as StdError,
4
    io::{Error as IoError, ErrorKind},
5
    sync::{Arc, Mutex},
6
};
7

            
8
use axum::{response::IntoResponse, Router};
9
use reqwest;
10
use serde::{Deserialize, Serialize};
11
use url::Url;
12

            
13
use async_trait::async_trait;
14
use log::{error, info, warn};
15

            
16
use general_mq::{
17
    queue::{EventHandler as QueueEventHandler, GmqQueue, Status},
18
    Queue,
19
};
20
use sylvia_iot_corelib::{
21
    constants::{CacheEngine, DbEngine},
22
    http::{Json, Query},
23
};
24

            
25
use crate::{
26
    libs::{
27
        config::{self, Config},
28
        mq::{self, application::ApplicationMgr, network::NetworkMgr, Connection},
29
    },
30
    models::{
31
        self, Cache, CacheConnOptions, ConnOptions, DeviceOptions, DeviceRouteOptions, Model,
32
        MongoDbOptions, NetworkRouteOptions, SqliteOptions,
33
    },
34
};
35

            
36
pub mod middleware;
37
mod v1;
38

            
39
/// The resources used by this service.
40
#[derive(Clone)]
41
pub struct State {
42
    /// The scope root path for the service.
43
    ///
44
    /// For example `/broker`, the APIs are
45
    /// - `http://host:port/broker/api/v1/unit/xxx`
46
    /// - `http://host:port/broker/api/v1/application/xxx`
47
    pub scope_path: &'static str,
48
    /// The scopes for accessing APIs.
49
    pub api_scopes: HashMap<String, Vec<String>>,
50
    /// The database model.
51
    pub model: Arc<dyn Model>,
52
    /// The database cache.
53
    pub cache: Option<Arc<dyn Cache>>,
54
    /// The sylvia-iot-auth base API path with host.
55
    ///
56
    /// For example, `http://localhost:1080/auth`.
57
    pub auth_base: String,
58
    pub amqp_prefetch: u16,
59
    pub amqp_persistent: bool,
60
    pub mqtt_shared_prefix: String,
61
    /// The client for internal HTTP requests.
62
    pub client: reqwest::Client,
63
    /// Queue connections. Key is uri.
64
    pub mq_conns: Arc<Mutex<HashMap<String, Connection>>>,
65
    /// Application managers. Key is `[unit-code].[application-code]`.
66
    pub application_mgrs: Arc<Mutex<HashMap<String, ApplicationMgr>>>,
67
    /// Network managers. Key is `[unit-code].[network-code]`. Unit code `_` means public network.
68
    pub network_mgrs: Arc<Mutex<HashMap<String, NetworkMgr>>>,
69
    /// Control channel receivers. Key is function such as `application`, `network`, ....
70
    pub ctrl_receivers: Arc<Mutex<HashMap<String, Queue>>>,
71
    /// Control channel senders.
72
    pub ctrl_senders: CtrlSenders,
73
    /// Data channel sender.
74
    pub data_sender: Option<Queue>,
75
}
76

            
77
/// Control channel senders.
78
#[derive(Clone)]
79
pub struct CtrlSenders {
80
    pub unit: Arc<Mutex<Queue>>,
81
    pub application: Arc<Mutex<Queue>>,
82
    pub network: Arc<Mutex<Queue>>,
83
    pub device: Arc<Mutex<Queue>>,
84
    pub device_route: Arc<Mutex<Queue>>,
85
    pub network_route: Arc<Mutex<Queue>>,
86
}
87

            
88
/// The sylvia-iot module specific error codes in addition to standard [`ErrResp`].
89
pub struct ErrReq;
90

            
91
struct DataSenderHandler;
92

            
93
/// Query parameters for `GET /version`
94
#[derive(Deserialize)]
95
pub struct GetVersionQuery {
96
    q: Option<String>,
97
}
98

            
99
#[derive(Serialize)]
100
struct GetVersionRes<'a> {
101
    data: GetVersionResData<'a>,
102
}
103

            
104
#[derive(Serialize)]
105
struct GetVersionResData<'a> {
106
    name: &'a str,
107
    version: &'a str,
108
}
109

            
110
const SERV_NAME: &'static str = env!("CARGO_PKG_NAME");
111
const SERV_VER: &'static str = env!("CARGO_PKG_VERSION");
112

            
113
impl ErrReq {
114
    pub const APPLICATION_EXIST: (u16, &'static str) = (400, "err_broker_application_exist");
115
    pub const APPLICATION_NOT_EXIST: (u16, &'static str) =
116
        (400, "err_broker_application_not_exist");
117
    pub const DEVICE_NOT_EXIST: (u16, &'static str) = (400, "err_broker_device_not_exist");
118
    pub const MEMBER_NOT_EXIST: (u16, &'static str) = (400, "err_broker_member_not_exist");
119
    pub const NETWORK_ADDR_EXIST: (u16, &'static str) = (400, "err_broker_network_addr_exist");
120
    pub const NETWORK_EXIST: (u16, &'static str) = (400, "err_broker_network_exist");
121
    pub const NETWORK_NOT_EXIST: (u16, &'static str) = (400, "err_broker_network_not_exist");
122
    pub const OWNER_NOT_EXIST: (u16, &'static str) = (400, "err_broker_owner_not_exist");
123
    pub const ROUTE_EXIST: (u16, &'static str) = (400, "err_broker_route_exist");
124
    pub const UNIT_EXIST: (u16, &'static str) = (400, "err_broker_unit_exist");
125
    pub const UNIT_NOT_EXIST: (u16, &'static str) = (400, "err_broker_unit_not_exist");
126
    pub const UNIT_NOT_MATCH: (u16, &'static str) = (400, "err_broker_unit_not_match");
127
}
128

            
129
#[async_trait]
130
impl QueueEventHandler for DataSenderHandler {
131
    async fn on_error(&self, queue: Arc<dyn GmqQueue>, err: Box<dyn StdError + Send + Sync>) {
132
        const FN_NAME: &'static str = "DataSenderHandler::on_error";
133
        let queue_name = queue.name();
134
        error!("[{}] {} error: {}", FN_NAME, queue_name, err);
135
    }
136

            
137
16
    async fn on_status(&self, queue: Arc<dyn GmqQueue>, status: Status) {
138
        const FN_NAME: &'static str = "DataSenderHandler::on_status";
139
16
        let queue_name = queue.name();
140
16
        match status {
141
8
            Status::Connected => info!("[{}] {} connected", queue_name, FN_NAME),
142
8
            _ => warn!("[{}] {} status to {:?}", FN_NAME, queue_name, status),
143
        }
144
32
    }
145
}
146

            
147
/// To create resources for the service.
148
15
pub async fn new_state(
149
15
    scope_path: &'static str,
150
15
    conf: &Config,
151
15
) -> Result<State, Box<dyn StdError>> {
152
15
    let conf = config::apply_default(conf);
153
15
    let db_opts = match conf.db.as_ref().unwrap().engine.as_ref().unwrap().as_str() {
154
15
        DbEngine::MONGODB => {
155
5
            let conf = conf.db.as_ref().unwrap().mongodb.as_ref().unwrap();
156
5
            ConnOptions::MongoDB(MongoDbOptions {
157
5
                url: conf.url.as_ref().unwrap().to_string(),
158
5
                db: conf.database.as_ref().unwrap().to_string(),
159
5
                pool_size: conf.pool_size,
160
5
            })
161
        }
162
        _ => {
163
10
            let conf = conf.db.as_ref().unwrap().sqlite.as_ref().unwrap();
164
10
            ConnOptions::Sqlite(SqliteOptions {
165
10
                path: conf.path.as_ref().unwrap().to_string(),
166
10
            })
167
        }
168
    };
169
15
    let cache_opts = match conf.cache.as_ref().unwrap().engine.as_ref() {
170
        None => None,
171
15
        Some(engine) => match engine.as_str() {
172
15
            CacheEngine::MEMORY => {
173
3
                let conf = conf.cache.as_ref().unwrap().memory.as_ref().unwrap();
174
3
                Some(CacheConnOptions::Memory {
175
3
                    device: DeviceOptions {
176
3
                        uldata_size: conf.device.unwrap(),
177
3
                    },
178
3
                    device_route: DeviceRouteOptions {
179
3
                        uldata_size: conf.device_route.unwrap(),
180
3
                        dldata_size: conf.device_route.unwrap(),
181
3
                        dldata_pub_size: conf.device_route.unwrap(),
182
3
                    },
183
3
                    network_route: NetworkRouteOptions {
184
3
                        uldata_size: conf.device_route.unwrap(),
185
3
                    },
186
3
                })
187
            }
188
12
            _ => None,
189
        },
190
    };
191
15
    let mq_conf = conf.mq.as_ref().unwrap();
192
15
    let model = models::new(&db_opts).await?;
193
15
    let cache = match cache_opts {
194
12
        None => None,
195
3
        Some(opts) => Some(models::new_cache(&opts, &model).await?),
196
    };
197
15
    let auth_base = conf.auth.as_ref().unwrap().clone();
198
15
    let mq_conns = Arc::new(Mutex::new(HashMap::new()));
199
15
    let ch_conf = conf.mq_channels.as_ref().unwrap();
200
15
    let ctrl_senders = new_ctrl_senders(&mq_conns, &ch_conf, cache.clone())?;
201
15
    let data_sender = match ch_conf.data.as_ref() {
202
7
        None => None,
203
8
        Some(conf) => match conf.url.as_ref() {
204
            None => None,
205
8
            Some(_) => Some(new_data_sender(&mq_conns, conf)?),
206
        },
207
    };
208
15
    let state = State {
209
15
        scope_path: match scope_path.len() {
210
            0 => "/",
211
15
            _ => scope_path,
212
        },
213
15
        api_scopes: conf.api_scopes.as_ref().unwrap().clone(),
214
15
        model,
215
15
        cache,
216
15
        auth_base,
217
15
        amqp_prefetch: mq_conf.prefetch.unwrap(),
218
15
        amqp_persistent: mq_conf.persistent.unwrap(),
219
15
        mqtt_shared_prefix: mq_conf.shared_prefix.as_ref().unwrap().to_string(),
220
15
        client: reqwest::Client::new(),
221
15
        mq_conns,
222
15
        application_mgrs: Arc::new(Mutex::new(HashMap::new())),
223
15
        network_mgrs: Arc::new(Mutex::new(HashMap::new())),
224
15
        ctrl_receivers: Arc::new(Mutex::new(HashMap::new())),
225
15
        ctrl_senders,
226
15
        data_sender,
227
    };
228
15
    let (r1, r2, r3, r4, r5, r6) = tokio::join!(
229
15
        v1::unit::init(&state, &ch_conf.unit.as_ref().unwrap()),
230
15
        v1::application::init(&state, &ch_conf.application.as_ref().unwrap()),
231
15
        v1::network::init(&state, &ch_conf.network.as_ref().unwrap()),
232
15
        v1::device::init(&state, &ch_conf.device.as_ref().unwrap()),
233
15
        v1::device_route::init(&state, &ch_conf.device_route.as_ref().unwrap()),
234
15
        v1::network_route::init(&state, &ch_conf.network_route.as_ref().unwrap())
235
15
    );
236
15
    r1?;
237
15
    r2?;
238
15
    r3?;
239
15
    r4?;
240
15
    r5?;
241
15
    r6?;
242
15
    Ok(state)
243
15
}
244

            
245
/// To register service URIs in the specified root path.
246
3341
pub fn new_service(state: &State) -> Router {
247
3341
    Router::new().nest(
248
3341
        &state.scope_path,
249
3341
        Router::new()
250
3341
            .merge(v1::unit::new_service("/api/v1/unit", state))
251
3341
            .merge(v1::application::new_service("/api/v1/application", state))
252
3341
            .merge(v1::network::new_service("/api/v1/network", state))
253
3341
            .merge(v1::device::new_service("/api/v1/device", state))
254
3341
            .merge(v1::device_route::new_service("/api/v1/device-route", state))
255
3341
            .merge(v1::network_route::new_service(
256
3341
                "/api/v1/network-route",
257
3341
                state,
258
3341
            ))
259
3341
            .merge(v1::dldata_buffer::new_service(
260
3341
                "/api/v1/dldata-buffer",
261
3341
                state,
262
3341
            )),
263
3341
    )
264
3341
}
265

            
266
15
pub fn new_ctrl_senders(
267
15
    mq_conns: &Arc<Mutex<HashMap<String, Connection>>>,
268
15
    ch_conf: &config::MqChannels,
269
15
    cache: Option<Arc<dyn Cache>>,
270
15
) -> Result<CtrlSenders, Box<dyn StdError>> {
271
15
    let unit_ctrl_cfg = ch_conf.unit.as_ref().unwrap();
272
15
    let app_ctrl_cfg = ch_conf.application.as_ref().unwrap();
273
15
    let net_ctrl_cfg = ch_conf.network.as_ref().unwrap();
274
15
    let dev_ctrl_cfg = ch_conf.device.as_ref().unwrap();
275
15
    let devr_ctrl_cfg = ch_conf.device_route.as_ref().unwrap();
276
15
    let netr_ctrl_cfg = ch_conf.network_route.as_ref().unwrap();
277
15

            
278
15
    Ok(CtrlSenders {
279
15
        unit: v1::unit::new_ctrl_sender(mq_conns, unit_ctrl_cfg)?,
280
15
        application: v1::application::new_ctrl_sender(mq_conns, app_ctrl_cfg)?,
281
15
        network: v1::network::new_ctrl_sender(mq_conns, net_ctrl_cfg, cache.clone())?,
282
15
        device: v1::device::new_ctrl_sender(mq_conns, dev_ctrl_cfg, cache.clone())?,
283
15
        device_route: v1::device_route::new_ctrl_sender(mq_conns, devr_ctrl_cfg, cache.clone())?,
284
15
        network_route: v1::network_route::new_ctrl_sender(mq_conns, netr_ctrl_cfg, cache.clone())?,
285
    })
286
15
}
287

            
288
/// Create data channel sender queue.
289
8
pub fn new_data_sender(
290
8
    conn_pool: &Arc<Mutex<HashMap<String, Connection>>>,
291
8
    config: &config::BrokerData,
292
8
) -> Result<Queue, Box<dyn StdError>> {
293
8
    let url = match config.url.as_ref() {
294
        None => {
295
            return Err(Box::new(IoError::new(
296
                ErrorKind::InvalidInput,
297
                "empty control url",
298
            )))
299
        }
300
8
        Some(url) => match Url::parse(url.as_str()) {
301
            Err(e) => return Err(Box::new(e)),
302
8
            Ok(url) => url,
303
        },
304
    };
305
8
    let persistent = match config.persistent {
306
        None => false,
307
8
        Some(persistent) => persistent,
308
    };
309

            
310
8
    match mq::data::new(conn_pool, &url, persistent, Arc::new(DataSenderHandler {})) {
311
        Err(e) => Err(Box::new(IoError::new(ErrorKind::InvalidInput, e))),
312
8
        Ok(q) => Ok(q),
313
    }
314
8
}
315

            
316
4
pub async fn get_version(Query(query): Query<GetVersionQuery>) -> impl IntoResponse {
317
4
    if let Some(q) = query.q.as_ref() {
318
3
        match q.as_str() {
319
3
            "name" => return SERV_NAME.into_response(),
320
2
            "version" => return SERV_VER.into_response(),
321
1
            _ => (),
322
        }
323
1
    }
324

            
325
2
    Json(GetVersionRes {
326
2
        data: GetVersionResData {
327
2
            name: SERV_NAME,
328
2
            version: SERV_VER,
329
2
        },
330
2
    })
331
2
    .into_response()
332
4
}