1
use std::{
2
    collections::HashMap,
3
    sync::{Arc, Mutex},
4
};
5

            
6
use url::Url;
7

            
8
use general_mq::{
9
    queue::{EventHandler, GmqQueue},
10
    AmqpQueueOptions, MqttQueueOptions, Queue, QueueOptions,
11
};
12

            
13
use super::{get_connection, Connection};
14

            
15
const QUEUE_NAME: &'static str = "coremgr.data";
16

            
17
/// To create a reliable unicast queue to send data messages.
18
3
pub fn new(
19
3
    conn_pool: &Arc<Mutex<HashMap<String, Connection>>>,
20
3
    host_uri: &Url,
21
3
    persistent: bool,
22
3
    handler: Arc<dyn EventHandler>,
23
3
) -> Result<Queue, String> {
24
3
    let conn = get_connection(&conn_pool, host_uri)?;
25
3
    let mut queue = match conn {
26
1
        Connection::Amqp(conn, counter) => {
27
1
            let opts = QueueOptions::Amqp(
28
1
                AmqpQueueOptions {
29
1
                    name: QUEUE_NAME.to_string(),
30
1
                    is_recv: false,
31
1
                    reliable: true,
32
1
                    persistent,
33
1
                    broadcast: false,
34
1
                    ..Default::default()
35
1
                },
36
1
                &conn,
37
1
            );
38
1
            {
39
1
                *counter.lock().unwrap() += 1;
40
1
            }
41
1
            Queue::new(opts)?
42
        }
43
2
        Connection::Mqtt(conn, counter) => {
44
2
            let opts = QueueOptions::Mqtt(
45
2
                MqttQueueOptions {
46
2
                    name: QUEUE_NAME.to_string(),
47
2
                    is_recv: false,
48
2
                    reliable: true,
49
2
                    broadcast: false,
50
2
                    ..Default::default()
51
2
                },
52
2
                &conn,
53
2
            );
54
2
            {
55
2
                *counter.lock().unwrap() += 1;
56
2
            }
57
2
            Queue::new(opts)?
58
        }
59
    };
60
3
    queue.set_handler(handler);
61
3
    if let Err(e) = queue.connect() {
62
        return Err(e.to_string());
63
3
    }
64
3
    Ok(queue)
65
3
}