1
use std::{collections::HashMap, error::Error as StdError};
2

            
3
use axum::{
4
    Router,
5
    body::Body,
6
    extract::{Path, Request, State},
7
    response::IntoResponse,
8
    routing,
9
};
10
use bytes::{Bytes, BytesMut};
11
use csv::WriterBuilder;
12
use futures_util::StreamExt;
13
use log::error;
14
use serde::{Deserialize, Serialize};
15
use serde_json::{Deserializer, Map, Value};
16

            
17
use sylvia_iot_corelib::err::ErrResp;
18

            
19
use super::{super::State as AppState, ListResp, api_bridge, list_api_bridge};
20

            
21
#[derive(Deserialize)]
22
struct UserIdPath {
23
    user_id: String,
24
}
25

            
26
#[derive(Deserialize, Serialize)]
27
struct User {
28
    account: String,
29
    #[serde(rename = "createdAt")]
30
    created_at: String,
31
    #[serde(rename = "modifiedAt")]
32
    modified_at: String,
33
    #[serde(rename = "verifiedAt")]
34
    verified_at: Option<String>,
35
    #[serde(skip_serializing)]
36
    roles: HashMap<String, bool>,
37
    #[serde(rename(serialize = "role"))]
38
    roles_str: Option<String>,
39
    name: String,
40
    #[serde(skip_serializing)]
41
    info: Map<String, Value>,
42
    #[serde(rename(serialize = "info"))]
43
    info_str: Option<String>,
44
}
45

            
46
const CSV_FIELDS: &'static [u8] =
47
    b"\xEF\xBB\xBFaccount,createdAt,modifiedAt,verifiedAt,roles,name,info\n";
