Line data Source code
1 : package mq
2 :
3 : import (
4 : "encoding/hex"
5 : "encoding/json"
6 : "errors"
7 : "net/url"
8 : "sync"
9 : "time"
10 :
11 : "github.com/tidwall/gjson"
12 :
13 : gmq "github.com/woofdogtw/sylvia-iot-go/general-mq"
14 : "github.com/woofdogtw/sylvia-iot-go/sdk/constants"
15 : )
16 :
17 : // Uplink data from network to broker.
18 : type NetUlData struct {
19 : Time time.Time
20 : NetworkAddr string
21 : Data []byte
22 : Extension map[string]interface{}
23 : }
24 :
25 : // Downlink data from broker to network.
26 : type NetDlData struct {
27 : DataID string
28 : Pub time.Time
29 : ExpiresIn int64
30 : NetworkAddr string
31 : Data []byte
32 : Extension map[string]interface{}
33 : }
34 :
35 : // Downlink data result when processing or completing data transfer to the device.
36 : type NetDlDataResult struct {
37 : DataID string `json:"dataId"`
38 : Status int `json:"status"`
39 : Message string `json:"message,omitempty"`
40 : }
41 :
42 : // `add-device` control data.
43 : type NetCtrlAddDevice struct {
44 : NetworkAddr string `json:"networkAddr"`
45 : }
46 :
47 : // `add-device-bulk` control data.
48 : type NetCtrlAddDeviceBulk struct {
49 : NetworkAddrs []string `json:"networkAddrs"`
50 : }
51 :
52 : // `add-device-range` control data.
53 : type NetCtrlAddDeviceRange struct {
54 : StartAddr string `json:"startAddr"`
55 : EndAddr string `json:"endAddr"`
56 : }
57 :
58 : // `del-device` control data.
59 : type NetCtrlDelDevice struct {
60 : NetworkAddr string `json:"networkAddr"`
61 : }
62 :
63 : // `del-device-bulk` control data.
64 : type NetCtrlDelDeviceBulk struct {
65 : NetworkAddrs []string `json:"networkAddrs"`
66 : }
67 :
68 : // `del-device-range` control data.
69 : type NetCtrlDelDeviceRange struct {
70 : StartAddr string `json:"startAddr"`
71 : EndAddr string `json:"endAddr"`
72 : }
73 :
74 : // The manager for network queues.
75 : type NetworkMgr struct {
76 : opts Options
77 :
78 : // Information for delete connection automatically.
79 : connPool *ConnectionPool
80 : hostUri string
81 :
82 : uldata gmq.GmqQueue
83 : dldata gmq.GmqQueue
84 : dldataResult gmq.GmqQueue
85 : ctrl gmq.GmqQueue
86 :
87 : status MgrStatus
88 : statusMutex sync.Mutex
89 : handler NetMgrEventHandler
90 : }
91 :
92 : // Event handler interface for the `NetworkMgr`.
93 : type NetMgrEventHandler interface {
94 : // Fired when one of the manager's queues encounters a state change.
95 : OnStatusChange(mgr *NetworkMgr, status MgrStatus)
96 :
97 : // Fired when a `DlData` data is received.
98 : //
99 : // Return error will NACK the data.
100 : // The data may will be received again depending on the protocol (such as AMQP).
101 : OnDlData(mgr *NetworkMgr, data *NetDlData) error
102 :
103 : // Fired when a `add-device` control data is received.
104 : //
105 : // Return error will NACK the data.
106 : // The data may will be received again depending on the protocol (such as AMQP).
107 : OnCtrlAddDevice(mgr *NetworkMgr, time time.Time, new *NetCtrlAddDevice) error
108 :
109 : // Fired when a `add-device-bulk` control data is received.
110 : //
111 : // Return error will NACK the data.
112 : // The data may will be received again depending on the protocol (such as AMQP).
113 : OnCtrlAddDeviceBulk(mgr *NetworkMgr, time time.Time, new *NetCtrlAddDeviceBulk) error
114 :
115 : // Fired when a `add-device-range` control data is received.
116 : //
117 : // Return error will NACK the data.
118 : // The data may will be received again depending on the protocol (such as AMQP).
119 : OnCtrlAddDeviceRange(mgr *NetworkMgr, time time.Time, new *NetCtrlAddDeviceRange) error
120 :
121 : // Fired when a `del-device` control data is received.
122 : //
123 : // Return error will NACK the data.
124 : // The data may will be received again depending on the protocol (such as AMQP).
125 : OnCtrlDelDevice(mgr *NetworkMgr, time time.Time, new *NetCtrlDelDevice) error
126 :
127 : // Fired when a `del-device-bulk` control data is received.
128 : //
129 : // Return error will NACK the data.
130 : // The data may will be received again depending on the protocol (such as AMQP).
131 : OnCtrlDelDeviceBulk(mgr *NetworkMgr, time time.Time, new *NetCtrlDelDeviceBulk) error
132 :
133 : // Fired when a `del-device-range` control data is received.
134 : //
135 : // Return error will NACK the data.
136 : // The data may will be received again depending on the protocol (such as AMQP).
137 : OnCtrlDelDeviceRange(mgr *NetworkMgr, time time.Time, new *NetCtrlDelDeviceRange) error
138 : }
139 :
140 : // The event handler for `gmq.GmqQueue`.
141 : type netMgrMqEventHandler struct {
142 : mgr *NetworkMgr
143 : }
144 :
145 : type netUlDataInner struct {
146 : Time string `json:"time"`
147 : NetworkAddr string `json:"networkAddr"`
148 : Data string `json:"data"`
149 : Extension map[string]interface{} `json:"extension,omitempty"`
150 : }
151 :
152 : type netDlDataInner struct {
153 : DataID string `json:"dataId"`
154 : Pub string `json:"pub"`
155 : ExpiresIn int64 `json:"expiresIn"`
156 : NetworkAddr string `json:"networkAddr"`
157 : Data string `json:"data"`
158 : Extension map[string]interface{} `json:"extension,omitempty"`
159 : }
160 :
161 : // Constants.
162 : const (
163 : netQueuePrefix = "broker.network"
164 : )
165 :
166 : var _ gmq.QueueEventHandler = (*netMgrMqEventHandler)(nil)
167 : var _ gmq.QueueMessageHandler = (*netMgrMqEventHandler)(nil)
168 :
169 : func NewNetworkMgr(
170 : connPool *ConnectionPool,
171 : hostUri url.URL,
172 : opts Options,
173 : handler NetMgrEventHandler,
174 36 : ) (*NetworkMgr, error) {
175 36 : conn, err := getConnection(connPool, hostUri)
176 38 : if err != nil {
177 2 : return nil, err
178 2 : }
179 :
180 34 : queues, err := newDataQueues(conn, opts, netQueuePrefix, true)
181 44 : if err != nil {
182 10 : return nil, err
183 10 : }
184 :
185 24 : mgr := &NetworkMgr{
186 24 : opts: opts,
187 24 : connPool: connPool,
188 24 : hostUri: hostUri.String(),
189 24 : uldata: queues.uldata,
190 24 : dldata: queues.dldata,
191 24 : dldataResult: queues.dldataResult,
192 24 : ctrl: queues.ctrl,
193 24 : status: NotReady,
194 24 : handler: handler,
195 24 : }
196 24 : mqHandler := &netMgrMqEventHandler{mgr: mgr}
197 24 : mgr.uldata.SetHandler(mqHandler)
198 24 : if err := mgr.uldata.Connect(); err != nil {
199 0 : return nil, err
200 0 : }
201 24 : mgr.dldata.SetHandler(mqHandler)
202 24 : mgr.dldata.SetMsgHandler(mqHandler)
203 24 : if err := mgr.dldata.Connect(); err != nil {
204 0 : return nil, err
205 0 : }
206 24 : mgr.dldataResult.SetHandler(mqHandler)
207 24 : if err := mgr.dldataResult.Connect(); err != nil {
208 0 : return nil, err
209 0 : }
210 24 : mgr.ctrl.SetHandler(mqHandler)
211 24 : mgr.ctrl.SetMsgHandler(mqHandler)
212 24 : if err := mgr.ctrl.Connect(); err != nil {
213 0 : return nil, err
214 0 : }
215 24 : conn.add(4)
216 24 : return mgr, nil
217 : }
218 :
219 : // The associated unit ID of the network.
220 2 : func (mgr *NetworkMgr) UnitID() string {
221 2 : return mgr.opts.UnitID
222 2 : }
223 :
224 : // The associated unit code of the network.
225 2 : func (mgr *NetworkMgr) UnitCode() string {
226 2 : return mgr.opts.UnitCode
227 2 : }
228 :
229 : // The network ID.
230 2 : func (mgr *NetworkMgr) ID() string {
231 2 : return mgr.opts.ID
232 2 : }
233 :
234 : // The network code.
235 2 : func (mgr *NetworkMgr) Name() string {
236 2 : return mgr.opts.Name
237 2 : }
238 :
239 : // Manager status.
240 1590 : func (mgr *NetworkMgr) Status() MgrStatus {
241 1590 : return mgr.status
242 1590 : }
243 :
244 : // Detail status of each message queue. Please ignore `DlDataResp`.
245 2 : func (mgr *NetworkMgr) MqStatus() DataMqStatus {
246 2 : return DataMqStatus{
247 2 : UlData: mgr.uldata.Status(),
248 2 : DlData: mgr.dldata.Status(),
249 2 : DlDataResp: gmq.Closed,
250 2 : DlDataResult: mgr.dldataResult.Status(),
251 2 : Ctrl: mgr.ctrl.Status(),
252 2 : }
253 2 : }
254 :
255 : // To close the manager queues.
256 : // The underlying connection will be closed when there are no queues use it.
257 26 : func (mgr *NetworkMgr) Close() error {
258 26 : if err := mgr.uldata.Close(); err != nil {
259 0 : return err
260 26 : } else if err = mgr.dldata.Close(); err != nil {
261 0 : return err
262 26 : } else if err = mgr.dldataResult.Close(); err != nil {
263 0 : return err
264 26 : } else if err = mgr.ctrl.Close(); err != nil {
265 0 : return err
266 0 : }
267 :
268 26 : return removeConnection(mgr.connPool, mgr.hostUri, 4)
269 : }
270 :
271 : // Send uplink data `UlData` to the broker.
272 6 : func (mgr *NetworkMgr) SendUlData(data NetUlData) error {
273 8 : if data.NetworkAddr == "" {
274 2 : return errors.New(errParamNetDev)
275 2 : }
276 :
277 4 : msg := netUlDataInner{
278 4 : Time: data.Time.Format(constants.TimeFormat),
279 4 : NetworkAddr: data.NetworkAddr,
280 4 : Data: hex.EncodeToString(data.Data),
281 4 : Extension: data.Extension,
282 4 : }
283 4 : payload, err := json.Marshal(msg)
284 4 : if err != nil {
285 0 : return err
286 0 : }
287 :
288 8 : go func() {
289 4 : _ = mgr.uldata.SendMsg(payload)
290 4 : }()
291 4 : return nil
292 : }
293 :
294 : // Send uplink data `DlDataResult` to the broker.
295 6 : func (mgr *NetworkMgr) SendDlDataResult(data NetDlDataResult) error {
296 8 : if data.DataID == "" {
297 2 : return errors.New(errParamDataID)
298 2 : }
299 :
300 4 : payload, err := json.Marshal(data)
301 4 : if err != nil {
302 0 : return err
303 0 : }
304 :
305 8 : go func() {
306 4 : _ = mgr.dldataResult.SendMsg(payload)
307 4 : }()
308 4 : return nil
309 : }
310 :
311 240 : func (h *netMgrMqEventHandler) OnStatus(queue gmq.GmqQueue, status gmq.Status) {
312 240 : h.onEvent(queue)
313 240 : }
314 :
315 0 : func (h *netMgrMqEventHandler) OnError(queue gmq.GmqQueue, err error) {
316 0 : h.onEvent(queue)
317 0 : }
318 :
319 57 : func (h *netMgrMqEventHandler) OnMessage(queue gmq.GmqQueue, message gmq.Message) {
320 57 : queueName := queue.Name()
321 68 : if queueName == h.mgr.dldata.Name() {
322 11 : var data netDlDataInner
323 13 : if err := json.Unmarshal(message.Payload(), &data); err != nil {
324 2 : _ = message.Ack()
325 2 : return
326 2 : }
327 9 : dataBytes, err := hex.DecodeString(data.Data)
328 9 : if err != nil {
329 0 : _ = message.Ack()
330 0 : return
331 0 : }
332 9 : dPub, err := time.Parse(constants.TimeFormat, data.Pub)
333 11 : if err != nil {
334 2 : _ = message.Ack()
335 2 : return
336 2 : }
337 7 : dldata := &NetDlData{
338 7 : DataID: data.DataID,
339 7 : Pub: dPub.UTC(),
340 7 : ExpiresIn: data.ExpiresIn,
341 7 : NetworkAddr: data.NetworkAddr,
342 7 : Data: dataBytes,
343 7 : Extension: data.Extension,
344 7 : }
345 7 : handler := h.mgr.handler
346 14 : if handler != nil {
347 9 : if err := handler.OnDlData(h.mgr, dldata); err != nil {
348 2 : _ = message.Nack()
349 7 : } else {
350 5 : _ = message.Ack()
351 5 : }
352 7 : return
353 : }
354 92 : } else if queueName == h.mgr.ctrl.Name() {
355 46 : payload := message.Payload()
356 46 : dTime, err := time.Parse(constants.TimeFormat, gjson.GetBytes(payload, "time").String())
357 48 : if err != nil {
358 2 : _ = message.Ack()
359 2 : return
360 2 : }
361 44 : new := gjson.GetBytes(payload, "new").String()
362 44 :
363 44 : switch gjson.GetBytes(payload, "operation").String() {
364 7 : case "add-device":
365 7 : var data NetCtrlAddDevice
366 7 : err := json.Unmarshal([]byte(new), &data)
367 9 : if err != nil {
368 2 : _ = message.Ack()
369 2 : return
370 2 : }
371 5 : handler := h.mgr.handler
372 10 : if handler != nil {
373 7 : if err := handler.OnCtrlAddDevice(h.mgr, dTime.UTC(), &data); err != nil {
374 2 : _ = message.Nack()
375 5 : } else {
376 3 : _ = message.Ack()
377 3 : }
378 5 : return
379 : }
380 7 : case "add-device-bulk":
381 7 : var data NetCtrlAddDeviceBulk
382 7 : err := json.Unmarshal([]byte(new), &data)
383 9 : if err != nil {
384 2 : _ = message.Ack()
385 2 : return
386 2 : }
387 5 : handler := h.mgr.handler
388 10 : if handler != nil {
389 7 : if err := handler.OnCtrlAddDeviceBulk(h.mgr, dTime.UTC(), &data); err != nil {
390 2 : _ = message.Nack()
391 5 : } else {
392 3 : _ = message.Ack()
393 3 : }
394 5 : return
395 : }
396 7 : case "add-device-range":
397 7 : var data NetCtrlAddDeviceRange
398 7 : err := json.Unmarshal([]byte(new), &data)
399 9 : if err != nil {
400 2 : _ = message.Ack()
401 2 : return
402 2 : }
403 5 : handler := h.mgr.handler
404 10 : if handler != nil {
405 7 : if err := handler.OnCtrlAddDeviceRange(h.mgr, dTime.UTC(), &data); err != nil {
406 2 : _ = message.Nack()
407 5 : } else {
408 3 : _ = message.Ack()
409 3 : }
410 5 : return
411 : }
412 7 : case "del-device":
413 7 : var data NetCtrlDelDevice
414 7 : err := json.Unmarshal([]byte(new), &data)
415 9 : if err != nil {
416 2 : _ = message.Ack()
417 2 : return
418 2 : }
419 5 : handler := h.mgr.handler
420 10 : if handler != nil {
421 7 : if err := handler.OnCtrlDelDevice(h.mgr, dTime.UTC(), &data); err != nil {
422 2 : _ = message.Nack()
423 5 : } else {
424 3 : _ = message.Ack()
425 3 : }
426 5 : return
427 : }
428 7 : case "del-device-bulk":
429 7 : var data NetCtrlDelDeviceBulk
430 7 : err := json.Unmarshal([]byte(new), &data)
431 9 : if err != nil {
432 2 : _ = message.Ack()
433 2 : return
434 2 : }
435 5 : handler := h.mgr.handler
436 10 : if handler != nil {
437 7 : if err := handler.OnCtrlDelDeviceBulk(h.mgr, dTime.UTC(), &data); err != nil {
438 2 : _ = message.Nack()
439 5 : } else {
440 3 : _ = message.Ack()
441 3 : }
442 5 : return
443 : }
444 7 : case "del-device-range":
445 7 : var data NetCtrlDelDeviceRange
446 7 : err := json.Unmarshal([]byte(new), &data)
447 9 : if err != nil {
448 2 : _ = message.Ack()
449 2 : return
450 2 : }
451 5 : handler := h.mgr.handler
452 10 : if handler != nil {
453 7 : if err := handler.OnCtrlDelDeviceRange(h.mgr, dTime.UTC(), &data); err != nil {
454 2 : _ = message.Nack()
455 5 : } else {
456 3 : _ = message.Ack()
457 3 : }
458 5 : return
459 : }
460 2 : default:
461 2 : _ = message.Ack()
462 2 : return
463 : }
464 : }
465 0 : _ = message.Ack()
466 : }
467 :
468 240 : func (h *netMgrMqEventHandler) onEvent(queue gmq.GmqQueue) {
469 240 : h.mgr.statusMutex.Lock()
470 240 : var mgrStatus MgrStatus
471 240 : if h.mgr.uldata.Status() == gmq.Connected &&
472 240 : h.mgr.dldata.Status() == gmq.Connected &&
473 240 : h.mgr.dldataResult.Status() == gmq.Connected &&
474 258 : h.mgr.ctrl.Status() == gmq.Connected {
475 18 : mgrStatus = Ready
476 240 : } else {
477 222 : mgrStatus = NotReady
478 222 : }
479 :
480 240 : changed := false
481 276 : if h.mgr.status != mgrStatus {
482 36 : h.mgr.status = mgrStatus
483 36 : changed = true
484 36 : }
485 240 : h.mgr.statusMutex.Unlock()
486 240 :
487 276 : if changed {
488 36 : handler := h.mgr.handler
489 72 : if handler != nil {
490 36 : handler.OnStatusChange(h.mgr, mgrStatus)
491 36 : }
492 : }
493 : }
|