1
use std::error::Error as StdError;
2

            
3
use axum::{
4
    body::Body,
5
    extract::{Path, Request, State},
6
    http::{header, HeaderValue},
7
    response::{IntoResponse, Response},
8
    routing, Router,
9
};
10
use bytes::{Bytes, BytesMut};
11
use csv::WriterBuilder;
12
use futures_util::StreamExt;
13
use log::error;
14
use serde::Deserialize;
15
use serde_json::Deserializer;
16
use url::Url;
17

            
18
use sylvia_iot_corelib::err::ErrResp;
19

            
20
use super::{
21
    super::{AmqpState, MqttState, State as AppState},
22
    api_bridge, get_stream_resp, get_unit_inner, list_api_bridge, response, ListResp,
23
};
24
use crate::libs::mq::{self, emqx, rabbitmq, QueueType};
25

            
26
#[derive(Deserialize)]
27
struct UnitIdPath {
28
    unit_id: String,
29
}
30

            
31
#[derive(Deserialize)]
32
struct Application {
33
    code: String,
34
    #[serde(rename = "hostUri")]
35
    host_uri: String,
36
}
37

            
38
#[derive(Deserialize)]
39
struct Network {
40
    code: String,
41
    #[serde(rename = "hostUri")]
42
    host_uri: String,
43
}
44

            
45
const CSV_FIELDS: &'static [u8] =
46
    b"\xEF\xBB\xBFunitId,code,createdAt,modifiedAt,ownerId,memberIds,name,info\n";
