LCOV - code coverage report
Current view: top level - mq - application.go (source / functions) Hit Total Coverage
Test: sdk.lcov Lines: 183 204 89.7 %
Date: 2024-12-20 13:58:22 Functions: 0 0 -

          Line data    Source code
       1             : package mq
       2             : 
       3             : import (
       4             :         "encoding/hex"
       5             :         "encoding/json"
       6             :         "errors"
       7             :         "net/url"
       8             :         "sync"
       9             :         "time"
      10             : 
      11             :         gmq "github.com/woofdogtw/sylvia-iot-go/general-mq"
      12             :         "github.com/woofdogtw/sylvia-iot-go/sdk/constants"
      13             : )
      14             : 
      15             : // Uplink data from broker to application.
      16             : type AppUlData struct {
      17             :         DataID      string
      18             :         Time        time.Time
      19             :         Pub         time.Time
      20             :         DeviceID    string
      21             :         NetworkID   string
      22             :         NetworkCode string
      23             :         NetworkAddr string
      24             :         IsPublic    bool
      25             :         Data        []byte
      26             :         Extension   map[string]interface{}
      27             : }
      28             : 
      29             : // Downlink data from application to broker.
      30             : type AppDlData struct {
      31             :         CorrelationID string
      32             :         DeviceID      string
      33             :         NetworkCode   string
      34             :         NetworkAddr   string
      35             :         Data          []byte
      36             :         Extension     map[string]interface{}
      37             : }
      38             : 
      39             : // Downlink data response for `DlData`.
      40             : type AppDlDataResp struct {
      41             :         CorrelationID string `json:"correlationId"`
      42             :         DataID        string `json:"dataId"`
      43             :         Error         string `json:"error"`
      44             :         Message       string `json:"message"`
      45             : }
      46             : 
      47             : // Downlink data result when processing or completing data transfer to the device.
      48             : type AppDlDataResult struct {
      49             :         DataID  string `json:"dataId"`
      50             :         Status  int    `json:"status"`
      51             :         Message string `json:"message"`
      52             : }
      53             : 
      54             : // The manager for application queues.
      55             : type ApplicationMgr struct {
      56             :         opts Options
      57             : 
      58             :         // Information for delete connection automatically.
      59             :         connPool *ConnectionPool
      60             :         hostUri  string
      61             : 
      62             :         uldata       gmq.GmqQueue
      63             :         dldata       gmq.GmqQueue
      64             :         dldataResp   gmq.GmqQueue
      65             :         dldataResult gmq.GmqQueue
      66             : 
      67             :         status      MgrStatus
      68             :         statusMutex sync.Mutex
      69             :         handler     AppMgrEventHandler
      70             : }
      71             : 
      72             : // Event handler interface for the `ApplicationMgr`.
      73             : type AppMgrEventHandler interface {
      74             :         // Fired when one of the manager's queues encounters a state change.
      75             :         OnStatusChange(mgr *ApplicationMgr, status MgrStatus)
      76             : 
      77             :         // Fired when a `UlData` data is received.
      78             :         //
      79             :         // Return error will NACK the data.
      80             :         // The data may will be received again depending on the protocol (such as AMQP).
      81             :         OnUlData(mgr *ApplicationMgr, data *AppUlData) error
      82             : 
      83             :         // Fired when a `DlDataResult` data is received.
      84             :         //
      85             :         // Return error will NACK the data.
      86             :         // The data may will be received again depending on the protocol (such as AMQP).
      87             :         OnDlDataResp(mgr *ApplicationMgr, data *AppDlDataResp) error
      88             : 
      89             :         // Fired when a `DlDataResp` data is received.
      90             :         //
      91             :         // Return error will NACK the data.
      92             :         // The data may will be received again depending on the protocol (such as AMQP).
      93             :         OnDlDataResult(mgr *ApplicationMgr, data *AppDlDataResult) error
      94             : }
      95             : 
      96             : // The event handler for `gmq.GmqQueue`.
      97             : type appMgrMqEventHandler struct {
      98             :         mgr *ApplicationMgr
      99             : }
     100             : 
     101             : type appUlDataInner struct {
     102             :         DataID      string                 `json:"dataId"`
     103             :         Time        string                 `json:"time"`
     104             :         Pub         string                 `json:"pub"`
     105             :         DeviceID    string                 `json:"deviceId"`
     106             :         NetworkID   string                 `json:"networkId"`
     107             :         NetworkCode string                 `json:"networkCode"`
     108             :         NetworkAddr string                 `json:"networkAddr"`
     109             :         IsPublic    bool                   `json:"isPublic"`
     110             :         Data        string                 `json:"data"`
     111             :         Extension   map[string]interface{} `json:"extension"`
     112             : }
     113             : 
     114             : type appDlDataInner struct {
     115             :         CorrelationID string                 `json:"correlationId"`
     116             :         DeviceID      string                 `json:"deviceId,omitempty"`
     117             :         NetworkCode   string                 `json:"networkCode,omitempty"`
     118             :         NetworkAddr   string                 `json:"networkAddr,omitempty"`
     119             :         Data          string                 `json:"data"`
     120             :         Extension     map[string]interface{} `json:"extension,omitempty"`
     121             : }
     122             : 
     123             : // Constants.
     124             : const (
     125             :         appQueuePrefix = "broker.application"
     126             : )
     127             : 
     128             : var _ gmq.QueueEventHandler = (*appMgrMqEventHandler)(nil)
     129             : var _ gmq.QueueMessageHandler = (*appMgrMqEventHandler)(nil)
     130             : 
     131             : func NewApplicationMgr(
     132             :         connPool *ConnectionPool,
     133             :         hostUri url.URL,
     134             :         opts Options,
     135             :         handler AppMgrEventHandler,
     136          34 : ) (*ApplicationMgr, error) {
     137          40 :         if opts.UnitID == "" {
     138           6 :                 return nil, errors.New("`UnitID` cannot be empty for application")
     139           6 :         }
     140             : 
     141          28 :         conn, err := getConnection(connPool, hostUri)
     142          28 :         if err != nil {
     143           0 :                 return nil, err
     144           0 :         }
     145             : 
     146          28 :         queues, err := newDataQueues(conn, opts, appQueuePrefix, false)
     147          34 :         if err != nil {
     148           6 :                 return nil, err
     149           6 :         }
     150             : 
     151          22 :         mgr := &ApplicationMgr{
     152          22 :                 opts:         opts,
     153          22 :                 connPool:     connPool,
     154          22 :                 hostUri:      hostUri.String(),
     155          22 :                 uldata:       queues.uldata,
     156          22 :                 dldata:       queues.dldata,
     157          22 :                 dldataResp:   queues.dldataResp,
     158          22 :                 dldataResult: queues.dldataResult,
     159          22 :                 status:       NotReady,
     160          22 :                 handler:      handler,
     161          22 :         }
     162          22 :         mqHandler := &appMgrMqEventHandler{mgr: mgr}
     163          22 :         mgr.uldata.SetHandler(mqHandler)
     164          22 :         mgr.uldata.SetMsgHandler(mqHandler)
     165          22 :         if err := mgr.uldata.Connect(); err != nil {
     166           0 :                 return nil, err
     167           0 :         }
     168          22 :         mgr.dldata.SetHandler(mqHandler)
     169          22 :         if err := mgr.dldata.Connect(); err != nil {
     170           0 :                 return nil, err
     171           0 :         }
     172          22 :         mgr.dldataResp.SetHandler(mqHandler)
     173          22 :         mgr.dldataResp.SetMsgHandler(mqHandler)
     174          22 :         if err := mgr.dldataResp.Connect(); err != nil {
     175           0 :                 return nil, err
     176           0 :         }
     177          22 :         mgr.dldataResult.SetHandler(mqHandler)
     178          22 :         mgr.dldataResult.SetMsgHandler(mqHandler)
     179          22 :         if err := mgr.dldataResult.Connect(); err != nil {
     180           0 :                 return nil, err
     181           0 :         }
     182          22 :         conn.add(4)
     183          22 :         return mgr, nil
     184             : }
     185             : 
     186             : // The associated unit ID of the application.
     187           2 : func (mgr *ApplicationMgr) UnitID() string {
     188           2 :         return mgr.opts.UnitID
     189           2 : }
     190             : 
     191             : // The associated unit code of the application.
     192           2 : func (mgr *ApplicationMgr) UnitCode() string {
     193           2 :         return mgr.opts.UnitCode
     194           2 : }
     195             : 
     196             : // The application ID.
     197           2 : func (mgr *ApplicationMgr) ID() string {
     198           2 :         return mgr.opts.ID
     199           2 : }
     200             : 
     201             : // The application code.
     202           2 : func (mgr *ApplicationMgr) Name() string {
     203           2 :         return mgr.opts.Name
     204           2 : }
     205             : 
     206             : // Manager status.
     207        1589 : func (mgr *ApplicationMgr) Status() MgrStatus {
     208        1589 :         return mgr.status
     209        1589 : }
     210             : 
     211             : // Detail status of each message queue. Please ignore `Ctrl`.
     212           2 : func (mgr *ApplicationMgr) MqStatus() DataMqStatus {
     213           2 :         return DataMqStatus{
     214           2 :                 UlData:       mgr.uldata.Status(),
     215           2 :                 DlData:       mgr.dldata.Status(),
     216           2 :                 DlDataResp:   mgr.dldataResp.Status(),
     217           2 :                 DlDataResult: mgr.dldataResult.Status(),
     218           2 :                 Ctrl:         gmq.Closed,
     219           2 :         }
     220           2 : }
     221             : 
     222             : // To close the manager queues.
     223             : // The underlying connection will be closed when there are no queues use it.
     224          24 : func (mgr *ApplicationMgr) Close() error {
     225          24 :         if err := mgr.uldata.Close(); err != nil {
     226           0 :                 return err
     227          24 :         } else if err = mgr.dldata.Close(); err != nil {
     228           0 :                 return err
     229          24 :         } else if err = mgr.dldataResp.Close(); err != nil {
     230           0 :                 return err
     231          24 :         } else if err = mgr.dldataResult.Close(); err != nil {
     232           0 :                 return err
     233           0 :         }
     234             : 
     235          24 :         return removeConnection(mgr.connPool, mgr.hostUri, 4)
     236             : }
     237             : 
     238             : // Send downlink data `DlData` to the broker.
     239          10 : func (mgr *ApplicationMgr) SendDlData(data AppDlData) error {
     240          12 :         if data.CorrelationID == "" {
     241           2 :                 return errors.New(errParamCorrID)
     242           2 :         }
     243          12 :         if data.DeviceID == "" && (data.NetworkCode == "" || data.NetworkAddr == "") {
     244           4 :                 return errors.New(errParamAppDev)
     245           4 :         }
     246             : 
     247           4 :         msg := appDlDataInner{
     248           4 :                 CorrelationID: data.CorrelationID,
     249           4 :                 DeviceID:      data.DeviceID,
     250           4 :                 NetworkCode:   data.NetworkCode,
     251           4 :                 NetworkAddr:   data.NetworkAddr,
     252           4 :                 Data:          hex.EncodeToString(data.Data),
     253           4 :                 Extension:     data.Extension,
     254           4 :         }
     255           4 :         payload, err := json.Marshal(msg)
     256           4 :         if err != nil {
     257           0 :                 return err
     258           0 :         }
     259             : 
     260           8 :         go func() {
     261           4 :                 _ = mgr.dldata.SendMsg(payload)
     262           4 :         }()
     263           4 :         return nil
     264             : }
     265             : 
     266         232 : func (h *appMgrMqEventHandler) OnStatus(queue gmq.GmqQueue, status gmq.Status) {
     267         232 :         h.onEvent(queue)
     268         232 : }
     269             : 
     270           0 : func (h *appMgrMqEventHandler) OnError(queue gmq.GmqQueue, err error) {
     271           0 :         h.onEvent(queue)
     272           0 : }
     273             : 
     274          33 : func (h *appMgrMqEventHandler) OnMessage(queue gmq.GmqQueue, message gmq.Message) {
     275          33 :         queueName := queue.Name()
     276          48 :         if queueName == h.mgr.uldata.Name() {
     277          15 :                 var data appUlDataInner
     278          17 :                 if err := json.Unmarshal(message.Payload(), &data); err != nil {
     279           2 :                         _ = message.Ack()
     280           2 :                         return
     281           2 :                 }
     282          13 :                 dataBytes, err := hex.DecodeString(data.Data)
     283          15 :                 if err != nil {
     284           2 :                         _ = message.Ack()
     285           2 :                         return
     286           2 :                 }
     287          11 :                 dTime, err := time.Parse(constants.TimeFormat, data.Time)
     288          13 :                 if err != nil {
     289           2 :                         _ = message.Ack()
     290           2 :                         return
     291           2 :                 }
     292           9 :                 dPub, err := time.Parse(constants.TimeFormat, data.Pub)
     293          11 :                 if err != nil {
     294           2 :                         _ = message.Ack()
     295           2 :                         return
     296           2 :                 }
     297           7 :                 uldata := &AppUlData{
     298           7 :                         DataID:      data.DataID,
     299           7 :                         Time:        dTime.UTC(),
     300           7 :                         Pub:         dPub.UTC(),
     301           7 :                         DeviceID:    data.DeviceID,
     302           7 :                         NetworkID:   data.NetworkID,
     303           7 :                         NetworkCode: data.NetworkCode,
     304           7 :                         NetworkAddr: data.NetworkAddr,
     305           7 :                         IsPublic:    data.IsPublic,
     306           7 :                         Data:        dataBytes,
     307           7 :                         Extension:   data.Extension,
     308           7 :                 }
     309           7 :                 handler := h.mgr.handler
     310          14 :                 if handler != nil {
     311           9 :                         if err := handler.OnUlData(h.mgr, uldata); err != nil {
     312           2 :                                 _ = message.Nack()
     313           7 :                         } else {
     314           5 :                                 _ = message.Ack()
     315           5 :                         }
     316           7 :                         return
     317             :                 }
     318          27 :         } else if queueName == h.mgr.dldataResp.Name() {
     319           9 :                 var data AppDlDataResp
     320          11 :                 if err := json.Unmarshal(message.Payload(), &data); err != nil {
     321           2 :                         _ = message.Ack()
     322           2 :                         return
     323           2 :                 }
     324           7 :                 handler := h.mgr.handler
     325          14 :                 if handler != nil {
     326           9 :                         if err := handler.OnDlDataResp(h.mgr, &data); err != nil {
     327           2 :                                 _ = message.Nack()
     328           7 :                         } else {
     329           5 :                                 _ = message.Ack()
     330           5 :                         }
     331           7 :                         return
     332             :                 }
     333          18 :         } else if queueName == h.mgr.dldataResult.Name() {
     334           9 :                 var data AppDlDataResult
     335          11 :                 if err := json.Unmarshal(message.Payload(), &data); err != nil {
     336           2 :                         _ = message.Ack()
     337           2 :                         return
     338           2 :                 }
     339           7 :                 handler := h.mgr.handler
     340          14 :                 if handler != nil {
     341           9 :                         if err := handler.OnDlDataResult(h.mgr, &data); err != nil {
     342           2 :                                 _ = message.Nack()
     343           7 :                         } else {
     344           5 :                                 _ = message.Ack()
     345           5 :                         }
     346           7 :                         return
     347             :                 }
     348             :         }
     349           0 :         _ = message.Ack()
     350             : }
     351             : 
     352         232 : func (h *appMgrMqEventHandler) onEvent(queue gmq.GmqQueue) {
     353         232 :         h.mgr.statusMutex.Lock()
     354         232 :         var mgrStatus MgrStatus
     355         232 :         if h.mgr.uldata.Status() == gmq.Connected &&
     356         232 :                 h.mgr.dldata.Status() == gmq.Connected &&
     357         232 :                 h.mgr.dldataResp.Status() == gmq.Connected &&
     358         250 :                 h.mgr.dldataResult.Status() == gmq.Connected {
     359          18 :                 mgrStatus = Ready
     360         232 :         } else {
     361         214 :                 mgrStatus = NotReady
     362         214 :         }
     363             : 
     364         232 :         changed := false
     365         268 :         if h.mgr.status != mgrStatus {
     366          36 :                 h.mgr.status = mgrStatus
     367          36 :                 changed = true
     368          36 :         }
     369         232 :         h.mgr.statusMutex.Unlock()
     370         232 : 
     371         268 :         if changed {
     372          36 :                 handler := h.mgr.handler
     373          72 :                 if handler != nil {
     374          36 :                         handler.OnStatusChange(h.mgr, mgrStatus)
     375          36 :                 }
     376             :         }
     377             : }

Generated by: LCOV version 1.14