1
use std::{collections::HashMap, error::Error as StdError};
2

            
3
use axum::{
4
    body::Body,
5
    extract::{Path, Request, State},
6
    response::IntoResponse,
7
    routing, Router,
8
};
9
use bytes::{Bytes, BytesMut};
10
use csv::WriterBuilder;
11
use futures_util::StreamExt;
12
use log::error;
13
use serde::{Deserialize, Serialize};
14
use serde_json::{Deserializer, Map, Value};
15

            
16
use sylvia_iot_corelib::err::ErrResp;
17

            
18
use super::{super::State as AppState, api_bridge, list_api_bridge, ListResp};
19

            
20
#[derive(Deserialize)]
21
struct UserIdPath {
22
    user_id: String,
23
}
24

            
25
#[derive(Deserialize, Serialize)]
26
struct User {
27
    account: String,
28
    #[serde(rename = "createdAt")]
29
    created_at: String,
30
    #[serde(rename = "modifiedAt")]
31
    modified_at: String,
32
    #[serde(rename = "verifiedAt")]
33
    verified_at: Option<String>,
34
    #[serde(skip_serializing)]
35
    roles: HashMap<String, bool>,
36
    #[serde(rename(serialize = "role"))]
37
    roles_str: Option<String>,
38
    name: String,
39
    #[serde(skip_serializing)]
40
    info: Map<String, Value>,
41
    #[serde(rename(serialize = "info"))]
42
    info_str: Option<String>,
43
}
44

            
45
const CSV_FIELDS: &'static [u8] =
46
    b"\xEF\xBB\xBFaccount,createdAt,modifiedAt,verifiedAt,roles,name,info\n";
