1
use std::error::Error as StdError;
2

            
3
use axum::{
4
    body::Body,
5
    extract::{Path, Request, State},
6
    response::IntoResponse,
7
    routing, Router,
8
};
9
use bytes::{Bytes, BytesMut};
10
use csv::WriterBuilder;
11
use futures_util::StreamExt;
12
use log::error;
13
use serde::{Deserialize, Serialize};
14
use serde_json::Deserializer;
15

            
16
use sylvia_iot_corelib::err::ErrResp;
17

            
18
use super::{super::State as AppState, api_bridge, list_api_bridge, ListResp};
19

            
20
#[derive(Deserialize)]
21
struct RouteIdPath {
22
    route_id: String,
23
}
24

            
25
#[derive(Deserialize, Serialize)]
26
struct DeviceRoute {
27
    #[serde(rename = "routeId")]
28
    route_id: String,
29
    #[serde(rename = "unitId")]
30
    unit_id: String,
31
    #[serde(rename = "applicationId")]
32
    application_id: String,
33
    #[serde(rename = "applicationCode")]
34
    application_code: String,
35
    #[serde(rename = "deviceId")]
36
    device_id: String,
37
    #[serde(rename = "networkId")]
38
    network_id: String,
39
    #[serde(rename = "networkCode")]
40
    network_code: String,
41
    #[serde(rename = "networkAddr")]
42
    network_addr: String,
43
    profile: String,
44
    #[serde(rename = "createdAt")]
45
    created_at: String,
46
    #[serde(rename = "modifiedAt")]
47
    modified_at: String,
48
}
49

            
50
const CSV_FIELDS: &'static [u8] =
51
    b"\xEF\xBB\xBFrouteId,unitId,applicationId,applicationCode,deviceId,networkId,networkCode,networkAddr,profile,createdAt,modifiedAt\n";
