1
use std::{
2
    collections::HashMap,
3
    sync::{Arc, Mutex},
4
};
5

            
6
use url::Url;
7

            
8
use general_mq::{
9
    queue::{EventHandler, GmqQueue, MessageHandler},
10
    AmqpQueueOptions, MqttQueueOptions, Queue, QueueOptions,
11
};
12

            
13
use super::{get_connection, Connection};
14

            
15
const QUEUE_PREFIX: &'static str = "broker.ctrl";
16

            
17
/// The default prefetch value for AMQP.
18
const DEF_PREFETCH: u16 = 100;
19

            
20
/// To create a broadcast queue for a function to send or receive control messages.
21
412
pub fn new(
22
412
    conn_pool: Arc<Mutex<HashMap<String, Connection>>>,
23
412
    host_uri: &Url,
24
412
    prefetch: Option<u16>,
25
412
    func_name: &str,
26
412
    is_recv: bool,
27
412
    handler: Arc<dyn EventHandler>,
28
412
    msg_handler: Arc<dyn MessageHandler>,
29
412
) -> Result<Queue, String> {
30
412
    if func_name.len() == 0 {
31
4
        return Err("`func_name` cannot be empty for control queue".to_string());
32
408
    }
33

            
34
408
    let conn = get_connection(&conn_pool, host_uri)?;
35
408
    let mut queue = match conn {
36
402
        Connection::Amqp(conn, counter) => {
37
402
            let prefetch = match prefetch {
38
38
                None => DEF_PREFETCH,
39
364
                Some(prefetch) => match prefetch {
40
2
                    0 => DEF_PREFETCH,
41
362
                    _ => prefetch,
42
                },
43
            };
44
402
            let opts = QueueOptions::Amqp(
45
402
                AmqpQueueOptions {
46
402
                    name: format!("{}.{}", QUEUE_PREFIX, func_name),
47
402
                    is_recv,
48
402
                    reliable: true,
49
402
                    broadcast: true,
50
402
                    prefetch,
51
402
                    ..Default::default()
52
402
                },
53
402
                &conn,
54
402
            );
55
402
            {
56
402
                *counter.lock().unwrap() += 1;
57
402
            }
58
402
            Queue::new(opts)?
59
        }
60
6
        Connection::Mqtt(conn, counter) => {
61
6
            let opts = QueueOptions::Mqtt(
62
6
                MqttQueueOptions {
63
6
                    name: format!("{}.{}", QUEUE_PREFIX, func_name),
64
6
                    is_recv,
65
6
                    reliable: true,
66
6
                    broadcast: true,
67
6
                    ..Default::default()
68
6
                },
69
6
                &conn,
70
6
            );
71
6
            {
72
6
                *counter.lock().unwrap() += 1;
73
6
            }
74
6
            Queue::new(opts)?
75
        }
76
    };
77
408
    queue.set_handler(handler);
78
408
    if is_recv {
79
180
        queue.set_msg_handler(msg_handler);
80
228
    }
81
408
    if let Err(e) = queue.connect() {
82
        return Err(e.to_string());
83
408
    }
84
408
    Ok(queue)
85
412
}