1
//! General purposed interfaces for message queues. Now we provide the following implementations:
2
//!
3
//! - AMQP 0-9-1
4
//! - MQTT
5
//!
6
//! By using these classes, you can configure queues with the following properties:
7
//!
8
//! - Unicast or broadcast.
9
//! - Reliable or best-effort.
10
//!
11
//! **Notes**
12
//!
13
//! - MQTT uses **shared queues** to implement unicast.
14
//! - AMQP uses **confirm channels** to implement reliable publish, and MQTT uses **QoS 1** to
15
//!   implement reliable publish/subscribe.
16
//!
17
//! # Relationships of Connections and Queues
18
//!
19
//! The term **connection** describes a TCP/TLS connection to the message broker.
20
//! The term **queue** describes a message queue or a topic within a connection.
21
//! You can use one connection to manage multiple queues, or one connection to manage one queue.
22
//!
23
//! A queue can only be a receiver or a sender at a time.
24
//!
25
//! ### Connections for sender/receiver queues with the same name
26
//!
27
//! The sender and the receiver are usually different programs, there are two connections to hold
28
//! two queues.
29
//!
30
//! For the special case that a program acts both the sender and the receiver using the same queue:
31
//!
32
//! - The AMQP implementation uses one **Channel** for one queue, so the program can manages all
33
//!   queues with one connection.
34
//! - The MQTT implementation **MUST** uses one connection for one queue, or both sender and
35
//!   receiver will receive packets.
36
//!
37
//! # Test
38
//!
39
//! Please prepare a [RabbitMQ](https://www.rabbitmq.com/) broker and a [EMQX](https://emqx.io/)
40
//! broker at **localhost** for testing.
41
//!
42
//! - To install using Docker:
43
//!
44
//!       $ docker run --rm --name rabbitmq -d -p 5672:5672 rabbitmq:management-alpine
45
//!       $ docker run --rm --name emqx -d -p 1883:1883 emqx/emqx
46
//!
47
//! Then run the test:
48
//!
49
//!     $ cargo test --test integration_test -- --nocapture
50
//!
51
//! # Example
52
//!
53
//! Run RabbitMQ and then run AMQP example:
54
//!
55
//!     $ cargo run --example simple
56
//!
57
//! Run EMQX and then run MQTT example:
58
//!
59
//!     $ RUN_MQTT= cargo run --example simple
60

            
61
use std::{error::Error as StdError, fmt, sync::Arc};
62

            
63
use async_trait::async_trait;
64
use rand::{distributions::Alphanumeric, thread_rng, Rng};
65

            
66
pub mod connection;
67
pub mod queue;
68

            
69
mod amqp;
70
mod mqtt;
71

            
72
pub use amqp::{AmqpConnection, AmqpConnectionOptions, AmqpQueue, AmqpQueueOptions};
73
pub use mqtt::{MqttConnection, MqttConnectionOptions, MqttQueue, MqttQueueOptions};
74
use queue::{EventHandler, GmqQueue, MessageHandler, Status};
75

            
76
/// general-mq error.
77
#[derive(Clone, Debug)]
78
pub enum Error {
79
    /// The queue does not have [`MessageHandler`].
80
    NoMsgHandler,
81
    /// The connection is not connected or the queue (topic) is not
82
    /// connected (declared/subscribed).
83
    NotConnected,
84
    /// The queue is a receiver that cannot send messages.
85
    QueueIsReceiver,
86
}
87

            
88
#[derive(Clone)]
89
pub enum Queue {
90
    Amqp(AmqpQueue),
91
    Mqtt(MqttQueue),
92
}
93

            
94
#[derive(Clone)]
95
pub enum QueueOptions<'a> {
96
    Amqp(AmqpQueueOptions, &'a AmqpConnection),
97
    Mqtt(MqttQueueOptions, &'a MqttConnection),
98
}
99

            
100
/// Identifier length of inner handlers.
101
pub(crate) const ID_SIZE: usize = 24;
102

            
103
impl fmt::Display for Error {
104
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
105
        match *self {
106
            Error::NoMsgHandler => write!(f, "no message handler"),
107
            Error::NotConnected => write!(f, "not connected"),
108
            Error::QueueIsReceiver => write!(f, "this queue is a receiver"),
109
        }
110
    }
111
}
112

            
113
impl StdError for Error {}
114

            
115
impl Queue {
116
27
    pub fn new(opts: QueueOptions) -> Result<Self, String> {
117
27
        match opts {
118
14
            QueueOptions::Amqp(opts, conn) => Ok(Queue::Amqp(AmqpQueue::new(opts, conn)?)),
119
13
            QueueOptions::Mqtt(opts, conn) => Ok(Queue::Mqtt(MqttQueue::new(opts, conn)?)),
120
        }
121
27
    }
122
}
123

            
124
#[async_trait]
125
impl GmqQueue for Queue {
126
4
    fn name(&self) -> &str {
127
4
        match self {
128
2
            Queue::Amqp(q) => q.name(),
129
2
            Queue::Mqtt(q) => q.name(),
130
        }
131
4
    }
132

            
133
4
    fn is_recv(&self) -> bool {
134
4
        match self {
135
2
            Queue::Amqp(q) => q.is_recv(),
136
2
            Queue::Mqtt(q) => q.is_recv(),
137
        }
138
4
    }
139

            
140
912
    fn status(&self) -> Status {
141
912
        match self {
142
458
            Queue::Amqp(q) => q.status(),
143
454
            Queue::Mqtt(q) => q.status(),
144
        }
145
912
    }
146

            
147
6
    fn set_handler(&mut self, handler: Arc<dyn EventHandler>) {
148
6
        match self {
149
3
            Queue::Amqp(q) => q.set_handler(handler),
150
3
            Queue::Mqtt(q) => q.set_handler(handler),
151
        }
152
6
    }
153

            
154
2
    fn clear_handler(&mut self) {
155
2
        match self {
156
1
            Queue::Amqp(q) => q.clear_handler(),
157
1
            Queue::Mqtt(q) => q.clear_handler(),
158
        }
159
2
    }
160

            
161
12
    fn set_msg_handler(&mut self, handler: Arc<dyn MessageHandler>) {
162
12
        match self {
163
6
            Queue::Amqp(q) => q.set_msg_handler(handler),
164
6
            Queue::Mqtt(q) => q.set_msg_handler(handler),
165
        }
166
12
    }
167

            
168
16
    fn connect(&mut self) -> Result<(), Box<dyn StdError>> {
169
16
        match self {
170
8
            Queue::Amqp(q) => q.connect(),
171
8
            Queue::Mqtt(q) => q.connect(),
172
        }
173
16
    }
174

            
175
18
    async fn close(&mut self) -> Result<(), Box<dyn StdError + Send + Sync>> {
176
18
        match self {
177
18
            Queue::Amqp(q) => q.close().await,
178
18
            Queue::Mqtt(q) => q.close().await,
179
18
        }
180
18
    }
181

            
182
4
    async fn send_msg(&self, payload: Vec<u8>) -> Result<(), Box<dyn StdError + Send + Sync>> {
183
4
        match self {
184
4
            Queue::Amqp(q) => q.send_msg(payload).await,
185
4
            Queue::Mqtt(q) => q.send_msg(payload).await,
186
4
        }
187
4
    }
188
}
189

            
190
/// Generate random alphanumeric with the specified length.
191
53
pub fn randomstring(len: usize) -> String {
192
53
    let mut rng = thread_rng();
193
53
    std::iter::repeat(())
194
676
        .map(|()| rng.sample(Alphanumeric))
195
53
        .map(char::from)
196
53
        .take(len)
197
53
        .collect()
198
53
}