1
use std::{
2
    collections::HashMap,
3
    error::Error as StdError,
4
    io::{Error as IoError, ErrorKind},
5
    sync::{Arc, Mutex},
6
};
7

            
8
use axum::Router;
9
use reqwest;
10
use url::Url;
11

            
12
use async_trait::async_trait;
13
use log::{error, info, warn};
14

            
15
use general_mq::{
16
    Queue,
17
    queue::{EventHandler as QueueEventHandler, GmqQueue, Status},
18
};
19
use sylvia_iot_corelib::constants::{CacheEngine, DbEngine};
20

            
21
use crate::{
22
    libs::{
23
        config::{self, Config},
24
        mq::{self, Connection, application::ApplicationMgr, network::NetworkMgr},
25
    },
26
    models::{
27
        self, Cache, CacheConnOptions, ConnOptions, DeviceOptions, DeviceRouteOptions, Model,
28
        MongoDbOptions, NetworkRouteOptions, SqliteOptions,
29
    },
30
};
31

            
32
pub mod middleware;
33
mod v1;
34

            
35
/// The resources used by this service.
36
#[derive(Clone)]
37
pub struct State {
38
    /// The scope root path for the service.
39
    ///
40
    /// For example `/broker`, the APIs are
41
    /// - `http://host:port/broker/api/v1/unit/xxx`
42
    /// - `http://host:port/broker/api/v1/application/xxx`
43
    pub scope_path: &'static str,
44
    /// The scopes for accessing APIs.
45
    pub api_scopes: HashMap<String, Vec<String>>,
46
    /// The database model.
47
    pub model: Arc<dyn Model>,
48
    /// The database cache.
49
    pub cache: Option<Arc<dyn Cache>>,
50
    /// The sylvia-iot-auth base API path with host.
51
    ///
52
    /// For example, `http://localhost:1080/auth`.
53
    pub auth_base: String,
54
    pub amqp_prefetch: u16,
55
    pub amqp_persistent: bool,
56
    pub mqtt_shared_prefix: String,
57
    /// The client for internal HTTP requests.
58
    pub client: reqwest::Client,
59
    /// Queue connections. Key is uri.
60
    pub mq_conns: Arc<Mutex<HashMap<String, Connection>>>,
61
    /// Application managers. Key is `[unit-code].[application-code]`.
62
    pub application_mgrs: Arc<Mutex<HashMap<String, ApplicationMgr>>>,
63
    /// Network managers. Key is `[unit-code].[network-code]`. Unit code `_` means public network.
64
    pub network_mgrs: Arc<Mutex<HashMap<String, NetworkMgr>>>,
65
    /// Control channel receivers. Key is function such as `application`, `network`, ....
66
    pub ctrl_receivers: Arc<Mutex<HashMap<String, Queue>>>,
67
    /// Control channel senders.
68
    pub ctrl_senders: CtrlSenders,
69
    /// Data channel sender.
70
    pub data_sender: Option<Queue>,
71
}
72

            
73
/// Control channel senders.
74
#[derive(Clone)]
75
pub struct CtrlSenders {
76
    pub unit: Arc<Mutex<Queue>>,
77
    pub application: Arc<Mutex<Queue>>,
78
    pub network: Arc<Mutex<Queue>>,
79
    pub device: Arc<Mutex<Queue>>,
80
    pub device_route: Arc<Mutex<Queue>>,
81
    pub network_route: Arc<Mutex<Queue>>,
82
}
83

            
84
/// The sylvia-iot module specific error codes in addition to standard
85
/// [`sylvia_iot_corelib::err::ErrResp`].
86
pub struct ErrReq;
87

            
88
struct DataSenderHandler;
89

            
90
impl ErrReq {
91
    pub const APPLICATION_EXIST: (u16, &'static str) = (400, "err_broker_application_exist");
92
    pub const APPLICATION_NOT_EXIST: (u16, &'static str) =
93
        (400, "err_broker_application_not_exist");
94
    pub const DEVICE_NOT_EXIST: (u16, &'static str) = (400, "err_broker_device_not_exist");
95
    pub const MEMBER_NOT_EXIST: (u16, &'static str) = (400, "err_broker_member_not_exist");
96
    pub const NETWORK_ADDR_EXIST: (u16, &'static str) = (400, "err_broker_network_addr_exist");
97
    pub const NETWORK_EXIST: (u16, &'static str) = (400, "err_broker_network_exist");
98
    pub const NETWORK_NOT_EXIST: (u16, &'static str) = (400, "err_broker_network_not_exist");
99
    pub const OWNER_NOT_EXIST: (u16, &'static str) = (400, "err_broker_owner_not_exist");
100
    pub const ROUTE_EXIST: (u16, &'static str) = (400, "err_broker_route_exist");
101
    pub const UNIT_EXIST: (u16, &'static str) = (400, "err_broker_unit_exist");
102
    pub const UNIT_NOT_EXIST: (u16, &'static str) = (400, "err_broker_unit_not_exist");
103
    pub const UNIT_NOT_MATCH: (u16, &'static str) = (400, "err_broker_unit_not_match");
104
}
105

            
106
#[async_trait]
107
impl QueueEventHandler for DataSenderHandler {
108
    async fn on_error(&self, queue: Arc<dyn GmqQueue>, err: Box<dyn StdError + Send + Sync>) {
109
        const FN_NAME: &'static str = "DataSenderHandler::on_error";
110
        let queue_name = queue.name();
111
        error!("[{}] {} error: {}", FN_NAME, queue_name, err);
112
    }
113

            
114
34
    async fn on_status(&self, queue: Arc<dyn GmqQueue>, status: Status) {
115
        const FN_NAME: &'static str = "DataSenderHandler::on_status";
116
        let queue_name = queue.name();
117
        match status {
118
            Status::Connected => info!("[{}] {} connected", queue_name, FN_NAME),
119
            _ => warn!("[{}] {} status to {:?}", FN_NAME, queue_name, status),
120
        }
121
34
    }
122
}
123

            
124
/// To create resources for the service.
125
30
pub async fn new_state(
126
30
    scope_path: &'static str,
127
30
    conf: &Config,
128
30
) -> Result<State, Box<dyn StdError>> {
129
30
    let conf = config::apply_default(conf);
130
30
    let db_opts = match conf.db.as_ref().unwrap().engine.as_ref().unwrap().as_str() {
131
30
        DbEngine::MONGODB => {
132
10
            let conf = conf.db.as_ref().unwrap().mongodb.as_ref().unwrap();
133
10
            ConnOptions::MongoDB(MongoDbOptions {
134
10
                url: conf.url.as_ref().unwrap().to_string(),
135
10
                db: conf.database.as_ref().unwrap().to_string(),
136
10
                pool_size: conf.pool_size,
137
10
            })
138
        }
139
        _ => {
140
20
            let conf = conf.db.as_ref().unwrap().sqlite.as_ref().unwrap();
141
20
            ConnOptions::Sqlite(SqliteOptions {
142
20
                path: conf.path.as_ref().unwrap().to_string(),
143
20
            })
144
        }
145
    };
146
30
    let cache_opts = match conf.cache.as_ref().unwrap().engine.as_ref() {
147
        None => None,
148
30
        Some(engine) => match engine.as_str() {
149
30
            CacheEngine::MEMORY => {
150
6
                let conf = conf.cache.as_ref().unwrap().memory.as_ref().unwrap();
151
6
                Some(CacheConnOptions::Memory {
152
6
                    device: DeviceOptions {
153
6
                        uldata_size: conf.device.unwrap(),
154
6
                    },
155
6
                    device_route: DeviceRouteOptions {
156
6
                        uldata_size: conf.device_route.unwrap(),
157
6
                        dldata_size: conf.device_route.unwrap(),
158
6
                        dldata_pub_size: conf.device_route.unwrap(),
159
6
                    },
160
6
                    network_route: NetworkRouteOptions {
161
6
                        uldata_size: conf.network_route.unwrap(),
162
6
                    },
163
6
                })
164
            }
165
24
            _ => None,
166
        },
167
    };
168
30
    let mq_conf = conf.mq.as_ref().unwrap();
169
30
    let model = models::new(&db_opts).await?;
170
30
    let cache = match cache_opts {
171
24
        None => None,
172
6
        Some(opts) => Some(models::new_cache(&opts, &model).await?),
173
    };
174
30
    let auth_base = conf.auth.as_ref().unwrap().clone();
175
30
    let mq_conns = Arc::new(Mutex::new(HashMap::new()));
176
30
    let ch_conf = conf.mq_channels.as_ref().unwrap();
177
30
    let ctrl_senders = new_ctrl_senders(&mq_conns, &ch_conf, cache.clone())?;
178
30
    let data_sender = match ch_conf.data.as_ref() {
179
14
        None => None,
180
16
        Some(conf) => match conf.url.as_ref() {
181
            None => None,
182
16
            Some(_) => Some(new_data_sender(&mq_conns, conf)?),
183
        },
184
    };
185
30
    let state = State {
186
30
        scope_path: match scope_path.len() {
187
            0 => "/",
188
30
            _ => scope_path,
189
        },
190
30
        api_scopes: conf.api_scopes.as_ref().unwrap().clone(),
191
30
        model,
192
30
        cache,
193
30
        auth_base,
194
30
        amqp_prefetch: mq_conf.prefetch.unwrap(),
195
30
        amqp_persistent: mq_conf.persistent.unwrap(),
196
30
        mqtt_shared_prefix: mq_conf.shared_prefix.as_ref().unwrap().to_string(),
197
30
        client: reqwest::Client::new(),
198
30
        mq_conns,
199
30
        application_mgrs: Arc::new(Mutex::new(HashMap::new())),
200
30
        network_mgrs: Arc::new(Mutex::new(HashMap::new())),
201
30
        ctrl_receivers: Arc::new(Mutex::new(HashMap::new())),
202
30
        ctrl_senders,
203
30
        data_sender,
204
    };
205
30
    let (r1, r2, r3, r4, r5, r6) = tokio::join!(
206
30
        v1::unit::init(&state, &ch_conf.unit.as_ref().unwrap()),
207
30
        v1::application::init(&state, &ch_conf.application.as_ref().unwrap()),
208
30
        v1::network::init(&state, &ch_conf.network.as_ref().unwrap()),
209
30
        v1::device::init(&state, &ch_conf.device.as_ref().unwrap()),
210
30
        v1::device_route::init(&state, &ch_conf.device_route.as_ref().unwrap()),
211
30
        v1::network_route::init(&state, &ch_conf.network_route.as_ref().unwrap())
212
    );
213
30
    r1?;
214
30
    r2?;
215
30
    r3?;
216
30
    r4?;
217
30
    r5?;
218
30
    r6?;
219
30
    Ok(state)
220
30
}
221

            
222
/// To register service URIs in the specified root path.
223
6682
pub fn new_service(state: &State) -> Router {
224
6682
    Router::new().nest(
225
6682
        &state.scope_path,
226
6682
        Router::new()
227
6682
            .merge(v1::unit::new_service("/api/v1/unit", state))
228
6682
            .merge(v1::application::new_service("/api/v1/application", state))
229
6682
            .merge(v1::network::new_service("/api/v1/network", state))
230
6682
            .merge(v1::device::new_service("/api/v1/device", state))
231
6682
            .merge(v1::device_route::new_service("/api/v1/device-route", state))
232
6682
            .merge(v1::network_route::new_service(
233
6682
                "/api/v1/network-route",
234
6682
                state,
235
            ))
236
6682
            .merge(v1::dldata_buffer::new_service(
237
6682
                "/api/v1/dldata-buffer",
238
6682
                state,
239
            )),
240
    )
241
6682
}
242

            
243
30
pub fn new_ctrl_senders(
244
30
    mq_conns: &Arc<Mutex<HashMap<String, Connection>>>,
245
30
    ch_conf: &config::MqChannels,
246
30
    cache: Option<Arc<dyn Cache>>,
247
30
) -> Result<CtrlSenders, Box<dyn StdError>> {
248
30
    let unit_ctrl_cfg = ch_conf.unit.as_ref().unwrap();
249
30
    let app_ctrl_cfg = ch_conf.application.as_ref().unwrap();
250
30
    let net_ctrl_cfg = ch_conf.network.as_ref().unwrap();
251
30
    let dev_ctrl_cfg = ch_conf.device.as_ref().unwrap();
252
30
    let devr_ctrl_cfg = ch_conf.device_route.as_ref().unwrap();
253
30
    let netr_ctrl_cfg = ch_conf.network_route.as_ref().unwrap();
254

            
255
    Ok(CtrlSenders {
256
30
        unit: v1::unit::new_ctrl_sender(mq_conns, unit_ctrl_cfg)?,
257
30
        application: v1::application::new_ctrl_sender(mq_conns, app_ctrl_cfg)?,
258
30
        network: v1::network::new_ctrl_sender(mq_conns, net_ctrl_cfg, cache.clone())?,
259
30
        device: v1::device::new_ctrl_sender(mq_conns, dev_ctrl_cfg, cache.clone())?,
260
30
        device_route: v1::device_route::new_ctrl_sender(mq_conns, devr_ctrl_cfg, cache.clone())?,
261
30
        network_route: v1::network_route::new_ctrl_sender(mq_conns, netr_ctrl_cfg, cache.clone())?,
262
    })
263
30
}
264

            
265
/// Create data channel sender queue.
266
16
pub fn new_data_sender(
267
16
    conn_pool: &Arc<Mutex<HashMap<String, Connection>>>,
268
16
    config: &config::BrokerData,
269
16
) -> Result<Queue, Box<dyn StdError>> {
270
16
    let url = match config.url.as_ref() {
271
        None => {
272
            return Err(Box::new(IoError::new(
273
                ErrorKind::InvalidInput,
274
                "empty control url",
275
            )));
276
        }
277
16
        Some(url) => match Url::parse(url.as_str()) {
278
            Err(e) => return Err(Box::new(e)),
279
16
            Ok(url) => url,
280
        },
281
    };
282
16
    let persistent = match config.persistent {
283
        None => false,
284
16
        Some(persistent) => persistent,
285
    };
286

            
287
16
    match mq::data::new(conn_pool, &url, persistent, Arc::new(DataSenderHandler {})) {
288
        Err(e) => Err(Box::new(IoError::new(ErrorKind::InvalidInput, e))),
289
16
        Ok(q) => Ok(q),
290
    }
291
16
}