1
use std::{
2
    collections::HashMap,
3
    sync::{Arc, Mutex},
4
};
5

            
6
use url::Url;
7

            
8
use general_mq::{
9
    queue::{EventHandler, GmqQueue},
10
    AmqpQueueOptions, MqttQueueOptions, Queue, QueueOptions,
11
};
12

            
13
use super::{get_connection, Connection};
14

            
15
const QUEUE_NAME: &'static str = "broker.data";
16

            
17
/// To create a reliable unicast queue to send data messages.
18
24
pub fn new(
19
24
    conn_pool: &Arc<Mutex<HashMap<String, Connection>>>,
20
24
    host_uri: &Url,
21
24
    persistent: bool,
22
24
    handler: Arc<dyn EventHandler>,
23
24
) -> Result<Queue, String> {
24
24
    let conn = get_connection(&conn_pool, host_uri)?;
25
24
    let mut queue = match conn {
26
16
        Connection::Amqp(conn, counter) => {
27
16
            let opts = QueueOptions::Amqp(
28
16
                AmqpQueueOptions {
29
16
                    name: QUEUE_NAME.to_string(),
30
16
                    is_recv: false,
31
16
                    reliable: true,
32
16
                    persistent,
33
16
                    broadcast: false,
34
16
                    ..Default::default()
35
16
                },
36
16
                &conn,
37
16
            );
38
16
            {
39
16
                *counter.lock().unwrap() += 1;
40
16
            }
41
16
            Queue::new(opts)?
42
        }
43
8
        Connection::Mqtt(conn, counter) => {
44
8
            let opts = QueueOptions::Mqtt(
45
8
                MqttQueueOptions {
46
8
                    name: QUEUE_NAME.to_string(),
47
8
                    is_recv: false,
48
8
                    reliable: true,
49
8
                    broadcast: false,
50
8
                    ..Default::default()
51
8
                },
52
8
                &conn,
53
8
            );
54
8
            {
55
8
                *counter.lock().unwrap() += 1;
56
8
            }
57
8
            Queue::new(opts)?
58
        }
59
    };
60
24
    queue.set_handler(handler);
61
24
    if let Err(e) = queue.connect() {
62
        return Err(e.to_string());
63
24
    }
64
24
    Ok(queue)
65
24
}