1
use std::error::Error as StdError;
2

            
3
use axum::{
4
    Router,
5
    body::Body,
6
    extract::{Path, Request, State},
7
    response::IntoResponse,
8
    routing,
9
};
10
use bytes::{Bytes, BytesMut};
11
use csv::WriterBuilder;
12
use futures_util::StreamExt;
13
use log::error;
14
use serde::{Deserialize, Serialize};
15
use serde_json::Deserializer;
16

            
17
use sylvia_iot_corelib::err::ErrResp;
18

            
19
use super::{super::State as AppState, ListResp, api_bridge, list_api_bridge};
20

            
21
#[derive(Deserialize)]
22
struct RouteIdPath {
23
    route_id: String,
24
}
25

            
26
#[derive(Deserialize, Serialize)]
27
struct NetworkRoute {
28
    #[serde(rename = "routeId")]
29
    route_id: String,
30
    #[serde(rename = "unitId")]
31
    unit_id: String,
32
    #[serde(rename = "applicationId")]
33
    application_id: String,
34
    #[serde(rename = "applicationCode")]
35
    application_code: String,
36
    #[serde(rename = "networkId")]
37
    network_id: String,
38
    #[serde(rename = "networkCode")]
39
    network_code: String,
40
    #[serde(rename = "createdAt")]
41
    created_at: String,
42
}
43

            
44
const CSV_FIELDS: &'static [u8] =
45
    b"\xEF\xBB\xBFrouteId,unitId,applicationId,applicationCode,networkId,networkCode,createdAt\n";
46

            
47
506
pub fn new_service(scope_path: &str, state: &AppState) -> Router {
48
506
    Router::new().nest(
49
506
        scope_path,
50
506
        Router::new()
51
506
            .route("/", routing::post(post_network_route))
52
506
            .route("/count", routing::get(get_network_route_count))
53
506
            .route("/list", routing::get(get_network_route_list))
54
506
            .route("/{route_id}", routing::delete(delete_network_route))
55
506
            .with_state(state.clone()),
56
    )
57
506
}
58

            
59
/// `POST /{base}/api/v1/network-route`
60
4
async fn post_network_route(state: State<AppState>, req: Request) -> impl IntoResponse {
61
    const FN_NAME: &'static str = "post_network_route";
62
4
    let api_path = format!("{}/api/v1/network-route", state.broker_base);
63
4
    let client = state.client.clone();
64

            
65
4
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
66
4
}
67

            
68
/// `GET /{base}/api/v1/network-route/count`
69
4
async fn get_network_route_count(state: State<AppState>, req: Request) -> impl IntoResponse {
70
    const FN_NAME: &'static str = "get_network_route_count";
71
4
    let api_path = format!("{}/api/v1/network-route/count", state.broker_base.as_str());
72
4
    let client = state.client.clone();
73

            
74
4
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
75
4
}
76

            
77
/// `GET /{base}/api/v1/network-route/list`
78
20
async fn get_network_route_list(state: State<AppState>, req: Request) -> impl IntoResponse {
79
    const FN_NAME: &'static str = "get_network_route_list";
80
20
    let api_path = format!("{}/api/v1/network-route/list", state.broker_base.as_str());
81
20
    let api_path = api_path.as_str();
82
20
    let client = state.client.clone();
83

            
84
4
    let (api_resp, resp_builder) =
85
20
        match list_api_bridge(FN_NAME, &client, req, api_path, false, "network-route").await {
86
16
            ListResp::Axum(resp) => return resp,
87
4
            ListResp::ArrayStream(api_resp, resp_builder) => (api_resp, resp_builder),
88
        };
89

            
90
4
    let mut resp_stream = api_resp.bytes_stream();
91
4
    let body = Body::from_stream(async_stream::stream! {
92
        yield Ok(Bytes::from(CSV_FIELDS));
93

            
94
        let mut buffer = BytesMut::new();
95
        while let Some(body) = resp_stream.next().await {
96
            match body {
97
                Err(e) => {
98
                    error!("[{}] get body error: {}", FN_NAME, e);
99
                    let err: Box<dyn StdError + Send + Sync> = Box::new(e);
100
                    yield Err(err);
101
                    break;
102
                }
103
                Ok(body) => buffer.extend_from_slice(&body[..]),
104
            }
105

            
106
            let mut json_stream =
107
                Deserializer::from_slice(&buffer[..]).into_iter::<NetworkRoute>();
108
            let mut index = 0;
109
            let mut finish = false;
110
            loop {
111
                if let Some(Ok(v)) = json_stream.next() {
112
                    let mut writer = WriterBuilder::new().has_headers(false).from_writer(vec![]);
113
                    if let Err(e) = writer.serialize(v) {
114
                        let err: Box<dyn StdError + Send + Sync> = Box::new(e);
115
                        yield Err(err);
116
                        finish = true;
117
                        break;
118
                    }
119
                    match writer.into_inner() {
120
                        Err(e) => {
121
                            let err: Box<dyn StdError + Send + Sync> = Box::new(e);
122
                            yield Err(err);
123
                            finish = true;
124
                            break;
125
                        }
126
                        Ok(row) => yield Ok(Bytes::copy_from_slice(row.as_slice())),
127
                    }
128
                    continue;
129
                }
130
                let offset = json_stream.byte_offset();
131
                if buffer.len() <= index + offset {
132
                    index = buffer.len();
133
                    break;
134
                }
135
                match buffer[index+offset] {
136
                    b'[' | b',' => {
137
                        index += offset + 1;
138
                        if buffer.len() <= index {
139
                            break;
140
                        }
141
                        json_stream =
142
                            Deserializer::from_slice(&buffer[index..]).into_iter::<NetworkRoute>();
143
                    }
144
                    b']' => {
145
                        finish = true;
146
                        break;
147
                    }
148
                    _ => break,
149
                }
150
            }
151
            if finish {
152
                break;
153
            }
154
            buffer = buffer.split_off(index);
155
        }
156
    });
157
4
    match resp_builder.body(body) {
158
        Err(e) => ErrResp::ErrRsc(Some(e.to_string())).into_response(),
159
4
        Ok(resp) => resp,
160
    }
161
20
}
162

            
163
/// `DELETE /{base}/api/v1/network-route/{routeId}`
164
4
async fn delete_network_route(
165
4
    state: State<AppState>,
166
4
    Path(param): Path<RouteIdPath>,
167
4
    req: Request,
168
4
) -> impl IntoResponse {
169
    const FN_NAME: &'static str = "delete_network_route";
170
4
    let api_path = format!(
171
4
        "{}/api/v1/network-route/{}",
172
4
        state.broker_base, param.route_id
173
    );
174
4
    let client = state.client.clone();
175

            
176
4
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
177
4
}