1
use std::{
2
    collections::HashMap,
3
    fmt,
4
    sync::{Arc, Mutex},
5
};
6

            
7
use url::Url;
8

            
9
use general_mq::{
10
    AmqpConnection, AmqpConnectionOptions, MqttConnection, MqttConnectionOptions,
11
    connection::GmqConnection,
12
};
13

            
14
pub mod data;
15
pub mod emqx;
16
pub mod rabbitmq;
17
pub mod rumqttd;
18

            
19
/// The general connection type with reference counter for upper layer maintenance.
20
#[derive(Clone)]
21
pub enum Connection {
22
    Amqp(AmqpConnection, Arc<Mutex<isize>>),
23
    Mqtt(MqttConnection, Arc<Mutex<isize>>),
24
}
25

            
26
/// Broker message queue type.
27
#[derive(Clone, Copy)]
28
pub enum QueueType {
29
    Application,
30
    Network,
31
}
32

            
33
impl fmt::Display for QueueType {
34
1944
    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
35
1944
        match *self {
36
740
            QueueType::Application => fmt.write_str("application"),
37
1204
            QueueType::Network => fmt.write_str("network"),
38
        }
39
1944
    }
40
}
41

            
42
/// Transfer queue type, unit code, application/network code to AMQP virtual host name and queue
43
/// name.
44
1944
pub fn to_username(q_type: QueueType, unit: &str, code: &str) -> String {
45
1944
    format!("{}.{}.{}", q_type, unit_code(unit), code)
46
1944
}
47

            
48
/// Unit code part for queue name.
49
1944
fn unit_code(code: &str) -> &str {
50
1944
    match code {
51
1944
        "" => "_",
52
1642
        _ => code,
53
    }
54
1944
}
55

            
56
/// Utility function to get the message queue connection instance. A new connection will be created
57
/// if the host does not exist.
58
6
fn get_connection(
59
6
    conn_pool: &Arc<Mutex<HashMap<String, Connection>>>,
60
6
    host_uri: &Url,
61
6
) -> Result<Connection, String> {
62
6
    let uri = host_uri.to_string();
63
    {
64
6
        let mutex = conn_pool.lock().unwrap();
65
6
        if let Some(conn) = mutex.get(&uri) {
66
            return Ok(conn.clone());
67
6
        }
68
    }
69

            
70
6
    match host_uri.scheme() {
71
6
        "amqp" | "amqps" => {
72
2
            let opts = AmqpConnectionOptions {
73
2
                uri: host_uri.to_string(),
74
2
                ..Default::default()
75
2
            };
76
2
            let mut conn = AmqpConnection::new(opts)?;
77
2
            let _ = conn.connect();
78
2
            let conn = Connection::Amqp(conn, Arc::new(Mutex::new(0)));
79
2
            {
80
2
                conn_pool.lock().unwrap().insert(uri, conn.clone());
81
2
            }
82
2
            Ok(conn)
83
        }
84
4
        "mqtt" | "mqtts" => {
85
4
            let opts = MqttConnectionOptions {
86
4
                uri: host_uri.to_string(),
87
4
                ..Default::default()
88
4
            };
89
4
            let mut conn = MqttConnection::new(opts)?;
90
4
            let _ = conn.connect();
91
4
            let conn = Connection::Mqtt(conn, Arc::new(Mutex::new(0)));
92
4
            {
93
4
                conn_pool.lock().unwrap().insert(uri, conn.clone());
94
4
            }
95
4
            Ok(conn)
96
        }
97
        s => Err(format!("unsupport scheme {}", s)),
98
    }
99
6
}