1
use std::error::Error as StdError;
2

            
3
use axum::{
4
    body::Body,
5
    extract::{Path, Request, State},
6
    response::IntoResponse,
7
    routing, Router,
8
};
9
use bytes::{Bytes, BytesMut};
10
use csv::WriterBuilder;
11
use futures_util::StreamExt;
12
use log::error;
13
use serde::{Deserialize, Serialize};
14
use serde_json::Deserializer;
15

            
16
use sylvia_iot_corelib::err::ErrResp;
17

            
18
use super::{super::State as AppState, api_bridge, list_api_bridge, ListResp};
19

            
20
#[derive(Deserialize)]
21
struct RouteIdPath {
22
    route_id: String,
23
}
24

            
25
#[derive(Deserialize, Serialize)]
26
struct NetworkRoute {
27
    #[serde(rename = "routeId")]
28
    route_id: String,
29
    #[serde(rename = "unitId")]
30
    unit_id: String,
31
    #[serde(rename = "applicationId")]
32
    application_id: String,
33
    #[serde(rename = "applicationCode")]
34
    application_code: String,
35
    #[serde(rename = "networkId")]
36
    network_id: String,
37
    #[serde(rename = "networkCode")]
38
    network_code: String,
39
    #[serde(rename = "createdAt")]
40
    created_at: String,
41
}
42

            
43
const CSV_FIELDS: &'static [u8] =
44
    b"\xEF\xBB\xBFrouteId,unitId,applicationId,applicationCode,networkId,networkCode,createdAt\n";
45

            
46
253
pub fn new_service(scope_path: &str, state: &AppState) -> Router {
47
253
    Router::new().nest(
48
253
        scope_path,
49
253
        Router::new()
50
253
            .route("/", routing::post(post_network_route))
51
253
            .route("/count", routing::get(get_network_route_count))
52
253
            .route("/list", routing::get(get_network_route_list))
53
253
            .route("/:route_id", routing::delete(delete_network_route))
54
253
            .with_state(state.clone()),
55
253
    )
56
253
}
57

            
58
/// `POST /{base}/api/v1/network-route`
59
2
async fn post_network_route(state: State<AppState>, req: Request) -> impl IntoResponse {
60
    const FN_NAME: &'static str = "post_network_route";
61
2
    let api_path = format!("{}/api/v1/network-route", state.broker_base);
62
2
    let client = state.client.clone();
63
2

            
64
2
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
65
2
}
66

            
67
/// `GET /{base}/api/v1/network-route/count`
68
2
async fn get_network_route_count(state: State<AppState>, req: Request) -> impl IntoResponse {
69
    const FN_NAME: &'static str = "get_network_route_count";
70
2
    let api_path = format!("{}/api/v1/network-route/count", state.broker_base.as_str());
71
2
    let client = state.client.clone();
72
2

            
73
2
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
74
2
}
75

            
76
/// `GET /{base}/api/v1/network-route/list`
77
10
async fn get_network_route_list(state: State<AppState>, req: Request) -> impl IntoResponse {
78
    const FN_NAME: &'static str = "get_network_route_list";
79
10
    let api_path = format!("{}/api/v1/network-route/list", state.broker_base.as_str());
80
10
    let api_path = api_path.as_str();
81
10
    let client = state.client.clone();
82

            
83
2
    let (api_resp, resp_builder) =
84
10
        match list_api_bridge(FN_NAME, &client, req, api_path, false, "network-route").await {
85
8
            ListResp::Axum(resp) => return resp,
86
2
            ListResp::ArrayStream(api_resp, resp_builder) => (api_resp, resp_builder),
87
2
        };
88
2

            
89
2
    let mut resp_stream = api_resp.bytes_stream();
90
2
    let body = Body::from_stream(async_stream::stream! {
91
2
        yield Ok(Bytes::from(CSV_FIELDS));
92
2

            
93
2
        let mut buffer = BytesMut::new();
94
2
        while let Some(body) = resp_stream.next().await {
95
2
            match body {
96
2
                Err(e) => {
97
2
                    error!("[{}] get body error: {}", FN_NAME, e);
98
2
                    let err: Box<dyn StdError + Send + Sync> = Box::new(e);
99
2
                    yield Err(err);
100
2
                    break;
101
2
                }
102
2
                Ok(body) => buffer.extend_from_slice(&body[..]),
103
2
            }
104
2

            
105
2
            let mut json_stream =
106
2
                Deserializer::from_slice(&buffer[..]).into_iter::<NetworkRoute>();
107
2
            let mut index = 0;
108
2
            let mut finish = false;
109
2
            loop {
110
2
                if let Some(Ok(v)) = json_stream.next() {
111
2
                    let mut writer = WriterBuilder::new().has_headers(false).from_writer(vec![]);
112
2
                    if let Err(e) = writer.serialize(v) {
113
2
                        let err: Box<dyn StdError + Send + Sync> = Box::new(e);
114
2
                        yield Err(err);
115
2
                        finish = true;
116
2
                        break;
117
2
                    }
118
2
                    match writer.into_inner() {
119
2
                        Err(e) => {
120
2
                            let err: Box<dyn StdError + Send + Sync> = Box::new(e);
121
2
                            yield Err(err);
122
2
                            finish = true;
123
2
                            break;
124
2
                        }
125
2
                        Ok(row) => yield Ok(Bytes::copy_from_slice(row.as_slice())),
126
2
                    }
127
2
                    continue;
128
2
                }
129
2
                let offset = json_stream.byte_offset();
130
2
                if buffer.len() <= index + offset {
131
2
                    index = buffer.len();
132
2
                    break;
133
2
                }
134
2
                match buffer[index+offset] {
135
2
                    b'[' | b',' => {
136
2
                        index += offset + 1;
137
2
                        if buffer.len() <= index {
138
2
                            break;
139
2
                        }
140
2
                        json_stream =
141
2
                            Deserializer::from_slice(&buffer[index..]).into_iter::<NetworkRoute>();
142
2
                    }
143
2
                    b']' => {
144
2
                        finish = true;
145
2
                        break;
146
2
                    }
147
2
                    _ => break,
148
2
                }
149
2
            }
150
2
            if finish {
151
2
                break;
152
2
            }
153
2
            buffer = buffer.split_off(index);
154
2
        }
155
2
    });
156
2
    match resp_builder.body(body) {
157
        Err(e) => ErrResp::ErrRsc(Some(e.to_string())).into_response(),
158
2
        Ok(resp) => resp,
159
    }
160
10
}
161

            
162
/// `DELETE /{base}/api/v1/network-route/{routeId}`
163
2
async fn delete_network_route(
164
2
    state: State<AppState>,
165
2
    Path(param): Path<RouteIdPath>,
166
2
    req: Request,
167
2
) -> impl IntoResponse {
168
    const FN_NAME: &'static str = "delete_network_route";
169
2
    let api_path = format!(
170
2
        "{}/api/v1/network-route/{}",
171
2
        state.broker_base, param.route_id
172
2
    );
173
2
    let client = state.client.clone();
174
2

            
175
2
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
176
2
}