1
use std::{
2
    collections::HashMap,
3
    sync::{Arc, Mutex},
4
};
5

            
6
use url::Url;
7

            
8
use general_mq::{
9
    queue::{EventHandler, GmqQueue},
10
    AmqpQueueOptions, MqttQueueOptions, Queue, QueueOptions,
11
};
12

            
13
use super::{get_connection, Connection};
14

            
15
const QUEUE_NAME: &'static str = "broker.data";
16

            
17
/// To create a reliable unicast queue to send data messages.
18
12
pub fn new(
19
12
    conn_pool: &Arc<Mutex<HashMap<String, Connection>>>,
20
12
    host_uri: &Url,
21
12
    persistent: bool,
22
12
    handler: Arc<dyn EventHandler>,
23
12
) -> Result<Queue, String> {
24
12
    let conn = get_connection(&conn_pool, host_uri)?;
25
12
    let mut queue = match conn {
26
8
        Connection::Amqp(conn, counter) => {
27
8
            let opts = QueueOptions::Amqp(
28
8
                AmqpQueueOptions {
29
8
                    name: QUEUE_NAME.to_string(),
30
8
                    is_recv: false,
31
8
                    reliable: true,
32
8
                    persistent,
33
8
                    broadcast: false,
34
8
                    ..Default::default()
35
8
                },
36
8
                &conn,
37
8
            );
38
8
            {
39
8
                *counter.lock().unwrap() += 1;
40
8
            }
41
8
            Queue::new(opts)?
42
        }
43
4
        Connection::Mqtt(conn, counter) => {
44
4
            let opts = QueueOptions::Mqtt(
45
4
                MqttQueueOptions {
46
4
                    name: QUEUE_NAME.to_string(),
47
4
                    is_recv: false,
48
4
                    reliable: true,
49
4
                    broadcast: false,
50
4
                    ..Default::default()
51
4
                },
52
4
                &conn,
53
4
            );
54
4
            {
55
4
                *counter.lock().unwrap() += 1;
56
4
            }
57
4
            Queue::new(opts)?
58
        }
59
    };
60
12
    queue.set_handler(handler);
61
12
    if let Err(e) = queue.connect() {
62
        return Err(e.to_string());
63
12
    }
64
12
    Ok(queue)
65
12
}