1
//! Wrapper APIs for controlling RabbitMQ.
2
//!
3
//! - `hostname` of all APIs are host name or IP address of the broker.
4

            
5
use reqwest::{self, Client, Method, StatusCode};
6
use serde::{Deserialize, Serialize};
7
use serde_json::{Map, Value};
8

            
9
use sylvia_iot_corelib::err::ErrResp;
10

            
11
use super::QueueType;
12

            
13
/// RabbitMQ management information.
14
#[derive(Clone)]
15
pub struct ManagementOpts {
16
    /// Management plugin administrator name.
17
    pub username: String,
18
    /// Management plugin administrator password.
19
    pub password: String,
20
    /// Default message TTL in milliseconds.
21
    pub ttl: Option<usize>,
22
    /// Default queue length.
23
    pub length: Option<usize>,
24
}
25

            
26
/// Policies for `broker.*` queues.
27
pub struct BrokerPolicies {
28
    /// Message TTL in milliseconds.
29
    pub ttl: Option<usize>,
30
    /// Queue length.
31
    pub length: Option<usize>,
32
}
33

            
34
/// Statistics.
35
#[derive(Default)]
36
pub struct Stats {
37
    /// Number of queue consumers.
38
    pub consumers: usize,
39
    /// Number of ready/unacked messages.
40
    pub messages: usize,
41
    /// Publish rate from the producer.
42
    pub publish_rate: f64,
43
    /// Deliver rate to the consumer.
44
    pub deliver_rate: f64,
45
}
46

            
47
#[derive(Serialize)]
48
struct PutUsersBody<'a> {
49
    password: &'a str,
50
    tags: &'a str,
51
}
52

            
53
#[derive(Serialize)]
54
struct PutPermissionsBody {
55
    configure: String,
56
    write: String,
57
    read: String,
58
}
59

            
60
#[derive(Deserialize, Serialize)]
61
struct Policies {
62
    pattern: String,
63
    definition: PoliciesDefinition,
64
    #[serde(rename = "apply-to")]
65
    apply_to: String,
66
}
67

            
68
#[derive(Deserialize, Serialize)]
69
struct PoliciesDefinition {
70
    #[serde(rename = "message-ttl", skip_serializing_if = "Option::is_none")]
71
    message_ttl: Option<usize>,
72
    #[serde(rename = "max-length", skip_serializing_if = "Option::is_none")]
73
    max_length: Option<usize>,
74
}
75

            
76
#[derive(Serialize)]
77
struct PostExchangesBody<'a> {
78
    properties: Map<String, Value>,
79
    routing_key: String,
80
    payload: String,
81
    payload_encoding: &'a str,
