1
use std::{
2
    collections::HashMap,
3
    fmt,
4
    sync::{Arc, Mutex},
5
};
6

            
7
use url::Url;
8

            
9
use general_mq::{
10
    AmqpConnection, AmqpConnectionOptions, MqttConnection, MqttConnectionOptions,
11
    connection::GmqConnection,
12
};
13

            
14
pub mod data;
15
pub mod emqx;
16
pub mod rabbitmq;
17
pub mod rumqttd;
18

            
19
/// The general connection type with reference counter for upper layer maintenance.
20
#[derive(Clone)]
21
pub enum Connection {
22
    Amqp(AmqpConnection, Arc<Mutex<isize>>),
23
    Mqtt(MqttConnection, Arc<Mutex<isize>>),
24
}
25

            
26
/// Broker message queue type.
27
pub enum QueueType {
28
    Application,
29
    Network,
30
}
31

            
32
impl fmt::Display for QueueType {
33
2056
    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
34
2056
        match *self {
35
834
            QueueType::Application => fmt.write_str("application"),
36
1222
            QueueType::Network => fmt.write_str("network"),
37
        }
38
2056
    }
39
}
40

            
41
impl Copy for QueueType {}
42

            
43
impl Clone for QueueType {
44
    fn clone(&self) -> QueueType {
45
        *self
46
    }
47
}
48

            
49
/// Transfer queue type, unit code, application/network code to AMQP virtual host name and queue
50
/// name.
51
2056
pub fn to_username(q_type: QueueType, unit: &str, code: &str) -> String {
52
2056
    format!("{}.{}.{}", q_type, unit_code(unit), code)
53
2056
}
54

            
55
/// Unit code part for queue name.
56
2056
fn unit_code(code: &str) -> &str {
57
2056
    match code {
58
2056
        "" => "_",
59
1754
        _ => code,
60
    }
61
2056
}
62

            
63
/// Utility function to get the message queue connection instance. A new connection will be created
64
/// if the host does not exist.
65
6
fn get_connection(
66
6
    conn_pool: &Arc<Mutex<HashMap<String, Connection>>>,
67
6
    host_uri: &Url,
68
6
) -> Result<Connection, String> {
69
6
    let uri = host_uri.to_string();
70
6
    {
71
6
        let mutex = conn_pool.lock().unwrap();
72
6
        if let Some(conn) = mutex.get(&uri) {
73
            return Ok(conn.clone());
74
6
        }
75
6
    }
76
6

            
77
6
    match host_uri.scheme() {
78
6
        "amqp" | "amqps" => {
79
2
            let opts = AmqpConnectionOptions {
80
2
                uri: host_uri.to_string(),
81
2
                ..Default::default()
82
2
            };
83
2
            let mut conn = AmqpConnection::new(opts)?;
84
2
            let _ = conn.connect();
85
2
            let conn = Connection::Amqp(conn, Arc::new(Mutex::new(0)));
86
2
            {
87
2
                conn_pool.lock().unwrap().insert(uri, conn.clone());
88
2
            }
89
2
            Ok(conn)
90
        }
91
4
        "mqtt" | "mqtts" => {
92
4
            let opts = MqttConnectionOptions {
93
4
                uri: host_uri.to_string(),
94
4
                ..Default::default()
95
4
            };
96
4
            let mut conn = MqttConnection::new(opts)?;
97
4
            let _ = conn.connect();
98
4
            let conn = Connection::Mqtt(conn, Arc::new(Mutex::new(0)));
99
4
            {
100
4
                conn_pool.lock().unwrap().insert(uri, conn.clone());
101
4
            }
102
4
            Ok(conn)
103
        }
104
        s => Err(format!("unsupport scheme {}", s)),
105
    }
106
6
}