1
use std::error::Error as StdError;
2

            
3
use axum::{
4
    Router,
5
    body::Body,
6
    extract::{Path, Request, State},
7
    response::IntoResponse,
8
    routing,
9
};
10
use bytes::{Bytes, BytesMut};
11
use csv::WriterBuilder;
12
use futures_util::StreamExt;
13
use log::error;
14
use serde::{Deserialize, Serialize};
15
use serde_json::Deserializer;
16

            
17
use sylvia_iot_corelib::err::ErrResp;
18

            
19
use super::{super::State as AppState, ListResp, api_bridge, list_api_bridge};
20

            
21
#[derive(Deserialize)]
22
struct RouteIdPath {
23
    route_id: String,
24
}
25

            
26
#[derive(Deserialize, Serialize)]
27
struct DeviceRoute {
28
    #[serde(rename = "routeId")]
29
    route_id: String,
30
    #[serde(rename = "unitId")]
31
    unit_id: String,
32
    #[serde(rename = "applicationId")]
33
    application_id: String,
34
    #[serde(rename = "applicationCode")]
35
    application_code: String,
36
    #[serde(rename = "deviceId")]
37
    device_id: String,
38
    #[serde(rename = "networkId")]
39
    network_id: String,
40
    #[serde(rename = "networkCode")]
41
    network_code: String,
42
    #[serde(rename = "networkAddr")]
43
    network_addr: String,
44
    profile: String,
45
    #[serde(rename = "createdAt")]
46
    created_at: String,
47
    #[serde(rename = "modifiedAt")]
48
    modified_at: String,
49
}
50

            
51
const CSV_FIELDS: &'static [u8] =
52
    b"\xEF\xBB\xBFrouteId,unitId,applicationId,applicationCode,deviceId,networkId,networkCode,networkAddr,profile,createdAt,modifiedAt\n";
