1
use std::{
2
    collections::HashMap,
3
    sync::{Arc, Mutex},
4
};
5

            
6
use url::Url;
7

            
8
use general_mq::{
9
    connection::GmqConnection,
10
    queue::{EventHandler, GmqQueue, MessageHandler},
11
    AmqpConnection, AmqpConnectionOptions, AmqpQueueOptions, MqttConnection, MqttConnectionOptions,
12
    MqttQueueOptions, Queue, QueueOptions,
13
};
14

            
15
pub mod broker;
16
pub mod coremgr;
17

            
18
use super::config::DataData as DataMqConfig;
19

            
20
/// The general connection type with reference counter for upper layer maintenance.
21
#[derive(Clone)]
22
pub enum Connection {
23
    Amqp(AmqpConnection, Arc<Mutex<isize>>),
24
    Mqtt(MqttConnection, Arc<Mutex<isize>>),
25
}
26

            
27
/// The default prefetch value for AMQP.
28
const DEF_PREFETCH: u16 = 100;
29

            
30
/// To create a reliable unicast queue to receive data messages.
31
128
fn new_data_queue(
32
128
    conn_pool: &mut HashMap<String, Connection>,
33
128
    config: &DataMqConfig,
34
128
    queue_name: &str,
35
128
    handler: Arc<dyn EventHandler>,
36
128
    msg_handler: Arc<dyn MessageHandler>,
37
128
) -> Result<Queue, String> {
38
128
    let host_uri = match config.url.as_ref() {
39
8
        None => return Err("host_uri empty".to_string()),
40
120
        Some(host_uri) => match Url::parse(host_uri) {
41
8
            Err(e) => return Err(format!("host_uri error: {}", e)),
42
112
            Ok(uri) => uri,
43
        },
44
    };
45
112
    let conn = get_connection(conn_pool, &host_uri)?;
46
104
    let mut queue = match conn {
47
68
        Connection::Amqp(conn, counter) => {
48
68
            let opts = QueueOptions::Amqp(
49
                AmqpQueueOptions {
50
68
                    name: queue_name.to_string(),
51
68
                    is_recv: true,
52
68
                    reliable: true,
53
68
                    broadcast: false,
54
68
                    prefetch: match config.prefetch {
55
24
                        None => DEF_PREFETCH,
56
44
                        Some(prefetch) => prefetch,
57
                    },
58
68
                    ..Default::default()
59
68
                },
60
68
                &conn,
61
68
            );
62
68
            {
63
68
                *counter.lock().unwrap() += 1;
64
68
            }
65
68
            Queue::new(opts)?
66
        }
67
36
        Connection::Mqtt(conn, counter) => {
68
36
            let opts = QueueOptions::Mqtt(
69
36
                MqttQueueOptions {
70
36
                    name: queue_name.to_string(),
71
36
                    is_recv: true,
72
36
                    reliable: true,
73
36
                    broadcast: false,
74
36
                    shared_prefix: config.shared_prefix.clone(),
75
36
                    ..Default::default()
76
36
                },
77
36
                &conn,
78
36
            );
79
36
            {
80
36
                *counter.lock().unwrap() += 1;
81
36
            }
82
36
            Queue::new(opts)?
83
        }
84
    };
85
104
    queue.set_handler(handler);
86
104
    queue.set_msg_handler(msg_handler);
87
104
    if let Err(e) = queue.connect() {
88
        return Err(e.to_string());
89
104
    }
90
104
    Ok(queue)
91
128
}
92

            
93
/// Utility function to get the message queue connection instance. A new connection will be created
94
/// if the host does not exist.
95
112
fn get_connection(
96
112
    conn_pool: &mut HashMap<String, Connection>,
97
112
    host_uri: &Url,
98
112
) -> Result<Connection, String> {
99
112
    let uri = host_uri.to_string();
100
112
    if let Some(conn) = conn_pool.get(&uri) {
101
24
        return Ok(conn.clone());
102
88
    }
103
88

            
104
88
    match host_uri.scheme() {
105
88
        "amqp" | "amqps" => {
106
48
            let opts = AmqpConnectionOptions {
107
48
                uri: host_uri.to_string(),
108
48
                ..Default::default()
109
48
            };
110
48
            let mut conn = AmqpConnection::new(opts)?;
111
48
            let _ = conn.connect();
112
48
            let conn = Connection::Amqp(conn, Arc::new(Mutex::new(0)));
113
48
            conn_pool.insert(uri, conn.clone());
114
48
            Ok(conn)
115
        }
116
40
        "mqtt" | "mqtts" => {
117
32
            let opts = MqttConnectionOptions {
118
32
                uri: host_uri.to_string(),
119
32
                ..Default::default()
120
32
            };
121
32
            let mut conn = MqttConnection::new(opts)?;
122
32
            let _ = conn.connect();
123
32
            let conn = Connection::Mqtt(conn, Arc::new(Mutex::new(0)));
124
32
            conn_pool.insert(uri, conn.clone());
125
32
            Ok(conn)
126
        }
127
8
        s => Err(format!("unsupport scheme {}", s)),
128
    }
129
112
}