1
use std::{
2
    collections::HashMap,
3
    net::{IpAddr, Ipv4Addr, SocketAddr},
4
    thread::{self, JoinHandle as ThreadHandle},
5
};
6

            
7
use rumqttd::{
8
    Broker, Config, ConnectionSettings, ConsoleSettings, RouterConfig, ServerSettings, TlsConfig,
9
};
10

            
11
use sylvia_iot_corelib::server_config::Config as SylviaServerConfig;
12

            
13
use super::super::config::{
14
    Rumqttd, DEF_RUMQTTD_CONSOLE_PORT, DEF_RUMQTTD_MQTTS_PORT, DEF_RUMQTTD_MQTT_PORT,
15
};
16

            
17
/// To start a rumqttd broker.
18
3
pub fn start_rumqttd(
19
3
    server_conf: &SylviaServerConfig,
20
3
    rumqttd_conf: &Rumqttd,
21
3
) -> (ThreadHandle<()>, ThreadHandle<()>) {
22
3
    let mut console_setting = ConsoleSettings::default();
23
3
    console_setting.listen = match rumqttd_conf.console_port {
24
        None => format!("0.0.0.0:{}", DEF_RUMQTTD_CONSOLE_PORT),
25
3
        Some(port) => format!("0.0.0.0:{}", port),
26
    };
27
3
    let mut config = Config {
28
3
        router: RouterConfig {
29
3
            max_connections: 10000,
30
3
            max_outgoing_packet_count: 200,
31
3
            max_segment_size: 104857600,
32
3
            max_segment_count: 10,
33
3
            ..Default::default()
34
3
        },
35
3
        v4: Some(HashMap::new()),
36
3
        console: Some(console_setting),
37
3
        ..Default::default()
38
3
    };
39
    {
40
3
        if let Some(v4) = config.v4.as_mut() {
41
3
            v4.insert(
42
3
                "mqtt".to_string(),
43
3
                ServerSettings {
44
3
                    name: "mqtt".to_string(),
45
3
                    listen: match rumqttd_conf.mqtt_port {
46
                        None => SocketAddr::new(
47
                            IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
48
                            DEF_RUMQTTD_MQTT_PORT,
49
                        ),
50
3
                        Some(port) => SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), port),
51
                    },
52
3
                    tls: None,
53
3
                    next_connection_delay_ms: 1,
54
3
                    connections: ConnectionSettings {
55
3
                        connection_timeout_ms: 5000,
56
3
                        max_payload_size: 1 * 1024 * 1024,
57
3
                        max_inflight_count: 200,
58
3
                        auth: None,
59
3
                        external_auth: None,
60
3
                        dynamic_filters: true,
61
3
                    },
62
                },
63
            );
64
        }
65
    }
66
3
    if let Some(cacert_file) = server_conf.cacert_file.as_ref() {
67
        if let Some(cert_file) = server_conf.cert_file.as_ref() {
68
            if let Some(key_file) = server_conf.key_file.as_ref() {
69
                if let Some(v4) = config.v4.as_mut() {
70
                    v4.insert(
71
                        "mqtts".to_string(),
72
                        ServerSettings {
73
                            name: "mqtts".to_string(),
74
                            listen: match rumqttd_conf.mqtt_port {
75
                                None => SocketAddr::new(
76
                                    IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
77
                                    DEF_RUMQTTD_MQTTS_PORT,
78
                                ),
79
                                Some(port) => {
80
                                    SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), port)
81
                                }
82
                            },
83
                            tls: Some(TlsConfig::Rustls {
84
                                capath: Some(cacert_file.clone()),
85
                                certpath: cert_file.clone(),
86
                                keypath: key_file.clone(),
87
                            }),
88
                            next_connection_delay_ms: 1,
89
                            connections: ConnectionSettings {
90
                                connection_timeout_ms: 5000,
91
                                max_payload_size: 1 * 1024 * 1024,
92
                                max_inflight_count: 200,
93
                                auth: None,
94
                                external_auth: None,
95
                                dynamic_filters: true,
96
                            },
97
                        },
98
                    );
99
                }
100
            }
101
        }
102
3
    }
103

            
104
3
    let mut broker = Broker::new(config);
105
3
    let (mut link_tx, mut link_rx) = broker.link("sylvia-iot-core").unwrap();
106
3
    let router_handle = thread::spawn(move || {
107
3
        let _ = broker.start();
108
3
    });
109
3
    let _ = link_tx.subscribe("#");
110
8
    let rx_handle = thread::spawn(move || loop {
111
8
        let _ = link_rx.id(); // XXX: add this line to prevent not ACK notifications.
112
8
        let _ = link_rx.recv().unwrap();
113
8
    });
114
3

            
115
3
    (router_handle, rx_handle)
116
3
}