48

            
49
506
pub fn new_service(scope_path: &str, state: &AppState) -> Router {
50
506
    Router::new().nest(
51
506
        scope_path,
52
506
        Router::new()
53
506
            .route(
54
506
                "/",
55
506
                routing::get(get_user)
56
506
                    .patch(patch_user)
57
506
                    .post(post_admin_user),
58
506
            )
59
506
            .route("/count", routing::get(get_admin_user_count))
60
506
            .route("/list", routing::get(get_admin_user_list))
61
506
            .route(
62
506
                "/{user_id}",
63
506
                routing::get(get_admin_user)
64
506
                    .patch(patch_admin_user)
65
506
                    .delete(delete_admin_user),
66
506
            )
67
506
            .with_state(state.clone()),
68
506
    )
69
506
}
70

            
71
/// `GET /{base}/api/v1/user`
72
4
async fn get_user(state: State<AppState>, req: Request) -> impl IntoResponse {
73
    const FN_NAME: &'static str = "get_user";
74
4
    let api_path = format!("{}/api/v1/user", state.auth_base);
75
4
    let client = state.client.clone();
76
4

            
77
4
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
78
4
}
79

            
80
/// `PATCH /{base}/api/v1/user`
81
4
async fn patch_user(state: State<AppState>, req: Request) -> impl IntoResponse {
82
    const FN_NAME: &'static str = "patch_user";
83
4
    let api_path = format!("{}/api/v1/user", state.auth_base);
84
4
    let client = state.client.clone();
85
4

            
86
4
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
87
4
}
88

            
89
/// `POST /{base}/api/v1/user`
90
4
async fn post_admin_user(state: State<AppState>, req: Request) -> impl IntoResponse {
91
    const FN_NAME: &'static str = "post_admin_user";
92
4
    let api_path = format!("{}/api/v1/user", state.auth_base);
93
4
    let client = state.client.clone();
94
4

            
95
4
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
96
4
}
97

            
98
/// `GET /{base}/api/v1/user/count`
99
12
async fn get_admin_user_count(state: State<AppState>, req: Request) -> impl IntoResponse {
100
    const FN_NAME: &'static str = "get_admin_user_count";
101
12
    let api_path = format!("{}/api/v1/user/count", state.auth_base.as_str());
102
12
    let client = state.client.clone();
103
12

            
104
12
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
105
12
}
106

            
107
/// `GET /{base}/api/v1/user/list`
108
28
async fn get_admin_user_list(state: State<AppState>, req: Request) -> impl IntoResponse {
109
    const FN_NAME: &'static str = "get_admin_user_list";
110
28
    let api_path = format!("{}/api/v1/user/list", state.auth_base.as_str());
111
28
    let api_path = api_path.as_str();
112
28
    let client = state.client.clone();
113

            
114
4
    let (api_resp, resp_builder) =
115
28
        match list_api_bridge(FN_NAME, &client, req, api_path, false, "user").await {
116
24
            ListResp::Axum(resp) => return resp,
117
4
            ListResp::ArrayStream(api_resp, resp_builder) => (api_resp, resp_builder),
118
4
        };
119
4

            
120
4
    let mut resp_stream = api_resp.bytes_stream();
121
4
    let body = Body::from_stream(async_stream::stream! {
122
4
        yield Ok(Bytes::from(CSV_FIELDS));
123
4

            
124
4
        let mut buffer = BytesMut::new();
125
4
        while let Some(body) = resp_stream.next().await {
126
4
            match body {
127
4
                Err(e) => {
128
4
                    error!("[{}] get body error: {}", FN_NAME, e);
129
4
                    let err: Box<dyn StdError + Send + Sync> = Box::new(e);
130
4
                    yield Err(err);
131
4
                    break;
132
4
                }
133
4
                Ok(body) => buffer.extend_from_slice(&body[..]),
134
4
            }
135
4

            
136
4
            let mut json_stream = Deserializer::from_slice(&buffer[..]).into_iter::<User>();
137
4
            let mut index = 0;
138
4
            let mut finish = false;
139
4
            loop {
140
4
                if let Some(Ok(mut v)) = json_stream.next() {
141
4
                    if let Ok(roles_str) = serde_json::to_string(&v.roles) {
142
4
                        v.roles_str = Some(roles_str);
143
4
                    }
144
4
                    if let Ok(info_str) = serde_json::to_string(&v.info) {
145
4
                        v.info_str = Some(info_str);
146
4
                    }
147
4
                    let mut writer = WriterBuilder::new().has_headers(false).from_writer(vec![]);
148
4
                    if let Err(e) = writer.serialize(v) {
149
4
                        let err: Box<dyn StdError + Send + Sync> = Box::new(e);
150
4
                        yield Err(err);
151
4
                        finish = true;
152
4
                        break;
153
4
                    }
154
4
                    match writer.into_inner() {
155
4
                        Err(e) => {
156
4
                            let err: Box<dyn StdError + Send + Sync> = Box::new(e);
157
4
                            yield Err(err);
158
4
                            finish = true;
159
4
                            break;
160
4
                        }
161
4
                        Ok(row) => yield Ok(Bytes::copy_from_slice(row.as_slice())),
162
4
                    }
163
4
                    continue;
164
4
                }
165
4
                let offset = json_stream.byte_offset();
166
4
                if buffer.len() <= index + offset {
167
4
                    index = buffer.len();
168
4
                    break;
169
4
                }
170
4
                match buffer[index+offset] {
171
4
                    b'[' | b',' => {
172
4
                        index += offset + 1;
173
4
                        if buffer.len() <= index {
174
4
                            break;
175
4
                        }
176
4
                        json_stream =
177
4
                            Deserializer::from_slice(&buffer[index..]).into_iter::<User>();
178
4
                    }
179
4
                    b']' => {
180
4
                        finish = true;
181
4
                        break;
182
4
                    }
183
4
                    _ => break,
184
4
                }
185
4
            }
186
4
            if finish {
187
4
                break;
188
4
            }
189
4
            buffer = buffer.split_off(index);
190
4
        }
191
4
    });
192
4
    match resp_builder.body(body) {
193
        Err(e) => ErrResp::ErrRsc(Some(e.to_string())).into_response(),
194
4
        Ok(resp) => resp,
195
    }
196
28
}
197

            
198
/// `GET /{base}/api/v1/user/{userId}`
199
4
async fn get_admin_user(
200
4
    state: State<AppState>,
201
4
    Path(param): Path<UserIdPath>,
202
4
    req: Request,
203
4
) -> impl IntoResponse {
204
    const FN_NAME: &'static str = "get_admin_user";
205
4
    let api_path = format!("{}/api/v1/user/{}", state.auth_base, param.user_id);
206
4
    let client = state.client.clone();
207
4

            
208
4
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
209
4
}
210

            
211
/// `PATCH /{base}/api/v1/user/{userId}`
212
4
async fn patch_admin_user(
213
4
    state: State<AppState>,
214
4
    Path(param): Path<UserIdPath>,
215
4
    req: Request,
216
4
) -> impl IntoResponse {
217
    const FN_NAME: &'static str = "patch_admin_user";
218
4
    let api_path = format!("{}/api/v1/user/{}", state.auth_base, param.user_id);
219
4
    let client = state.client.clone();
220
4

            
221
4
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
222
4
}
223

            
224
/// `DELETE /{base}/api/v1/user/{userId}`
225
4
async fn delete_admin_user(
226
4
    state: State<AppState>,
227
4
    Path(param): Path<UserIdPath>,
228
4
    req: Request,
229
4
) -> impl IntoResponse {
230
    const FN_NAME: &'static str = "delete_admin_user";
231
4
    let api_path = format!("{}/api/v1/user/{}", state.auth_base, param.user_id);
232
4
    let client = state.client.clone();
233
4

            
234
4
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
235
4
}