1
//! Wrapper APIs for controlling RabbitMQ.
2
//!
3
//! - `hostname` of all APIs are host name or IP address of the broker.
4

            
5
use reqwest::{self, Client, Method, StatusCode};
6
use serde::{Deserialize, Serialize};
7
use serde_json::{Map, Value};
8

            
9
use sylvia_iot_corelib::err::ErrResp;
10

            
11
use super::QueueType;
12

            
13
/// RabbitMQ management information.
14
#[derive(Clone)]
15
pub struct ManagementOpts {
16
    /// Management plugin administrator name.
17
    pub username: String,
18
    /// Management plugin administrator password.
19
    pub password: String,
20
    /// Default message TTL in milliseconds.
21
    pub ttl: Option<usize>,
22
    /// Default queue length.
23
    pub length: Option<usize>,
24
}
25

            
26
/// Policies for `broker.*` queues.
27
pub struct BrokerPolicies {
28
    /// Message TTL in milliseconds.
29
    pub ttl: Option<usize>,
30
    /// Queue length.
31
    pub length: Option<usize>,
32
}
33

            
34
/// Statistics.
35
#[derive(Default)]
36
pub struct Stats {
37
    /// Number of queue consumers.
38
    pub consumers: usize,
39
    /// Number of ready/unacked messages.
40
    pub messages: usize,
41
    /// Publish rate from the producer.
42
    pub publish_rate: f64,
43
    /// Deliver rate to the consumer.
44
    pub deliver_rate: f64,
45
}
46

            
47
#[derive(Serialize)]
48
struct PutUsersBody<'a> {
49
    password: &'a str,
50
    tags: &'a str,
51
}
52

            
53
#[derive(Serialize)]
54
struct PutPermissionsBody {
55
    configure: String,
56
    write: String,
57
    read: String,
58
}
59

            
60
#[derive(Deserialize, Serialize)]
61
struct Policies {
62
    pattern: String,
63
    definition: PoliciesDefinition,
64
    #[serde(rename = "apply-to")]
65
    apply_to: String,
66
}
67

            
68
#[derive(Deserialize, Serialize)]
69
struct PoliciesDefinition {
70
    #[serde(rename = "message-ttl", skip_serializing_if = "Option::is_none")]
71
    message_ttl: Option<usize>,
72
    #[serde(rename = "max-length", skip_serializing_if = "Option::is_none")]
73
    max_length: Option<usize>,
74
}
75

            
76
#[derive(Serialize)]
77
struct PostExchangesBody<'a> {
78
    properties: Map<String, Value>,
79
    routing_key: String,
80
    payload: String,
81
    payload_encoding: &'a str,