47

            
48
506
pub fn new_service(scope_path: &str, state: &AppState) -> Router {
49
506
    Router::new().nest(
50
506
        scope_path,
51
506
        Router::new()
52
506
            .route("/", routing::post(post_unit))
53
506
            .route("/count", routing::get(get_unit_count))
54
506
            .route("/list", routing::get(get_unit_list))
55
506
            .route(
56
506
                "/{unit_id}",
57
506
                routing::get(get_unit).patch(patch_unit).delete(delete_unit),
58
506
            )
59
506
            .with_state(state.clone()),
60
506
    )
61
506
}
62

            
63
/// `POST /{base}/api/v1/unit`
64
4
async fn post_unit(state: State<AppState>, req: Request) -> impl IntoResponse {
65
    const FN_NAME: &'static str = "post_unit";
66
4
    let api_path = format!("{}/api/v1/unit", state.broker_base);
67
4
    let client = state.client.clone();
68
4

            
69
4
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
70
4
}
71

            
72
/// `GET /{base}/api/v1/unit/count`
73
4
async fn get_unit_count(state: State<AppState>, req: Request) -> impl IntoResponse {
74
    const FN_NAME: &'static str = "get_unit_count";
75
4
    let api_path = format!("{}/api/v1/unit/count", state.broker_base.as_str());
76
4
    let client = state.client.clone();
77
4

            
78
4
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
79
4
}
80

            
81
/// `GET /{base}/api/v1/unit/list`
82
20
async fn get_unit_list(state: State<AppState>, req: Request) -> impl IntoResponse {
83
    const FN_NAME: &'static str = "get_unit_list";
84
20
    let api_path = format!("{}/api/v1/unit/list", state.broker_base.as_str());
85
20
    let api_path = api_path.as_str();
86
20
    let client = state.client.clone();
87

            
88
4
    let (api_resp, resp_builder) =
89
20
        match list_api_bridge(FN_NAME, &client, req, api_path, false, "unit").await {
90
16
            ListResp::Axum(resp) => return resp,
91
4
            ListResp::ArrayStream(api_resp, resp_builder) => (api_resp, resp_builder),
92
4
        };
93
4

            
94
4
    let mut resp_stream = api_resp.bytes_stream();
95
4
    let body = Body::from_stream(async_stream::stream! {
96
4
        yield Ok(Bytes::from(CSV_FIELDS));
97
4

            
98
4
        let mut buffer = BytesMut::new();
99
4
        while let Some(body) = resp_stream.next().await {
100
4
            match body {
101
4
                Err(e) => {
102
4
                    error!("[{}] get body error: {}", FN_NAME, e);
103
4
                    let err: Box<dyn StdError + Send + Sync> = Box::new(e);
104
4
                    yield Err(err);
105
4
                    break;
106
4
                }
107
4
                Ok(body) => buffer.extend_from_slice(&body[..]),
108
4
            }
109
4

            
110
4
            let mut json_stream = Deserializer::from_slice(&buffer[..]).into_iter::<response::Unit>();
111
4
            let mut index = 0;
112
4
            let mut finish = false;
113
4
            loop {
114
4
                if let Some(Ok(mut v)) = json_stream.next() {
115
4
                    if let Ok(member_ids_str) = serde_json::to_string(&v.member_ids) {
116
4
                        v.member_ids_str = Some(member_ids_str);
117
4
                    }
118
4
                    if let Ok(info_str) = serde_json::to_string(&v.info) {
119
4
                        v.info_str = Some(info_str);
120
4
                    }
121
4
                    let mut writer = WriterBuilder::new().has_headers(false).from_writer(vec![]);
122
4
                    if let Err(e) = writer.serialize(v) {
123
4
                        let err: Box<dyn StdError + Send + Sync> = Box::new(e);
124
4
                        yield Err(err);
125
4
                        finish = true;
126
4
                        break;
127
4
                    }
128
4
                    match writer.into_inner() {
129
4
                        Err(e) => {
130
4
                            let err: Box<dyn StdError + Send + Sync> = Box::new(e);
131
4
                            yield Err(err);
132
4
                            finish = true;
133
4
                            break;
134
4
                        }
135
4
                        Ok(row) => yield Ok(Bytes::copy_from_slice(row.as_slice())),
136
4
                    }
137
4
                    continue;
138
4
                }
139
4
                let offset = json_stream.byte_offset();
140
4
                if buffer.len() <= index + offset {
141
4
                    index = buffer.len();
142
4
                    break;
143
4
                }
144
4
                match buffer[index+offset] {
145
4
                    b'[' | b',' => {
146
4
                        index += offset + 1;
147
4
                        if buffer.len() <= index {
148
4
                            break;
149
4
                        }
150
4
                        json_stream =
151
4
                            Deserializer::from_slice(&buffer[index..])
152
4
                                .into_iter::<response::Unit>();
153
4
                    }
154
4
                    b']' => {
155
4
                        finish = true;
156
4
                        break;
157
4
                    }
158
4
                    _ => break,
159
4
                }
160
4
            }
161
4
            if finish {
162
4
                break;
163
4
            }
164
4
            buffer = buffer.split_off(index);
165
4
        }
166
4
    });
167
4
    match resp_builder.body(body) {
168
        Err(e) => ErrResp::ErrRsc(Some(e.to_string())).into_response(),
169
4
        Ok(resp) => resp,
170
    }
171
20
}
172

            
173
/// `GET /{base}/api/v1/unit/{unitId}`
174
4
async fn get_unit(
175
4
    state: State<AppState>,
176
4
    Path(param): Path<UnitIdPath>,
177
4
    req: Request,
178
4
) -> impl IntoResponse {
179
    const FN_NAME: &'static str = "get_unit";
180
4
    let api_path = format!("{}/api/v1/unit/{}", state.broker_base, param.unit_id);
181
4
    let client = state.client.clone();
182
4

            
183
4
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
184
4
}
185

            
186
/// `PATCH /{base}/api/v1/unit/{unitId}`
187
4
async fn patch_unit(
188
4
    state: State<AppState>,
189
4
    Path(param): Path<UnitIdPath>,
190
4
    req: Request,
191
4
) -> impl IntoResponse {
192
    const FN_NAME: &'static str = "patch_unit";
193
4
    let api_path = format!("{}/api/v1/unit/{}", state.broker_base, param.unit_id);
194
4
    let client = state.client.clone();
195
4

            
196
4
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
197
4
}
198

            
199
/// `DELETE /{base}/api/v1/unit/{unitId}`
200
8
async fn delete_unit(
201
8
    state: State<AppState>,
202
8
    Path(param): Path<UnitIdPath>,
203
8
    req: Request,
204
8
) -> impl IntoResponse {
205
    const FN_NAME: &'static str = "delete_unit";
206
8
    let api_path = format!("{}/api/v1/unit/{}", state.broker_base, param.unit_id);
207
8
    let client = state.client.clone();
208

            
209
    // Delete all underlaying broker resources before deleting the unit.
210
8
    let token = match req.headers().get(header::AUTHORIZATION) {
211
        None => {
212
4
            let msg = "missing Authorization".to_string();
213
4
            return ErrResp::ErrParam(Some(msg)).into_response();
214
        }
215
4
        Some(value) => value.clone(),
216
    };
217
4
    let unit = match get_unit_inner(
218
4
        FN_NAME,
219
4
        &client,
220
4
        state.broker_base.as_str(),
221
4
        param.unit_id.as_str(),
222
4
        &token,
223
4
    )
224
4
    .await
225
    {
226
        Err(e) => return e,
227
4
        Ok(unit) => unit,
228
    };
229
4
    if let Some(unit) = unit {
230
4
        let unit_id = param.unit_id.as_str();
231
4
        let unit_code = unit.code.as_str();
232
        if let Err(e) =
233
4
            delete_application_resources(FN_NAME, &token, &state, unit_id, unit_code).await
234
        {
235
            return e;
236
4
        }
237
4
        if let Err(e) = delete_network_resources(FN_NAME, &token, &state, unit_id, unit_code).await
238
        {
239
            return e;
240
4
        }
241
    }
242

            
243
4
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
244
8
}
245

            
246
4
async fn delete_application_resources(
247
4
    fn_name: &str,
248
4
    token: &HeaderValue,
249
4
    state: &AppState,
250
4
    unit_id: &str,
251
4
    unit_code: &str,
252
4
) -> Result<(), Response> {
253
4
    // Get application from stream and delete broker resources.
254
4
    let client = state.client.clone();
255
4
    let uri = format!(
256
4
        "{}/api/v1/application/list?limit=0&format=array&unit={}",
257
4
        state.broker_base.as_str(),
258
4
        unit_id
259
4
    );
260
4

            
261
4
    let mut buffer = BytesMut::new();
262
4
    let mut stream = get_stream_resp(fn_name, token, &client, uri.as_str())
263
4
        .await?
264
4
        .bytes_stream();
265
12
    while let Some(body) = stream.next().await {
266
12
        match body {
267
            Err(e) => {
268
                let msg = format!("get application body error: {}", e);
269
                error!("[{}] {}", fn_name, msg);
270
                return Err(ErrResp::ErrIntMsg(Some(msg)).into_response());
271
            }
272
12
            Ok(body) => buffer.extend_from_slice(&body[..]),
273
12
        }
274
12

            
275
12
        let mut json_stream = Deserializer::from_slice(&buffer[..]).into_iter::<Application>();
276
12
        let mut index = 0;
277
12
        let mut finish = false;
278
        loop {
279
416
            if let Some(Ok(v)) = json_stream.next() {
280
404
                if v.host_uri.starts_with("amqp") {
281
200
                    match &state.amqp {
282
200
                        AmqpState::RabbitMq(opts) => {
283
200
                            let host = match Url::parse(v.host_uri.as_str()) {
284
                                Err(e) => {
285
                                    let msg = format!("{} is not valid URI: {}", v.host_uri, e);
286
                                    error!("[{}] {}", fn_name, msg);
287
                                    return Err(ErrResp::ErrUnknown(Some(msg)).into_response());
288
                                }
289
200
                                Ok(url) => match url.host_str() {
290
                                    None => {
291
                                        let msg = format!("{} is not valid URI", v.host_uri);
292
                                        error!("[{}] {}", fn_name, msg);
293
                                        return Err(ErrResp::ErrUnknown(Some(msg)).into_response());
294
                                    }
295
200
                                    Some(host) => host.to_string(),
296
200
                                },
297
200
                            };
298
200
                            let username =
299
200
                                mq::to_username(QueueType::Application, unit_code, v.code.as_str());
300
200
                            if let Err(e) = rabbitmq::delete_user(
301
200
                                &client,
302
200
                                opts,
303
200
                                host.as_str(),
304
200
                                username.as_str(),
305
200
                            )
306
200
                            .await
307
                            {
308
                                let msg = format!("delete RabbitMQ user {} error: {}", username, e);
309
                                error!("[{}] {}", fn_name, msg);
310
                                return Err(ErrResp::ErrIntMsg(Some(msg)).into_response());
311
200
                            }
312
                        }
313
                    }
314
204
                } else if v.host_uri.starts_with("mqtt") {
315
204
                    match &state.mqtt {
316
102
                        MqttState::Emqx(opts) => {
317
102
                            let host = match Url::parse(v.host_uri.as_str()) {
318
                                Err(e) => {
319
                                    let msg = format!("{} is not valid URI: {}", v.host_uri, e);
320
                                    error!("[{}] {}", fn_name, msg);
321
                                    return Err(ErrResp::ErrUnknown(Some(msg)).into_response());
322
                                }
323
102
                                Ok(url) => match url.host_str() {
324
                                    None => {
325
                                        let msg = format!("{} is not valid URI", v.host_uri);
326
                                        error!("[{}] {}", fn_name, msg);
327
                                        return Err(ErrResp::ErrUnknown(Some(msg)).into_response());
328
                                    }
329
102
                                    Some(host) => host.to_string(),
330
102
                                },
331
102
                            };
332
102
                            let username =
333
102
                                mq::to_username(QueueType::Application, unit_code, v.code.as_str());
334
                            if let Err(e) =
335
102
                                emqx::delete_user(&client, opts, host.as_str(), username.as_str())
336
102
                                    .await
337
                            {
338
                                let msg = format!("delete RabbitMQ user {} error: {}", username, e);
339
                                error!("[{}] {}", fn_name, msg);
340
                                return Err(ErrResp::ErrIntMsg(Some(msg)).into_response());
341
102
                            }
342
                        }
343
102
                        MqttState::Rumqttd => {}
344
                    }
345
                }
346
12
            }
347
416
            let offset = json_stream.byte_offset();
348
416
            if buffer.len() <= index + offset {
349
4
                index = buffer.len();
350
4
                break;
351
412
            }
352
412
            match buffer[index + offset] {
353
                b'[' | b',' => {
354
404
                    index += offset + 1;
355
404
                    if buffer.len() <= index {
356
                        break;
357
404
                    }
358
404
                    json_stream =
359
404
                        Deserializer::from_slice(&buffer[index..]).into_iter::<Application>();
360
                }
361
                b']' => {
362
4
                    finish = true;
363
4
                    break;
364
                }
365
4
                _ => break,
366
            }
367
        }
368
12
        if finish {
369
4
            break;
370
8
        }
371
8
        buffer = buffer.split_off(index);
372
    }
373

            
374
4
    Ok(())
375
4
}
376

            
377
4
async fn delete_network_resources(
378
4
    fn_name: &str,
379
4
    token: &HeaderValue,
380
4
    state: &AppState,
381
4
    unit_id: &str,
382
4
    unit_code: &str,
383
4
) -> Result<(), Response> {
384
4
    // Get network from stream and delete broker resources.
385
4
    let client = state.client.clone();
386
4
    let uri = format!(
387
4
        "{}/api/v1/network/list?limit=0&format=array&unit={}",
388
4
        state.broker_base.as_str(),
389
4
        unit_id
390
4
    );
391
4

            
392
4
    let mut buffer = BytesMut::new();
393
4
    let mut stream = get_stream_resp(fn_name, token, &client, uri.as_str())
394
4
        .await?
395
4
        .bytes_stream();
396
8
    while let Some(body) = stream.next().await {
397
8
        match body {
398
            Err(e) => {
399
                let msg = format!("get network body error: {}", e);
400
                error!("[{}] {}", fn_name, msg);
401
                return Err(ErrResp::ErrIntMsg(Some(msg)).into_response());
402
            }
403
8
            Ok(body) => buffer.extend_from_slice(&body[..]),
404
8
        }
405
8

            
406
8
        let mut json_stream = Deserializer::from_slice(&buffer[..]).into_iter::<Network>();
407
8
        let mut index = 0;
408
8
        let mut finish = false;
409
        loop {
410
412
            if let Some(Ok(v)) = json_stream.next() {
411
404
                if v.host_uri.starts_with("amqp") {
412
200
                    match &state.amqp {
413
200
                        AmqpState::RabbitMq(opts) => {
414
200
                            let host = match Url::parse(v.host_uri.as_str()) {
415
                                Err(e) => {
416
                                    let msg = format!("{} is not valid URI: {}", v.host_uri, e);
417
                                    error!("[{}] {}", fn_name, msg);
418
                                    return Err(ErrResp::ErrUnknown(Some(msg)).into_response());
419
                                }
420
200
                                Ok(url) => match url.host_str() {
421
                                    None => {
422
                                        let msg = format!("{} is not valid URI", v.host_uri);
423
                                        error!("[{}] {}", fn_name, msg);
424
                                        return Err(ErrResp::ErrUnknown(Some(msg)).into_response());
425
                                    }
426
200
                                    Some(host) => host.to_string(),
427
200
                                },
428
200
                            };
429
200
                            let username =
430
200
                                mq::to_username(QueueType::Network, unit_code, v.code.as_str());
431
200
                            if let Err(e) = rabbitmq::delete_user(
432
200
                                &client,
433
200
                                opts,
434
200
                                host.as_str(),
435
200
                                username.as_str(),
436
200
                            )
437
200
                            .await
438
                            {
439
                                let msg = format!("delete RabbitMQ user {} error: {}", username, e);
440
                                error!("[{}] {}", fn_name, msg);
441
                                return Err(ErrResp::ErrIntMsg(Some(msg)).into_response());
442
200
                            }
443
                        }
444
                    }
445
204
                } else if v.host_uri.starts_with("mqtt") {
446
204
                    match &state.mqtt {
447
102
                        MqttState::Emqx(opts) => {
448
102
                            let host = match Url::parse(v.host_uri.as_str()) {
449
                                Err(e) => {
450
                                    let msg = format!("{} is not valid URI: {}", v.host_uri, e);
451
                                    error!("[{}] {}", fn_name, msg);
452
                                    return Err(ErrResp::ErrUnknown(Some(msg)).into_response());
453
                                }
454
102
                                Ok(url) => match url.host_str() {
455
                                    None => {
456
                                        let msg = format!("{} is not valid URI", v.host_uri);
457
                                        error!("[{}] {}", fn_name, msg);
458
                                        return Err(ErrResp::ErrUnknown(Some(msg)).into_response());
459
                                    }
460
102
                                    Some(host) => host.to_string(),
461
102
                                },
462
102
                            };
463
102
                            let username =
464
102
                                mq::to_username(QueueType::Network, unit_code, v.code.as_str());
465
                            if let Err(e) =
466
102
                                emqx::delete_user(&client, opts, host.as_str(), username.as_str())
467
102
                                    .await
468
                            {
469
                                let msg = format!("delete RabbitMQ user {} error: {}", username, e);
470
                                error!("[{}] {}", fn_name, msg);
471
                                return Err(ErrResp::ErrIntMsg(Some(msg)).into_response());
472
102
                            }
473
                        }
474
102
                        MqttState::Rumqttd => {}
475
                    }
476
                }
477
8
            }
478
412
            let offset = json_stream.byte_offset();
479
412
            if buffer.len() <= index + offset {
480
4
                index = buffer.len();
481
4
                break;
482
408
            }
483
408
            match buffer[index + offset] {
484
                b'[' | b',' => {
485
404
                    index += offset + 1;
486
404
                    if buffer.len() <= index {
487
                        break;
488
404
                    }
489
404
                    json_stream = Deserializer::from_slice(&buffer[index..]).into_iter::<Network>();
490
                }
491
                b']' => {
492
4
                    finish = true;
493
4
                    break;
494
                }
495
                _ => break,
496
            }
497
        }
498
8
        if finish {
499
4
            break;
500
4
        }
501
4
        buffer = buffer.split_off(index);
502
    }
503

            
504
4
    Ok(())
505
4
}