1
use std::{
2
    collections::HashMap,
3
    error::Error as StdError,
4
    io::{Error as IoError, ErrorKind},
5
    sync::{Arc, Mutex},
6
};
7

            
8
use axum::{response::IntoResponse, Router};
9
use reqwest;
10
use serde::{Deserialize, Serialize};
11
use url::Url;
12

            
13
use async_trait::async_trait;
14
use log::{error, info, warn};
15

            
16
use general_mq::{
17
    queue::{EventHandler as QueueEventHandler, GmqQueue, Status},
18
    Queue,
19
};
20
use sylvia_iot_corelib::{
21
    constants::{CacheEngine, DbEngine},
22
    http::{Json, Query},
23
};
24

            
25
use crate::{
26
    libs::{
27
        config::{self, Config},
28
        mq::{self, application::ApplicationMgr, network::NetworkMgr, Connection},
29
    },
30
    models::{
31
        self, Cache, CacheConnOptions, ConnOptions, DeviceOptions, DeviceRouteOptions, Model,
32
        MongoDbOptions, NetworkRouteOptions, SqliteOptions,
33
    },
34
};
35

            
36
pub mod middleware;
37
mod v1;
38

            
39
/// The resources used by this service.
40
#[derive(Clone)]
41
pub struct State {
42
    /// The scope root path for the service.
43
    ///
44
    /// For example `/broker`, the APIs are
45
    /// - `http://host:port/broker/api/v1/unit/xxx`
46
    /// - `http://host:port/broker/api/v1/application/xxx`
47
    pub scope_path: &'static str,
48
    /// The scopes for accessing APIs.
49
    pub api_scopes: HashMap<String, Vec<String>>,
50
    /// The database model.
51
    pub model: Arc<dyn Model>,
52
    /// The database cache.
53
    pub cache: Option<Arc<dyn Cache>>,
54
    /// The sylvia-iot-auth base API path with host.
55
    ///
56
    /// For example, `http://localhost:1080/auth`.
57
    pub auth_base: String,
58
    pub amqp_prefetch: u16,
59
    pub amqp_persistent: bool,
60
    pub mqtt_shared_prefix: String,
61
    /// The client for internal HTTP requests.
62
    pub client: reqwest::Client,
63
    /// Queue connections. Key is uri.
64
    pub mq_conns: Arc<Mutex<HashMap<String, Connection>>>,
65
    /// Application managers. Key is `[unit-code].[application-code]`.
66
    pub application_mgrs: Arc<Mutex<HashMap<String, ApplicationMgr>>>,
67
    /// Network managers. Key is `[unit-code].[network-code]`. Unit code `_` means public network.
68
    pub network_mgrs: Arc<Mutex<HashMap<String, NetworkMgr>>>,
69
    /// Control channel receivers. Key is function such as `application`, `network`, ....
70
    pub ctrl_receivers: Arc<Mutex<HashMap<String, Queue>>>,
71
    /// Control channel senders.
72
    pub ctrl_senders: CtrlSenders,
73
    /// Data channel sender.
74
    pub data_sender: Option<Queue>,
75
}
76

            
77
/// Control channel senders.
78
#[derive(Clone)]
79
pub struct CtrlSenders {
80
    pub unit: Arc<Mutex<Queue>>,
81
    pub application: Arc<Mutex<Queue>>,
82
    pub network: Arc<Mutex<Queue>>,
83
    pub device: Arc<Mutex<Queue>>,
84
    pub device_route: Arc<Mutex<Queue>>,
85
    pub network_route: Arc<Mutex<Queue>>,
86
}
87

            
88
/// The sylvia-iot module specific error codes in addition to standard
89
/// [`sylvia_iot_corelib::err::ErrResp`].
90
pub struct ErrReq;
91

            
92
struct DataSenderHandler;
93

            
94
/// Query parameters for `GET /version`
95
#[derive(Deserialize)]
96
pub struct GetVersionQuery {
97
    q: Option<String>,
98
}
99

            
100
#[derive(Serialize)]
101
struct GetVersionRes<'a> {
102
    data: GetVersionResData<'a>,
