1
use std::error::Error as StdError;
2

            
3
use axum::{
4
    body::Body,
5
    extract::{Path, Request, State},
6
    http::{header, HeaderValue},
7
    response::{IntoResponse, Response},
8
    routing, Router,
9
};
10
use bytes::{Bytes, BytesMut};
11
use csv::WriterBuilder;
12
use futures_util::StreamExt;
13
use log::error;
14
use serde::Deserialize;
15
use serde_json::Deserializer;
16
use url::Url;
17

            
18
use sylvia_iot_corelib::err::ErrResp;
19

            
20
use super::{
21
    super::{AmqpState, MqttState, State as AppState},
22
    api_bridge, get_stream_resp, get_unit_inner, list_api_bridge, response, ListResp,
23
};
24
use crate::libs::mq::{self, emqx, rabbitmq, QueueType};
25

            
26
#[derive(Deserialize)]
27
struct UnitIdPath {
28
    unit_id: String,
29
}
30

            
31
#[derive(Deserialize)]
32
struct Application {
33
    code: String,
34
    #[serde(rename = "hostUri")]
35
    host_uri: String,
36
}
37

            
38
#[derive(Deserialize)]
39
struct Network {
40
    code: String,
41
    #[serde(rename = "hostUri")]
42
    host_uri: String,
43
}
44

            
45
const CSV_FIELDS: &'static [u8] =
46
    b"\xEF\xBB\xBFunitId,code,createdAt,modifiedAt,ownerId,memberIds,name,info\n";
