1
use std::error::Error as StdError;
2

            
3
use axum::{
4
    body::Body,
5
    extract::{Path, Request, State},
6
    response::IntoResponse,
7
    routing, Router,
8
};
9
use bytes::{Bytes, BytesMut};
10
use csv::WriterBuilder;
11
use futures_util::StreamExt;
12
use log::error;
13
use serde::{Deserialize, Serialize};
14
use serde_json::{Deserializer, Map, Value};
15

            
16
use sylvia_iot_corelib::err::ErrResp;
17

            
18
use super::{super::State as AppState, api_bridge, list_api_bridge, ListResp};
19

            
20
#[derive(Deserialize)]
21
struct DeviceIdPath {
22
    device_id: String,
23
}
24

            
25
#[derive(Deserialize, Serialize)]
26
struct Device {
27
    #[serde(rename = "deviceId")]
28
    device_id: String,
29
    #[serde(rename = "unitId")]
30
    unit_id: String,
31
    #[serde(rename = "unitCode")]
32
    unit_code: Option<String>,
33
    #[serde(rename = "networkId")]
34
    network_id: String,
35
    #[serde(rename = "networkCode")]
36
    network_code: String,
37
    #[serde(rename = "networkAddr")]
38
    network_addr: String,
39
    #[serde(rename = "createdAt")]
40
    created_at: String,
41
    #[serde(rename = "modifiedAt")]
42
    modified_at: String,
43
    profile: String,
44
    name: String,
45
    #[serde(skip_serializing)]
46
    info: Map<String, Value>,
47
    #[serde(rename(serialize = "info"))]
48
    info_str: Option<String>,
49
}
50

            
51
const CSV_FIELDS: &'static [u8] =
52
    b"\xEF\xBB\xBFdeviceId,unitId,unitCode,networkId,networkCode,networkAddr,createdAt,modifiedAt,profile,name,info\n";
