Line data Source code
1 : package mq
2 :
3 : import (
4 : "encoding/hex"
5 : "encoding/json"
6 : "errors"
7 : "net/url"
8 : "sync"
9 : "time"
10 :
11 : gmq "github.com/woofdogtw/sylvia-iot-go/general-mq"
12 : "github.com/woofdogtw/sylvia-iot-go/sdk/constants"
13 : )
14 :
15 : // Uplink data from broker to application.
16 : type AppUlData struct {
17 : DataID string
18 : Time time.Time
19 : Pub time.Time
20 : DeviceID string
21 : NetworkID string
22 : NetworkCode string
23 : NetworkAddr string
24 : IsPublic bool
25 : Data []byte
26 : Extension map[string]interface{}
27 : }
28 :
29 : // Downlink data from application to broker.
30 : type AppDlData struct {
31 : CorrelationID string
32 : DeviceID string
33 : NetworkCode string
34 : NetworkAddr string
35 : Data []byte
36 : Extension map[string]interface{}
37 : }
38 :
39 : // Downlink data response for `DlData`.
40 : type AppDlDataResp struct {
41 : CorrelationID string `json:"correlationId"`
42 : DataID string `json:"dataId"`
43 : Error string `json:"error"`
44 : Message string `json:"message"`
45 : }
46 :
47 : // Downlink data result when processing or completing data transfer to the device.
48 : type AppDlDataResult struct {
49 : DataID string `json:"dataId"`
50 : Status int `json:"status"`
51 : Message string `json:"message"`
52 : }
53 :
54 : // The manager for application queues.
55 : type ApplicationMgr struct {
56 : opts Options
57 :
58 : // Information for delete connection automatically.
59 : connPool *ConnectionPool
60 : hostUri string
61 :
62 : uldata gmq.GmqQueue
63 : dldata gmq.GmqQueue
64 : dldataResp gmq.GmqQueue
65 : dldataResult gmq.GmqQueue
66 :
67 : status MgrStatus
68 : statusMutex sync.Mutex
69 : handler AppMgrEventHandler
70 : }
71 :
72 : // Event handler interface for the `ApplicationMgr`.
73 : type AppMgrEventHandler interface {
74 : // Fired when one of the manager's queues encounters a state change.
75 : OnStatusChange(mgr *ApplicationMgr, status MgrStatus)
76 :
77 : // Fired when a `UlData` data is received.
78 : //
79 : // Return error will NACK the data.
80 : // The data may will be received again depending on the protocol (such as AMQP).
81 : OnUlData(mgr *ApplicationMgr, data *AppUlData) error
82 :
83 : // Fired when a `DlDataResult` data is received.
84 : //
85 : // Return error will NACK the data.
86 : // The data may will be received again depending on the protocol (such as AMQP).
87 : OnDlDataResp(mgr *ApplicationMgr, data *AppDlDataResp) error
88 :
89 : // Fired when a `DlDataResp` data is received.
90 : //
91 : // Return error will NACK the data.
92 : // The data may will be received again depending on the protocol (such as AMQP).
93 : OnDlDataResult(mgr *ApplicationMgr, data *AppDlDataResult) error
94 : }
95 :
96 : // The event handler for `gmq.GmqQueue`.
97 : type appMgrMqEventHandler struct {
98 : mgr *ApplicationMgr
99 : }
100 :
101 : type appUlDataInner struct {
102 : DataID string `json:"dataId"`
103 : Time string `json:"time"`
104 : Pub string `json:"pub"`
105 : DeviceID string `json:"deviceId"`
106 : NetworkID string `json:"networkId"`
107 : NetworkCode string `json:"networkCode"`
108 : NetworkAddr string `json:"networkAddr"`
109 : IsPublic bool `json:"isPublic"`
110 : Data string `json:"data"`
111 : Extension map[string]interface{} `json:"extension"`
112 : }
113 :
114 : type appDlDataInner struct {
115 : CorrelationID string `json:"correlationId"`
116 : DeviceID string `json:"deviceId,omitempty"`
117 : NetworkCode string `json:"networkCode,omitempty"`
118 : NetworkAddr string `json:"networkAddr,omitempty"`
119 : Data string `json:"data"`
120 : Extension map[string]interface{} `json:"extension,omitempty"`
121 : }
122 :
123 : // Constants.
124 : const (
125 : appQueuePrefix = "broker.application"
126 : )
127 :
128 : var _ gmq.QueueEventHandler = (*appMgrMqEventHandler)(nil)
129 : var _ gmq.QueueMessageHandler = (*appMgrMqEventHandler)(nil)
130 :
131 : func NewApplicationMgr(
132 : connPool *ConnectionPool,
133 : hostUri url.URL,
134 : opts Options,
135 : handler AppMgrEventHandler,
136 34 : ) (*ApplicationMgr, error) {
137 40 : if opts.UnitID == "" {
138 6 : return nil, errors.New("`UnitID` cannot be empty for application")
139 6 : }
140 :
141 28 : conn, err := getConnection(connPool, hostUri)
142 28 : if err != nil {
143 0 : return nil, err
144 0 : }
145 :
146 28 : queues, err := newDataQueues(conn, opts, appQueuePrefix, false)
147 34 : if err != nil {
148 6 : return nil, err
149 6 : }
150 :
151 22 : mgr := &ApplicationMgr{
152 22 : opts: opts,
153 22 : connPool: connPool,
154 22 : hostUri: hostUri.String(),
155 22 : uldata: queues.uldata,
156 22 : dldata: queues.dldata,
157 22 : dldataResp: queues.dldataResp,
158 22 : dldataResult: queues.dldataResult,
159 22 : status: NotReady,
160 22 : handler: handler,
161 22 : }
162 22 : mqHandler := &appMgrMqEventHandler{mgr: mgr}
163 22 : mgr.uldata.SetHandler(mqHandler)
164 22 : mgr.uldata.SetMsgHandler(mqHandler)
165 22 : if err := mgr.uldata.Connect(); err != nil {
166 0 : return nil, err
167 0 : }
168 22 : mgr.dldata.SetHandler(mqHandler)
169 22 : if err := mgr.dldata.Connect(); err != nil {
170 0 : return nil, err
171 0 : }
172 22 : mgr.dldataResp.SetHandler(mqHandler)
173 22 : mgr.dldataResp.SetMsgHandler(mqHandler)
174 22 : if err := mgr.dldataResp.Connect(); err != nil {
175 0 : return nil, err
176 0 : }
177 22 : mgr.dldataResult.SetHandler(mqHandler)
178 22 : mgr.dldataResult.SetMsgHandler(mqHandler)
179 22 : if err := mgr.dldataResult.Connect(); err != nil {
180 0 : return nil, err
181 0 : }
182 22 : conn.add(4)
183 22 : return mgr, nil
184 : }
185 :
186 : // The associated unit ID of the application.
187 2 : func (mgr *ApplicationMgr) UnitID() string {
188 2 : return mgr.opts.UnitID
189 2 : }
190 :
191 : // The associated unit code of the application.
192 2 : func (mgr *ApplicationMgr) UnitCode() string {
193 2 : return mgr.opts.UnitCode
194 2 : }
195 :
196 : // The application ID.
197 2 : func (mgr *ApplicationMgr) ID() string {
198 2 : return mgr.opts.ID
199 2 : }
200 :
201 : // The application code.
202 2 : func (mgr *ApplicationMgr) Name() string {
203 2 : return mgr.opts.Name
204 2 : }
205 :
206 : // Manager status.
207 1589 : func (mgr *ApplicationMgr) Status() MgrStatus {
208 1589 : return mgr.status
209 1589 : }
210 :
211 : // Detail status of each message queue. Please ignore `Ctrl`.
212 2 : func (mgr *ApplicationMgr) MqStatus() DataMqStatus {
213 2 : return DataMqStatus{
214 2 : UlData: mgr.uldata.Status(),
215 2 : DlData: mgr.dldata.Status(),
216 2 : DlDataResp: mgr.dldataResp.Status(),
217 2 : DlDataResult: mgr.dldataResult.Status(),
218 2 : Ctrl: gmq.Closed,
219 2 : }
220 2 : }
221 :
222 : // To close the manager queues.
223 : // The underlying connection will be closed when there are no queues use it.
224 24 : func (mgr *ApplicationMgr) Close() error {
225 24 : if err := mgr.uldata.Close(); err != nil {
226 0 : return err
227 24 : } else if err = mgr.dldata.Close(); err != nil {
228 0 : return err
229 24 : } else if err = mgr.dldataResp.Close(); err != nil {
230 0 : return err
231 24 : } else if err = mgr.dldataResult.Close(); err != nil {
232 0 : return err
233 0 : }
234 :
235 24 : return removeConnection(mgr.connPool, mgr.hostUri, 4)
236 : }
237 :
238 : // Send downlink data `DlData` to the broker.
239 10 : func (mgr *ApplicationMgr) SendDlData(data AppDlData) error {
240 12 : if data.CorrelationID == "" {
241 2 : return errors.New(errParamCorrID)
242 2 : }
243 12 : if data.DeviceID == "" && (data.NetworkCode == "" || data.NetworkAddr == "") {
244 4 : return errors.New(errParamAppDev)
245 4 : }
246 :
247 4 : msg := appDlDataInner{
248 4 : CorrelationID: data.CorrelationID,
249 4 : DeviceID: data.DeviceID,
250 4 : NetworkCode: data.NetworkCode,
251 4 : NetworkAddr: data.NetworkAddr,
252 4 : Data: hex.EncodeToString(data.Data),
253 4 : Extension: data.Extension,
254 4 : }
255 4 : payload, err := json.Marshal(msg)
256 4 : if err != nil {
257 0 : return err
258 0 : }
259 :
260 8 : go func() {
261 4 : _ = mgr.dldata.SendMsg(payload)
262 4 : }()
263 4 : return nil
264 : }
265 :
266 232 : func (h *appMgrMqEventHandler) OnStatus(queue gmq.GmqQueue, status gmq.Status) {
267 232 : h.onEvent(queue)
268 232 : }
269 :
270 0 : func (h *appMgrMqEventHandler) OnError(queue gmq.GmqQueue, err error) {
271 0 : h.onEvent(queue)
272 0 : }
273 :
274 33 : func (h *appMgrMqEventHandler) OnMessage(queue gmq.GmqQueue, message gmq.Message) {
275 33 : queueName := queue.Name()
276 48 : if queueName == h.mgr.uldata.Name() {
277 15 : var data appUlDataInner
278 17 : if err := json.Unmarshal(message.Payload(), &data); err != nil {
279 2 : _ = message.Ack()
280 2 : return
281 2 : }
282 13 : dataBytes, err := hex.DecodeString(data.Data)
283 15 : if err != nil {
284 2 : _ = message.Ack()
285 2 : return
286 2 : }
287 11 : dTime, err := time.Parse(constants.TimeFormat, data.Time)
288 13 : if err != nil {
289 2 : _ = message.Ack()
290 2 : return
291 2 : }
292 9 : dPub, err := time.Parse(constants.TimeFormat, data.Pub)
293 11 : if err != nil {
294 2 : _ = message.Ack()
295 2 : return
296 2 : }
297 7 : uldata := &AppUlData{
298 7 : DataID: data.DataID,
299 7 : Time: dTime.UTC(),
300 7 : Pub: dPub.UTC(),
301 7 : DeviceID: data.DeviceID,
302 7 : NetworkID: data.NetworkID,
303 7 : NetworkCode: data.NetworkCode,
304 7 : NetworkAddr: data.NetworkAddr,
305 7 : IsPublic: data.IsPublic,
306 7 : Data: dataBytes,
307 7 : Extension: data.Extension,
308 7 : }
309 7 : handler := h.mgr.handler
310 14 : if handler != nil {
311 9 : if err := handler.OnUlData(h.mgr, uldata); err != nil {
312 2 : _ = message.Nack()
313 7 : } else {
314 5 : _ = message.Ack()
315 5 : }
316 7 : return
317 : }
318 27 : } else if queueName == h.mgr.dldataResp.Name() {
319 9 : var data AppDlDataResp
320 11 : if err := json.Unmarshal(message.Payload(), &data); err != nil {
321 2 : _ = message.Ack()
322 2 : return
323 2 : }
324 7 : handler := h.mgr.handler
325 14 : if handler != nil {
326 9 : if err := handler.OnDlDataResp(h.mgr, &data); err != nil {
327 2 : _ = message.Nack()
328 7 : } else {
329 5 : _ = message.Ack()
330 5 : }
331 7 : return
332 : }
333 18 : } else if queueName == h.mgr.dldataResult.Name() {
334 9 : var data AppDlDataResult
335 11 : if err := json.Unmarshal(message.Payload(), &data); err != nil {
336 2 : _ = message.Ack()
337 2 : return
338 2 : }
339 7 : handler := h.mgr.handler
340 14 : if handler != nil {
341 9 : if err := handler.OnDlDataResult(h.mgr, &data); err != nil {
342 2 : _ = message.Nack()
343 7 : } else {
344 5 : _ = message.Ack()
345 5 : }
346 7 : return
347 : }
348 : }
349 0 : _ = message.Ack()
350 : }
351 :
352 232 : func (h *appMgrMqEventHandler) onEvent(queue gmq.GmqQueue) {
353 232 : h.mgr.statusMutex.Lock()
354 232 : var mgrStatus MgrStatus
355 232 : if h.mgr.uldata.Status() == gmq.Connected &&
356 232 : h.mgr.dldata.Status() == gmq.Connected &&
357 232 : h.mgr.dldataResp.Status() == gmq.Connected &&
358 250 : h.mgr.dldataResult.Status() == gmq.Connected {
359 18 : mgrStatus = Ready
360 232 : } else {
361 214 : mgrStatus = NotReady
362 214 : }
363 :
364 232 : changed := false
365 268 : if h.mgr.status != mgrStatus {
366 36 : h.mgr.status = mgrStatus
367 36 : changed = true
368 36 : }
369 232 : h.mgr.statusMutex.Unlock()
370 232 :
371 268 : if changed {
372 36 : handler := h.mgr.handler
373 72 : if handler != nil {
374 36 : handler.OnStatusChange(h.mgr, mgrStatus)
375 36 : }
376 : }
377 : }
|