       1             : /*
       2             : Package mq manages queues for applications and networks.
       3             : 
       4             : For applications, the `ApplicationMgr` manages the following kind of queues:
       5             :   - uldata: uplink data from the broker to the application.
       6             :   - dldata: downlink data from the application to the broker.
       7             :   - dldata-resp: the response of downlink data.
       8             :   - dldata-result: the data process result from the network.
       9             : 
      10             : For networks, the `NetworkMgr` manages the following kind of queues:
      11             :   - uldata: device uplink data from the network to the broker.
      12             :   - dldata: downlink data from the broker to the network.
      13             :   - dldata-result: the data process result from the network.
      14             :   - ctrl: the control messages from the broker to the network
      15             : */
      16             : package mq
      17             : 
      18             : import (
      19             :         "errors"
      20             :         "fmt"
      21             :         "net/url"
      22             :         "sync"
      23             : 
      24             :         gmq ""
      25             : )
      26             : 
      27             : // Connection pool. The key is `host` of the message broker.
      28             : type ConnectionPool struct {
      29             :         connections map[string]*counterConnection
      30             :         mutex       sync.Mutex
      31             : }
      32             : 
      33             : // Detail queue connection status.
      34             : type DataMqStatus struct {
      35             :         // For `uldata`.
      36             :         UlData gmq.Status
      37             :         // For `dldata`.
      38             :         DlData gmq.Status
      39             :         // For `dldata-resp`.
      40             :         DlDataResp gmq.Status
      41             :         // For `dldata-result`.
      42             :         DlDataResult gmq.Status
      43             :         // For `ctrl`.
      44             :         Ctrl gmq.Status
      45             : }
      46             : 
      47             : // The options of the application/network manager.
      48             : type Options struct {
      49             :         // The associated unit ID of the application/network. Empty for public network.
      50             :         UnitID string `json:"unitId"`
      51             :         // The associated unit code of the application/network. Empty for public network.
      52             :         UnitCode string `json:"unitCode"`
      53             :         // The associated application/network ID.
      54             :         ID string `json:"id"`
      55             :         // The associated application/network code.
      56             :         Name string `json:"name"`
      57             :         // AMQP prefetch option.
      58             :         Prefetch uint16 `json:"prefetch"`
      59             :         // AMQP persistent option.
      60             :         Persistent bool `json:"persistent"`
      61             :         // MQTT shared queue prefix option.
      62             :         SharedPrefix string `json:"sharedPrefix"`
      63             : }
      64             : 
      65             : // The connection with queue counter.
      66             : type counterConnection struct {
      67             :         connection gmq.GmqConnection
      68             :         counter    int
      69             :         mutex      sync.Mutex
      70             :         connType   connectionType
      71             : }
      72             : 
      73             : // The created data queues.
      74             : type dataQueues struct {
      75             :         // `[prefix].[unit].[code].uldata`.
      76             :         uldata gmq.GmqQueue
      77             :         // `[prefix].[unit].[code].dldata`.
      78             :         dldata gmq.GmqQueue
      79             :         // `[prefix].[unit].[code].dldata-resp`: for applications only. `nil` for networks.
      80             :         dldataResp gmq.GmqQueue
      81             :         // `[prefix].[unit].[code].dldata-result`.
      82             :         dldataResult gmq.GmqQueue
      83             :         // `[prefix].[unit].[code].ctrl`.
      84             :         ctrl gmq.GmqQueue
      85             : }
      86             : 
      87             : // Application/Network Manager status.
      88             : type MgrStatus int
      89             : 
      90             : // To identify shared connection type.
      91             : type connectionType int
      92             : 
      93             : // Manager status.
      94             : const (
      95             :         // One or more queues are not connected.
      96             :         NotReady MgrStatus = iota
      97             :         // All queues are connected.
      98             :         Ready
      99             : )
     100             : 
     101             : // Constants.
     102             : const (
     103             :         // Default AMQP prefetch.
     104             :         defPrefetch = 100
     105             : )
     106             : 
     107             : // Connection type.
     108             : const (
     109             :         connTypeAmqp connectionType = iota
     110             :         connTypeMqtt
     111             : )
     112             : 
     113             : // Error response.
     114             : const (
     115             :         errParamCorrID = "the `CorrelationID` must be a non-empty string"
     116             :         errParamData   = "the `Data` must be a hex string"
     117             :         errParamDataID = "the `DataID` must be a non-empty string"
     118             :         errParamAppDev = "one of `DeviceID` or [`NetworkCode`, `NetworkAddr`] pair must be provided with non-empty string"
     119             :         errParamNetDev = "`NetworkAddr` must be a non-empty string"
     120             : )
     121             : 
     122             : // Constants.
     123             : var (
     124             :         // Support application/network host schemes.
     125             :         SupportSchemes = []string{"amqp", "amqps", "mqtt", "mqtts"}
     126             : )
     127             : 
     128           0 : func (s MgrStatus) String() string {
     129           0 :         switch s {
     130           0 :         case NotReady:
     131           0 :                 return "not ready"
     132           0 :         case Ready:
     133           0 :                 return "ready"
     134             :         }
     135           0 :         return "unknown"
     136             : }
     137             : 
     138          65 : func NewConnectionPool() *ConnectionPool {
     139          65 :         return &ConnectionPool{
     140          65 :                 connections: map[string]*counterConnection{},
     141          65 :         }
     142          65 : }
     143             : 
     144          48 : func (p *ConnectionPool) ForceClear() {
     145          48 :         p.mutex.Lock()
     146          48 :         for _, conn := range p.connections {
     147           0 :                 c := conn
     148           0 :                 go func() { _ = c.connection.Close() }()
     149             :         }
     150          48 :         p.connections = map[string]*counterConnection{}
     151          48 :         p.mutex.Unlock()
     152             : }
     153             : 
     154          46 : func (c *counterConnection) add(count int) {
     155          46 :         c.mutex.Lock()
     156          46 :         c.counter += count
     157          46 :         c.mutex.Unlock()
     158          46 : }
     159             : 
     160             : // Utility function to get the message queue connection instance. A new connection will be created
     161             : // if the host does not exist.
     162          64 : func getConnection(pool *ConnectionPool, hostUri url.URL) (*counterConnection, error) {
     163          66 :         if pool == nil {
     164           2 :                 return nil, errors.New("connection pool is nil")
     165           2 :         }
     166          62 :         pool.mutex.Lock()
     167          62 :         defer pool.mutex.Unlock()
     168          62 : 
     169          62 :         conn := pool.connections[hostUri.String()]
     170          76 :         if conn != nil {
     171          14 :                 return conn, nil
     172          14 :         }
     173             : 
     174          48 :         switch hostUri.Scheme {
     175          24 :         case "amqp", "amqps":
     176          24 :                 opts := gmq.AmqpConnectionOptions{
     177          24 :                         URI: hostUri.String(),
     178          24 :                 }
     179          24 :                 conn, err := gmq.NewAmqpConnection(opts)
     180          24 :                 if err != nil {
     181           0 :                         return nil, err
     182           0 :                 }
     183          24 :                 _ = conn.Connect()
     184          24 :                 retConn := &counterConnection{
     185          24 :                         connection: conn,
     186          24 :                         counter:    0,
     187          24 :                         connType:   connTypeAmqp,
     188          24 :                 }
     189          24 :                 pool.connections[hostUri.String()] = retConn
     190          24 :                 return retConn, nil
     191          24 :         case "mqtt", "mqtts":
     192          24 :                 opts := gmq.MqttConnectionOptions{
     193          24 :                         URI: hostUri.String(),
     194          24 :                 }
     195          24 :                 conn, err := gmq.NewMqttConnection(opts)
     196          24 :                 if err != nil {
     197           0 :                         return nil, err
     198           0 :                 }
     199          24 :                 _ = conn.Connect()
     200          24 :                 retConn := &counterConnection{
     201          24 :                         connection: conn,
     202          24 :                         counter:    0,
     203          24 :                         connType:   connTypeMqtt,
     204          24 :                 }
     205          24 :                 pool.connections[hostUri.String()] = retConn
     206          24 :                 return retConn, nil
     207           0 :         default:
     208           0 :                 return nil, errors.New("unsupport scheme" + hostUri.Scheme)
     209             :         }
     210             : }
     211             : 
     212             : // Utility function to remove connection from the pool if the reference count meet zero.
     213          50 : func removeConnection(pool *ConnectionPool, hostUri string, count int) error {
     214          50 :         pool.mutex.Lock()
     215          50 : 
     216          50 :         conn, ok := pool.connections[hostUri]
     217          54 :         if !ok {
     218           4 :                 pool.mutex.Unlock()
     219           4 :                 return nil
     220           4 :         }
     221          46 :         conn.counter -= count
     222          48 :         if conn.counter > 0 {
     223           2 :                 pool.mutex.Unlock()
     224           2 :                 return nil
     225           2 :         }
     226          44 :         delete(pool.connections, hostUri)
     227          44 :         pool.mutex.Unlock()
     228          44 : 
     229          44 :         return conn.connection.Close()
     230             : }
     231             : 
     232             : // The utility function for creating application/network queue.
     233             : func newDataQueues(
     234             :         conn *counterConnection,
     235             :         opts Options,
     236             :         prefix string,
     237             :         isNetwork bool,
     238          62 : ) (*dataQueues, error) {
     239          62 :         var uldata gmq.GmqQueue
     240          62 :         var dldata gmq.GmqQueue
     241          62 :         var dldataResp gmq.GmqQueue
     242          62 :         var dldataResult gmq.GmqQueue
     243          62 :         var ctrl gmq.GmqQueue
     244          62 : 
     245          68 :         if opts.UnitID == "" {
     246           8 :                 if opts.UnitCode != "" {
     247           2 :                         return nil, errors.New("UnitID and UnitCode must both empty or non-empty")
     248           2 :                 }
     249          56 :         } else {
     250          60 :                 if opts.UnitCode == "" {
     251           4 :                         return nil, errors.New("UnitID and UnitCode must both empty or non-empty")
     252           4 :                 }
     253             :         }
     254          62 :         if opts.ID == "" {
     255           6 :                 return nil, errors.New("`ID` cannot be empty")
     256           6 :         }
     257          54 :         if opts.Name == "" {
     258           4 :                 return nil, errors.New("`Name` cannot be empty")
     259           4 :         }
     260             : 
     261          46 :         unit := opts.UnitCode
     262          48 :         if unit == "" {
     263           2 :                 unit = "_"
     264           2 :         }
     265             : 
     266          46 :         var err error
     267          69 :         if conn.connType == connTypeAmqp {
     268          23 :                 prefetch := opts.Prefetch
     269          43 :                 if prefetch == 0 {
     270          20 :                         prefetch = defPrefetch
     271          20 :                 }
     272             : 
     273          23 :                 uldataOpts := gmq.AmqpQueueOptions{
     274          23 :                         Name:       fmt.Sprintf("%s.%s.%s.uldata", prefix, unit, opts.Name),
     275          23 :                         IsRecv:     !isNetwork,
     276          23 :                         Reliable:   true,
     277          23 :                         Persistent: opts.Persistent,
     278          23 :                         Prefetch:   prefetch,
     279          23 :                 }
     280          23 :                 dldataOpts := gmq.AmqpQueueOptions{
     281          23 :                         Name:       fmt.Sprintf("%s.%s.%s.dldata", prefix, unit, opts.Name),
     282          23 :                         IsRecv:     isNetwork,
     283          23 :                         Reliable:   true,
     284          23 :                         Persistent: opts.Persistent,
     285          23 :                         Prefetch:   prefetch,
     286          23 :                 }
     287          23 :                 dldataRespOpts := gmq.AmqpQueueOptions{
     288          23 :                         Name:       fmt.Sprintf("%s.%s.%s.dldata-resp", prefix, unit, opts.Name),
     289          23 :                         IsRecv:     !isNetwork,
     290          23 :                         Reliable:   true,
     291          23 :                         Persistent: opts.Persistent,
     292          23 :                         Prefetch:   prefetch,
     293          23 :                 }
     294          23 :                 dldataResultOpts := gmq.AmqpQueueOptions{
     295          23 :                         Name:       fmt.Sprintf("%s.%s.%s.dldata-result", prefix, unit, opts.Name),
     296          23 :                         IsRecv:     !isNetwork,
     297          23 :                         Reliable:   true,
     298          23 :                         Persistent: opts.Persistent,
     299          23 :                         Prefetch:   prefetch,
     300          23 :                 }
     301          23 :                 ctrlOpts := gmq.AmqpQueueOptions{
     302          23 :                         Name:     fmt.Sprintf("%s.%s.%s.ctrl", prefix, unit, opts.Name),
     303          23 :                         IsRecv:   true,
     304          23 :                         Reliable: true,
     305          23 :                         Prefetch: prefetch,
     306          23 :                 }
     307          23 :                 _conn := conn.connection.(*gmq.AmqpConnection)
     308          23 :                 if uldata, err = gmq.NewAmqpQueue(uldataOpts, _conn); err != nil {
     309           0 :                         return nil, err
     310          23 :                 } else if dldata, err = gmq.NewAmqpQueue(dldataOpts, _conn); err != nil {
     311           0 :                         return nil, err
     312          23 :                 } else if dldataResp, err = gmq.NewAmqpQueue(dldataRespOpts, _conn); err != nil {
     313           0 :                         return nil, err
     314          23 :                 } else if dldataResult, err = gmq.NewAmqpQueue(dldataResultOpts, _conn); err != nil {
     315           0 :                         return nil, err
     316          23 :                 } else if ctrl, err = gmq.NewAmqpQueue(ctrlOpts, _conn); err != nil {
     317           0 :                         return nil, err
     318           0 :                 }
     319          46 :         } else if conn.connType == connTypeMqtt {
     320          23 :                 uldataOpts := gmq.MqttQueueOptions{
     321          23 :                         Name:         fmt.Sprintf("%s.%s.%s.uldata", prefix, unit, opts.Name),
     322          23 :                         IsRecv:       !isNetwork,
     323          23 :                         Reliable:     true,
     324          23 :                         SharedPrefix: opts.SharedPrefix,
     325          23 :                 }
     326          23 :                 dldataOpts := gmq.MqttQueueOptions{
     327          23 :                         Name:         fmt.Sprintf("%s.%s.%s.dldata", prefix, unit, opts.Name),
     328          23 :                         IsRecv:       isNetwork,
     329          23 :                         Reliable:     true,
     330          23 :                         SharedPrefix: opts.SharedPrefix,
     331          23 :                 }
     332          23 :                 dldataRespOpts := gmq.MqttQueueOptions{
     333          23 :                         Name:         fmt.Sprintf("%s.%s.%s.dldata-resp", prefix, unit, opts.Name),
     334          23 :                         IsRecv:       !isNetwork,
     335          23 :                         Reliable:     true,
     336          23 :                         SharedPrefix: opts.SharedPrefix,
     337          23 :                 }
     338          23 :                 dldataResultOpts := gmq.MqttQueueOptions{
     339          23 :                         Name:         fmt.Sprintf("%s.%s.%s.dldata-result", prefix, unit, opts.Name),
     340          23 :                         IsRecv:       !isNetwork,
     341          23 :                         Reliable:     true,
     342          23 :                         SharedPrefix: opts.SharedPrefix,
     343          23 :                 }
     344          23 :                 ctrlOpts := gmq.MqttQueueOptions{
     345          23 :                         Name:         fmt.Sprintf("%s.%s.%s.ctrl", prefix, unit, opts.Name),
     346          23 :                         IsRecv:       true,
     347          23 :                         Reliable:     true,
     348          23 :                         SharedPrefix: opts.SharedPrefix,
     349          23 :                 }
     350          23 :                 _conn := conn.connection.(*gmq.MqttConnection)
     351          23 :                 if uldata, err = gmq.NewMqttQueue(uldataOpts, _conn); err != nil {
     352           0 :                         return nil, err
     353          23 :                 } else if dldata, err = gmq.NewMqttQueue(dldataOpts, _conn); err != nil {
     354           0 :                         return nil, err
     355          23 :                 } else if dldataResp, err = gmq.NewMqttQueue(dldataRespOpts, _conn); err != nil {
     356           0 :                         return nil, err
     357          23 :                 } else if dldataResult, err = gmq.NewMqttQueue(dldataResultOpts, _conn); err != nil {
     358           0 :                         return nil, err
     359          23 :                 } else if ctrl, err = gmq.NewMqttQueue(ctrlOpts, _conn); err != nil {
     360           0 :                         return nil, err
     361           0 :                 }
     362           0 :         } else {
     363           0 :                 return nil, errors.New("unknown shared connection type")
     364           0 :         }
     365             : 
     366          46 :         return &dataQueues{
     367          46 :                 uldata:       uldata,
     368          46 :                 dldata:       dldata,
     369          46 :                 dldataResp:   dldataResp,
     370          46 :                 dldataResult: dldataResult,
     371          46 :                 ctrl:         ctrl,
     372          46 :         }, nil
     373             : }

