1
use std::{
2
    collections::HashMap,
3
    sync::{Arc, Mutex},
4
};
5

            
6
use url::Url;
7

            
8
use general_mq::{
9
    queue::{EventHandler, GmqQueue, MessageHandler},
10
    AmqpQueueOptions, MqttQueueOptions, Queue, QueueOptions,
11
};
12

            
13
use super::{get_connection, Connection};
14

            
15
const QUEUE_PREFIX: &'static str = "broker.ctrl";
16

            
17
/// The default prefetch value for AMQP.
18
const DEF_PREFETCH: u16 = 100;
19

            
20
/// To create a broadcast queue for a function to send or receive control messages.
21
206
pub fn new(
22
206
    conn_pool: Arc<Mutex<HashMap<String, Connection>>>,
23
206
    host_uri: &Url,
24
206
    prefetch: Option<u16>,
25
206
    func_name: &str,
26
206
    is_recv: bool,
27
206
    handler: Arc<dyn EventHandler>,
28
206
    msg_handler: Arc<dyn MessageHandler>,
29
206
) -> Result<Queue, String> {
30
206
    if func_name.len() == 0 {
31
2
        return Err("`func_name` cannot be empty for control queue".to_string());
32
204
    }
33

            
34
204
    let conn = get_connection(&conn_pool, host_uri)?;
35
204
    let mut queue = match conn {
36
201
        Connection::Amqp(conn, counter) => {
37
201
            let prefetch = match prefetch {
38
19
                None => DEF_PREFETCH,
39
182
                Some(prefetch) => match prefetch {
40
1
                    0 => DEF_PREFETCH,
41
181
                    _ => prefetch,
42
                },
43
            };
44
201
            let opts = QueueOptions::Amqp(
45
201
                AmqpQueueOptions {
46
201
                    name: format!("{}.{}", QUEUE_PREFIX, func_name),
47
201
                    is_recv,
48
201
                    reliable: true,
49
201
                    broadcast: true,
50
201
                    prefetch,
51
201
                    ..Default::default()
52
201
                },
53
201
                &conn,
54
201
            );
55
201
            {
56
201
                *counter.lock().unwrap() += 1;
57
201
            }
58
201
            Queue::new(opts)?
59
        }
60
3
        Connection::Mqtt(conn, counter) => {
61
3
            let opts = QueueOptions::Mqtt(
62
3
                MqttQueueOptions {
63
3
                    name: format!("{}.{}", QUEUE_PREFIX, func_name),
64
3
                    is_recv,
65
3
                    reliable: true,
66
3
                    broadcast: true,
67
3
                    ..Default::default()
68
3
                },
69
3
                &conn,
70
3
            );
71
3
            {
72
3
                *counter.lock().unwrap() += 1;
73
3
            }
74
3
            Queue::new(opts)?
75
        }
76
    };
77
204
    queue.set_handler(handler);
78
204
    if is_recv {
79
90
        queue.set_msg_handler(msg_handler);
80
114
    }
81
204
    if let Err(e) = queue.connect() {
82
        return Err(e.to_string());
83
204
    }
84
204
    Ok(queue)
85
206
}