1
use std::error::Error as StdError;
2

            
3
use axum::{
4
    Router,
5
    body::Body,
6
    extract::{Path, Request, State},
7
    http::{HeaderValue, header},
8
    response::{IntoResponse, Response},
9
    routing,
10
};
11
use bytes::{Bytes, BytesMut};
12
use csv::WriterBuilder;
13
use futures_util::StreamExt;
14
use log::error;
15
use serde::Deserialize;
16
use serde_json::Deserializer;
17
use url::Url;
18

            
19
use sylvia_iot_corelib::err::ErrResp;
20

            
21
use super::{
22
    super::{AmqpState, MqttState, State as AppState},
23
    ListResp, api_bridge, get_stream_resp, get_unit_inner, list_api_bridge, response,
24
};
25
use crate::libs::mq::{self, QueueType, emqx, rabbitmq};
26

            
27
#[derive(Deserialize)]
28
struct UnitIdPath {
29
    unit_id: String,
30
}
31

            
32
#[derive(Deserialize)]
33
struct Application {
34
    code: String,
35
    #[serde(rename = "hostUri")]
36
    host_uri: String,
37
}
38

            
39
#[derive(Deserialize)]
40
struct Network {
41
    code: String,
42
    #[serde(rename = "hostUri")]
43
    host_uri: String,
44
}
45

            
46
const CSV_FIELDS: &'static [u8] =
47
    b"\xEF\xBB\xBFunitId,code,createdAt,modifiedAt,ownerId,memberIds,name,info\n";