82
}
83

            
84
#[derive(Deserialize)]
85
struct GetQueuesResBody {
86
    consumers: Option<usize>,
87
    messages: Option<usize>,
88
    message_stats: Option<MessageStats>,
89
}
90

            
91
#[derive(Deserialize)]
92
struct MessageStats {
93
    deliver_details: Option<Details>,
94
    publish_details: Option<Details>,
95
}
96

            
97
#[derive(Deserialize)]
98
struct Details {
99
    rate: f64,
100
}
101

            
102
/// To create or update user account and its password.
103
84
pub async fn put_user(
104
84
    client: &Client,
105
84
    opts: &ManagementOpts,
106
84
    hostname: &str,
107
84
    username: &str,
108
84
    password: &str,
109
84
) -> Result<(), ErrResp> {
110
84
    let uri = format!("http://{}:15672/api/users/{}", hostname, username);
111
84
    let req = match client
112
84
        .request(Method::PUT, uri)
113
84
        .basic_auth(opts.username.as_str(), Some(opts.password.as_str()))
114
84
        .json(&PutUsersBody { password, tags: "" })
115
84
        .build()
116
    {
117
        Err(e) => {
118
            let e = format!("generate user request error: {}", e);
119
            return Err(ErrResp::ErrRsc(Some(e)));
120
        }
121
84
        Ok(req) => req,
122
84
    };
123
84
    match client.execute(req).await {
124
2
        Err(e) => {
125
2
            let e = format!("execute user request error: {}", e);
126
2
            Err(ErrResp::ErrIntMsg(Some(e)))
127
        }
128
82
        Ok(resp) => match resp.status() {
129
80
            StatusCode::CREATED | StatusCode::NO_CONTENT => Ok(()),
130
            _ => {
131
2
                let e = format!("execute user request with status: {}", resp.status());
132
2
                Err(ErrResp::ErrIntMsg(Some(e)))
133
            }
134
        },
135
    }
136
84
}
137

            
138
/// To delete a user.
139
714
pub async fn delete_user(
140
714
    client: &Client,
141
714
    opts: &ManagementOpts,
142
714
    hostname: &str,
143
714
    username: &str,
144
714
) -> Result<(), ErrResp> {
145
714
    let uri = format!("http://{}:15672/api/users/{}", hostname, username);
146
714
    let req = match client
147
714
        .request(Method::DELETE, uri)
148
714
        .basic_auth(opts.username.as_str(), Some(opts.password.as_str()))
149
714
        .build()
150
    {
151
        Err(e) => {
152
            let e = format!("generate user request error: {}", e);
153
            return Err(ErrResp::ErrRsc(Some(e)));
154
        }
155
714
        Ok(req) => req,
156
714
    };
157
714
    match client.execute(req).await {
158
2
        Err(e) => {
159
2
            let e = format!("execute user request error: {}", e);
160
2
            Err(ErrResp::ErrIntMsg(Some(e)))
161
        }
162
712
        Ok(resp) => match resp.status() {
163
710
            StatusCode::NO_CONTENT | StatusCode::NOT_FOUND => Ok(()),
164
            _ => {
165
2
                let e = format!("execute user request with status: {}", resp.status());
166
2
                Err(ErrResp::ErrIntMsg(Some(e)))
167
            }
168
        },
169
    }
170
714
}
171

            
172
/// To create a virtual host.
173
84
pub async fn put_vhost(
174
84
    client: &Client,
175
84
    opts: &ManagementOpts,
176
84
    hostname: &str,
177
84
    username: &str,
178
84
) -> Result<(), ErrResp> {
179
84
    let uri = format!("http://{}:15672/api/vhosts/{}", hostname, username);
180
84
    let req = match client
181
84
        .request(Method::PUT, uri)
182
84
        .basic_auth(opts.username.as_str(), Some(opts.password.as_str()))
183
84
        .build()
184
    {
185
        Err(e) => {
186
            let e = format!("generate vhost request error: {}", e);
187
            return Err(ErrResp::ErrRsc(Some(e)));
188
        }
189
84
        Ok(req) => req,
190
84
    };
191
84
    match client.execute(req).await {
192
2
        Err(e) => {
193
2
            let e = format!("execute vhost request error: {}", e);
194
2
            Err(ErrResp::ErrIntMsg(Some(e)))
195
        }
196
82
        Ok(resp) => match resp.status() {
197
80
            StatusCode::CREATED | StatusCode::NO_CONTENT => Ok(()),
198
            _ => {
199
2
                let e = format!("execute vhost request with status: {}", resp.status());
200
2
                Err(ErrResp::ErrIntMsg(Some(e)))
201
            }
202
        },
203
    }
204
84
}
205

            
206
/// To delete a virtual host.
207
314
pub async fn delete_vhost(
208
314
    client: &Client,
209
314
    opts: &ManagementOpts,
210
314
    hostname: &str,
211
314
    username: &str,
212
314
) -> Result<(), ErrResp> {
213
314
    let uri = format!("http://{}:15672/api/vhosts/{}", hostname, username);
214
314
    let req = match client
215
314
        .request(Method::DELETE, uri)
216
314
        .basic_auth(opts.username.as_str(), Some(opts.password.as_str()))
217
314
        .build()
218
    {
219
        Err(e) => {
220
            let e = format!("generate vhost request error: {}", e);
221
            return Err(ErrResp::ErrRsc(Some(e)));
222
        }
223
314
        Ok(req) => req,
224
314
    };
225
314
    match client.execute(req).await {
226
2
        Err(e) => {
227
2
            let e = format!("execute vhost request error: {}", e);
228
2
            Err(ErrResp::ErrIntMsg(Some(e)))
229
        }
230
312
        Ok(resp) => match resp.status() {
231
310
            StatusCode::NO_CONTENT | StatusCode::NOT_FOUND => Ok(()),
232
            _ => {
233
2
                let e = format!("execute vhost request with status: {}", resp.status());
234
2
                Err(ErrResp::ErrIntMsg(Some(e)))
235
            }
236
        },
237
    }
238
314
}
239

            
240
/// To set-up permissions of a group of application/network queues in a virtual host for the user.
241
70
pub async fn put_permissions(
242
70
    client: &Client,
243
70
    opts: &ManagementOpts,
244
70
    hostname: &str,
245
70
    q_type: QueueType,
246
70
    username: &str,
247
70
) -> Result<(), ErrResp> {
248
70
    let uri = format!(
249
70
        "http://{}:15672/api/permissions/{}/{}",
250
70
        hostname, username, username
251
70
    );
252
70
    let config_pattern = match q_type {
253
36
        QueueType::Application => format!(
254
36
            "^broker.{}.(uldata|dldata|dldata-resp|dldata-result)$",
255
36
            username
256
36
        )
257
36
        .replace(".", "\\."),
258
        QueueType::Network => {
259
34
            format!("^broker.{}.(uldata|dldata|dldata-result|ctrl)$", username).replace(".", "\\.")
260
        }
261
    };
262
70
    let read_pattern = match q_type {
263
        QueueType::Application => {
264
36
            format!("^broker.{}.(uldata|dldata-resp|dldata-result)$", username).replace(".", "\\.")
265
        }
266
34
        QueueType::Network => format!("^broker.{}.(dldata|ctrl)$", username).replace(".", "\\."),
267
    };
268
70
    let body = PutPermissionsBody {
269
70
        configure: config_pattern.to_string(),
270
70
        write: ".*".to_string(),
271
70
        read: read_pattern,
272
70
    };
273
70
    let req = match client
274
70
        .request(Method::PUT, uri)
275
70
        .basic_auth(opts.username.as_str(), Some(opts.password.as_str()))
276
70
        .json(&body)
277
70
        .build()
278
    {
279
        Err(e) => {
280
            let e = format!("generate permissions request error: {}", e);
281
            return Err(ErrResp::ErrRsc(Some(e)));
282
        }
283
70
        Ok(req) => req,
284
70
    };
285
70
    match client.execute(req).await {
286
2
        Err(e) => {
287
2
            let e = format!("execute permissions request error: {}", e);
288
2
            Err(ErrResp::ErrIntMsg(Some(e)))
289
        }
290
68
        Ok(resp) => match resp.status() {
291
66
            StatusCode::CREATED | StatusCode::NO_CONTENT => Ok(()),
292
            _ => {
293
2
                let e = format!("execute permissions request with status: {}", resp.status());
294
2
                Err(ErrResp::ErrIntMsg(Some(e)))
295
            }
296
        },
297
    }
298
70
}
299

            
300
/// To get TTL/length policies for the user.
301
40
pub async fn get_policies(
302
40
    client: &Client,
303
40
    opts: &ManagementOpts,
304
40
    hostname: &str,
305
40
    username: &str,
306
40
) -> Result<BrokerPolicies, ErrResp> {
307
40
    let uri = format!(
308
40
        "http://{}:15672/api/policies/{}/sylvia-iot-broker",
309
40
        hostname, username
310
40
    );
311
40
    let req = match client
312
40
        .request(Method::GET, uri)
313
40
        .basic_auth(opts.username.as_str(), Some(opts.password.as_str()))
314
40
        .build()
315
    {
316
        Err(e) => {
317
            let e = format!("generate policies request error: {}", e);
318
            return Err(ErrResp::ErrRsc(Some(e)));
319
        }
320
40
        Ok(req) => req,
321
    };
322
40
    let resp = match client.execute(req).await {
323
2
        Err(e) => {
324
2
            let e = format!("execute policies request error: {}", e);
325
2
            return Err(ErrResp::ErrIntMsg(Some(e)));
326
        }
327
38
        Ok(resp) => match resp.status() {
328
18
            StatusCode::OK => resp,
329
            StatusCode::NOT_FOUND => {
330
18
                return Ok(BrokerPolicies {
331
18
                    ttl: Some(0),
332
18
                    length: Some(0),
333
18
                })
334
            }
335
            _ => {
336
2
                let e = format!("execute request with status: {}", resp.status());
337
2
                return Err(ErrResp::ErrIntMsg(Some(e)));
338
            }
339
        },
340
    };
341
18
    match resp.json::<Policies>().await {
342
        Err(e) => {
343
            let e = format!("not expected policies body: {}", e);
344
            Err(ErrResp::ErrUnknown(Some(e)))
345
        }
346
18
        Ok(body) => Ok(BrokerPolicies {
347
18
            ttl: match body.definition.message_ttl {
348
                None => Some(0),
349
18
                _ => body.definition.message_ttl,
350
            },
351
18
            length: match body.definition.max_length {
352
                None => Some(0),
353
18
                _ => body.definition.max_length,
354
            },
355
        }),
356
    }
357
40
}
358

            
359
/// To update TTL/length policies for the user.
360
54
pub async fn put_policies(
361
54
    client: &Client,
362
54
    opts: &ManagementOpts,
363
54
    hostname: &str,
364
54
    username: &str,
365
54
    policies: &BrokerPolicies,
366
54
) -> Result<(), ErrResp> {
367
54
    let uri = format!(
368
54
        "http://{}:15672/api/policies/{}/sylvia-iot-broker",
369
54
        hostname, username
370
54
    );
371
54
    let is_delete = match policies.ttl {
372
14
        None | Some(0) => match policies.length {
373
14
            None | Some(0) => true,
374
            _ => false,
375
        },
376
40
        _ => false,
377
    };
378
54
    let builder = if is_delete {
379
14
        client
380
14
            .request(Method::DELETE, uri)
381
14
            .basic_auth(opts.username.as_str(), Some(opts.password.as_str()))
382
    } else {
383
40
        let definition = PoliciesDefinition {
384
40
            message_ttl: match policies.ttl {
385
                Some(0) => None,
386
40
                _ => policies.ttl,
387
            },
388
40
            max_length: match policies.length {
389
4
                Some(0) => None,
390
36
                _ => policies.length,
391
            },
392
        };
393
40
        let body = Policies {
394
40
            pattern: "^broker.".to_string(),
395
40
            definition,
396
40
            apply_to: "queues".to_string(),
397
40
        };
398
40
        client
399
40
            .request(Method::PUT, uri)
400
40
            .basic_auth(opts.username.as_str(), Some(opts.password.as_str()))
401
40
            .json(&body)
402
    };
403
54
    let req = match builder.build() {
404
        Err(e) => {
405
            let e = format!("generate policies request error: {}", e);
406
            return Err(ErrResp::ErrRsc(Some(e)));
407
        }
408
54
        Ok(req) => req,
409
54
    };
410
54
    match client.execute(req).await {
411
2
        Err(e) => {
412
2
            let e = format!("execute policies request error: {}", e);
413
2
            Err(ErrResp::ErrIntMsg(Some(e)))
414
        }
415
52
        Ok(resp) => match resp.status() {
416
48
            StatusCode::CREATED | StatusCode::NO_CONTENT => Ok(()),
417
2
            StatusCode::NOT_FOUND => match is_delete {
418
                false => Err(ErrResp::ErrNotFound(None)),
419
2
                true => Ok(()),
420
            },
421
            _ => {
422
2
                let e = format!("execute request with status: {}", resp.status());
423
2
                Err(ErrResp::ErrIntMsg(Some(e)))
424
            }
425
        },
426
    }
427
54
}
428

            
429
/// To publish a message to the specified queue (such as `uldata` and `dldata`).
430
///
431
/// The `payload` MUST be Base64 encoded string.
432
42
pub async fn publish_message(
433
42
    client: &Client,
434
42
    opts: &ManagementOpts,
435
42
    hostname: &str,
436
42
    username: &str,
437
42
    queue: &str,     // uldata,dldata
438
42
    payload: String, // Base64
439
42
) -> Result<(), ErrResp> {
440
42
    let uri = format!(
441
42
        "http://{}:15672/api/exchanges/{}/amq.default/publish",
442
42
        hostname, username
443
42
    );
444
42
    let body = PostExchangesBody {
445
42
        properties: Map::<String, Value>::new(),
446
42
        routing_key: format!("broker.{}.{}", username, queue),
447
42
        payload,
448
42
        payload_encoding: "base64",
449
42
    };
450
42
    let req = match client
451
42
        .request(Method::POST, uri)
452
42
        .basic_auth(opts.username.as_str(), Some(opts.password.as_str()))
453
42
        .json(&body)
454
42
        .build()
455
    {
456
        Err(e) => {
457
            let e = format!("generate publish request error: {}", e);
458
            return Err(ErrResp::ErrRsc(Some(e)));
459
        }
460
42
        Ok(req) => req,
461
42
    };
462
42
    match client.execute(req).await {
463
2
        Err(e) => {
464
2
            let e = format!("execute publish request error: {}", e);
465
2
            Err(ErrResp::ErrIntMsg(Some(e)))
466
        }
467
40
        Ok(resp) => match resp.status() {
468
38
            StatusCode::OK => Ok(()),
469
            _ => {
470
2
                let e = format!("execute publish request with status: {}", resp.status());
471
2
                Err(ErrResp::ErrIntMsg(Some(e)))
472
            }
473
        },
474
    }
475
42
}
476

            
477
/// Get statistics of a queue.
478
5526
pub async fn stats(
479
5526
    client: &Client,
480
5526
    opts: &ManagementOpts,
481
5526
    hostname: &str,
482
5526
    username: &str,
483
5526
    queue: &str, // uldata,dldata,dldata-resp,dldata-result,ctrl
484
5526
) -> Result<Stats, ErrResp> {
485
5526
    let uri = format!(
486
5526
        "http://{}:15672/api/queues/{}/broker.{}.{}?msg_rates_age=60&msg_rates_incr=5",
487
5526
        hostname, username, username, queue
488
5526
    );
489
5526
    let req = match client
490
5526
        .request(Method::GET, uri)
491
5526
        .basic_auth(opts.username.as_str(), Some(opts.password.as_str()))
492
5526
        .build()
493
    {
494
        Err(e) => {
495
            let e = format!("generate stats request error: {}", e);
496
            return Err(ErrResp::ErrRsc(Some(e)));
497
        }
498
5526
        Ok(req) => req,
499
    };
500
5526
    let resp = match client.execute(req).await {
501
2
        Err(e) => {
502
2
            let e = format!("execute stats request error: {}", e);
503
2
            return Err(ErrResp::ErrIntMsg(Some(e)));
504
        }
505
5524
        Ok(resp) => match resp.status() {
506
5520
            StatusCode::OK => resp,
507
2
            StatusCode::NOT_FOUND => return Err(ErrResp::ErrNotFound(None)),
508
            _ => {
509
2
                let e = format!("execute stats request with status: {}", resp.status());
510
2
                return Err(ErrResp::ErrIntMsg(Some(e)));
511
            }
512
        },
513
    };
514
5520
    let resp_stats = match resp.json::<GetQueuesResBody>().await {
515
        Err(e) => {
516
            let e = format!("read stats body error: {}", e);
517
            return Err(ErrResp::ErrIntMsg(Some(e)));
518
        }
519
5520
        Ok(stats) => stats,
520
5520
    };
521
5520
    let mut ret_stats = Stats {
522
5520
        ..Default::default()
523
5520
    };
524
5520
    if let Some(consumers) = resp_stats.consumers {
525
3762
        ret_stats.consumers = consumers;
526
3762
    }
527
5520
    if let Some(messages) = resp_stats.messages {
528
3998
        ret_stats.messages = messages;
529
3998
    }
530
5520
    if let Some(stats) = resp_stats.message_stats {
531
3374
        if let Some(details) = stats.publish_details.as_ref() {
532
3374
            ret_stats.publish_rate = details.rate;
533
3374
        }
534
3374
        if let Some(details) = stats.deliver_details.as_ref() {
535
102
            ret_stats.deliver_rate = details.rate;
536
3272
        }
537
2146
    }
538
5520
    Ok(ret_stats)
539
5526
}