1
use std::error::Error as StdError;
2

            
3
use axum::{
4
    Router,
5
    body::Body,
6
    extract::{Path, Request, State},
7
    response::IntoResponse,
8
    routing,
9
};
10
use bytes::{Bytes, BytesMut};
11
use csv::WriterBuilder;
12
use futures_util::StreamExt;
13
use log::error;
14
use serde::{Deserialize, Serialize};
15
use serde_json::Deserializer;
16

            
17
use sylvia_iot_corelib::err::ErrResp;
18

            
19
use super::{super::State as AppState, ListResp, api_bridge, list_api_bridge};
20

            
21
#[derive(Deserialize)]
22
struct DataIdPath {
23
    data_id: String,
24
}
25

            
26
#[derive(Deserialize, Serialize)]
27
struct DlDataBuffer {
28
    #[serde(rename = "dataId")]
29
    data_id: String,
30
    #[serde(rename = "unitId")]
31
    unit_id: String,
32
    #[serde(rename = "applicationId")]
33
    application_id: String,
34
    #[serde(rename = "applicationCode")]
35
    application_code: String,
36
    #[serde(rename = "deviceId")]
37
    device_id: String,
38
    #[serde(rename = "networkId")]
39
    network_id: String,
40
    #[serde(rename = "createdAt")]
41
    created_at: String,
42
    #[serde(rename = "expiredAt")]
43
    expired_at: String,
44
}
45

            
46
const CSV_FIELDS: &'static [u8] =
47
    b"\xEF\xBB\xBFdataId,unitId,applicationId,applicationCode,deviceId,networkId,createdAt,expiredAt\n";
48

            
49
506
pub fn new_service(scope_path: &str, state: &AppState) -> Router {
50
506
    Router::new().nest(
51
506
        scope_path,
52
506
        Router::new()
53
506
            .route("/count", routing::get(get_dldata_buffer_count))
54
506
            .route("/list", routing::get(get_dldata_buffer_list))
55
506
            .route("/{data_id}", routing::delete(delete_dldata_buffer))
56
506
            .with_state(state.clone()),
57
    )
58
506
}
59

            
60
/// `GET /{base}/api/v1/dldata-buffer/count`
61
4
async fn get_dldata_buffer_count(state: State<AppState>, req: Request) -> impl IntoResponse {
62
    const FN_NAME: &'static str = "get_dldata_buffer_count";
63
4
    let api_path = format!("{}/api/v1/dldata-buffer/count", state.broker_base.as_str());
64
4
    let client = state.client.clone();
65

            
66
4
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
67
4
}
68

            
69
/// `GET /{base}/api/v1/dldata-buffer/list`
70
20
async fn get_dldata_buffer_list(state: State<AppState>, req: Request) -> impl IntoResponse {
71
    const FN_NAME: &'static str = "get_dldata_buffer_list";
72
20
    let api_path = format!("{}/api/v1/dldata-buffer/list", state.broker_base.as_str());
73
20
    let api_path = api_path.as_str();
74
20
    let client = state.client.clone();
75

            
76
4
    let (api_resp, resp_builder) =
77
20
        match list_api_bridge(FN_NAME, &client, req, api_path, false, "dldata-buffer").await {
78
16
            ListResp::Axum(resp) => return resp,
79
4
            ListResp::ArrayStream(api_resp, resp_builder) => (api_resp, resp_builder),
80
        };
81

            
82
4
    let mut resp_stream = api_resp.bytes_stream();
83
4
    let body = Body::from_stream(async_stream::stream! {
84
        yield Ok(Bytes::from(CSV_FIELDS));
85

            
86
        let mut buffer = BytesMut::new();
87
        while let Some(body) = resp_stream.next().await {
88
            match body {
89
                Err(e) => {
90
                    error!("[{}] get body error: {}", FN_NAME, e);
91
                    let err: Box<dyn StdError + Send + Sync> = Box::new(e);
92
                    yield Err(err);
93
                    break;
94
                }
95
                Ok(body) => buffer.extend_from_slice(&body[..]),
96
            }
97

            
98
            let mut json_stream =
99
                Deserializer::from_slice(&buffer[..]).into_iter::<DlDataBuffer>();
100
            let mut index = 0;
101
            let mut finish = false;
102
            loop {
103
                if let Some(Ok(v)) = json_stream.next() {
104
                    let mut writer = WriterBuilder::new().has_headers(false).from_writer(vec![]);
105
                    if let Err(e) = writer.serialize(v) {
106
                        let err: Box<dyn StdError + Send + Sync> = Box::new(e);
107
                        yield Err(err);
108
                        finish = true;
109
                        break;
110
                    }
111
                    match writer.into_inner() {
112
                        Err(e) => {
113
                            let err: Box<dyn StdError + Send + Sync> = Box::new(e);
114
                            yield Err(err);
115
                            finish = true;
116
                            break;
117
                        }
118
                        Ok(row) => yield Ok(Bytes::copy_from_slice(row.as_slice())),
119
                    }
120
                    continue;
121
                }
122
                let offset = json_stream.byte_offset();
123
                if buffer.len() <= index + offset {
124
                    index = buffer.len();
125
                    break;
126
                }
127
                match buffer[index+offset] {
128
                    b'[' | b',' => {
129
                        index += offset + 1;
130
                        if buffer.len() <= index {
131
                            break;
132
                        }
133
                        json_stream =
134
                            Deserializer::from_slice(&buffer[index..]).into_iter::<DlDataBuffer>();
135
                    }
136
                    b']' => {
137
                        finish = true;
138
                        break;
139
                    }
140
                    _ => break,
141
                }
142
            }
143
            if finish {
144
                break;
145
            }
146
            buffer = buffer.split_off(index);
147
        }
148
    });
149
4
    match resp_builder.body(body) {
150
        Err(e) => ErrResp::ErrRsc(Some(e.to_string())).into_response(),
151
4
        Ok(resp) => resp,
152
    }
153
20
}
154

            
155
/// `DELETE /{base}/api/v1/dldata-buffer/{dataId}`
156
4
async fn delete_dldata_buffer(
157
4
    state: State<AppState>,
158
4
    Path(param): Path<DataIdPath>,
159
4
    req: Request,
160
4
) -> impl IntoResponse {
161
    const FN_NAME: &'static str = "delete_dldata_buffer";
162
4
    let api_path = format!(
163
4
        "{}/api/v1/dldata-buffer/{}",
164
4
        state.broker_base, param.data_id
165
    );
166
4
    let client = state.client.clone();
167

            
168
4
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
169
4
}