1
use std::{
2
    collections::HashMap,
3
    sync::{Arc, Mutex},
4
};
5

            
6
use url::Url;
7

            
8
use general_mq::{
9
    connection::GmqConnection,
10
    queue::{EventHandler, GmqQueue, MessageHandler},
11
    AmqpConnection, AmqpConnectionOptions, AmqpQueueOptions, MqttConnection, MqttConnectionOptions,
12
    MqttQueueOptions, Queue, QueueOptions,
13
};
14

            
15
pub mod broker;
16
pub mod coremgr;
17

            
18
use super::config::DataData as DataMqConfig;
19

            
20
/// The general connection type with reference counter for upper layer maintenance.
21
#[derive(Clone)]
22
pub enum Connection {
23
    Amqp(AmqpConnection, Arc<Mutex<isize>>),
24
    Mqtt(MqttConnection, Arc<Mutex<isize>>),
25
}
26

            
27
/// The default prefetch value for AMQP.
28
const DEF_PREFETCH: u16 = 100;
29

            
30
/// To create a reliable unicast queue to receive data messages.
31
64
fn new_data_queue(
32
64
    conn_pool: &mut HashMap<String, Connection>,
33
64
    config: &DataMqConfig,
34
64
    queue_name: &str,
35
64
    handler: Arc<dyn EventHandler>,
36
64
    msg_handler: Arc<dyn MessageHandler>,
37
64
) -> Result<Queue, String> {
38
64
    let host_uri = match config.url.as_ref() {
39
4
        None => return Err("host_uri empty".to_string()),
40
60
        Some(host_uri) => match Url::parse(host_uri) {
41
4
            Err(e) => return Err(format!("host_uri error: {}", e)),
42
56
            Ok(uri) => uri,
43
        },
44
    };
45
56
    let conn = get_connection(conn_pool, &host_uri)?;
46
52
    let mut queue = match conn {
47
34
        Connection::Amqp(conn, counter) => {
48
34
            let opts = QueueOptions::Amqp(
49
                AmqpQueueOptions {
50
34
                    name: queue_name.to_string(),
51
34
                    is_recv: true,
52
34
                    reliable: true,
53
34
                    broadcast: false,
54
34
                    prefetch: match config.prefetch {
55
12
                        None => DEF_PREFETCH,
56
22
                        Some(prefetch) => prefetch,
57
                    },
58
34
                    ..Default::default()
59
34
                },
60
34
                &conn,
61
34
            );
62
34
            {
63
34
                *counter.lock().unwrap() += 1;
64
34
            }
65
34
            Queue::new(opts)?
66
        }
67
18
        Connection::Mqtt(conn, counter) => {
68
18
            let opts = QueueOptions::Mqtt(
69
18
                MqttQueueOptions {
70
18
                    name: queue_name.to_string(),
71
18
                    is_recv: true,
72
18
                    reliable: true,
73
18
                    broadcast: false,
74
18
                    shared_prefix: config.shared_prefix.clone(),
75
18
                    ..Default::default()
76
18
                },
77
18
                &conn,
78
18
            );
79
18
            {
80
18
                *counter.lock().unwrap() += 1;
81
18
            }
82
18
            Queue::new(opts)?
83
        }
84
    };
85
52
    queue.set_handler(handler);
86
52
    queue.set_msg_handler(msg_handler);
87
52
    if let Err(e) = queue.connect() {
88
        return Err(e.to_string());
89
52
    }
90
52
    Ok(queue)
91
64
}
92

            
93
/// Utility function to get the message queue connection instance. A new connection will be created
94
/// if the host does not exist.
95
56
fn get_connection(
96
56
    conn_pool: &mut HashMap<String, Connection>,
97
56
    host_uri: &Url,
98
56
) -> Result<Connection, String> {
99
56
    let uri = host_uri.to_string();
100
56
    if let Some(conn) = conn_pool.get(&uri) {
101
12
        return Ok(conn.clone());
102
44
    }
103
44

            
104
44
    match host_uri.scheme() {
105
44
        "amqp" | "amqps" => {
106
24
            let opts = AmqpConnectionOptions {
107
24
                uri: host_uri.to_string(),
108
24
                ..Default::default()
109
24
            };
110
24
            let mut conn = AmqpConnection::new(opts)?;
111
24
            let _ = conn.connect();
112
24
            let conn = Connection::Amqp(conn, Arc::new(Mutex::new(0)));
113
24
            conn_pool.insert(uri, conn.clone());
114
24
            Ok(conn)
115
        }
116
20
        "mqtt" | "mqtts" => {
117
16
            let opts = MqttConnectionOptions {
118
16
                uri: host_uri.to_string(),
119
16
                ..Default::default()
120
16
            };
121
16
            let mut conn = MqttConnection::new(opts)?;
122
16
            let _ = conn.connect();
123
16
            let conn = Connection::Mqtt(conn, Arc::new(Mutex::new(0)));
124
16
            conn_pool.insert(uri, conn.clone());
125
16
            Ok(conn)
126
        }
127
4
        s => Err(format!("unsupport scheme {}", s)),
128
    }
129
56
}