LCOV - code coverage report
Current view: top level - mq - network.go (source / functions) Hit Total Coverage
Test: sdk.lcov Lines: 249 273 91.2 %
Date: 2024-12-20 13:58:22 Functions: 0 0 -

          Line data    Source code
       1             : package mq
       2             : 
       3             : import (
       4             :         "encoding/hex"
       5             :         "encoding/json"
       6             :         "errors"
       7             :         "net/url"
       8             :         "sync"
       9             :         "time"
      10             : 
      11             :         "github.com/tidwall/gjson"
      12             : 
      13             :         gmq "github.com/woofdogtw/sylvia-iot-go/general-mq"
      14             :         "github.com/woofdogtw/sylvia-iot-go/sdk/constants"
      15             : )
      16             : 
      17             : // Uplink data from network to broker.
      18             : type NetUlData struct {
      19             :         Time        time.Time
      20             :         NetworkAddr string
      21             :         Data        []byte
      22             :         Extension   map[string]interface{}
      23             : }
      24             : 
      25             : // Downlink data from broker to network.
      26             : type NetDlData struct {
      27             :         DataID      string
      28             :         Pub         time.Time
      29             :         ExpiresIn   int64
      30             :         NetworkAddr string
      31             :         Data        []byte
      32             :         Extension   map[string]interface{}
      33             : }
      34             : 
      35             : // Downlink data result when processing or completing data transfer to the device.
      36             : type NetDlDataResult struct {
      37             :         DataID  string `json:"dataId"`
      38             :         Status  int    `json:"status"`
      39             :         Message string `json:"message,omitempty"`
      40             : }
      41             : 
      42             : // `add-device` control data.
      43             : type NetCtrlAddDevice struct {
      44             :         NetworkAddr string `json:"networkAddr"`
      45             : }
      46             : 
      47             : // `add-device-bulk` control data.
      48             : type NetCtrlAddDeviceBulk struct {
      49             :         NetworkAddrs []string `json:"networkAddrs"`
      50             : }
      51             : 
      52             : // `add-device-range` control data.
      53             : type NetCtrlAddDeviceRange struct {
      54             :         StartAddr string `json:"startAddr"`
      55             :         EndAddr   string `json:"endAddr"`
      56             : }
      57             : 
      58             : // `del-device` control data.
      59             : type NetCtrlDelDevice struct {
      60             :         NetworkAddr string `json:"networkAddr"`
      61             : }
      62             : 
      63             : // `del-device-bulk` control data.
      64             : type NetCtrlDelDeviceBulk struct {
      65             :         NetworkAddrs []string `json:"networkAddrs"`
      66             : }
      67             : 
      68             : // `del-device-range` control data.
      69             : type NetCtrlDelDeviceRange struct {
      70             :         StartAddr string `json:"startAddr"`
      71             :         EndAddr   string `json:"endAddr"`
      72             : }
      73             : 
      74             : // The manager for network queues.
      75             : type NetworkMgr struct {
      76             :         opts Options
      77             : 
      78             :         // Information for delete connection automatically.
      79             :         connPool *ConnectionPool
      80             :         hostUri  string
      81             : 
      82             :         uldata       gmq.GmqQueue
      83             :         dldata       gmq.GmqQueue
      84             :         dldataResult gmq.GmqQueue
      85             :         ctrl         gmq.GmqQueue
      86             : 
      87             :         status      MgrStatus
      88             :         statusMutex sync.Mutex
      89             :         handler     NetMgrEventHandler
      90             : }
      91             : 
      92             : // Event handler interface for the `NetworkMgr`.
      93             : type NetMgrEventHandler interface {
      94             :         // Fired when one of the manager's queues encounters a state change.
      95             :         OnStatusChange(mgr *NetworkMgr, status MgrStatus)
      96             : 
      97             :         // Fired when a `DlData` data is received.
      98             :         //
      99             :         // Return error will NACK the data.
     100             :         // The data may will be received again depending on the protocol (such as AMQP).
     101             :         OnDlData(mgr *NetworkMgr, data *NetDlData) error
     102             : 
     103             :         // Fired when a `add-device` control data is received.
     104             :         //
     105             :         // Return error will NACK the data.
     106             :         // The data may will be received again depending on the protocol (such as AMQP).
     107             :         OnCtrlAddDevice(mgr *NetworkMgr, time time.Time, new *NetCtrlAddDevice) error
     108             : 
     109             :         // Fired when a `add-device-bulk` control data is received.
     110             :         //
     111             :         // Return error will NACK the data.
     112             :         // The data may will be received again depending on the protocol (such as AMQP).
     113             :         OnCtrlAddDeviceBulk(mgr *NetworkMgr, time time.Time, new *NetCtrlAddDeviceBulk) error
     114             : 
     115             :         // Fired when a `add-device-range` control data is received.
     116             :         //
     117             :         // Return error will NACK the data.
     118             :         // The data may will be received again depending on the protocol (such as AMQP).
     119             :         OnCtrlAddDeviceRange(mgr *NetworkMgr, time time.Time, new *NetCtrlAddDeviceRange) error
     120             : 
     121             :         // Fired when a `del-device` control data is received.
     122             :         //
     123             :         // Return error will NACK the data.
     124             :         // The data may will be received again depending on the protocol (such as AMQP).
     125             :         OnCtrlDelDevice(mgr *NetworkMgr, time time.Time, new *NetCtrlDelDevice) error
     126             : 
     127             :         // Fired when a `del-device-bulk` control data is received.
     128             :         //
     129             :         // Return error will NACK the data.
     130             :         // The data may will be received again depending on the protocol (such as AMQP).
     131             :         OnCtrlDelDeviceBulk(mgr *NetworkMgr, time time.Time, new *NetCtrlDelDeviceBulk) error
     132             : 
     133             :         // Fired when a `del-device-range` control data is received.
     134             :         //
     135             :         // Return error will NACK the data.
     136             :         // The data may will be received again depending on the protocol (such as AMQP).
     137             :         OnCtrlDelDeviceRange(mgr *NetworkMgr, time time.Time, new *NetCtrlDelDeviceRange) error
     138             : }
     139             : 
     140             : // The event handler for `gmq.GmqQueue`.
     141             : type netMgrMqEventHandler struct {
     142             :         mgr *NetworkMgr
     143             : }
     144             : 
     145             : type netUlDataInner struct {
     146             :         Time        string                 `json:"time"`
     147             :         NetworkAddr string                 `json:"networkAddr"`
     148             :         Data        string                 `json:"data"`
     149             :         Extension   map[string]interface{} `json:"extension,omitempty"`
     150             : }
     151             : 
     152             : type netDlDataInner struct {
     153             :         DataID      string                 `json:"dataId"`
     154             :         Pub         string                 `json:"pub"`
     155             :         ExpiresIn   int64                  `json:"expiresIn"`
     156             :         NetworkAddr string                 `json:"networkAddr"`
     157             :         Data        string                 `json:"data"`
     158             :         Extension   map[string]interface{} `json:"extension,omitempty"`
     159             : }
     160             : 
     161             : // Constants.
     162             : const (
     163             :         netQueuePrefix = "broker.network"
     164             : )
     165             : 
     166             : var _ gmq.QueueEventHandler = (*netMgrMqEventHandler)(nil)
     167             : var _ gmq.QueueMessageHandler = (*netMgrMqEventHandler)(nil)
     168             : 
     169             : func NewNetworkMgr(
     170             :         connPool *ConnectionPool,
     171             :         hostUri url.URL,
     172             :         opts Options,
     173             :         handler NetMgrEventHandler,
     174          36 : ) (*NetworkMgr, error) {
     175          36 :         conn, err := getConnection(connPool, hostUri)
     176          38 :         if err != nil {
     177           2 :                 return nil, err
     178           2 :         }
     179             : 
     180          34 :         queues, err := newDataQueues(conn, opts, netQueuePrefix, true)
     181          44 :         if err != nil {
     182          10 :                 return nil, err
     183          10 :         }
     184             : 
     185          24 :         mgr := &NetworkMgr{
     186          24 :                 opts:         opts,
     187          24 :                 connPool:     connPool,
     188          24 :                 hostUri:      hostUri.String(),
     189          24 :                 uldata:       queues.uldata,
     190          24 :                 dldata:       queues.dldata,
     191          24 :                 dldataResult: queues.dldataResult,
     192          24 :                 ctrl:         queues.ctrl,
     193          24 :                 status:       NotReady,
     194          24 :                 handler:      handler,
     195          24 :         }
     196          24 :         mqHandler := &netMgrMqEventHandler{mgr: mgr}
     197          24 :         mgr.uldata.SetHandler(mqHandler)
     198          24 :         if err := mgr.uldata.Connect(); err != nil {
     199           0 :                 return nil, err
     200           0 :         }
     201          24 :         mgr.dldata.SetHandler(mqHandler)
     202          24 :         mgr.dldata.SetMsgHandler(mqHandler)
     203          24 :         if err := mgr.dldata.Connect(); err != nil {
     204           0 :                 return nil, err
     205           0 :         }
     206          24 :         mgr.dldataResult.SetHandler(mqHandler)
     207          24 :         if err := mgr.dldataResult.Connect(); err != nil {
     208           0 :                 return nil, err
     209           0 :         }
     210          24 :         mgr.ctrl.SetHandler(mqHandler)
     211          24 :         mgr.ctrl.SetMsgHandler(mqHandler)
     212          24 :         if err := mgr.ctrl.Connect(); err != nil {
     213           0 :                 return nil, err
     214           0 :         }
     215          24 :         conn.add(4)
     216          24 :         return mgr, nil
     217             : }
     218             : 
     219             : // The associated unit ID of the network.
     220           2 : func (mgr *NetworkMgr) UnitID() string {
     221           2 :         return mgr.opts.UnitID
     222           2 : }
     223             : 
     224             : // The associated unit code of the network.
     225           2 : func (mgr *NetworkMgr) UnitCode() string {
     226           2 :         return mgr.opts.UnitCode
     227           2 : }
     228             : 
     229             : // The network ID.
     230           2 : func (mgr *NetworkMgr) ID() string {
     231           2 :         return mgr.opts.ID
     232           2 : }
     233             : 
     234             : // The network code.
     235           2 : func (mgr *NetworkMgr) Name() string {
     236           2 :         return mgr.opts.Name
     237           2 : }
     238             : 
     239             : // Manager status.
     240        1590 : func (mgr *NetworkMgr) Status() MgrStatus {
     241        1590 :         return mgr.status
     242        1590 : }
     243             : 
     244             : // Detail status of each message queue. Please ignore `DlDataResp`.
     245           2 : func (mgr *NetworkMgr) MqStatus() DataMqStatus {
     246           2 :         return DataMqStatus{
     247           2 :                 UlData:       mgr.uldata.Status(),
     248           2 :                 DlData:       mgr.dldata.Status(),
     249           2 :                 DlDataResp:   gmq.Closed,
     250           2 :                 DlDataResult: mgr.dldataResult.Status(),
     251           2 :                 Ctrl:         mgr.ctrl.Status(),
     252           2 :         }
     253           2 : }
     254             : 
     255             : // To close the manager queues.
     256             : // The underlying connection will be closed when there are no queues use it.
     257          26 : func (mgr *NetworkMgr) Close() error {
     258          26 :         if err := mgr.uldata.Close(); err != nil {
     259           0 :                 return err
     260          26 :         } else if err = mgr.dldata.Close(); err != nil {
     261           0 :                 return err
     262          26 :         } else if err = mgr.dldataResult.Close(); err != nil {
     263           0 :                 return err
     264          26 :         } else if err = mgr.ctrl.Close(); err != nil {
     265           0 :                 return err
     266           0 :         }
     267             : 
     268          26 :         return removeConnection(mgr.connPool, mgr.hostUri, 4)
     269             : }
     270             : 
     271             : // Send uplink data `UlData` to the broker.
     272           6 : func (mgr *NetworkMgr) SendUlData(data NetUlData) error {
     273           8 :         if data.NetworkAddr == "" {
     274           2 :                 return errors.New(errParamNetDev)
     275           2 :         }
     276             : 
     277           4 :         msg := netUlDataInner{
     278           4 :                 Time:        data.Time.Format(constants.TimeFormat),
     279           4 :                 NetworkAddr: data.NetworkAddr,
     280           4 :                 Data:        hex.EncodeToString(data.Data),
     281           4 :                 Extension:   data.Extension,
     282           4 :         }
     283           4 :         payload, err := json.Marshal(msg)
     284           4 :         if err != nil {
     285           0 :                 return err
     286           0 :         }
     287             : 
     288           8 :         go func() {
     289           4 :                 _ = mgr.uldata.SendMsg(payload)
     290           4 :         }()
     291           4 :         return nil
     292             : }
     293             : 
     294             : // Send uplink data `DlDataResult` to the broker.
     295           6 : func (mgr *NetworkMgr) SendDlDataResult(data NetDlDataResult) error {
     296           8 :         if data.DataID == "" {
     297           2 :                 return errors.New(errParamDataID)
     298           2 :         }
     299             : 
     300           4 :         payload, err := json.Marshal(data)
     301           4 :         if err != nil {
     302           0 :                 return err
     303           0 :         }
     304             : 
     305           8 :         go func() {
     306           4 :                 _ = mgr.dldataResult.SendMsg(payload)
     307           4 :         }()
     308           4 :         return nil
     309             : }
     310             : 
     311         240 : func (h *netMgrMqEventHandler) OnStatus(queue gmq.GmqQueue, status gmq.Status) {
     312         240 :         h.onEvent(queue)
     313         240 : }
     314             : 
     315           0 : func (h *netMgrMqEventHandler) OnError(queue gmq.GmqQueue, err error) {
     316           0 :         h.onEvent(queue)
     317           0 : }
     318             : 
     319          57 : func (h *netMgrMqEventHandler) OnMessage(queue gmq.GmqQueue, message gmq.Message) {
     320          57 :         queueName := queue.Name()
     321          68 :         if queueName == h.mgr.dldata.Name() {
     322          11 :                 var data netDlDataInner
     323          13 :                 if err := json.Unmarshal(message.Payload(), &data); err != nil {
     324           2 :                         _ = message.Ack()
     325           2 :                         return
     326           2 :                 }
     327           9 :                 dataBytes, err := hex.DecodeString(data.Data)
     328           9 :                 if err != nil {
     329           0 :                         _ = message.Ack()
     330           0 :                         return
     331           0 :                 }
     332           9 :                 dPub, err := time.Parse(constants.TimeFormat, data.Pub)
     333          11 :                 if err != nil {
     334           2 :                         _ = message.Ack()
     335           2 :                         return
     336           2 :                 }
     337           7 :                 dldata := &NetDlData{
     338           7 :                         DataID:      data.DataID,
     339           7 :                         Pub:         dPub.UTC(),
     340           7 :                         ExpiresIn:   data.ExpiresIn,
     341           7 :                         NetworkAddr: data.NetworkAddr,
     342           7 :                         Data:        dataBytes,
     343           7 :                         Extension:   data.Extension,
     344           7 :                 }
     345           7 :                 handler := h.mgr.handler
     346          14 :                 if handler != nil {
     347           9 :                         if err := handler.OnDlData(h.mgr, dldata); err != nil {
     348           2 :                                 _ = message.Nack()
     349           7 :                         } else {
     350           5 :                                 _ = message.Ack()
     351           5 :                         }
     352           7 :                         return
     353             :                 }
     354          92 :         } else if queueName == h.mgr.ctrl.Name() {
     355          46 :                 payload := message.Payload()
     356          46 :                 dTime, err := time.Parse(constants.TimeFormat, gjson.GetBytes(payload, "time").String())
     357          48 :                 if err != nil {
     358           2 :                         _ = message.Ack()
     359           2 :                         return
     360           2 :                 }
     361          44 :                 new := gjson.GetBytes(payload, "new").String()
     362          44 : 
     363          44 :                 switch gjson.GetBytes(payload, "operation").String() {
     364           7 :                 case "add-device":
     365           7 :                         var data NetCtrlAddDevice
     366           7 :                         err := json.Unmarshal([]byte(new), &data)
     367           9 :                         if err != nil {
     368           2 :                                 _ = message.Ack()
     369           2 :                                 return
     370           2 :                         }
     371           5 :                         handler := h.mgr.handler
     372          10 :                         if handler != nil {
     373           7 :                                 if err := handler.OnCtrlAddDevice(h.mgr, dTime.UTC(), &data); err != nil {
     374           2 :                                         _ = message.Nack()
     375           5 :                                 } else {
     376           3 :                                         _ = message.Ack()
     377           3 :                                 }
     378           5 :                                 return
     379             :                         }
     380           7 :                 case "add-device-bulk":
     381           7 :                         var data NetCtrlAddDeviceBulk
     382           7 :                         err := json.Unmarshal([]byte(new), &data)
     383           9 :                         if err != nil {
     384           2 :                                 _ = message.Ack()
     385           2 :                                 return
     386           2 :                         }
     387           5 :                         handler := h.mgr.handler
     388          10 :                         if handler != nil {
     389           7 :                                 if err := handler.OnCtrlAddDeviceBulk(h.mgr, dTime.UTC(), &data); err != nil {
     390           2 :                                         _ = message.Nack()
     391           5 :                                 } else {
     392           3 :                                         _ = message.Ack()
     393           3 :                                 }
     394           5 :                                 return
     395             :                         }
     396           7 :                 case "add-device-range":
     397           7 :                         var data NetCtrlAddDeviceRange
     398           7 :                         err := json.Unmarshal([]byte(new), &data)
     399           9 :                         if err != nil {
     400           2 :                                 _ = message.Ack()
     401           2 :                                 return
     402           2 :                         }
     403           5 :                         handler := h.mgr.handler
     404          10 :                         if handler != nil {
     405           7 :                                 if err := handler.OnCtrlAddDeviceRange(h.mgr, dTime.UTC(), &data); err != nil {
     406           2 :                                         _ = message.Nack()
     407           5 :                                 } else {
     408           3 :                                         _ = message.Ack()
     409           3 :                                 }
     410           5 :                                 return
     411             :                         }
     412           7 :                 case "del-device":
     413           7 :                         var data NetCtrlDelDevice
     414           7 :                         err := json.Unmarshal([]byte(new), &data)
     415           9 :                         if err != nil {
     416           2 :                                 _ = message.Ack()
     417           2 :                                 return
     418           2 :                         }
     419           5 :                         handler := h.mgr.handler
     420          10 :                         if handler != nil {
     421           7 :                                 if err := handler.OnCtrlDelDevice(h.mgr, dTime.UTC(), &data); err != nil {
     422           2 :                                         _ = message.Nack()
     423           5 :                                 } else {
     424           3 :                                         _ = message.Ack()
     425           3 :                                 }
     426           5 :                                 return
     427             :                         }
     428           7 :                 case "del-device-bulk":
     429           7 :                         var data NetCtrlDelDeviceBulk
     430           7 :                         err := json.Unmarshal([]byte(new), &data)
     431           9 :                         if err != nil {
     432           2 :                                 _ = message.Ack()
     433           2 :                                 return
     434           2 :                         }
     435           5 :                         handler := h.mgr.handler
     436          10 :                         if handler != nil {
     437           7 :                                 if err := handler.OnCtrlDelDeviceBulk(h.mgr, dTime.UTC(), &data); err != nil {
     438           2 :                                         _ = message.Nack()
     439           5 :                                 } else {
     440           3 :                                         _ = message.Ack()
     441           3 :                                 }
     442           5 :                                 return
     443             :                         }
     444           7 :                 case "del-device-range":
     445           7 :                         var data NetCtrlDelDeviceRange
     446           7 :                         err := json.Unmarshal([]byte(new), &data)
     447           9 :                         if err != nil {
     448           2 :                                 _ = message.Ack()
     449           2 :                                 return
     450           2 :                         }
     451           5 :                         handler := h.mgr.handler
     452          10 :                         if handler != nil {
     453           7 :                                 if err := handler.OnCtrlDelDeviceRange(h.mgr, dTime.UTC(), &data); err != nil {
     454           2 :                                         _ = message.Nack()
     455           5 :                                 } else {
     456           3 :                                         _ = message.Ack()
     457           3 :                                 }
     458           5 :                                 return
     459             :                         }
     460           2 :                 default:
     461           2 :                         _ = message.Ack()
     462           2 :                         return
     463             :                 }
     464             :         }
     465           0 :         _ = message.Ack()
     466             : }
     467             : 
     468         240 : func (h *netMgrMqEventHandler) onEvent(queue gmq.GmqQueue) {
     469         240 :         h.mgr.statusMutex.Lock()
     470         240 :         var mgrStatus MgrStatus
     471         240 :         if h.mgr.uldata.Status() == gmq.Connected &&
     472         240 :                 h.mgr.dldata.Status() == gmq.Connected &&
     473         240 :                 h.mgr.dldataResult.Status() == gmq.Connected &&
     474         258 :                 h.mgr.ctrl.Status() == gmq.Connected {
     475          18 :                 mgrStatus = Ready
     476         240 :         } else {
     477         222 :                 mgrStatus = NotReady
     478         222 :         }
     479             : 
     480         240 :         changed := false
     481         276 :         if h.mgr.status != mgrStatus {
     482          36 :                 h.mgr.status = mgrStatus
     483          36 :                 changed = true
     484          36 :         }
     485         240 :         h.mgr.statusMutex.Unlock()
     486         240 : 
     487         276 :         if changed {
     488          36 :                 handler := h.mgr.handler
     489          72 :                 if handler != nil {
     490          36 :                         handler.OnStatusChange(h.mgr, mgrStatus)
     491          36 :                 }
     492             :         }
     493             : }

Generated by: LCOV version 1.14