48

            
49
506
pub fn new_service(scope_path: &str, state: &AppState) -> Router {
50
506
    Router::new().nest(
51
506
        scope_path,
52
506
        Router::new()
53
506
            .route("/", routing::post(post_unit))
54
506
            .route("/count", routing::get(get_unit_count))
55
506
            .route("/list", routing::get(get_unit_list))
56
506
            .route(
57
506
                "/{unit_id}",
58
506
                routing::get(get_unit).patch(patch_unit).delete(delete_unit),
59
506
            )
60
506
            .with_state(state.clone()),
61
506
    )
62
506
}
63

            
64
/// `POST /{base}/api/v1/unit`
65
4
async fn post_unit(state: State<AppState>, req: Request) -> impl IntoResponse {
66
    const FN_NAME: &'static str = "post_unit";
67
4
    let api_path = format!("{}/api/v1/unit", state.broker_base);
68
4
    let client = state.client.clone();
69
4

            
70
4
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
71
4
}
72

            
73
/// `GET /{base}/api/v1/unit/count`
74
4
async fn get_unit_count(state: State<AppState>, req: Request) -> impl IntoResponse {
75
    const FN_NAME: &'static str = "get_unit_count";
76
4
    let api_path = format!("{}/api/v1/unit/count", state.broker_base.as_str());
77
4
    let client = state.client.clone();
78
4

            
79
4
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
80
4
}
81

            
82
/// `GET /{base}/api/v1/unit/list`
83
20
async fn get_unit_list(state: State<AppState>, req: Request) -> impl IntoResponse {
84
    const FN_NAME: &'static str = "get_unit_list";
85
20
    let api_path = format!("{}/api/v1/unit/list", state.broker_base.as_str());
86
20
    let api_path = api_path.as_str();
87
20
    let client = state.client.clone();
88

            
89
4
    let (api_resp, resp_builder) =
90
20
        match list_api_bridge(FN_NAME, &client, req, api_path, false, "unit").await {
91
16
            ListResp::Axum(resp) => return resp,
92
4
            ListResp::ArrayStream(api_resp, resp_builder) => (api_resp, resp_builder),
93
4
        };
94
4

            
95
4
    let mut resp_stream = api_resp.bytes_stream();
96
4
    let body = Body::from_stream(async_stream::stream! {
97
4
        yield Ok(Bytes::from(CSV_FIELDS));
98
4

            
99
4
        let mut buffer = BytesMut::new();
100
4
        while let Some(body) = resp_stream.next().await {
101
4
            match body {
102
4
                Err(e) => {
103
4
                    error!("[{}] get body error: {}", FN_NAME, e);
104
4
                    let err: Box<dyn StdError + Send + Sync> = Box::new(e);
105
4
                    yield Err(err);
106
4
                    break;
107
4
                }
108
4
                Ok(body) => buffer.extend_from_slice(&body[..]),
109
4
            }
110
4

            
111
4
            let mut json_stream = Deserializer::from_slice(&buffer[..]).into_iter::<response::Unit>();
112
4
            let mut index = 0;
113
4
            let mut finish = false;
114
4
            loop {
115
4
                if let Some(Ok(mut v)) = json_stream.next() {
116
4
                    if let Ok(member_ids_str) = serde_json::to_string(&v.member_ids) {
117
4
                        v.member_ids_str = Some(member_ids_str);
118
4
                    }
119
4
                    if let Ok(info_str) = serde_json::to_string(&v.info) {
120
4
                        v.info_str = Some(info_str);
121
4
                    }
122
4
                    let mut writer = WriterBuilder::new().has_headers(false).from_writer(vec![]);
123
4
                    if let Err(e) = writer.serialize(v) {
124
4
                        let err: Box<dyn StdError + Send + Sync> = Box::new(e);
125
4
                        yield Err(err);
126
4
                        finish = true;
127
4
                        break;
128
4
                    }
129
4
                    match writer.into_inner() {
130
4
                        Err(e) => {
131
4
                            let err: Box<dyn StdError + Send + Sync> = Box::new(e);
132
4
                            yield Err(err);
133
4
                            finish = true;
134
4
                            break;
135
4
                        }
136
4
                        Ok(row) => yield Ok(Bytes::copy_from_slice(row.as_slice())),
137
4
                    }
138
4
                    continue;
139
4
                }
140
4
                let offset = json_stream.byte_offset();
141
4
                if buffer.len() <= index + offset {
142
4
                    index = buffer.len();
143
4
                    break;
144
4
                }
145
4
                match buffer[index+offset] {
146
4
                    b'[' | b',' => {
147
4
                        index += offset + 1;
148
4
                        if buffer.len() <= index {
149
4
                            break;
150
4
                        }
151
4
                        json_stream =
152
4
                            Deserializer::from_slice(&buffer[index..])
153
4
                                .into_iter::<response::Unit>();
154
4
                    }
155
4
                    b']' => {
156
4
                        finish = true;
157
4
                        break;
158
4
                    }
159
4
                    _ => break,
160
4
                }
161
4
            }
162
4
            if finish {
163
4
                break;
164
4
            }
165
4
            buffer = buffer.split_off(index);
166
4
        }
167
4
    });
168
4
    match resp_builder.body(body) {
169
        Err(e) => ErrResp::ErrRsc(Some(e.to_string())).into_response(),
170
4
        Ok(resp) => resp,
171
    }
172
20
}
173

            
174
/// `GET /{base}/api/v1/unit/{unitId}`
175
4
async fn get_unit(
176
4
    state: State<AppState>,
177
4
    Path(param): Path<UnitIdPath>,
178
4
    req: Request,
179
4
) -> impl IntoResponse {
180
    const FN_NAME: &'static str = "get_unit";
181
4
    let api_path = format!("{}/api/v1/unit/{}", state.broker_base, param.unit_id);
182
4
    let client = state.client.clone();
183
4

            
184
4
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
185
4
}
186

            
187
/// `PATCH /{base}/api/v1/unit/{unitId}`
188
4
async fn patch_unit(
189
4
    state: State<AppState>,
190
4
    Path(param): Path<UnitIdPath>,
191
4
    req: Request,
192
4
) -> impl IntoResponse {
193
    const FN_NAME: &'static str = "patch_unit";
194
4
    let api_path = format!("{}/api/v1/unit/{}", state.broker_base, param.unit_id);
195
4
    let client = state.client.clone();
196
4

            
197
4
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
198
4
}
199

            
200
/// `DELETE /{base}/api/v1/unit/{unitId}`
201
8
async fn delete_unit(
202
8
    state: State<AppState>,
203
8
    Path(param): Path<UnitIdPath>,
204
8
    req: Request,
205
8
) -> impl IntoResponse {
206
    const FN_NAME: &'static str = "delete_unit";
207
8
    let api_path = format!("{}/api/v1/unit/{}", state.broker_base, param.unit_id);
208
8
    let client = state.client.clone();
209

            
210
    // Delete all underlaying broker resources before deleting the unit.
211
8
    let token = match req.headers().get(header::AUTHORIZATION) {
212
        None => {
213
4
            let msg = "missing Authorization".to_string();
214
4
            return ErrResp::ErrParam(Some(msg)).into_response();
215
        }
216
4
        Some(value) => value.clone(),
217
    };
218
4
    let unit = match get_unit_inner(
219
4
        FN_NAME,
220
4
        &client,
221
4
        state.broker_base.as_str(),
222
4
        param.unit_id.as_str(),
223
4
        &token,
224
4
    )
225
4
    .await
226
    {
227
        Err(e) => return e,
228
4
        Ok(unit) => unit,
229
    };
230
4
    if let Some(unit) = unit {
231
4
        let unit_id = param.unit_id.as_str();
232
4
        let unit_code = unit.code.as_str();
233
        if let Err(e) =
234
4
            delete_application_resources(FN_NAME, &token, &state, unit_id, unit_code).await
235
        {
236
            return e;
237
4
        }
238
4
        if let Err(e) = delete_network_resources(FN_NAME, &token, &state, unit_id, unit_code).await
239
        {
240
            return e;
241
4
        }
242
    }
243

            
244
4
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
245
8
}
246

            
247
4
async fn delete_application_resources(
248
4
    fn_name: &str,
249
4
    token: &HeaderValue,
250
4
    state: &AppState,
251
4
    unit_id: &str,
252
4
    unit_code: &str,
253
4
) -> Result<(), Response> {
254
4
    // Get application from stream and delete broker resources.
255
4
    let client = state.client.clone();
256
4
    let uri = format!(
257
4
        "{}/api/v1/application/list?limit=0&format=array&unit={}",
258
4
        state.broker_base.as_str(),
259
4
        unit_id
260
4
    );
261
4

            
262
4
    let mut buffer = BytesMut::new();
263
4
    let mut stream = get_stream_resp(fn_name, token, &client, uri.as_str())
264
4
        .await?
265
4
        .bytes_stream();
266
12
    while let Some(body) = stream.next().await {
267
12
        match body {
268
            Err(e) => {
269
                let msg = format!("get application body error: {}", e);
270
                error!("[{}] {}", fn_name, msg);
271
                return Err(ErrResp::ErrIntMsg(Some(msg)).into_response());
272
            }
273
12
            Ok(body) => buffer.extend_from_slice(&body[..]),
274
12
        }
275
12

            
276
12
        let mut json_stream = Deserializer::from_slice(&buffer[..]).into_iter::<Application>();
277
12
        let mut index = 0;
278
12
        let mut finish = false;
279
        loop {
280
416
            if let Some(Ok(v)) = json_stream.next() {
281
404
                if v.host_uri.starts_with("amqp") {
282
200
                    match &state.amqp {
283
200
                        AmqpState::RabbitMq(opts) => {
284
200
                            let host = match Url::parse(v.host_uri.as_str()) {
285
                                Err(e) => {
286
                                    let msg = format!("{} is not valid URI: {}", v.host_uri, e);
287
                                    error!("[{}] {}", fn_name, msg);
288
                                    return Err(ErrResp::ErrUnknown(Some(msg)).into_response());
289
                                }
290
200
                                Ok(url) => match url.host_str() {
291
                                    None => {
292
                                        let msg = format!("{} is not valid URI", v.host_uri);
293
                                        error!("[{}] {}", fn_name, msg);
294
                                        return Err(ErrResp::ErrUnknown(Some(msg)).into_response());
295
                                    }
296
200
                                    Some(host) => host.to_string(),
297
200
                                },
298
200
                            };
299
200
                            let username =
300
200
                                mq::to_username(QueueType::Application, unit_code, v.code.as_str());
301
200
                            if let Err(e) = rabbitmq::delete_user(
302
200
                                &client,
303
200
                                opts,
304
200
                                host.as_str(),
305
200
                                username.as_str(),
306
200
                            )
307
200
                            .await
308
                            {
309
                                let msg = format!("delete RabbitMQ user {} error: {}", username, e);
310
                                error!("[{}] {}", fn_name, msg);
311
                                return Err(ErrResp::ErrIntMsg(Some(msg)).into_response());
312
200
                            }
313
                        }
314
                    }
315
204
                } else if v.host_uri.starts_with("mqtt") {
316
204
                    match &state.mqtt {
317
102
                        MqttState::Emqx(opts) => {
318
102
                            let host = match Url::parse(v.host_uri.as_str()) {
319
                                Err(e) => {
320
                                    let msg = format!("{} is not valid URI: {}", v.host_uri, e);
321
                                    error!("[{}] {}", fn_name, msg);
322
                                    return Err(ErrResp::ErrUnknown(Some(msg)).into_response());
323
                                }
324
102
                                Ok(url) => match url.host_str() {
325
                                    None => {
326
                                        let msg = format!("{} is not valid URI", v.host_uri);
327
                                        error!("[{}] {}", fn_name, msg);
328
                                        return Err(ErrResp::ErrUnknown(Some(msg)).into_response());
329
                                    }
330
102
                                    Some(host) => host.to_string(),
331
102
                                },
332
102
                            };
333
102
                            let username =
334
102
                                mq::to_username(QueueType::Application, unit_code, v.code.as_str());
335
                            if let Err(e) =
336
102
                                emqx::delete_user(&client, opts, host.as_str(), username.as_str())
337
102
                                    .await
338
                            {
339
                                let msg = format!("delete RabbitMQ user {} error: {}", username, e);
340
                                error!("[{}] {}", fn_name, msg);
341
                                return Err(ErrResp::ErrIntMsg(Some(msg)).into_response());
342
102
                            }
343
                        }
344
102
                        MqttState::Rumqttd => {}
345
                    }
346
                }
347
12
            }
348
416
            let offset = json_stream.byte_offset();
349
416
            if buffer.len() <= index + offset {
350
4
                index = buffer.len();
351
4
                break;
352
412
            }
353
412
            match buffer[index + offset] {
354
                b'[' | b',' => {
355
404
                    index += offset + 1;
356
404
                    if buffer.len() <= index {
357
                        break;
358
404
                    }
359
404
                    json_stream =
360
404
                        Deserializer::from_slice(&buffer[index..]).into_iter::<Application>();
361
                }
362
                b']' => {
363
4
                    finish = true;
364
4
                    break;
365
                }
366
4
                _ => break,
367
            }
368
        }
369
12
        if finish {
370
4
            break;
371
8
        }
372
8
        buffer = buffer.split_off(index);
373
    }
374

            
375
4
    Ok(())
376
4
}
377

            
378
4
async fn delete_network_resources(
379
4
    fn_name: &str,
380
4
    token: &HeaderValue,
381
4
    state: &AppState,
382
4
    unit_id: &str,
383
4
    unit_code: &str,
384
4
) -> Result<(), Response> {
385
4
    // Get network from stream and delete broker resources.
386
4
    let client = state.client.clone();
387
4
    let uri = format!(
388
4
        "{}/api/v1/network/list?limit=0&format=array&unit={}",
389
4
        state.broker_base.as_str(),
390
4
        unit_id
391
4
    );
392
4

            
393
4
    let mut buffer = BytesMut::new();
394
4
    let mut stream = get_stream_resp(fn_name, token, &client, uri.as_str())
395
4
        .await?
396
4
        .bytes_stream();
397
8
    while let Some(body) = stream.next().await {
398
8
        match body {
399
            Err(e) => {
400
                let msg = format!("get network body error: {}", e);
401
                error!("[{}] {}", fn_name, msg);
402
                return Err(ErrResp::ErrIntMsg(Some(msg)).into_response());
403
            }
404
8
            Ok(body) => buffer.extend_from_slice(&body[..]),
405
8
        }
406
8

            
407
8
        let mut json_stream = Deserializer::from_slice(&buffer[..]).into_iter::<Network>();
408
8
        let mut index = 0;
409
8
        let mut finish = false;
410
        loop {
411
412
            if let Some(Ok(v)) = json_stream.next() {
412
404
                if v.host_uri.starts_with("amqp") {
413
200
                    match &state.amqp {
414
200
                        AmqpState::RabbitMq(opts) => {
415
200
                            let host = match Url::parse(v.host_uri.as_str()) {
416
                                Err(e) => {
417
                                    let msg = format!("{} is not valid URI: {}", v.host_uri, e);
418
                                    error!("[{}] {}", fn_name, msg);
419
                                    return Err(ErrResp::ErrUnknown(Some(msg)).into_response());
420
                                }
421
200
                                Ok(url) => match url.host_str() {
422
                                    None => {
423
                                        let msg = format!("{} is not valid URI", v.host_uri);
424
                                        error!("[{}] {}", fn_name, msg);
425
                                        return Err(ErrResp::ErrUnknown(Some(msg)).into_response());
426
                                    }
427
200
                                    Some(host) => host.to_string(),
428
200
                                },
429
200
                            };
430
200
                            let username =
431
200
                                mq::to_username(QueueType::Network, unit_code, v.code.as_str());
432
200
                            if let Err(e) = rabbitmq::delete_user(
433
200
                                &client,
434
200
                                opts,
435
200
                                host.as_str(),
436
200
                                username.as_str(),
437
200
                            )
438
200
                            .await
439
                            {
440
                                let msg = format!("delete RabbitMQ user {} error: {}", username, e);
441
                                error!("[{}] {}", fn_name, msg);
442
                                return Err(ErrResp::ErrIntMsg(Some(msg)).into_response());
443
200
                            }
444
                        }
445
                    }
446
204
                } else if v.host_uri.starts_with("mqtt") {
447
204
                    match &state.mqtt {
448
102
                        MqttState::Emqx(opts) => {
449
102
                            let host = match Url::parse(v.host_uri.as_str()) {
450
                                Err(e) => {
451
                                    let msg = format!("{} is not valid URI: {}", v.host_uri, e);
452
                                    error!("[{}] {}", fn_name, msg);
453
                                    return Err(ErrResp::ErrUnknown(Some(msg)).into_response());
454
                                }
455
102
                                Ok(url) => match url.host_str() {
456
                                    None => {
457
                                        let msg = format!("{} is not valid URI", v.host_uri);
458
                                        error!("[{}] {}", fn_name, msg);
459
                                        return Err(ErrResp::ErrUnknown(Some(msg)).into_response());
460
                                    }
461
102
                                    Some(host) => host.to_string(),
462
102
                                },
463
102
                            };
464
102
                            let username =
465
102
                                mq::to_username(QueueType::Network, unit_code, v.code.as_str());
466
                            if let Err(e) =
467
102
                                emqx::delete_user(&client, opts, host.as_str(), username.as_str())
468
102
                                    .await
469
                            {
470
                                let msg = format!("delete RabbitMQ user {} error: {}", username, e);
471
                                error!("[{}] {}", fn_name, msg);
472
                                return Err(ErrResp::ErrIntMsg(Some(msg)).into_response());
473
102
                            }
474
                        }
475
102
                        MqttState::Rumqttd => {}
476
                    }
477
                }
478
8
            }
479
412
            let offset = json_stream.byte_offset();
480
412
            if buffer.len() <= index + offset {
481
4
                index = buffer.len();
482
4
                break;
483
408
            }
484
408
            match buffer[index + offset] {
485
                b'[' | b',' => {
486
404
                    index += offset + 1;
487
404
                    if buffer.len() <= index {
488
                        break;
489
404
                    }
490
404
                    json_stream = Deserializer::from_slice(&buffer[index..]).into_iter::<Network>();
491
                }
492
                b']' => {
493
4
                    finish = true;
494
4
                    break;
495
                }
496
                _ => break,
497
            }
498
        }
499
8
        if finish {
500
4
            break;
501
4
        }
502
4
        buffer = buffer.split_off(index);
503
    }
504

            
505
4
    Ok(())
506
4
}