103
}
104

            
105
#[derive(Serialize)]
106
struct GetVersionResData<'a> {
107
    name: &'a str,
108
    version: &'a str,
109
}
110

            
111
const SERV_NAME: &'static str = env!("CARGO_PKG_NAME");
112
const SERV_VER: &'static str = env!("CARGO_PKG_VERSION");
113

            
114
impl ErrReq {
115
    pub const APPLICATION_EXIST: (u16, &'static str) = (400, "err_broker_application_exist");
116
    pub const APPLICATION_NOT_EXIST: (u16, &'static str) =
117
        (400, "err_broker_application_not_exist");
118
    pub const DEVICE_NOT_EXIST: (u16, &'static str) = (400, "err_broker_device_not_exist");
119
    pub const MEMBER_NOT_EXIST: (u16, &'static str) = (400, "err_broker_member_not_exist");
120
    pub const NETWORK_ADDR_EXIST: (u16, &'static str) = (400, "err_broker_network_addr_exist");
121
    pub const NETWORK_EXIST: (u16, &'static str) = (400, "err_broker_network_exist");
122
    pub const NETWORK_NOT_EXIST: (u16, &'static str) = (400, "err_broker_network_not_exist");
123
    pub const OWNER_NOT_EXIST: (u16, &'static str) = (400, "err_broker_owner_not_exist");
124
    pub const ROUTE_EXIST: (u16, &'static str) = (400, "err_broker_route_exist");
125
    pub const UNIT_EXIST: (u16, &'static str) = (400, "err_broker_unit_exist");
126
    pub const UNIT_NOT_EXIST: (u16, &'static str) = (400, "err_broker_unit_not_exist");
127
    pub const UNIT_NOT_MATCH: (u16, &'static str) = (400, "err_broker_unit_not_match");
128
}
129

            
130
#[async_trait]
131
impl QueueEventHandler for DataSenderHandler {
132
    async fn on_error(&self, queue: Arc<dyn GmqQueue>, err: Box<dyn StdError + Send + Sync>) {
133
        const FN_NAME: &'static str = "DataSenderHandler::on_error";
134
        let queue_name = queue.name();
135
        error!("[{}] {} error: {}", FN_NAME, queue_name, err);
136
    }
137

            
138
16
    async fn on_status(&self, queue: Arc<dyn GmqQueue>, status: Status) {
139
        const FN_NAME: &'static str = "DataSenderHandler::on_status";
140
16
        let queue_name = queue.name();
141
16
        match status {
142
8
            Status::Connected => info!("[{}] {} connected", queue_name, FN_NAME),
143
8
            _ => warn!("[{}] {} status to {:?}", FN_NAME, queue_name, status),
144
        }
145
32
    }
146
}
147

            
148
/// To create resources for the service.
149
15
pub async fn new_state(
150
15
    scope_path: &'static str,
151
15
    conf: &Config,
152
15
) -> Result<State, Box<dyn StdError>> {
153
15
    let conf = config::apply_default(conf);
154
15
    let db_opts = match conf.db.as_ref().unwrap().engine.as_ref().unwrap().as_str() {
155
15
        DbEngine::MONGODB => {
156
5
            let conf = conf.db.as_ref().unwrap().mongodb.as_ref().unwrap();
157
5
            ConnOptions::MongoDB(MongoDbOptions {
158
5
                url: conf.url.as_ref().unwrap().to_string(),
159
5
                db: conf.database.as_ref().unwrap().to_string(),
160
5
                pool_size: conf.pool_size,
161
5
            })
162
        }
163
        _ => {
164
10
            let conf = conf.db.as_ref().unwrap().sqlite.as_ref().unwrap();
165
10
            ConnOptions::Sqlite(SqliteOptions {
166
10
                path: conf.path.as_ref().unwrap().to_string(),
167
10
            })
168
        }
169
    };
170
15
    let cache_opts = match conf.cache.as_ref().unwrap().engine.as_ref() {
171
        None => None,
172
15
        Some(engine) => match engine.as_str() {
173
15
            CacheEngine::MEMORY => {
174
3
                let conf = conf.cache.as_ref().unwrap().memory.as_ref().unwrap();
175
3
                Some(CacheConnOptions::Memory {
176
3
                    device: DeviceOptions {
177
3
                        uldata_size: conf.device.unwrap(),
178
3
                    },
179
3
                    device_route: DeviceRouteOptions {
180
3
                        uldata_size: conf.device_route.unwrap(),
181
3
                        dldata_size: conf.device_route.unwrap(),
182
3
                        dldata_pub_size: conf.device_route.unwrap(),
183
3
                    },
184
3
                    network_route: NetworkRouteOptions {
185
3
                        uldata_size: conf.device_route.unwrap(),
186
3
                    },
187
3
                })
188
            }
189
12
            _ => None,
190
        },
191
    };
192
15
    let mq_conf = conf.mq.as_ref().unwrap();
193
15
    let model = models::new(&db_opts).await?;
194
15
    let cache = match cache_opts {
195
12
        None => None,
196
3
        Some(opts) => Some(models::new_cache(&opts, &model).await?),
197
    };
198
15
    let auth_base = conf.auth.as_ref().unwrap().clone();
199
15
    let mq_conns = Arc::new(Mutex::new(HashMap::new()));
200
15
    let ch_conf = conf.mq_channels.as_ref().unwrap();
201
15
    let ctrl_senders = new_ctrl_senders(&mq_conns, &ch_conf, cache.clone())?;
202
15
    let data_sender = match ch_conf.data.as_ref() {
203
7
        None => None,
204
8
        Some(conf) => match conf.url.as_ref() {
205
            None => None,
206
8
            Some(_) => Some(new_data_sender(&mq_conns, conf)?),
207
        },
208
    };
209
15
    let state = State {
210
15
        scope_path: match scope_path.len() {
211
            0 => "/",
212
15
            _ => scope_path,
213
        },
214
15
        api_scopes: conf.api_scopes.as_ref().unwrap().clone(),
215
15
        model,
216
15
        cache,
217
15
        auth_base,
218
15
        amqp_prefetch: mq_conf.prefetch.unwrap(),
219
15
        amqp_persistent: mq_conf.persistent.unwrap(),
220
15
        mqtt_shared_prefix: mq_conf.shared_prefix.as_ref().unwrap().to_string(),
221
15
        client: reqwest::Client::new(),
222
15
        mq_conns,
223
15
        application_mgrs: Arc::new(Mutex::new(HashMap::new())),
224
15
        network_mgrs: Arc::new(Mutex::new(HashMap::new())),
225
15
        ctrl_receivers: Arc::new(Mutex::new(HashMap::new())),
226
15
        ctrl_senders,
227
15
        data_sender,
228
    };
229
15
    let (r1, r2, r3, r4, r5, r6) = tokio::join!(
230
15
        v1::unit::init(&state, &ch_conf.unit.as_ref().unwrap()),
231
15
        v1::application::init(&state, &ch_conf.application.as_ref().unwrap()),
232
15
        v1::network::init(&state, &ch_conf.network.as_ref().unwrap()),
233
15
        v1::device::init(&state, &ch_conf.device.as_ref().unwrap()),
234
15
        v1::device_route::init(&state, &ch_conf.device_route.as_ref().unwrap()),
235
15
        v1::network_route::init(&state, &ch_conf.network_route.as_ref().unwrap())
236
15
    );
237
15
    r1?;
238
15
    r2?;
239
15
    r3?;
240
15
    r4?;
241
15
    r5?;
242
15
    r6?;
243
15
    Ok(state)
244
15
}
245

            
246
/// To register service URIs in the specified root path.
247
3341
pub fn new_service(state: &State) -> Router {
248
3341
    Router::new().nest(
249
3341
        &state.scope_path,
250
3341
        Router::new()
251
3341
            .merge(v1::unit::new_service("/api/v1/unit", state))
252
3341
            .merge(v1::application::new_service("/api/v1/application", state))
253
3341
            .merge(v1::network::new_service("/api/v1/network", state))
254
3341
            .merge(v1::device::new_service("/api/v1/device", state))
255
3341
            .merge(v1::device_route::new_service("/api/v1/device-route", state))
256
3341
            .merge(v1::network_route::new_service(
257
3341
                "/api/v1/network-route",
258
3341
                state,
259
3341
            ))
260
3341
            .merge(v1::dldata_buffer::new_service(
261
3341
                "/api/v1/dldata-buffer",
262
3341
                state,
263
3341
            )),
264
3341
    )
265
3341
}
266

            
267
15
pub fn new_ctrl_senders(
268
15
    mq_conns: &Arc<Mutex<HashMap<String, Connection>>>,
269
15
    ch_conf: &config::MqChannels,
270
15
    cache: Option<Arc<dyn Cache>>,
271
15
) -> Result<CtrlSenders, Box<dyn StdError>> {
272
15
    let unit_ctrl_cfg = ch_conf.unit.as_ref().unwrap();
273
15
    let app_ctrl_cfg = ch_conf.application.as_ref().unwrap();
274
15
    let net_ctrl_cfg = ch_conf.network.as_ref().unwrap();
275
15
    let dev_ctrl_cfg = ch_conf.device.as_ref().unwrap();
276
15
    let devr_ctrl_cfg = ch_conf.device_route.as_ref().unwrap();
277
15
    let netr_ctrl_cfg = ch_conf.network_route.as_ref().unwrap();
278
15

            
279
15
    Ok(CtrlSenders {
280
15
        unit: v1::unit::new_ctrl_sender(mq_conns, unit_ctrl_cfg)?,
281
15
        application: v1::application::new_ctrl_sender(mq_conns, app_ctrl_cfg)?,
282
15
        network: v1::network::new_ctrl_sender(mq_conns, net_ctrl_cfg, cache.clone())?,
283
15
        device: v1::device::new_ctrl_sender(mq_conns, dev_ctrl_cfg, cache.clone())?,
284
15
        device_route: v1::device_route::new_ctrl_sender(mq_conns, devr_ctrl_cfg, cache.clone())?,
285
15
        network_route: v1::network_route::new_ctrl_sender(mq_conns, netr_ctrl_cfg, cache.clone())?,
286
    })
287
15
}
288

            
289
/// Create data channel sender queue.
290
8
pub fn new_data_sender(
291
8
    conn_pool: &Arc<Mutex<HashMap<String, Connection>>>,
292
8
    config: &config::BrokerData,
293
8
) -> Result<Queue, Box<dyn StdError>> {
294
8
    let url = match config.url.as_ref() {
295
        None => {
296
            return Err(Box::new(IoError::new(
297
                ErrorKind::InvalidInput,
298
                "empty control url",
299
            )))
300
        }
301
8
        Some(url) => match Url::parse(url.as_str()) {
302
            Err(e) => return Err(Box::new(e)),
303
8
            Ok(url) => url,
304
        },
305
    };
306
8
    let persistent = match config.persistent {
307
        None => false,
308
8
        Some(persistent) => persistent,
309
    };
310

            
311
8
    match mq::data::new(conn_pool, &url, persistent, Arc::new(DataSenderHandler {})) {
312
        Err(e) => Err(Box::new(IoError::new(ErrorKind::InvalidInput, e))),
313
8
        Ok(q) => Ok(q),
314
    }
315
8
}
316

            
317
4
pub async fn get_version(Query(query): Query<GetVersionQuery>) -> impl IntoResponse {
318
4
    if let Some(q) = query.q.as_ref() {
319
3
        match q.as_str() {
320
3
            "name" => return SERV_NAME.into_response(),
321
2
            "version" => return SERV_VER.into_response(),
322
1
            _ => (),
323
        }
324
1
    }
325

            
326
2
    Json(GetVersionRes {
327
2
        data: GetVersionResData {
328
2
            name: SERV_NAME,
329
2
            version: SERV_VER,
330
2
        },
331
2
    })
332
2
    .into_response()
333
4
}