53

            
54
506
pub fn new_service(scope_path: &str, state: &AppState) -> Router {
55
506
    Router::new().nest(
56
506
        scope_path,
57
506
        Router::new()
58
506
            .route("/", routing::post(post_device_route))
59
506
            .route("/bulk", routing::post(post_device_route_bulk))
60
506
            .route("/bulk-delete", routing::post(post_device_route_bulk_del))
61
506
            .route("/range", routing::post(post_device_route_range))
62
506
            .route("/range-delete", routing::post(post_device_route_range_del))
63
506
            .route("/count", routing::get(get_device_route_count))
64
506
            .route("/list", routing::get(get_device_route_list))
65
506
            .route("/{route_id}", routing::delete(delete_device_route))
66
506
            .with_state(state.clone()),
67
    )
68
506
}
69

            
70
/// `POST /{base}/api/v1/device-route`
71
4
async fn post_device_route(state: State<AppState>, req: Request) -> impl IntoResponse {
72
    const FN_NAME: &'static str = "post_device_route";
73
4
    let api_path = format!("{}/api/v1/device-route", state.broker_base);
74
4
    let client = state.client.clone();
75

            
76
4
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
77
4
}
78

            
79
/// `POST /{base}/api/v1/device-route/bulk`
80
4
async fn post_device_route_bulk(state: State<AppState>, req: Request) -> impl IntoResponse {
81
    const FN_NAME: &'static str = "post_device_route_bulk";
82
4
    let api_path = format!("{}/api/v1/device-route/bulk", state.broker_base);
83
4
    let client = state.client.clone();
84

            
85
4
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
86
4
}
87

            
88
/// `POST /{base}/api/v1/device-route/bulk-delete`
89
4
async fn post_device_route_bulk_del(state: State<AppState>, req: Request) -> impl IntoResponse {
90
    const FN_NAME: &'static str = "post_device_route_bulk_del";
91
4
    let api_path = format!("{}/api/v1/device-route/bulk-delete", state.broker_base);
92
4
    let client = state.client.clone();
93

            
94
4
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
95
4
}
96

            
97
/// `POST /{base}/api/v1/device-route/range`
98
4
async fn post_device_route_range(state: State<AppState>, req: Request) -> impl IntoResponse {
99
    const FN_NAME: &'static str = "post_device_route_range";
100
4
    let api_path = format!("{}/api/v1/device-route/range", state.broker_base);
101
4
    let client = state.client.clone();
102

            
103
4
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
104
4
}
105

            
106
/// `POST /{base}/api/v1/device-route/range-delete`
107
4
async fn post_device_route_range_del(state: State<AppState>, req: Request) -> impl IntoResponse {
108
    const FN_NAME: &'static str = "post_device_route_range_del";
109
4
    let api_path = format!("{}/api/v1/device-route/range-delete", state.broker_base);
110
4
    let client = state.client.clone();
111

            
112
4
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
113
4
}
114

            
115
/// `GET /{base}/api/v1/device-route/count`
116
4
async fn get_device_route_count(state: State<AppState>, req: Request) -> impl IntoResponse {
117
    const FN_NAME: &'static str = "get_device_route_count";
118
4
    let api_path = format!("{}/api/v1/device-route/count", state.broker_base.as_str());
119
4
    let client = state.client.clone();
120

            
121
4
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
122
4
}
123

            
124
/// `GET /{base}/api/v1/device-route/list`
125
40
async fn get_device_route_list(state: State<AppState>, req: Request) -> impl IntoResponse {
126
    const FN_NAME: &'static str = "get_device_route_list";
127
40
    let api_path = format!("{}/api/v1/device-route/list", state.broker_base.as_str());
128
40
    let api_path = api_path.as_str();
129
40
    let client = state.client.clone();
130

            
131
8
    let (api_resp, resp_builder) =
132
40
        match list_api_bridge(FN_NAME, &client, req, api_path, false, "device-route").await {
133
32
            ListResp::Axum(resp) => return resp,
134
8
            ListResp::ArrayStream(api_resp, resp_builder) => (api_resp, resp_builder),
135
        };
136

            
137
8
    let mut resp_stream = api_resp.bytes_stream();
138
8
    let body = Body::from_stream(async_stream::stream! {
139
        yield Ok(Bytes::from(CSV_FIELDS));
140

            
141
        let mut buffer = BytesMut::new();
142
        while let Some(body) = resp_stream.next().await {
143
            match body {
144
                Err(e) => {
145
                    error!("[{}] get body error: {}", FN_NAME, e);
146
                    let err: Box<dyn StdError + Send + Sync> = Box::new(e);
147
                    yield Err(err);
148
                    break;
149
                }
150
                Ok(body) => buffer.extend_from_slice(&body[..]),
151
            }
152

            
153
            let mut json_stream =
154
                Deserializer::from_slice(&buffer[..]).into_iter::<DeviceRoute>();
155
            let mut index = 0;
156
            let mut finish = false;
157
            loop {
158
                if let Some(Ok(v)) = json_stream.next() {
159
                    let mut writer = WriterBuilder::new().has_headers(false).from_writer(vec![]);
160
                    if let Err(e) = writer.serialize(v) {
161
                        let err: Box<dyn StdError + Send + Sync> = Box::new(e);
162
                        yield Err(err);
163
                        finish = true;
164
                        break;
165
                    }
166
                    match writer.into_inner() {
167
                        Err(e) => {
168
                            let err: Box<dyn StdError + Send + Sync> = Box::new(e);
169
                            yield Err(err);
170
                            finish = true;
171
                            break;
172
                        }
173
                        Ok(row) => yield Ok(Bytes::copy_from_slice(row.as_slice())),
174
                    }
175
                    continue;
176
                }
177
                let offset = json_stream.byte_offset();
178
                if buffer.len() <= index + offset {
179
                    index = buffer.len();
180
                    break;
181
                }
182
                match buffer[index+offset] {
183
                    b'[' | b',' => {
184
                        index += offset + 1;
185
                        if buffer.len() <= index {
186
                            break;
187
                        }
188
                        json_stream =
189
                            Deserializer::from_slice(&buffer[index..]).into_iter::<DeviceRoute>();
190
                    }
191
                    b']' => {
192
                        finish = true;
193
                        break;
194
                    }
195
                    _ => break,
196
                }
197
            }
198
            if finish {
199
                break;
200
            }
201
            buffer = buffer.split_off(index);
202
        }
203
    });
204
8
    match resp_builder.body(body) {
205
        Err(e) => ErrResp::ErrRsc(Some(e.to_string())).into_response(),
206
8
        Ok(resp) => resp,
207
    }
208
40
}
209

            
210
/// `DELETE /{base}/api/v1/device-route/{routeId}`
211
4
async fn delete_device_route(
212
4
    state: State<AppState>,
213
4
    Path(param): Path<RouteIdPath>,
214
4
    req: Request,
215
4
) -> impl IntoResponse {
216
    const FN_NAME: &'static str = "delete_device_route";
217
4
    let api_path = format!(
218
4
        "{}/api/v1/device-route/{}",
219
4
        state.broker_base, param.route_id
220
    );
221
4
    let client = state.client.clone();
222

            
223
4
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
224
4
}