47

            
48
253
pub fn new_service(scope_path: &str, state: &AppState) -> Router {
49
253
    Router::new().nest(
50
253
        scope_path,
51
253
        Router::new()
52
253
            .route("/", routing::post(post_unit))
53
253
            .route("/count", routing::get(get_unit_count))
54
253
            .route("/list", routing::get(get_unit_list))
55
253
            .route(
56
253
                "/:unit_id",
57
253
                routing::get(get_unit).patch(patch_unit).delete(delete_unit),
58
253
            )
59
253
            .with_state(state.clone()),
60
253
    )
61
253
}
62

            
63
/// `POST /{base}/api/v1/unit`
64
2
async fn post_unit(state: State<AppState>, req: Request) -> impl IntoResponse {
65
    const FN_NAME: &'static str = "post_unit";
66
2
    let api_path = format!("{}/api/v1/unit", state.broker_base);
67
2
    let client = state.client.clone();
68
2

            
69
2
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
70
2
}
71

            
72
/// `GET /{base}/api/v1/unit/count`
73
2
async fn get_unit_count(state: State<AppState>, req: Request) -> impl IntoResponse {
74
    const FN_NAME: &'static str = "get_unit_count";
75
2
    let api_path = format!("{}/api/v1/unit/count", state.broker_base.as_str());
76
2
    let client = state.client.clone();
77
2

            
78
2
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
79
2
}
80

            
81
/// `GET /{base}/api/v1/unit/list`
82
10
async fn get_unit_list(state: State<AppState>, req: Request) -> impl IntoResponse {
83
    const FN_NAME: &'static str = "get_unit_list";
84
10
    let api_path = format!("{}/api/v1/unit/list", state.broker_base.as_str());
85
10
    let api_path = api_path.as_str();
86
10
    let client = state.client.clone();
87

            
88
2
    let (api_resp, resp_builder) =
89
10
        match list_api_bridge(FN_NAME, &client, req, api_path, false, "unit").await {
90
8
            ListResp::Axum(resp) => return resp,
91
2
            ListResp::ArrayStream(api_resp, resp_builder) => (api_resp, resp_builder),
92
2
        };
93
2

            
94
2
    let mut resp_stream = api_resp.bytes_stream();
95
2
    let body = Body::from_stream(async_stream::stream! {
96
2
        yield Ok(Bytes::from(CSV_FIELDS));
97
2

            
98
2
        let mut buffer = BytesMut::new();
99
2
        while let Some(body) = resp_stream.next().await {
100
2
            match body {
101
2
                Err(e) => {
102
2
                    error!("[{}] get body error: {}", FN_NAME, e);
103
2
                    let err: Box<dyn StdError + Send + Sync> = Box::new(e);
104
2
                    yield Err(err);
105
2
                    break;
106
2
                }
107
2
                Ok(body) => buffer.extend_from_slice(&body[..]),
108
2
            }
109
2

            
110
2
            let mut json_stream = Deserializer::from_slice(&buffer[..]).into_iter::<response::Unit>();
111
2
            let mut index = 0;
112
2
            let mut finish = false;
113
2
            loop {
114
2
                if let Some(Ok(mut v)) = json_stream.next() {
115
2
                    if let Ok(member_ids_str) = serde_json::to_string(&v.member_ids) {
116
2
                        v.member_ids_str = Some(member_ids_str);
117
2
                    }
118
2
                    if let Ok(info_str) = serde_json::to_string(&v.info) {
119
2
                        v.info_str = Some(info_str);
120
2
                    }
121
2
                    let mut writer = WriterBuilder::new().has_headers(false).from_writer(vec![]);
122
2
                    if let Err(e) = writer.serialize(v) {
123
2
                        let err: Box<dyn StdError + Send + Sync> = Box::new(e);
124
2
                        yield Err(err);
125
2
                        finish = true;
126
2
                        break;
127
2
                    }
128
2
                    match writer.into_inner() {
129
2
                        Err(e) => {
130
2
                            let err: Box<dyn StdError + Send + Sync> = Box::new(e);
131
2
                            yield Err(err);
132
2
                            finish = true;
133
2
                            break;
134
2
                        }
135
2
                        Ok(row) => yield Ok(Bytes::copy_from_slice(row.as_slice())),
136
2
                    }
137
2
                    continue;
138
2
                }
139
2
                let offset = json_stream.byte_offset();
140
2
                if buffer.len() <= index + offset {
141
2
                    index = buffer.len();
142
2
                    break;
143
2
                }
144
2
                match buffer[index+offset] {
145
2
                    b'[' | b',' => {
146
2
                        index += offset + 1;
147
2
                        if buffer.len() <= index {
148
2
                            break;
149
2
                        }
150
2
                        json_stream =
151
2
                            Deserializer::from_slice(&buffer[index..])
152
2
                                .into_iter::<response::Unit>();
153
2
                    }
154
2
                    b']' => {
155
2
                        finish = true;
156
2
                        break;
157
2
                    }
158
2
                    _ => break,
159
2
                }
160
2
            }
161
2
            if finish {
162
2
                break;
163
2
            }
164
2
            buffer = buffer.split_off(index);
165
2
        }
166
2
    });
167
2
    match resp_builder.body(body) {
168
        Err(e) => ErrResp::ErrRsc(Some(e.to_string())).into_response(),
169
2
        Ok(resp) => resp,
170
    }
171
10
}
172

            
173
/// `GET /{base}/api/v1/unit/{unitId}`
174
2
async fn get_unit(
175
2
    state: State<AppState>,
176
2
    Path(param): Path<UnitIdPath>,
177
2
    req: Request,
178
2
) -> impl IntoResponse {
179
    const FN_NAME: &'static str = "get_unit";
180
2
    let api_path = format!("{}/api/v1/unit/{}", state.broker_base, param.unit_id);
181
2
    let client = state.client.clone();
182
2

            
183
2
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
184
2
}
185

            
186
/// `PATCH /{base}/api/v1/unit/{unitId}`
187
2
async fn patch_unit(
188
2
    state: State<AppState>,
189
2
    Path(param): Path<UnitIdPath>,
190
2
    req: Request,
191
2
) -> impl IntoResponse {
192
    const FN_NAME: &'static str = "patch_unit";
193
2
    let api_path = format!("{}/api/v1/unit/{}", state.broker_base, param.unit_id);
194
2
    let client = state.client.clone();
195
2

            
196
2
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
197
2
}
198

            
199
/// `DELETE /{base}/api/v1/unit/{unitId}`
200
4
async fn delete_unit(
201
4
    state: State<AppState>,
202
4
    Path(param): Path<UnitIdPath>,
203
4
    req: Request,
204
4
) -> impl IntoResponse {
205
    const FN_NAME: &'static str = "delete_unit";
206
4
    let api_path = format!("{}/api/v1/unit/{}", state.broker_base, param.unit_id);
207
4
    let client = state.client.clone();
208

            
209
    // Delete all underlaying broker resources before deleting the unit.
210
4
    let token = match req.headers().get(header::AUTHORIZATION) {
211
        None => {
212
2
            let msg = "missing Authorization".to_string();
213
2
            return ErrResp::ErrParam(Some(msg)).into_response();
214
        }
215
2
        Some(value) => value.clone(),
216
    };
217
2
    let unit = match get_unit_inner(
218
2
        FN_NAME,
219
2
        &client,
220
2
        state.broker_base.as_str(),
221
2
        param.unit_id.as_str(),
222
2
        &token,
223
2
    )
224
2
    .await
225
    {
226
        Err(e) => return e,
227
2
        Ok(unit) => unit,
228
    };
229
2
    if let Some(unit) = unit {
230
2
        let unit_id = param.unit_id.as_str();
231
2
        let unit_code = unit.code.as_str();
232
        if let Err(e) =
233
2
            delete_application_resources(FN_NAME, &token, &state, unit_id, unit_code).await
234
        {
235
            return e;
236
2
        }
237
2
        if let Err(e) = delete_network_resources(FN_NAME, &token, &state, unit_id, unit_code).await
238
        {
239
            return e;
240
2
        }
241
    }
242

            
243
2
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
244
4
}
245

            
246
2
async fn delete_application_resources(
247
2
    fn_name: &str,
248
2
    token: &HeaderValue,
249
2
    state: &AppState,
250
2
    unit_id: &str,
251
2
    unit_code: &str,
252
2
) -> Result<(), Response> {
253
2
    // Get application from stream and delete broker resources.
254
2
    let client = state.client.clone();
255
2
    let uri = format!(
256
2
        "{}/api/v1/application/list?limit=0&format=array&unit={}",
257
2
        state.broker_base.as_str(),
258
2
        unit_id
259
2
    );
260
2

            
261
2
    let mut buffer = BytesMut::new();
262
2
    let mut stream = get_stream_resp(fn_name, token, &client, uri.as_str())
263
2
        .await?
264
2
        .bytes_stream();
265
6
    while let Some(body) = stream.next().await {
266
6
        match body {
267
            Err(e) => {
268
                let msg = format!("get application body error: {}", e);
269
                error!("[{}] {}", fn_name, msg);
270
                return Err(ErrResp::ErrIntMsg(Some(msg)).into_response());
271
            }
272
6
            Ok(body) => buffer.extend_from_slice(&body[..]),
273
6
        }
274
6

            
275
6
        let mut json_stream = Deserializer::from_slice(&buffer[..]).into_iter::<Application>();
276
6
        let mut index = 0;
277
6
        let mut finish = false;
278
        loop {
279
208
            if let Some(Ok(v)) = json_stream.next() {
280
202
                if v.host_uri.starts_with("amqp") {
281
100
                    match &state.amqp {
282
100
                        AmqpState::RabbitMq(opts) => {
283
100
                            let host = match Url::parse(v.host_uri.as_str()) {
284
                                Err(e) => {
285
                                    let msg = format!("{} is not valid URI: {}", v.host_uri, e);
286
                                    error!("[{}] {}", fn_name, msg);
287
                                    return Err(ErrResp::ErrUnknown(Some(msg)).into_response());
288
                                }
289
100
                                Ok(url) => match url.host_str() {
290
                                    None => {
291
                                        let msg = format!("{} is not valid URI", v.host_uri);
292
                                        error!("[{}] {}", fn_name, msg);
293
                                        return Err(ErrResp::ErrUnknown(Some(msg)).into_response());
294
                                    }
295
100
                                    Some(host) => host.to_string(),
296
100
                                },
297
100
                            };
298
100
                            let username =
299
100
                                mq::to_username(QueueType::Application, unit_code, v.code.as_str());
300
100
                            if let Err(e) = rabbitmq::delete_user(
301
100
                                &client,
302
100
                                opts,
303
100
                                host.as_str(),
304
100
                                username.as_str(),
305
100
                            )
306
100
                            .await
307
                            {
308
                                let msg = format!("delete RabbitMQ user {} error: {}", username, e);
309
                                error!("[{}] {}", fn_name, msg);
310
                                return Err(ErrResp::ErrIntMsg(Some(msg)).into_response());
311
100
                            }
312
                        }
313
                    }
314
102
                } else if v.host_uri.starts_with("mqtt") {
315
102
                    match &state.mqtt {
316
51
                        MqttState::Emqx(opts) => {
317
51
                            let host = match Url::parse(v.host_uri.as_str()) {
318
                                Err(e) => {
319
                                    let msg = format!("{} is not valid URI: {}", v.host_uri, e);
320
                                    error!("[{}] {}", fn_name, msg);
321
                                    return Err(ErrResp::ErrUnknown(Some(msg)).into_response());
322
                                }
323
51
                                Ok(url) => match url.host_str() {
324
                                    None => {
325
                                        let msg = format!("{} is not valid URI", v.host_uri);
326
                                        error!("[{}] {}", fn_name, msg);
327
                                        return Err(ErrResp::ErrUnknown(Some(msg)).into_response());
328
                                    }
329
51
                                    Some(host) => host.to_string(),
330
51
                                },
331
51
                            };
332
51
                            let username =
333
51
                                mq::to_username(QueueType::Application, unit_code, v.code.as_str());
334
                            if let Err(e) =
335
51
                                emqx::delete_user(&client, opts, host.as_str(), username.as_str())
336
51
                                    .await
337
                            {
338
                                let msg = format!("delete RabbitMQ user {} error: {}", username, e);
339
                                error!("[{}] {}", fn_name, msg);
340
                                return Err(ErrResp::ErrIntMsg(Some(msg)).into_response());
341
51
                            }
342
                        }
343
51
                        MqttState::Rumqttd => {}
344
                    }
345
                }
346
6
            }
347
208
            let offset = json_stream.byte_offset();
348
208
            if buffer.len() <= index + offset {
349
2
                index = buffer.len();
350
2
                break;
351
206
            }
352
206
            match buffer[index + offset] {
353
                b'[' | b',' => {
354
202
                    index += offset + 1;
355
202
                    if buffer.len() <= index {
356
                        break;
357
202
                    }
358
202
                    json_stream =
359
202
                        Deserializer::from_slice(&buffer[index..]).into_iter::<Application>();
360
                }
361
                b']' => {
362
2
                    finish = true;
363
2
                    break;
364
                }
365
2
                _ => break,
366
            }
367
        }
368
6
        if finish {
369
2
            break;
370
4
        }
371
4
        buffer = buffer.split_off(index);
372
    }
373

            
374
2
    Ok(())
375
2
}
376

            
377
2
async fn delete_network_resources(
378
2
    fn_name: &str,
379
2
    token: &HeaderValue,
380
2
    state: &AppState,
381
2
    unit_id: &str,
382
2
    unit_code: &str,
383
2
) -> Result<(), Response> {
384
2
    // Get network from stream and delete broker resources.
385
2
    let client = state.client.clone();
386
2
    let uri = format!(
387
2
        "{}/api/v1/network/list?limit=0&format=array&unit={}",
388
2
        state.broker_base.as_str(),
389
2
        unit_id
390
2
    );
391
2

            
392
2
    let mut buffer = BytesMut::new();
393
2
    let mut stream = get_stream_resp(fn_name, token, &client, uri.as_str())
394
2
        .await?
395
2
        .bytes_stream();
396
4
    while let Some(body) = stream.next().await {
397
4
        match body {
398
            Err(e) => {
399
                let msg = format!("get network body error: {}", e);
400
                error!("[{}] {}", fn_name, msg);
401
                return Err(ErrResp::ErrIntMsg(Some(msg)).into_response());
402
            }
403
4
            Ok(body) => buffer.extend_from_slice(&body[..]),
404
4
        }
405
4

            
406
4
        let mut json_stream = Deserializer::from_slice(&buffer[..]).into_iter::<Network>();
407
4
        let mut index = 0;
408
4
        let mut finish = false;
409
        loop {
410
206
            if let Some(Ok(v)) = json_stream.next() {
411
202
                if v.host_uri.starts_with("amqp") {
412
100
                    match &state.amqp {
413
100
                        AmqpState::RabbitMq(opts) => {
414
100
                            let host = match Url::parse(v.host_uri.as_str()) {
415
                                Err(e) => {
416
                                    let msg = format!("{} is not valid URI: {}", v.host_uri, e);
417
                                    error!("[{}] {}", fn_name, msg);
418
                                    return Err(ErrResp::ErrUnknown(Some(msg)).into_response());
419
                                }
420
100
                                Ok(url) => match url.host_str() {
421
                                    None => {
422
                                        let msg = format!("{} is not valid URI", v.host_uri);
423
                                        error!("[{}] {}", fn_name, msg);
424
                                        return Err(ErrResp::ErrUnknown(Some(msg)).into_response());
425
                                    }
426
100
                                    Some(host) => host.to_string(),
427
100
                                },
428
100
                            };
429
100
                            let username =
430
100
                                mq::to_username(QueueType::Network, unit_code, v.code.as_str());
431
100
                            if let Err(e) = rabbitmq::delete_user(
432
100
                                &client,
433
100
                                opts,
434
100
                                host.as_str(),
435
100
                                username.as_str(),
436
100
                            )
437
100
                            .await
438
                            {
439
                                let msg = format!("delete RabbitMQ user {} error: {}", username, e);
440
                                error!("[{}] {}", fn_name, msg);
441
                                return Err(ErrResp::ErrIntMsg(Some(msg)).into_response());
442
100
                            }
443
                        }
444
                    }
445
102
                } else if v.host_uri.starts_with("mqtt") {
446
102
                    match &state.mqtt {
447
51
                        MqttState::Emqx(opts) => {
448
51
                            let host = match Url::parse(v.host_uri.as_str()) {
449
                                Err(e) => {
450
                                    let msg = format!("{} is not valid URI: {}", v.host_uri, e);
451
                                    error!("[{}] {}", fn_name, msg);
452
                                    return Err(ErrResp::ErrUnknown(Some(msg)).into_response());
453
                                }
454
51
                                Ok(url) => match url.host_str() {
455
                                    None => {
456
                                        let msg = format!("{} is not valid URI", v.host_uri);
457
                                        error!("[{}] {}", fn_name, msg);
458
                                        return Err(ErrResp::ErrUnknown(Some(msg)).into_response());
459
                                    }
460
51
                                    Some(host) => host.to_string(),
461
51
                                },
462
51
                            };
463
51
                            let username =
464
51
                                mq::to_username(QueueType::Network, unit_code, v.code.as_str());
465
                            if let Err(e) =
466
51
                                emqx::delete_user(&client, opts, host.as_str(), username.as_str())
467
51
                                    .await
468
                            {
469
                                let msg = format!("delete RabbitMQ user {} error: {}", username, e);
470
                                error!("[{}] {}", fn_name, msg);
471
                                return Err(ErrResp::ErrIntMsg(Some(msg)).into_response());
472
51
                            }
473
                        }
474
51
                        MqttState::Rumqttd => {}
475
                    }
476
                }
477
4
            }
478
206
            let offset = json_stream.byte_offset();
479
206
            if buffer.len() <= index + offset {
480
2
                index = buffer.len();
481
2
                break;
482
204
            }
483
204
            match buffer[index + offset] {
484
                b'[' | b',' => {
485
202
                    index += offset + 1;
486
202
                    if buffer.len() <= index {
487
                        break;
488
202
                    }
489
202
                    json_stream = Deserializer::from_slice(&buffer[index..]).into_iter::<Network>();
490
                }
491
                b']' => {
492
2
                    finish = true;
493
2
                    break;
494
                }
495
                _ => break,
496
            }
497
        }
498
4
        if finish {
499
2
            break;
500
2
        }
501
2
        buffer = buffer.split_off(index);
502
    }
503

            
504
2
    Ok(())
505
2
}