Line data Source code
1 : /*
2 : Package mq manages queues for applications and networks.
3 :
4 : For applications, the `ApplicationMgr` manages the following kind of queues:
5 : - uldata: uplink data from the broker to the application.
6 : - dldata: downlink data from the application to the broker.
7 : - dldata-resp: the response of downlink data.
8 : - dldata-result: the data process result from the network.
9 :
10 : For networks, the `NetworkMgr` manages the following kind of queues:
11 : - uldata: device uplink data from the network to the broker.
12 : - dldata: downlink data from the broker to the network.
13 : - dldata-result: the data process result from the network.
14 : - ctrl: the control messages from the broker to the network
15 : */
16 : package mq
17 :
18 : import (
19 : "errors"
20 : "fmt"
21 : "net/url"
22 : "sync"
23 :
24 : gmq "github.com/woofdogtw/sylvia-iot-go/general-mq"
25 : )
26 :
27 : // Connection pool. The key is `host` of the message broker.
28 : type ConnectionPool struct {
29 : connections map[string]*counterConnection
30 : mutex sync.Mutex
31 : }
32 :
33 : // Detail queue connection status.
34 : type DataMqStatus struct {
35 : // For `uldata`.
36 : UlData gmq.Status
37 : // For `dldata`.
38 : DlData gmq.Status
39 : // For `dldata-resp`.
40 : DlDataResp gmq.Status
41 : // For `dldata-result`.
42 : DlDataResult gmq.Status
43 : // For `ctrl`.
44 : Ctrl gmq.Status
45 : }
46 :
47 : // The options of the application/network manager.
48 : type Options struct {
49 : // The associated unit ID of the application/network. Empty for public network.
50 : UnitID string `json:"unitId"`
51 : // The associated unit code of the application/network. Empty for public network.
52 : UnitCode string `json:"unitCode"`
53 : // The associated application/network ID.
54 : ID string `json:"id"`
55 : // The associated application/network code.
56 : Name string `json:"name"`
57 : // AMQP prefetch option.
58 : Prefetch uint16 `json:"prefetch"`
59 : // AMQP persistent option.
60 : Persistent bool `json:"persistent"`
61 : // MQTT shared queue prefix option.
62 : SharedPrefix string `json:"sharedPrefix"`
63 : }
64 :
65 : // The connection with queue counter.
66 : type counterConnection struct {
67 : connection gmq.GmqConnection
68 : counter int
69 : mutex sync.Mutex
70 : connType connectionType
71 : }
72 :
73 : // The created data queues.
74 : type dataQueues struct {
75 : // `[prefix].[unit].[code].uldata`.
76 : uldata gmq.GmqQueue
77 : // `[prefix].[unit].[code].dldata`.
78 : dldata gmq.GmqQueue
79 : // `[prefix].[unit].[code].dldata-resp`: for applications only. `nil` for networks.
80 : dldataResp gmq.GmqQueue
81 : // `[prefix].[unit].[code].dldata-result`.
82 : dldataResult gmq.GmqQueue
83 : // `[prefix].[unit].[code].ctrl`.
84 : ctrl gmq.GmqQueue
85 : }
86 :
87 : // Application/Network Manager status.
88 : type MgrStatus int
89 :
90 : // To identify shared connection type.
91 : type connectionType int
92 :
93 : // Manager status.
94 : const (
95 : // One or more queues are not connected.
96 : NotReady MgrStatus = iota
97 : // All queues are connected.
98 : Ready
99 : )
100 :
101 : // Constants.
102 : const (
103 : // Default AMQP prefetch.
104 : defPrefetch = 100
105 : )
106 :
107 : // Connection type.
108 : const (
109 : connTypeAmqp connectionType = iota
110 : connTypeMqtt
111 : )
112 :
113 : // Error response.
114 : const (
115 : errParamCorrID = "the `CorrelationID` must be a non-empty string"
116 : errParamData = "the `Data` must be a hex string"
117 : errParamDataID = "the `DataID` must be a non-empty string"
118 : errParamAppDev = "one of `DeviceID` or [`NetworkCode`, `NetworkAddr`] pair must be provided with non-empty string"
119 : errParamNetDev = "`NetworkAddr` must be a non-empty string"
120 : )
121 :
122 : // Constants.
123 : var (
124 : // Support application/network host schemes.
125 : SupportSchemes = []string{"amqp", "amqps", "mqtt", "mqtts"}
126 : )
127 :
128 0 : func (s MgrStatus) String() string {
129 0 : switch s {
130 0 : case NotReady:
131 0 : return "not ready"
132 0 : case Ready:
133 0 : return "ready"
134 : }
135 0 : return "unknown"
136 : }
137 :
138 65 : func NewConnectionPool() *ConnectionPool {
139 65 : return &ConnectionPool{
140 65 : connections: map[string]*counterConnection{},
141 65 : }
142 65 : }
143 :
144 48 : func (p *ConnectionPool) ForceClear() {
145 48 : p.mutex.Lock()
146 48 : for _, conn := range p.connections {
147 0 : c := conn
148 0 : go func() { _ = c.connection.Close() }()
149 : }
150 48 : p.connections = map[string]*counterConnection{}
151 48 : p.mutex.Unlock()
152 : }
153 :
154 46 : func (c *counterConnection) add(count int) {
155 46 : c.mutex.Lock()
156 46 : c.counter += count
157 46 : c.mutex.Unlock()
158 46 : }
159 :
160 : // Utility function to get the message queue connection instance. A new connection will be created
161 : // if the host does not exist.
162 64 : func getConnection(pool *ConnectionPool, hostUri url.URL) (*counterConnection, error) {
163 66 : if pool == nil {
164 2 : return nil, errors.New("connection pool is nil")
165 2 : }
166 62 : pool.mutex.Lock()
167 62 : defer pool.mutex.Unlock()
168 62 :
169 62 : conn := pool.connections[hostUri.String()]
170 76 : if conn != nil {
171 14 : return conn, nil
172 14 : }
173 :
174 48 : switch hostUri.Scheme {
175 24 : case "amqp", "amqps":
176 24 : opts := gmq.AmqpConnectionOptions{
177 24 : URI: hostUri.String(),
178 24 : }
179 24 : conn, err := gmq.NewAmqpConnection(opts)
180 24 : if err != nil {
181 0 : return nil, err
182 0 : }
183 24 : _ = conn.Connect()
184 24 : retConn := &counterConnection{
185 24 : connection: conn,
186 24 : counter: 0,
187 24 : connType: connTypeAmqp,
188 24 : }
189 24 : pool.connections[hostUri.String()] = retConn
190 24 : return retConn, nil
191 24 : case "mqtt", "mqtts":
192 24 : opts := gmq.MqttConnectionOptions{
193 24 : URI: hostUri.String(),
194 24 : }
195 24 : conn, err := gmq.NewMqttConnection(opts)
196 24 : if err != nil {
197 0 : return nil, err
198 0 : }
199 24 : _ = conn.Connect()
200 24 : retConn := &counterConnection{
201 24 : connection: conn,
202 24 : counter: 0,
203 24 : connType: connTypeMqtt,
204 24 : }
205 24 : pool.connections[hostUri.String()] = retConn
206 24 : return retConn, nil
207 0 : default:
208 0 : return nil, errors.New("unsupport scheme" + hostUri.Scheme)
209 : }
210 : }
211 :
212 : // Utility function to remove connection from the pool if the reference count meet zero.
213 50 : func removeConnection(pool *ConnectionPool, hostUri string, count int) error {
214 50 : pool.mutex.Lock()
215 50 :
216 50 : conn, ok := pool.connections[hostUri]
217 54 : if !ok {
218 4 : pool.mutex.Unlock()
219 4 : return nil
220 4 : }
221 46 : conn.counter -= count
222 48 : if conn.counter > 0 {
223 2 : pool.mutex.Unlock()
224 2 : return nil
225 2 : }
226 44 : delete(pool.connections, hostUri)
227 44 : pool.mutex.Unlock()
228 44 :
229 44 : return conn.connection.Close()
230 : }
231 :
232 : // The utility function for creating application/network queue.
233 : func newDataQueues(
234 : conn *counterConnection,
235 : opts Options,
236 : prefix string,
237 : isNetwork bool,
238 62 : ) (*dataQueues, error) {
239 62 : var uldata gmq.GmqQueue
240 62 : var dldata gmq.GmqQueue
241 62 : var dldataResp gmq.GmqQueue
242 62 : var dldataResult gmq.GmqQueue
243 62 : var ctrl gmq.GmqQueue
244 62 :
245 68 : if opts.UnitID == "" {
246 8 : if opts.UnitCode != "" {
247 2 : return nil, errors.New("UnitID and UnitCode must both empty or non-empty")
248 2 : }
249 56 : } else {
250 60 : if opts.UnitCode == "" {
251 4 : return nil, errors.New("UnitID and UnitCode must both empty or non-empty")
252 4 : }
253 : }
254 62 : if opts.ID == "" {
255 6 : return nil, errors.New("`ID` cannot be empty")
256 6 : }
257 54 : if opts.Name == "" {
258 4 : return nil, errors.New("`Name` cannot be empty")
259 4 : }
260 :
261 46 : unit := opts.UnitCode
262 48 : if unit == "" {
263 2 : unit = "_"
264 2 : }
265 :
266 46 : var err error
267 69 : if conn.connType == connTypeAmqp {
268 23 : prefetch := opts.Prefetch
269 43 : if prefetch == 0 {
270 20 : prefetch = defPrefetch
271 20 : }
272 :
273 23 : uldataOpts := gmq.AmqpQueueOptions{
274 23 : Name: fmt.Sprintf("%s.%s.%s.uldata", prefix, unit, opts.Name),
275 23 : IsRecv: !isNetwork,
276 23 : Reliable: true,
277 23 : Persistent: opts.Persistent,
278 23 : Prefetch: prefetch,
279 23 : }
280 23 : dldataOpts := gmq.AmqpQueueOptions{
281 23 : Name: fmt.Sprintf("%s.%s.%s.dldata", prefix, unit, opts.Name),
282 23 : IsRecv: isNetwork,
283 23 : Reliable: true,
284 23 : Persistent: opts.Persistent,
285 23 : Prefetch: prefetch,
286 23 : }
287 23 : dldataRespOpts := gmq.AmqpQueueOptions{
288 23 : Name: fmt.Sprintf("%s.%s.%s.dldata-resp", prefix, unit, opts.Name),
289 23 : IsRecv: !isNetwork,
290 23 : Reliable: true,
291 23 : Persistent: opts.Persistent,
292 23 : Prefetch: prefetch,
293 23 : }
294 23 : dldataResultOpts := gmq.AmqpQueueOptions{
295 23 : Name: fmt.Sprintf("%s.%s.%s.dldata-result", prefix, unit, opts.Name),
296 23 : IsRecv: !isNetwork,
297 23 : Reliable: true,
298 23 : Persistent: opts.Persistent,
299 23 : Prefetch: prefetch,
300 23 : }
301 23 : ctrlOpts := gmq.AmqpQueueOptions{
302 23 : Name: fmt.Sprintf("%s.%s.%s.ctrl", prefix, unit, opts.Name),
303 23 : IsRecv: true,
304 23 : Reliable: true,
305 23 : Prefetch: prefetch,
306 23 : }
307 23 : _conn := conn.connection.(*gmq.AmqpConnection)
308 23 : if uldata, err = gmq.NewAmqpQueue(uldataOpts, _conn); err != nil {
309 0 : return nil, err
310 23 : } else if dldata, err = gmq.NewAmqpQueue(dldataOpts, _conn); err != nil {
311 0 : return nil, err
312 23 : } else if dldataResp, err = gmq.NewAmqpQueue(dldataRespOpts, _conn); err != nil {
313 0 : return nil, err
314 23 : } else if dldataResult, err = gmq.NewAmqpQueue(dldataResultOpts, _conn); err != nil {
315 0 : return nil, err
316 23 : } else if ctrl, err = gmq.NewAmqpQueue(ctrlOpts, _conn); err != nil {
317 0 : return nil, err
318 0 : }
319 46 : } else if conn.connType == connTypeMqtt {
320 23 : uldataOpts := gmq.MqttQueueOptions{
321 23 : Name: fmt.Sprintf("%s.%s.%s.uldata", prefix, unit, opts.Name),
322 23 : IsRecv: !isNetwork,
323 23 : Reliable: true,
324 23 : SharedPrefix: opts.SharedPrefix,
325 23 : }
326 23 : dldataOpts := gmq.MqttQueueOptions{
327 23 : Name: fmt.Sprintf("%s.%s.%s.dldata", prefix, unit, opts.Name),
328 23 : IsRecv: isNetwork,
329 23 : Reliable: true,
330 23 : SharedPrefix: opts.SharedPrefix,
331 23 : }
332 23 : dldataRespOpts := gmq.MqttQueueOptions{
333 23 : Name: fmt.Sprintf("%s.%s.%s.dldata-resp", prefix, unit, opts.Name),
334 23 : IsRecv: !isNetwork,
335 23 : Reliable: true,
336 23 : SharedPrefix: opts.SharedPrefix,
337 23 : }
338 23 : dldataResultOpts := gmq.MqttQueueOptions{
339 23 : Name: fmt.Sprintf("%s.%s.%s.dldata-result", prefix, unit, opts.Name),
340 23 : IsRecv: !isNetwork,
341 23 : Reliable: true,
342 23 : SharedPrefix: opts.SharedPrefix,
343 23 : }
344 23 : ctrlOpts := gmq.MqttQueueOptions{
345 23 : Name: fmt.Sprintf("%s.%s.%s.ctrl", prefix, unit, opts.Name),
346 23 : IsRecv: true,
347 23 : Reliable: true,
348 23 : SharedPrefix: opts.SharedPrefix,
349 23 : }
350 23 : _conn := conn.connection.(*gmq.MqttConnection)
351 23 : if uldata, err = gmq.NewMqttQueue(uldataOpts, _conn); err != nil {
352 0 : return nil, err
353 23 : } else if dldata, err = gmq.NewMqttQueue(dldataOpts, _conn); err != nil {
354 0 : return nil, err
355 23 : } else if dldataResp, err = gmq.NewMqttQueue(dldataRespOpts, _conn); err != nil {
356 0 : return nil, err
357 23 : } else if dldataResult, err = gmq.NewMqttQueue(dldataResultOpts, _conn); err != nil {
358 0 : return nil, err
359 23 : } else if ctrl, err = gmq.NewMqttQueue(ctrlOpts, _conn); err != nil {
360 0 : return nil, err
361 0 : }
362 0 : } else {
363 0 : return nil, errors.New("unknown shared connection type")
364 0 : }
365 :
366 46 : return &dataQueues{
367 46 : uldata: uldata,
368 46 : dldata: dldata,
369 46 : dldataResp: dldataResp,
370 46 : dldataResult: dldataResult,
371 46 : ctrl: ctrl,
372 46 : }, nil
373 : }
|