82
}
83

            
84
#[derive(Deserialize)]
85
struct GetQueuesResBody {
86
    consumers: Option<usize>,
87
    messages: Option<usize>,
88
    message_stats: Option<MessageStats>,
89
}
90

            
91
#[derive(Deserialize)]
92
struct MessageStats {
93
    deliver_details: Option<Details>,
94
    publish_details: Option<Details>,
95
}
96

            
97
#[derive(Deserialize)]
98
struct Details {
99
    rate: f64,
100
}
101

            
102
/// To create or update user account and its password.
103
42
pub async fn put_user(
104
42
    client: &Client,
105
42
    opts: &ManagementOpts,
106
42
    hostname: &str,
107
42
    username: &str,
108
42
    password: &str,
109
42
) -> Result<(), ErrResp> {
110
42
    let uri = format!("http://{}:15672/api/users/{}", hostname, username);
111
42
    let req = match client
112
42
        .request(Method::PUT, uri)
113
42
        .basic_auth(opts.username.as_str(), Some(opts.password.as_str()))
114
42
        .json(&PutUsersBody { password, tags: "" })
115
42
        .build()
116
    {
117
        Err(e) => {
118
            let e = format!("generate user request error: {}", e);
119
            return Err(ErrResp::ErrRsc(Some(e)));
120
        }
121
42
        Ok(req) => req,
122
42
    };
123
42
    match client.execute(req).await {
124
1
        Err(e) => {
125
1
            let e = format!("execute user request error: {}", e);
126
1
            Err(ErrResp::ErrIntMsg(Some(e)))
127
        }
128
41
        Ok(resp) => match resp.status() {
129
40
            StatusCode::CREATED | StatusCode::NO_CONTENT => Ok(()),
130
            _ => {
131
1
                let e = format!("execute user request with status: {}", resp.status());
132
1
                Err(ErrResp::ErrIntMsg(Some(e)))
133
            }
134
        },
135
    }
136
42
}
137

            
138
/// To delete a user.
139
357
pub async fn delete_user(
140
357
    client: &Client,
141
357
    opts: &ManagementOpts,
142
357
    hostname: &str,
143
357
    username: &str,
144
357
) -> Result<(), ErrResp> {
145
357
    let uri = format!("http://{}:15672/api/users/{}", hostname, username);
146
357
    let req = match client
147
357
        .request(Method::DELETE, uri)
148
357
        .basic_auth(opts.username.as_str(), Some(opts.password.as_str()))
149
357
        .build()
150
    {
151
        Err(e) => {
152
            let e = format!("generate user request error: {}", e);
153
            return Err(ErrResp::ErrRsc(Some(e)));
154
        }
155
357
        Ok(req) => req,
156
357
    };
157
357
    match client.execute(req).await {
158
1
        Err(e) => {
159
1
            let e = format!("execute user request error: {}", e);
160
1
            Err(ErrResp::ErrIntMsg(Some(e)))
161
        }
162
356
        Ok(resp) => match resp.status() {
163
355
            StatusCode::NO_CONTENT | StatusCode::NOT_FOUND => Ok(()),
164
            _ => {
165
1
                let e = format!("execute user request with status: {}", resp.status());
166
1
                Err(ErrResp::ErrIntMsg(Some(e)))
167
            }
168
        },
169
    }
170
357
}
171

            
172
/// To create a virtual host.
173
42
pub async fn put_vhost(
174
42
    client: &Client,
175
42
    opts: &ManagementOpts,
176
42
    hostname: &str,
177
42
    username: &str,
178
42
) -> Result<(), ErrResp> {
179
42
    let uri = format!("http://{}:15672/api/vhosts/{}", hostname, username);
180
42
    let req = match client
181
42
        .request(Method::PUT, uri)
182
42
        .basic_auth(opts.username.as_str(), Some(opts.password.as_str()))
183
42
        .build()
184
    {
185
        Err(e) => {
186
            let e = format!("generate vhost request error: {}", e);
187
            return Err(ErrResp::ErrRsc(Some(e)));
188
        }
189
42
        Ok(req) => req,
190
42
    };
191
42
    match client.execute(req).await {
192
1
        Err(e) => {
193
1
            let e = format!("execute vhost request error: {}", e);
194
1
            Err(ErrResp::ErrIntMsg(Some(e)))
195
        }
196
41
        Ok(resp) => match resp.status() {
197
40
            StatusCode::CREATED | StatusCode::NO_CONTENT => Ok(()),
198
            _ => {
199
1
                let e = format!("execute vhost request with status: {}", resp.status());
200
1
                Err(ErrResp::ErrIntMsg(Some(e)))
201
            }
202
        },
203
    }
204
42
}
205

            
206
/// To delete a virtual host.
207
157
pub async fn delete_vhost(
208
157
    client: &Client,
209
157
    opts: &ManagementOpts,
210
157
    hostname: &str,
211
157
    username: &str,
212
157
) -> Result<(), ErrResp> {
213
157
    let uri = format!("http://{}:15672/api/vhosts/{}", hostname, username);
214
157
    let req = match client
215
157
        .request(Method::DELETE, uri)
216
157
        .basic_auth(opts.username.as_str(), Some(opts.password.as_str()))
217
157
        .build()
218
    {
219
        Err(e) => {
220
            let e = format!("generate vhost request error: {}", e);
221
            return Err(ErrResp::ErrRsc(Some(e)));
222
        }
223
157
        Ok(req) => req,
224
157
    };
225
157
    match client.execute(req).await {
226
1
        Err(e) => {
227
1
            let e = format!("execute vhost request error: {}", e);
228
1
            Err(ErrResp::ErrIntMsg(Some(e)))
229
        }
230
156
        Ok(resp) => match resp.status() {
231
155
            StatusCode::NO_CONTENT | StatusCode::NOT_FOUND => Ok(()),
232
            _ => {
233
1
                let e = format!("execute vhost request with status: {}", resp.status());
234
1
                Err(ErrResp::ErrIntMsg(Some(e)))
235
            }
236
        },
237
    }
238
157
}
239

            
240
/// To set-up permissions of a group of application/network queues in a virtual host for the user.
241
35
pub async fn put_permissions(
242
35
    client: &Client,
243
35
    opts: &ManagementOpts,
244
35
    hostname: &str,
245
35
    q_type: QueueType,
246
35
    username: &str,
247
35
) -> Result<(), ErrResp> {
248
35
    let uri = format!(
249
35
        "http://{}:15672/api/permissions/{}/{}",
250
35
        hostname, username, username
251
35
    );
252
35
    let config_pattern = match q_type {
253
18
        QueueType::Application => format!(
254
18
            "^broker.{}.(uldata|dldata|dldata-resp|dldata-result)$",
255
18
            username
256
18
        )
257
18
        .replace(".", "\\."),
258
        QueueType::Network => {
259
17
            format!("^broker.{}.(uldata|dldata|dldata-result|ctrl)$", username).replace(".", "\\.")
260
        }
261
    };
262
35
    let read_pattern = match q_type {
263
        QueueType::Application => {
264
18
            format!("^broker.{}.(uldata|dldata-resp|dldata-result)$", username).replace(".", "\\.")
265
        }
266
17
        QueueType::Network => format!("^broker.{}.(dldata|ctrl)$", username).replace(".", "\\."),
267
    };
268
35
    let body = PutPermissionsBody {
269
35
        configure: config_pattern.to_string(),
270
35
        write: ".*".to_string(),
271
35
        read: read_pattern,
272
35
    };
273
35
    let req = match client
274
35
        .request(Method::PUT, uri)
275
35
        .basic_auth(opts.username.as_str(), Some(opts.password.as_str()))
276
35
        .json(&body)
277
35
        .build()
278
    {
279
        Err(e) => {
280
            let e = format!("generate permissions request error: {}", e);
281
            return Err(ErrResp::ErrRsc(Some(e)));
282
        }
283
35
        Ok(req) => req,
284
35
    };
285
35
    match client.execute(req).await {
286
1
        Err(e) => {
287
1
            let e = format!("execute permissions request error: {}", e);
288
1
            Err(ErrResp::ErrIntMsg(Some(e)))
289
        }
290
34
        Ok(resp) => match resp.status() {
291
33
            StatusCode::CREATED | StatusCode::NO_CONTENT => Ok(()),
292
            _ => {
293
1
                let e = format!("execute permissions request with status: {}", resp.status());
294
1
                Err(ErrResp::ErrIntMsg(Some(e)))
295
            }
296
        },
297
    }
298
35
}
299

            
300
/// To get TTL/length policies for the user.
301
20
pub async fn get_policies(
302
20
    client: &Client,
303
20
    opts: &ManagementOpts,
304
20
    hostname: &str,
305
20
    username: &str,
306
20
) -> Result<BrokerPolicies, ErrResp> {
307
20
    let uri = format!(
308
20
        "http://{}:15672/api/policies/{}/sylvia-iot-broker",
309
20
        hostname, username
310
20
    );
311
20
    let req = match client
312
20
        .request(Method::GET, uri)
313
20
        .basic_auth(opts.username.as_str(), Some(opts.password.as_str()))
314
20
        .build()
315
    {
316
        Err(e) => {
317
            let e = format!("generate policies request error: {}", e);
318
            return Err(ErrResp::ErrRsc(Some(e)));
319
        }
320
20
        Ok(req) => req,
321
    };
322
20
    let resp = match client.execute(req).await {
323
1
        Err(e) => {
324
1
            let e = format!("execute policies request error: {}", e);
325
1
            return Err(ErrResp::ErrIntMsg(Some(e)));
326
        }
327
19
        Ok(resp) => match resp.status() {
328
9
            StatusCode::OK => resp,
329
            StatusCode::NOT_FOUND => {
330
9
                return Ok(BrokerPolicies {
331
9
                    ttl: Some(0),
332
9
                    length: Some(0),
333
9
                })
334
            }
335
            _ => {
336
1
                let e = format!("execute request with status: {}", resp.status());
337
1
                return Err(ErrResp::ErrIntMsg(Some(e)));
338
            }
339
        },
340
    };
341
9
    match resp.json::<Policies>().await {
342
        Err(e) => {
343
            let e = format!("not expected policies body: {}", e);
344
            Err(ErrResp::ErrUnknown(Some(e)))
345
        }
346
9
        Ok(body) => Ok(BrokerPolicies {
347
9
            ttl: match body.definition.message_ttl {
348
                None => Some(0),
349
9
                _ => body.definition.message_ttl,
350
            },
351
9
            length: match body.definition.max_length {
352
                None => Some(0),
353
9
                _ => body.definition.max_length,
354
            },
355
        }),
356
    }
357
20
}
358

            
359
/// To update TTL/length policies for the user.
360
27
pub async fn put_policies(
361
27
    client: &Client,
362
27
    opts: &ManagementOpts,
363
27
    hostname: &str,
364
27
    username: &str,
365
27
    policies: &BrokerPolicies,
366
27
) -> Result<(), ErrResp> {
367
27
    let uri = format!(
368
27
        "http://{}:15672/api/policies/{}/sylvia-iot-broker",
369
27
        hostname, username
370
27
    );
371
27
    let is_delete = match policies.ttl {
372
7
        None | Some(0) => match policies.length {
373
7
            None | Some(0) => true,
374
            _ => false,
375
        },
376
20
        _ => false,
377
    };
378
27
    let builder = if is_delete {
379
7
        client
380
7
            .request(Method::DELETE, uri)
381
7
            .basic_auth(opts.username.as_str(), Some(opts.password.as_str()))
382
    } else {
383
20
        let definition = PoliciesDefinition {
384
20
            message_ttl: match policies.ttl {
385
                Some(0) => None,
386
20
                _ => policies.ttl,
387
            },
388
20
            max_length: match policies.length {
389
2
                Some(0) => None,
390
18
                _ => policies.length,
391
            },
392
        };
393
20
        let body = Policies {
394
20
            pattern: "^broker.".to_string(),
395
20
            definition,
396
20
            apply_to: "queues".to_string(),
397
20
        };
398
20
        client
399
20
            .request(Method::PUT, uri)
400
20
            .basic_auth(opts.username.as_str(), Some(opts.password.as_str()))
401
20
            .json(&body)
402
    };
403
27
    let req = match builder.build() {
404
        Err(e) => {
405
            let e = format!("generate policies request error: {}", e);
406
            return Err(ErrResp::ErrRsc(Some(e)));
407
        }
408
27
        Ok(req) => req,
409
27
    };
410
27
    match client.execute(req).await {
411
1
        Err(e) => {
412
1
            let e = format!("execute policies request error: {}", e);
413
1
            Err(ErrResp::ErrIntMsg(Some(e)))
414
        }
415
26
        Ok(resp) => match resp.status() {
416
24
            StatusCode::CREATED | StatusCode::NO_CONTENT => Ok(()),
417
1
            StatusCode::NOT_FOUND => match is_delete {
418
                false => Err(ErrResp::ErrNotFound(None)),
419
1
                true => Ok(()),
420
            },
421
            _ => {
422
1
                let e = format!("execute request with status: {}", resp.status());
423
1
                Err(ErrResp::ErrIntMsg(Some(e)))
424
            }
425
        },
426
    }
427
27
}
428

            
429
/// To publish a message to the specified queue (such as `uldata` and `dldata`).
430
///
431
/// The `payload` MUST be Base64 encoded string.
432
21
pub async fn publish_message(
433
21
    client: &Client,
434
21
    opts: &ManagementOpts,
435
21
    hostname: &str,
436
21
    username: &str,
437
21
    queue: &str,     // uldata,dldata
438
21
    payload: String, // Base64
439
21
) -> Result<(), ErrResp> {
440
21
    let uri = format!(
441
21
        "http://{}:15672/api/exchanges/{}/amq.default/publish",
442
21
        hostname, username
443
21
    );
444
21
    let body = PostExchangesBody {
445
21
        properties: Map::<String, Value>::new(),
446
21
        routing_key: format!("broker.{}.{}", username, queue),
447
21
        payload,
448
21
        payload_encoding: "base64",
449
21
    };
450
21
    let req = match client
451
21
        .request(Method::POST, uri)
452
21
        .basic_auth(opts.username.as_str(), Some(opts.password.as_str()))
453
21
        .json(&body)
454
21
        .build()
455
    {
456
        Err(e) => {
457
            let e = format!("generate publish request error: {}", e);
458
            return Err(ErrResp::ErrRsc(Some(e)));
459
        }
460
21
        Ok(req) => req,
461
21
    };
462
21
    match client.execute(req).await {
463
1
        Err(e) => {
464
1
            let e = format!("execute publish request error: {}", e);
465
1
            Err(ErrResp::ErrIntMsg(Some(e)))
466
        }
467
20
        Ok(resp) => match resp.status() {
468
19
            StatusCode::OK => Ok(()),
469
            _ => {
470
1
                let e = format!("execute publish request with status: {}", resp.status());
471
1
                Err(ErrResp::ErrIntMsg(Some(e)))
472
            }
473
        },
474
    }
475
21
}
476

            
477
/// Get statistics of a queue.
478
2932
pub async fn stats(
479
2932
    client: &Client,
480
2932
    opts: &ManagementOpts,
481
2932
    hostname: &str,
482
2932
    username: &str,
483
2932
    queue: &str, // uldata,dldata,dldata-resp,dldata-result,ctrl
484
2932
) -> Result<Stats, ErrResp> {
485
2932
    let uri = format!(
486
2932
        "http://{}:15672/api/queues/{}/broker.{}.{}?msg_rates_age=60&msg_rates_incr=5",
487
2932
        hostname, username, username, queue
488
2932
    );
489
2932
    let req = match client
490
2932
        .request(Method::GET, uri)
491
2932
        .basic_auth(opts.username.as_str(), Some(opts.password.as_str()))
492
2932
        .build()
493
    {
494
        Err(e) => {
495
            let e = format!("generate stats request error: {}", e);
496
            return Err(ErrResp::ErrRsc(Some(e)));
497
        }
498
2932
        Ok(req) => req,
499
    };
500
2932
    let resp = match client.execute(req).await {
501
1
        Err(e) => {
502
1
            let e = format!("execute stats request error: {}", e);
503
1
            return Err(ErrResp::ErrIntMsg(Some(e)));
504
        }
505
2931
        Ok(resp) => match resp.status() {
506
2929
            StatusCode::OK => resp,
507
1
            StatusCode::NOT_FOUND => return Err(ErrResp::ErrNotFound(None)),
508
            _ => {
509
1
                let e = format!("execute stats request with status: {}", resp.status());
510
1
                return Err(ErrResp::ErrIntMsg(Some(e)));
511
            }
512
        },
513
    };
514
2929
    let resp_stats = match resp.json::<GetQueuesResBody>().await {
515
        Err(e) => {
516
            let e = format!("read stats body error: {}", e);
517
            return Err(ErrResp::ErrIntMsg(Some(e)));
518
        }
519
2929
        Ok(stats) => stats,
520
2929
    };
521
2929
    let mut ret_stats = Stats {
522
2929
        ..Default::default()
523
2929
    };
524
2929
    if let Some(consumers) = resp_stats.consumers {
525
1941
        ret_stats.consumers = consumers;
526
1941
    }
527
2929
    if let Some(messages) = resp_stats.messages {
528
2057
        ret_stats.messages = messages;
529
2057
    }
530
2929
    if let Some(stats) = resp_stats.message_stats {
531
1751
        if let Some(details) = stats.publish_details.as_ref() {
532
1751
            ret_stats.publish_rate = details.rate;
533
1751
        }
534
1751
        if let Some(details) = stats.deliver_details.as_ref() {
535
98
            ret_stats.deliver_rate = details.rate;
536
1653
        }
537
1178
    }
538
2929
    Ok(ret_stats)
539
2932
}