1
use std::error::Error as StdError;
2

            
3
use axum::{
4
    Router,
5
    body::Body,
6
    extract::{Path, Request, State},
7
    response::IntoResponse,
8
    routing,
9
};
10
use bytes::{Bytes, BytesMut};
11
use csv::WriterBuilder;
12
use futures_util::StreamExt;
13
use log::error;
14
use serde::{Deserialize, Serialize};
15
use serde_json::{Deserializer, Map, Value};
16

            
17
use sylvia_iot_corelib::err::ErrResp;
18

            
19
use super::{super::State as AppState, ListResp, api_bridge, list_api_bridge};
20

            
21
#[derive(Deserialize)]
22
struct DeviceIdPath {
23
    device_id: String,
24
}
25

            
26
#[derive(Deserialize, Serialize)]
27
struct Device {
28
    #[serde(rename = "deviceId")]
29
    device_id: String,
30
    #[serde(rename = "unitId")]
31
    unit_id: String,
32
    #[serde(rename = "unitCode")]
33
    unit_code: Option<String>,
34
    #[serde(rename = "networkId")]
35
    network_id: String,
36
    #[serde(rename = "networkCode")]
37
    network_code: String,
38
    #[serde(rename = "networkAddr")]
39
    network_addr: String,
40
    #[serde(rename = "createdAt")]
41
    created_at: String,
42
    #[serde(rename = "modifiedAt")]
43
    modified_at: String,
44
    profile: String,
45
    name: String,
46
    #[serde(skip_serializing)]
47
    info: Map<String, Value>,
48
    #[serde(rename(serialize = "info"))]
49
    info_str: Option<String>,
50
}
51

            
52
const CSV_FIELDS: &'static [u8] =
53
    b"\xEF\xBB\xBFdeviceId,unitId,unitCode,networkId,networkCode,networkAddr,createdAt,modifiedAt,profile,name,info\n";
