1
use std::{
2
    collections::HashMap,
3
    fmt,
4
    sync::{Arc, Mutex},
5
};
6

            
7
use url::Url;
8

            
9
use general_mq::{
10
    connection::GmqConnection, AmqpConnection, AmqpConnectionOptions, MqttConnection,
11
    MqttConnectionOptions,
12
};
13

            
14
pub mod data;
15
pub mod emqx;
16
pub mod rabbitmq;
17
pub mod rumqttd;
18

            
19
/// The general connection type with reference counter for upper layer maintenance.
20
#[derive(Clone)]
21
pub enum Connection {
22
    Amqp(AmqpConnection, Arc<Mutex<isize>>),
23
    Mqtt(MqttConnection, Arc<Mutex<isize>>),
24
}
25

            
26
/// Broker message queue type.
27
pub enum QueueType {
28
    Application,
29
    Network,
30
}
31

            
32
impl fmt::Display for QueueType {
33
1018
    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
34
1018
        match *self {
35
403
            QueueType::Application => fmt.write_str("application"),
36
615
            QueueType::Network => fmt.write_str("network"),
37
        }
38
1018
    }
39
}
40

            
41
impl Copy for QueueType {}
42

            
43
impl Clone for QueueType {
44
    fn clone(&self) -> QueueType {
45
        *self
46
    }
47
}
48

            
49
/// Transfer queue type, unit code, application/network code to AMQP virtual host name and queue
50
/// name.
51
1018
pub fn to_username(q_type: QueueType, unit: &str, code: &str) -> String {
52
1018
    format!("{}.{}.{}", q_type, unit_code(unit), code)
53
1018
}
54

            
55
/// Unit code part for queue name.
56
1018
fn unit_code(code: &str) -> &str {
57
1018
    match code {
58
1018
        "" => "_",
59
865
        _ => code,
60
    }
61
1018
}
62

            
63
/// Utility function to get the message queue connection instance. A new connection will be created
64
/// if the host does not exist.
65
3
fn get_connection(
66
3
    conn_pool: &Arc<Mutex<HashMap<String, Connection>>>,
67
3
    host_uri: &Url,
68
3
) -> Result<Connection, String> {
69
3
    let uri = host_uri.to_string();
70
3
    {
71
3
        let mutex = conn_pool.lock().unwrap();
72
3
        if let Some(conn) = mutex.get(&uri) {
73
            return Ok(conn.clone());
74
3
        }
75
3
    }
76
3

            
77
3
    match host_uri.scheme() {
78
3
        "amqp" | "amqps" => {
79
1
            let opts = AmqpConnectionOptions {
80
1
                uri: host_uri.to_string(),
81
1
                ..Default::default()
82
1
            };
83
1
            let mut conn = AmqpConnection::new(opts)?;
84
1
            let _ = conn.connect();
85
1
            let conn = Connection::Amqp(conn, Arc::new(Mutex::new(0)));
86
1
            {
87
1
                conn_pool.lock().unwrap().insert(uri, conn.clone());
88
1
            }
89
1
            Ok(conn)
90
        }
91
2
        "mqtt" | "mqtts" => {
92
2
            let opts = MqttConnectionOptions {
93
2
                uri: host_uri.to_string(),
94
2
                ..Default::default()
95
2
            };
96
2
            let mut conn = MqttConnection::new(opts)?;
97
2
            let _ = conn.connect();
98
2
            let conn = Connection::Mqtt(conn, Arc::new(Mutex::new(0)));
99
2
            {
100
2
                conn_pool.lock().unwrap().insert(uri, conn.clone());
101
2
            }
102
2
            Ok(conn)
103
        }
104
        s => Err(format!("unsupport scheme {}", s)),
105
    }
106
3
}