1
use std::{
2
    collections::HashMap,
3
    sync::{Arc, Mutex},
4
};
5

            
6
use url::Url;
7

            
8
use general_mq::{
9
    queue::{EventHandler, GmqQueue},
10
    AmqpQueueOptions, MqttQueueOptions, Queue, QueueOptions,
11
};
12

            
13
use super::{get_connection, Connection};
14

            
15
const QUEUE_NAME: &'static str = "coremgr.data";
16

            
17
/// To create a reliable unicast queue to send data messages.
18
6
pub fn new(
19
6
    conn_pool: &Arc<Mutex<HashMap<String, Connection>>>,
20
6
    host_uri: &Url,
21
6
    persistent: bool,
22
6
    handler: Arc<dyn EventHandler>,
23
6
) -> Result<Queue, String> {
24
6
    let conn = get_connection(&conn_pool, host_uri)?;
25
6
    let mut queue = match conn {
26
2
        Connection::Amqp(conn, counter) => {
27
2
            let opts = QueueOptions::Amqp(
28
2
                AmqpQueueOptions {
29
2
                    name: QUEUE_NAME.to_string(),
30
2
                    is_recv: false,
31
2
                    reliable: true,
32
2
                    persistent,
33
2
                    broadcast: false,
34
2
                    ..Default::default()
35
2
                },
36
2
                &conn,
37
2
            );
38
2
            {
39
2
                *counter.lock().unwrap() += 1;
40
2
            }
41
2
            Queue::new(opts)?
42
        }
43
4
        Connection::Mqtt(conn, counter) => {
44
4
            let opts = QueueOptions::Mqtt(
45
4
                MqttQueueOptions {
46
4
                    name: QUEUE_NAME.to_string(),
47
4
                    is_recv: false,
48
4
                    reliable: true,
49
4
                    broadcast: false,
50
4
                    ..Default::default()
51
4
                },
52
4
                &conn,
53
4
            );
54
4
            {
55
4
                *counter.lock().unwrap() += 1;
56
4
            }
57
4
            Queue::new(opts)?
58
        }
59
    };
60
6
    queue.set_handler(handler);
61
6
    if let Err(e) = queue.connect() {
62
        return Err(e.to_string());
63
6
    }
64
6
    Ok(queue)
65
6
}