1
use std::error::Error as StdError;
2

            
3
use axum::{
4
    body::Body,
5
    extract::{Path, Request, State},
6
    response::IntoResponse,
7
    routing, Router,
8
};
9
use bytes::{Bytes, BytesMut};
10
use csv::WriterBuilder;
11
use futures_util::StreamExt;
12
use log::error;
13
use serde::{Deserialize, Serialize};
14
use serde_json::Deserializer;
15

            
16
use sylvia_iot_corelib::err::ErrResp;
17

            
18
use super::{super::State as AppState, api_bridge, list_api_bridge, ListResp};
19

            
20
#[derive(Deserialize)]
21
struct DataIdPath {
22
    data_id: String,
23
}
24

            
25
#[derive(Deserialize, Serialize)]
26
struct DlDataBuffer {
27
    #[serde(rename = "dataId")]
28
    data_id: String,
29
    #[serde(rename = "unitId")]
30
    unit_id: String,
31
    #[serde(rename = "applicationId")]
32
    application_id: String,
33
    #[serde(rename = "applicationCode")]
34
    application_code: String,
35
    #[serde(rename = "deviceId")]
36
    device_id: String,
37
    #[serde(rename = "networkId")]
38
    network_id: String,
39
    #[serde(rename = "createdAt")]
40
    created_at: String,
41
    #[serde(rename = "expiredAt")]
42
    expired_at: String,
43
}
44

            
45
const CSV_FIELDS: &'static [u8] =
46
    b"\xEF\xBB\xBFdataId,unitId,applicationId,applicationCode,deviceId,networkId,createdAt,expiredAt\n";
47

            
48
506
pub fn new_service(scope_path: &str, state: &AppState) -> Router {
49
506
    Router::new().nest(
50
506
        scope_path,
51
506
        Router::new()
52
506
            .route("/count", routing::get(get_dldata_buffer_count))
53
506
            .route("/list", routing::get(get_dldata_buffer_list))
54
506
            .route("/{data_id}", routing::delete(delete_dldata_buffer))
55
506
            .with_state(state.clone()),
56
506
    )
57
506
}
58

            
59
/// `GET /{base}/api/v1/dldata-buffer/count`
60
4
async fn get_dldata_buffer_count(state: State<AppState>, req: Request) -> impl IntoResponse {
61
    const FN_NAME: &'static str = "get_dldata_buffer_count";
62
4
    let api_path = format!("{}/api/v1/dldata-buffer/count", state.broker_base.as_str());
63
4
    let client = state.client.clone();
64
4

            
65
4
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
66
4
}
67

            
68
/// `GET /{base}/api/v1/dldata-buffer/list`
69
20
async fn get_dldata_buffer_list(state: State<AppState>, req: Request) -> impl IntoResponse {
70
    const FN_NAME: &'static str = "get_dldata_buffer_list";
71
20
    let api_path = format!("{}/api/v1/dldata-buffer/list", state.broker_base.as_str());
72
20
    let api_path = api_path.as_str();
73
20
    let client = state.client.clone();
74

            
75
4
    let (api_resp, resp_builder) =
76
20
        match list_api_bridge(FN_NAME, &client, req, api_path, false, "dldata-buffer").await {
77
16
            ListResp::Axum(resp) => return resp,
78
4
            ListResp::ArrayStream(api_resp, resp_builder) => (api_resp, resp_builder),
79
4
        };
80
4

            
81
4
    let mut resp_stream = api_resp.bytes_stream();
82
4
    let body = Body::from_stream(async_stream::stream! {
83
4
        yield Ok(Bytes::from(CSV_FIELDS));
84
4

            
85
4
        let mut buffer = BytesMut::new();
86
4
        while let Some(body) = resp_stream.next().await {
87
4
            match body {
88
4
                Err(e) => {
89
4
                    error!("[{}] get body error: {}", FN_NAME, e);
90
4
                    let err: Box<dyn StdError + Send + Sync> = Box::new(e);
91
4
                    yield Err(err);
92
4
                    break;
93
4
                }
94
4
                Ok(body) => buffer.extend_from_slice(&body[..]),
95
4
            }
96
4

            
97
4
            let mut json_stream =
98
4
                Deserializer::from_slice(&buffer[..]).into_iter::<DlDataBuffer>();
99
4
            let mut index = 0;
100
4
            let mut finish = false;
101
4
            loop {
102
4
                if let Some(Ok(v)) = json_stream.next() {
103
4
                    let mut writer = WriterBuilder::new().has_headers(false).from_writer(vec![]);
104
4
                    if let Err(e) = writer.serialize(v) {
105
4
                        let err: Box<dyn StdError + Send + Sync> = Box::new(e);
106
4
                        yield Err(err);
107
4
                        finish = true;
108
4
                        break;
109
4
                    }
110
4
                    match writer.into_inner() {
111
4
                        Err(e) => {
112
4
                            let err: Box<dyn StdError + Send + Sync> = Box::new(e);
113
4
                            yield Err(err);
114
4
                            finish = true;
115
4
                            break;
116
4
                        }
117
4
                        Ok(row) => yield Ok(Bytes::copy_from_slice(row.as_slice())),
118
4
                    }
119
4
                    continue;
120
4
                }
121
4
                let offset = json_stream.byte_offset();
122
4
                if buffer.len() <= index + offset {
123
4
                    index = buffer.len();
124
4
                    break;
125
4
                }
126
4
                match buffer[index+offset] {
127
4
                    b'[' | b',' => {
128
4
                        index += offset + 1;
129
4
                        if buffer.len() <= index {
130
4
                            break;
131
4
                        }
132
4
                        json_stream =
133
4
                            Deserializer::from_slice(&buffer[index..]).into_iter::<DlDataBuffer>();
134
4
                    }
135
4
                    b']' => {
136
4
                        finish = true;
137
4
                        break;
138
4
                    }
139
4
                    _ => break,
140
4
                }
141
4
            }
142
4
            if finish {
143
4
                break;
144
4
            }
145
4
            buffer = buffer.split_off(index);
146
4
        }
147
4
    });
148
4
    match resp_builder.body(body) {
149
        Err(e) => ErrResp::ErrRsc(Some(e.to_string())).into_response(),
150
4
        Ok(resp) => resp,
151
    }
152
20
}
153

            
154
/// `DELETE /{base}/api/v1/dldata-buffer/{dataId}`
155
4
async fn delete_dldata_buffer(
156
4
    state: State<AppState>,
157
4
    Path(param): Path<DataIdPath>,
158
4
    req: Request,
159
4
) -> impl IntoResponse {
160
    const FN_NAME: &'static str = "delete_dldata_buffer";
161
4
    let api_path = format!(
162
4
        "{}/api/v1/dldata-buffer/{}",
163
4
        state.broker_base, param.data_id
164
4
    );
165
4
    let client = state.client.clone();
166
4

            
167
4
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
168
4
}