54

            
55
506
pub fn new_service(scope_path: &str, state: &AppState) -> Router {
56
506
    Router::new().nest(
57
506
        scope_path,
58
506
        Router::new()
59
506
            .route("/", routing::post(post_device))
60
506
            .route("/bulk", routing::post(post_device_bulk))
61
506
            .route("/bulk-delete", routing::post(post_device_bulk_del))
62
506
            .route("/range", routing::post(post_device_range))
63
506
            .route("/range-delete", routing::post(post_device_range_del))
64
506
            .route("/count", routing::get(get_device_count))
65
506
            .route("/list", routing::get(get_device_list))
66
506
            .route(
67
506
                "/{device_id}",
68
506
                routing::get(get_device)
69
506
                    .patch(patch_device)
70
506
                    .delete(delete_device),
71
506
            )
72
506
            .with_state(state.clone()),
73
506
    )
74
506
}
75

            
76
/// `POST /{base}/api/v1/device`
77
4
async fn post_device(state: State<AppState>, req: Request) -> impl IntoResponse {
78
    const FN_NAME: &'static str = "post_device";
79
4
    let api_path = format!("{}/api/v1/device", state.broker_base);
80
4
    let client = state.client.clone();
81
4

            
82
4
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
83
4
}
84

            
85
/// `POST /{base}/api/v1/device/bulk`
86
4
async fn post_device_bulk(state: State<AppState>, req: Request) -> impl IntoResponse {
87
    const FN_NAME: &'static str = "post_device_bulk";
88
4
    let api_path = format!("{}/api/v1/device/bulk", state.broker_base);
89
4
    let client = state.client.clone();
90
4

            
91
4
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
92
4
}
93

            
94
/// `POST /{base}/api/v1/device/bulk-delete`
95
4
async fn post_device_bulk_del(state: State<AppState>, req: Request) -> impl IntoResponse {
96
    const FN_NAME: &'static str = "post_device_bulk_del";
97
4
    let api_path = format!("{}/api/v1/device/bulk-delete", state.broker_base);
98
4
    let client = state.client.clone();
99
4

            
100
4
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
101
4
}
102

            
103
/// `POST /{base}/api/v1/device/range`
104
4
async fn post_device_range(state: State<AppState>, req: Request) -> impl IntoResponse {
105
    const FN_NAME: &'static str = "post_device_range";
106
4
    let api_path = format!("{}/api/v1/device/range", state.broker_base);
107
4
    let client = state.client.clone();
108
4

            
109
4
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
110
4
}
111

            
112
/// `POST /{base}/api/v1/device/range-delete`
113
4
async fn post_device_range_del(state: State<AppState>, req: Request) -> impl IntoResponse {
114
    const FN_NAME: &'static str = "post_device_range_del";
115
4
    let api_path = format!("{}/api/v1/device/range-delete", state.broker_base);
116
4
    let client = state.client.clone();
117
4

            
118
4
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
119
4
}
120

            
121
/// `GET /{base}/api/v1/device/count`
122
4
async fn get_device_count(state: State<AppState>, req: Request) -> impl IntoResponse {
123
    const FN_NAME: &'static str = "get_device_count";
124
4
    let api_path = format!("{}/api/v1/device/count", state.broker_base.as_str());
125
4
    let client = state.client.clone();
126
4

            
127
4
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
128
4
}
129

            
130
/// `GET /{base}/api/v1/device/list`
131
20
async fn get_device_list(state: State<AppState>, req: Request) -> impl IntoResponse {
132
    const FN_NAME: &'static str = "get_device_list";
133
20
    let api_path = format!("{}/api/v1/device/list", state.broker_base.as_str());
134
20
    let api_path = api_path.as_str();
135
20
    let client = state.client.clone();
136

            
137
4
    let (api_resp, resp_builder) =
138
20
        match list_api_bridge(FN_NAME, &client, req, api_path, false, "device").await {
139
16
            ListResp::Axum(resp) => return resp,
140
4
            ListResp::ArrayStream(api_resp, resp_builder) => (api_resp, resp_builder),
141
4
        };
142
4

            
143
4
    let mut resp_stream = api_resp.bytes_stream();
144
4
    let body = Body::from_stream(async_stream::stream! {
145
4
        yield Ok(Bytes::from(CSV_FIELDS));
146
4

            
147
4
        let mut buffer = BytesMut::new();
148
4
        while let Some(body) = resp_stream.next().await {
149
4
            match body {
150
4
                Err(e) => {
151
4
                    error!("[{}] get body error: {}", FN_NAME, e);
152
4
                    let err: Box<dyn StdError + Send + Sync> = Box::new(e);
153
4
                    yield Err(err);
154
4
                    break;
155
4
                }
156
4
                Ok(body) => buffer.extend_from_slice(&body[..]),
157
4
            }
158
4

            
159
4
            let mut json_stream = Deserializer::from_slice(&buffer[..]).into_iter::<Device>();
160
4
            let mut index = 0;
161
4
            let mut finish = false;
162
4
            loop {
163
4
                if let Some(Ok(mut v)) = json_stream.next() {
164
4
                    if let Ok(info_str) = serde_json::to_string(&v.info) {
165
4
                        v.info_str = Some(info_str);
166
4
                    }
167
4
                    let mut writer = WriterBuilder::new().has_headers(false).from_writer(vec![]);
168
4
                    if let Err(e) = writer.serialize(v) {
169
4
                        let err: Box<dyn StdError + Send + Sync> = Box::new(e);
170
4
                        yield Err(err);
171
4
                        finish = true;
172
4
                        break;
173
4
                    }
174
4
                    match writer.into_inner() {
175
4
                        Err(e) => {
176
4
                            let err: Box<dyn StdError + Send + Sync> = Box::new(e);
177
4
                            yield Err(err);
178
4
                            finish = true;
179
4
                            break;
180
4
                        }
181
4
                        Ok(row) => yield Ok(Bytes::copy_from_slice(row.as_slice())),
182
4
                    }
183
4
                    continue;
184
4
                }
185
4
                let offset = json_stream.byte_offset();
186
4
                if buffer.len() <= index + offset {
187
4
                    index = buffer.len();
188
4
                    break;
189
4
                }
190
4
                match buffer[index+offset] {
191
4
                    b'[' | b',' => {
192
4
                        index += offset + 1;
193
4
                        if buffer.len() <= index {
194
4
                            break;
195
4
                        }
196
4
                        json_stream =
197
4
                            Deserializer::from_slice(&buffer[index..]).into_iter::<Device>();
198
4
                    }
199
4
                    b']' => {
200
4
                        finish = true;
201
4
                        break;
202
4
                    }
203
4
                    _ => break,
204
4
                }
205
4
            }
206
4
            if finish {
207
4
                break;
208
4
            }
209
4
            buffer = buffer.split_off(index);
210
4
        }
211
4
    });
212
4
    match resp_builder.body(body) {
213
        Err(e) => ErrResp::ErrRsc(Some(e.to_string())).into_response(),
214
4
        Ok(resp) => resp,
215
    }
216
20
}
217

            
218
/// `GET /{base}/api/v1/device/{deviceId}`
219
4
async fn get_device(
220
4
    state: State<AppState>,
221
4
    Path(param): Path<DeviceIdPath>,
222
4
    req: Request,
223
4
) -> impl IntoResponse {
224
    const FN_NAME: &'static str = "get_device";
225
4
    let api_path = format!("{}/api/v1/device/{}", state.broker_base, param.device_id);
226
4
    let client = state.client.clone();
227
4

            
228
4
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
229
4
}
230

            
231
/// `PATCH /{base}/api/v1/device/{deviceId}`
232
4
async fn patch_device(
233
4
    state: State<AppState>,
234
4
    Path(param): Path<DeviceIdPath>,
235
4
    req: Request,
236
4
) -> impl IntoResponse {
237
    const FN_NAME: &'static str = "patch_device";
238
4
    let api_path = format!("{}/api/v1/device/{}", state.broker_base, param.device_id);
239
4
    let client = state.client.clone();
240
4

            
241
4
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
242
4
}
243

            
244
/// `DELETE /{base}/api/v1/device/{deviceId}`
245
4
async fn delete_device(
246
4
    state: State<AppState>,
247
4
    Path(param): Path<DeviceIdPath>,
248
4
    req: Request,
249
4
) -> impl IntoResponse {
250
    const FN_NAME: &'static str = "delete_device";
251
4
    let api_path = format!("{}/api/v1/device/{}", state.broker_base, param.device_id);
252
4
    let client = state.client.clone();
253
4

            
254
4
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
255
4
}