47

            
48
253
pub fn new_service(scope_path: &str, state: &AppState) -> Router {
49
253
    Router::new().nest(
50
253
        scope_path,
51
253
        Router::new()
52
253
            .route(
53
253
                "/",
54
253
                routing::get(get_user)
55
253
                    .patch(patch_user)
56
253
                    .post(post_admin_user),
57
253
            )
58
253
            .route("/count", routing::get(get_admin_user_count))
59
253
            .route("/list", routing::get(get_admin_user_list))
60
253
            .route(
61
253
                "/:user_id",
62
253
                routing::get(get_admin_user)
63
253
                    .patch(patch_admin_user)
64
253
                    .delete(delete_admin_user),
65
253
            )
66
253
            .with_state(state.clone()),
67
253
    )
68
253
}
69

            
70
/// `GET /{base}/api/v1/user`
71
2
async fn get_user(state: State<AppState>, req: Request) -> impl IntoResponse {
72
    const FN_NAME: &'static str = "get_user";
73
2
    let api_path = format!("{}/api/v1/user", state.auth_base);
74
2
    let client = state.client.clone();
75
2

            
76
2
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
77
2
}
78

            
79
/// `PATCH /{base}/api/v1/user`
80
2
async fn patch_user(state: State<AppState>, req: Request) -> impl IntoResponse {
81
    const FN_NAME: &'static str = "patch_user";
82
2
    let api_path = format!("{}/api/v1/user", state.auth_base);
83
2
    let client = state.client.clone();
84
2

            
85
2
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
86
2
}
87

            
88
/// `POST /{base}/api/v1/user`
89
2
async fn post_admin_user(state: State<AppState>, req: Request) -> impl IntoResponse {
90
    const FN_NAME: &'static str = "post_admin_user";
91
2
    let api_path = format!("{}/api/v1/user", state.auth_base);
92
2
    let client = state.client.clone();
93
2

            
94
2
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
95
2
}
96

            
97
/// `GET /{base}/api/v1/user/count`
98
6
async fn get_admin_user_count(state: State<AppState>, req: Request) -> impl IntoResponse {
99
    const FN_NAME: &'static str = "get_admin_user_count";
100
6
    let api_path = format!("{}/api/v1/user/count", state.auth_base.as_str());
101
6
    let client = state.client.clone();
102
6

            
103
6
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
104
6
}
105

            
106
/// `GET /{base}/api/v1/user/list`
107
14
async fn get_admin_user_list(state: State<AppState>, req: Request) -> impl IntoResponse {
108
    const FN_NAME: &'static str = "get_admin_user_list";
109
14
    let api_path = format!("{}/api/v1/user/list", state.auth_base.as_str());
110
14
    let api_path = api_path.as_str();
111
14
    let client = state.client.clone();
112

            
113
2
    let (api_resp, resp_builder) =
114
14
        match list_api_bridge(FN_NAME, &client, req, api_path, false, "user").await {
115
12
            ListResp::Axum(resp) => return resp,
116
2
            ListResp::ArrayStream(api_resp, resp_builder) => (api_resp, resp_builder),
117
2
        };
118
2

            
119
2
    let mut resp_stream = api_resp.bytes_stream();
120
2
    let body = Body::from_stream(async_stream::stream! {
121
2
        yield Ok(Bytes::from(CSV_FIELDS));
122
2

            
123
2
        let mut buffer = BytesMut::new();
124
2
        while let Some(body) = resp_stream.next().await {
125
2
            match body {
126
2
                Err(e) => {
127
2
                    error!("[{}] get body error: {}", FN_NAME, e);
128
2
                    let err: Box<dyn StdError + Send + Sync> = Box::new(e);
129
2
                    yield Err(err);
130
2
                    break;
131
2
                }
132
2
                Ok(body) => buffer.extend_from_slice(&body[..]),
133
2
            }
134
2

            
135
2
            let mut json_stream = Deserializer::from_slice(&buffer[..]).into_iter::<User>();
136
2
            let mut index = 0;
137
2
            let mut finish = false;
138
2
            loop {
139
2
                if let Some(Ok(mut v)) = json_stream.next() {
140
2
                    if let Ok(roles_str) = serde_json::to_string(&v.roles) {
141
2
                        v.roles_str = Some(roles_str);
142
2
                    }
143
2
                    if let Ok(info_str) = serde_json::to_string(&v.info) {
144
2
                        v.info_str = Some(info_str);
145
2
                    }
146
2
                    let mut writer = WriterBuilder::new().has_headers(false).from_writer(vec![]);
147
2
                    if let Err(e) = writer.serialize(v) {
148
2
                        let err: Box<dyn StdError + Send + Sync> = Box::new(e);
149
2
                        yield Err(err);
150
2
                        finish = true;
151
2
                        break;
152
2
                    }
153
2
                    match writer.into_inner() {
154
2
                        Err(e) => {
155
2
                            let err: Box<dyn StdError + Send + Sync> = Box::new(e);
156
2
                            yield Err(err);
157
2
                            finish = true;
158
2
                            break;
159
2
                        }
160
2
                        Ok(row) => yield Ok(Bytes::copy_from_slice(row.as_slice())),
161
2
                    }
162
2
                    continue;
163
2
                }
164
2
                let offset = json_stream.byte_offset();
165
2
                if buffer.len() <= index + offset {
166
2
                    index = buffer.len();
167
2
                    break;
168
2
                }
169
2
                match buffer[index+offset] {
170
2
                    b'[' | b',' => {
171
2
                        index += offset + 1;
172
2
                        if buffer.len() <= index {
173
2
                            break;
174
2
                        }
175
2
                        json_stream =
176
2
                            Deserializer::from_slice(&buffer[index..]).into_iter::<User>();
177
2
                    }
178
2
                    b']' => {
179
2
                        finish = true;
180
2
                        break;
181
2
                    }
182
2
                    _ => break,
183
2
                }
184
2
            }
185
2
            if finish {
186
2
                break;
187
2
            }
188
2
            buffer = buffer.split_off(index);
189
2
        }
190
2
    });
191
2
    match resp_builder.body(body) {
192
        Err(e) => ErrResp::ErrRsc(Some(e.to_string())).into_response(),
193
2
        Ok(resp) => resp,
194
    }
195
14
}
196

            
197
/// `GET /{base}/api/v1/user/{userId}`
198
2
async fn get_admin_user(
199
2
    state: State<AppState>,
200
2
    Path(param): Path<UserIdPath>,
201
2
    req: Request,
202
2
) -> impl IntoResponse {
203
    const FN_NAME: &'static str = "get_admin_user";
204
2
    let api_path = format!("{}/api/v1/user/{}", state.auth_base, param.user_id);
205
2
    let client = state.client.clone();
206
2

            
207
2
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
208
2
}
209

            
210
/// `PATCH /{base}/api/v1/user/{userId}`
211
2
async fn patch_admin_user(
212
2
    state: State<AppState>,
213
2
    Path(param): Path<UserIdPath>,
214
2
    req: Request,
215
2
) -> impl IntoResponse {
216
    const FN_NAME: &'static str = "patch_admin_user";
217
2
    let api_path = format!("{}/api/v1/user/{}", state.auth_base, param.user_id);
218
2
    let client = state.client.clone();
219
2

            
220
2
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
221
2
}
222

            
223
/// `DELETE /{base}/api/v1/user/{userId}`
224
2
async fn delete_admin_user(
225
2
    state: State<AppState>,
226
2
    Path(param): Path<UserIdPath>,
227
2
    req: Request,
228
2
) -> impl IntoResponse {
229
    const FN_NAME: &'static str = "delete_admin_user";
230
2
    let api_path = format!("{}/api/v1/user/{}", state.auth_base, param.user_id);
231
2
    let client = state.client.clone();
232
2

            
233
2
    api_bridge(FN_NAME, &client, req, api_path.as_str()).await
234
2
}