52

            
53
253
pub fn new_service(scope_path: &str, state: &AppState) -> Router {
54
253
    Router::new().nest(
55
253
        scope_path,
56
253
        Router::new()
57
253
            .route("/", routing::post(post_device_route))
58
253
            .route("/bulk", routing::post(post_device_route_bulk))
59
253
            .route("/bulk-delete", routing::post(post_device_route_bulk_del))
60
253
            .route("/range", routing::post(post_device_route_range))
61
253
            .route("/range-delete", routing::post(post_device_route_range_del))
62
253
            .route("/count", routing::get(get_device_route_count))
63
253
            .route("/list", routing::get(get_device_route_list))
64
253
            .route("/:route_id", routing::delete(delete_device_route))
65
253
            .with_state(state.clone()),
66
253
    )
67
253
}
68

            
69
/// `POST /{base}/api/v1/device-route`
70
2
async fn post_device_route(state: State<AppState>, req: Request) -> impl IntoResponse {
71
    const FN_NAME: &'static str = "post_device_route";
72
2
    let api_path = format!("{}/api/v1/device-route", state.broker_base);
73
2
    let client = state.client.clone();
74
2

            
75
2
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
76
2
}
77

            
78
/// `POST /{base}/api/v1/device-route/bulk`
79
2
async fn post_device_route_bulk(state: State<AppState>, req: Request) -> impl IntoResponse {
80
    const FN_NAME: &'static str = "post_device_route_bulk";
81
2
    let api_path = format!("{}/api/v1/device-route/bulk", state.broker_base);
82
2
    let client = state.client.clone();
83
2

            
84
2
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
85
2
}
86

            
87
/// `POST /{base}/api/v1/device-route/bulk-delete`
88
2
async fn post_device_route_bulk_del(state: State<AppState>, req: Request) -> impl IntoResponse {
89
    const FN_NAME: &'static str = "post_device_route_bulk_del";
90
2
    let api_path = format!("{}/api/v1/device-route/bulk-delete", state.broker_base);
91
2
    let client = state.client.clone();
92
2

            
93
2
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
94
2
}
95

            
96
/// `POST /{base}/api/v1/device-route/range`
97
2
async fn post_device_route_range(state: State<AppState>, req: Request) -> impl IntoResponse {
98
    const FN_NAME: &'static str = "post_device_route_range";
99
2
    let api_path = format!("{}/api/v1/device-route/range", state.broker_base);
100
2
    let client = state.client.clone();
101
2

            
102
2
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
103
2
}
104

            
105
/// `POST /{base}/api/v1/device-route/range-delete`
106
2
async fn post_device_route_range_del(state: State<AppState>, req: Request) -> impl IntoResponse {
107
    const FN_NAME: &'static str = "post_device_route_range_del";
108
2
    let api_path = format!("{}/api/v1/device-route/range-delete", state.broker_base);
109
2
    let client = state.client.clone();
110
2

            
111
2
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
112
2
}
113

            
114
/// `GET /{base}/api/v1/device-route/count`
115
2
async fn get_device_route_count(state: State<AppState>, req: Request) -> impl IntoResponse {
116
    const FN_NAME: &'static str = "get_device_route_count";
117
2
    let api_path = format!("{}/api/v1/device-route/count", state.broker_base.as_str());
118
2
    let client = state.client.clone();
119
2

            
120
2
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
121
2
}
122

            
123
/// `GET /{base}/api/v1/device-route/list`
124
20
async fn get_device_route_list(state: State<AppState>, req: Request) -> impl IntoResponse {
125
    const FN_NAME: &'static str = "get_device_route_list";
126
20
    let api_path = format!("{}/api/v1/device-route/list", state.broker_base.as_str());
127
20
    let api_path = api_path.as_str();
128
20
    let client = state.client.clone();
129

            
130
4
    let (api_resp, resp_builder) =
131
20
        match list_api_bridge(FN_NAME, &client, req, api_path, false, "device-route").await {
132
16
            ListResp::Axum(resp) => return resp,
133
4
            ListResp::ArrayStream(api_resp, resp_builder) => (api_resp, resp_builder),
134
4
        };
135
4

            
136
4
    let mut resp_stream = api_resp.bytes_stream();
137
4
    let body = Body::from_stream(async_stream::stream! {
138
4
        yield Ok(Bytes::from(CSV_FIELDS));
139
4

            
140
4
        let mut buffer = BytesMut::new();
141
4
        while let Some(body) = resp_stream.next().await {
142
4
            match body {
143
4
                Err(e) => {
144
4
                    error!("[{}] get body error: {}", FN_NAME, e);
145
4
                    let err: Box<dyn StdError + Send + Sync> = Box::new(e);
146
4
                    yield Err(err);
147
4
                    break;
148
4
                }
149
4
                Ok(body) => buffer.extend_from_slice(&body[..]),
150
4
            }
151
4

            
152
4
            let mut json_stream =
153
4
                Deserializer::from_slice(&buffer[..]).into_iter::<DeviceRoute>();
154
4
            let mut index = 0;
155
4
            let mut finish = false;
156
4
            loop {
157
4
                if let Some(Ok(v)) = json_stream.next() {
158
4
                    let mut writer = WriterBuilder::new().has_headers(false).from_writer(vec![]);
159
4
                    if let Err(e) = writer.serialize(v) {
160
4
                        let err: Box<dyn StdError + Send + Sync> = Box::new(e);
161
4
                        yield Err(err);
162
4
                        finish = true;
163
4
                        break;
164
4
                    }
165
4
                    match writer.into_inner() {
166
4
                        Err(e) => {
167
4
                            let err: Box<dyn StdError + Send + Sync> = Box::new(e);
168
4
                            yield Err(err);
169
4
                            finish = true;
170
4
                            break;
171
4
                        }
172
4
                        Ok(row) => yield Ok(Bytes::copy_from_slice(row.as_slice())),
173
4
                    }
174
4
                    continue;
175
4
                }
176
4
                let offset = json_stream.byte_offset();
177
4
                if buffer.len() <= index + offset {
178
4
                    index = buffer.len();
179
4
                    break;
180
4
                }
181
4
                match buffer[index+offset] {
182
4
                    b'[' | b',' => {
183
4
                        index += offset + 1;
184
4
                        if buffer.len() <= index {
185
4
                            break;
186
4
                        }
187
4
                        json_stream =
188
4
                            Deserializer::from_slice(&buffer[index..]).into_iter::<DeviceRoute>();
189
4
                    }
190
4
                    b']' => {
191
4
                        finish = true;
192
4
                        break;
193
4
                    }
194
4
                    _ => break,
195
4
                }
196
4
            }
197
4
            if finish {
198
4
                break;
199
4
            }
200
4
            buffer = buffer.split_off(index);
201
4
        }
202
4
    });
203
4
    match resp_builder.body(body) {
204
        Err(e) => ErrResp::ErrRsc(Some(e.to_string())).into_response(),
205
4
        Ok(resp) => resp,
206
    }
207
20
}
208

            
209
/// `DELETE /{base}/api/v1/device-route/{routeId}`
210
2
async fn delete_device_route(
211
2
    state: State<AppState>,
212
2
    Path(param): Path<RouteIdPath>,
213
2
    req: Request,
214
2
) -> impl IntoResponse {
215
    const FN_NAME: &'static str = "delete_device_route";
216
2
    let api_path = format!(
217
2
        "{}/api/v1/device-route/{}",
218
2
        state.broker_base, param.route_id
219
2
    );
220
2
    let client = state.client.clone();
221
2

            
222
2
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
223
2
}