53

            
54
253
pub fn new_service(scope_path: &str, state: &AppState) -> Router {
55
253
    Router::new().nest(
56
253
        scope_path,
57
253
        Router::new()
58
253
            .route("/", routing::post(post_device))
59
253
            .route("/bulk", routing::post(post_device_bulk))
60
253
            .route("/bulk-delete", routing::post(post_device_bulk_del))
61
253
            .route("/range", routing::post(post_device_range))
62
253
            .route("/range-delete", routing::post(post_device_range_del))
63
253
            .route("/count", routing::get(get_device_count))
64
253
            .route("/list", routing::get(get_device_list))
65
253
            .route(
66
253
                "/:device_id",
67
253
                routing::get(get_device)
68
253
                    .patch(patch_device)
69
253
                    .delete(delete_device),
70
253
            )
71
253
            .with_state(state.clone()),
72
253
    )
73
253
}
74

            
75
/// `POST /{base}/api/v1/device`
76
2
async fn post_device(state: State<AppState>, req: Request) -> impl IntoResponse {
77
    const FN_NAME: &'static str = "post_device";
78
2
    let api_path = format!("{}/api/v1/device", state.broker_base);
79
2
    let client = state.client.clone();
80
2

            
81
2
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
82
2
}
83

            
84
/// `POST /{base}/api/v1/device/bulk`
85
2
async fn post_device_bulk(state: State<AppState>, req: Request) -> impl IntoResponse {
86
    const FN_NAME: &'static str = "post_device_bulk";
87
2
    let api_path = format!("{}/api/v1/device/bulk", state.broker_base);
88
2
    let client = state.client.clone();
89
2

            
90
2
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
91
2
}
92

            
93
/// `POST /{base}/api/v1/device/bulk-delete`
94
2
async fn post_device_bulk_del(state: State<AppState>, req: Request) -> impl IntoResponse {
95
    const FN_NAME: &'static str = "post_device_bulk_del";
96
2
    let api_path = format!("{}/api/v1/device/bulk-delete", state.broker_base);
97
2
    let client = state.client.clone();
98
2

            
99
2
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
100
2
}
101

            
102
/// `POST /{base}/api/v1/device/range`
103
2
async fn post_device_range(state: State<AppState>, req: Request) -> impl IntoResponse {
104
    const FN_NAME: &'static str = "post_device_range";
105
2
    let api_path = format!("{}/api/v1/device/range", state.broker_base);
106
2
    let client = state.client.clone();
107
2

            
108
2
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
109
2
}
110

            
111
/// `POST /{base}/api/v1/device/range-delete`
112
2
async fn post_device_range_del(state: State<AppState>, req: Request) -> impl IntoResponse {
113
    const FN_NAME: &'static str = "post_device_range_del";
114
2
    let api_path = format!("{}/api/v1/device/range-delete", state.broker_base);
115
2
    let client = state.client.clone();
116
2

            
117
2
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
118
2
}
119

            
120
/// `GET /{base}/api/v1/device/count`
121
2
async fn get_device_count(state: State<AppState>, req: Request) -> impl IntoResponse {
122
    const FN_NAME: &'static str = "get_device_count";
123
2
    let api_path = format!("{}/api/v1/device/count", state.broker_base.as_str());
124
2
    let client = state.client.clone();
125
2

            
126
2
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
127
2
}
128

            
129
/// `GET /{base}/api/v1/device/list`
130
10
async fn get_device_list(state: State<AppState>, req: Request) -> impl IntoResponse {
131
    const FN_NAME: &'static str = "get_device_list";
132
10
    let api_path = format!("{}/api/v1/device/list", state.broker_base.as_str());
133
10
    let api_path = api_path.as_str();
134
10
    let client = state.client.clone();
135

            
136
2
    let (api_resp, resp_builder) =
137
10
        match list_api_bridge(FN_NAME, &client, req, api_path, false, "device").await {
138
8
            ListResp::Axum(resp) => return resp,
139
2
            ListResp::ArrayStream(api_resp, resp_builder) => (api_resp, resp_builder),
140
2
        };
141
2

            
142
2
    let mut resp_stream = api_resp.bytes_stream();
143
2
    let body = Body::from_stream(async_stream::stream! {
144
2
        yield Ok(Bytes::from(CSV_FIELDS));
145
2

            
146
2
        let mut buffer = BytesMut::new();
147
2
        while let Some(body) = resp_stream.next().await {
148
2
            match body {
149
2
                Err(e) => {
150
2
                    error!("[{}] get body error: {}", FN_NAME, e);
151
2
                    let err: Box<dyn StdError + Send + Sync> = Box::new(e);
152
2
                    yield Err(err);
153
2
                    break;
154
2
                }
155
2
                Ok(body) => buffer.extend_from_slice(&body[..]),
156
2
            }
157
2

            
158
2
            let mut json_stream = Deserializer::from_slice(&buffer[..]).into_iter::<Device>();
159
2
            let mut index = 0;
160
2
            let mut finish = false;
161
2
            loop {
162
2
                if let Some(Ok(mut v)) = json_stream.next() {
163
2
                    if let Ok(info_str) = serde_json::to_string(&v.info) {
164
2
                        v.info_str = Some(info_str);
165
2
                    }
166
2
                    let mut writer = WriterBuilder::new().has_headers(false).from_writer(vec![]);
167
2
                    if let Err(e) = writer.serialize(v) {
168
2
                        let err: Box<dyn StdError + Send + Sync> = Box::new(e);
169
2
                        yield Err(err);
170
2
                        finish = true;
171
2
                        break;
172
2
                    }
173
2
                    match writer.into_inner() {
174
2
                        Err(e) => {
175
2
                            let err: Box<dyn StdError + Send + Sync> = Box::new(e);
176
2
                            yield Err(err);
177
2
                            finish = true;
178
2
                            break;
179
2
                        }
180
2
                        Ok(row) => yield Ok(Bytes::copy_from_slice(row.as_slice())),
181
2
                    }
182
2
                    continue;
183
2
                }
184
2
                let offset = json_stream.byte_offset();
185
2
                if buffer.len() <= index + offset {
186
2
                    index = buffer.len();
187
2
                    break;
188
2
                }
189
2
                match buffer[index+offset] {
190
2
                    b'[' | b',' => {
191
2
                        index += offset + 1;
192
2
                        if buffer.len() <= index {
193
2
                            break;
194
2
                        }
195
2
                        json_stream =
196
2
                            Deserializer::from_slice(&buffer[index..]).into_iter::<Device>();
197
2
                    }
198
2
                    b']' => {
199
2
                        finish = true;
200
2
                        break;
201
2
                    }
202
2
                    _ => break,
203
2
                }
204
2
            }
205
2
            if finish {
206
2
                break;
207
2
            }
208
2
            buffer = buffer.split_off(index);
209
2
        }
210
2
    });
211
2
    match resp_builder.body(body) {
212
        Err(e) => ErrResp::ErrRsc(Some(e.to_string())).into_response(),
213
2
        Ok(resp) => resp,
214
    }
215
10
}
216

            
217
/// `GET /{base}/api/v1/device/{deviceId}`
218
2
async fn get_device(
219
2
    state: State<AppState>,
220
2
    Path(param): Path<DeviceIdPath>,
221
2
    req: Request,
222
2
) -> impl IntoResponse {
223
    const FN_NAME: &'static str = "get_device";
224
2
    let api_path = format!("{}/api/v1/device/{}", state.broker_base, param.device_id);
225
2
    let client = state.client.clone();
226
2

            
227
2
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
228
2
}
229

            
230
/// `PATCH /{base}/api/v1/device/{deviceId}`
231
2
async fn patch_device(
232
2
    state: State<AppState>,
233
2
    Path(param): Path<DeviceIdPath>,
234
2
    req: Request,
235
2
) -> impl IntoResponse {
236
    const FN_NAME: &'static str = "patch_device";
237
2
    let api_path = format!("{}/api/v1/device/{}", state.broker_base, param.device_id);
238
2
    let client = state.client.clone();
239
2

            
240
2
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
241
2
}
242

            
243
/// `DELETE /{base}/api/v1/device/{deviceId}`
244
2
async fn delete_device(
245
2
    state: State<AppState>,
246
2
    Path(param): Path<DeviceIdPath>,
247
2
    req: Request,
248
2
) -> impl IntoResponse {
249
    const FN_NAME: &'static str = "delete_device";
250
2
    let api_path = format!("{}/api/v1/device/{}", state.broker_base, param.device_id);
251
2
    let client = state.client.clone();
252
2

            
253
2
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
254
2
}