mqtt.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. package bus
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "sync"
  6. "time"
  7. "github.com/ammeter/config"
  8. "github.com/ammeter/util"
  9. MQTT "github.com/eclipse/paho.mqtt.golang"
  10. "github.com/yuguorong/go/log"
  11. )
  12. type QMqtt struct {
  13. baseBus `json:"-" gorm:"-"`
  14. User string `json:"accessToken" gorm:"not null"`
  15. MqttUrl string `json:"mqttserver"`
  16. Uid string `json:"id" gorm:"primary_key:unique"`
  17. Name string `json:"name" gorm:"not null"`
  18. Pswd string `json:"-" gorm:"-"`
  19. Qos byte `json:"-" gorm:"-"`
  20. cnn MQTT.Client `json:"-" gorm:"-"`
  21. cliLock *sync.Mutex `json:"-" gorm:"-"`
  22. }
  23. type CbRegistMqttSubs func(cnn *QMqtt) int
  24. var cbMqttEdgeConfig MQTT.MessageHandler = func(client MQTT.Client, msg MQTT.Message) {
  25. log.Infof("R_TOPIC: %s\n", msg.Topic())
  26. log.Infof("R_MSG: %s\n", msg.Payload())
  27. }
  28. func (mqtt *QMqtt) subTopic(subtopic string, Qos byte, hdlr MQTT.MessageHandler) {
  29. log.Info("MQTT sub :[" + subtopic + "]")
  30. if token := mqtt.cnn.Subscribe(subtopic, Qos, hdlr); token.Wait() && token.Error() != nil {
  31. log.Info(token.Error())
  32. }
  33. }
  34. func (mqtt *QMqtt) PublishObj(topic string, qos byte, in interface{}) {
  35. jsonBytes, _ := json.Marshal(in)
  36. sjson := string(jsonBytes)
  37. log.Info(mqtt.User, ",", topic)
  38. log.Info(sjson)
  39. mqtt.cnn.Publish(topic, qos, true, sjson)
  40. }
  41. //MqttDisocnnect mqtt disconnect
  42. func (mqtt *QMqtt) Disocnnect() {
  43. mqtt.cliLock.Lock()
  44. mqtt.cnn.Disconnect(0)
  45. mqtt.cnn = nil
  46. mqtt.cliLock.Unlock()
  47. }
  48. //MqttConnServer mqtt init and connect server
  49. func (mqtt *QMqtt) ConnServer(retry int, cbSubs CbRegistMqttSubs) error {
  50. suid := mqtt.Uid //"8df8de45-efa6-419a-a46c-acf30f017da5"
  51. if mqtt.cnn != nil {
  52. return errors.New("Mqtt already connected")
  53. }
  54. opts := MQTT.NewClientOptions().AddBroker(mqtt.MqttUrl)
  55. opts.SetClientID(suid)
  56. opts.SetUsername(mqtt.User)
  57. opts.SetPassword(mqtt.Pswd)
  58. opts.SetDefaultPublishHandler(cbMqttEdgeConfig)
  59. mqtt.cliLock.Lock()
  60. defer mqtt.cliLock.Unlock()
  61. //create and start a client using the above ClientOptions
  62. mqtt.cnn = MQTT.NewClient(opts)
  63. for ; retry != 0; retry-- {
  64. if token := mqtt.cnn.Connect(); token.Wait() && token.Error() == nil {
  65. if cbSubs != nil {
  66. cbSubs(mqtt)
  67. }
  68. log.Info("MQTT connect " + mqtt.User + " OK!")
  69. return nil
  70. } else {
  71. log.Error("Retry to connect the MQTT server!! ", token.Error())
  72. }
  73. time.Sleep(time.Duration(3) * time.Second)
  74. }
  75. return errors.New("Fault Error! can not connect MQTT server!!!")
  76. }
  77. func (mqtt *QMqtt) StartServer() error {
  78. return mqtt.ConnServer(-1, AppMqttSubs)
  79. }
  80. func (mqtt *QMqtt) StopServer() {
  81. mqtt.Disocnnect()
  82. log.Info("Stop MQTT server")
  83. }
  84. func (mqtt *QMqtt) Init() error {
  85. mqtt.baseBus.Init()
  86. go mqtt.StartServer()
  87. return nil
  88. }
  89. func (mqtt *QMqtt) Uninit() {
  90. mqtt.baseBus.Uninit()
  91. mqtt.StopServer()
  92. }
  93. func (mqtt *QMqtt) Send(chn IChannel, buff interface{}) (int, error) {
  94. mqtt.PublishObj(chn.ID(), mqtt.Qos, buff)
  95. return mqtt.baseBus.Send(chn, buff)
  96. }
  97. func (mqtt *QMqtt) Recive(chn IChannel, buff interface{}) (int, error) {
  98. return mqtt.baseBus.Recive(chn, buff)
  99. }
  100. func AppMqttSubs(mqtt *QMqtt) int {
  101. mqtt.subTopic("/sub/default", 1, cbMqttEdgeConfig)
  102. return 0
  103. }
  104. func NewMqtt(param []interface{}) IBus {
  105. mq := &QMqtt{
  106. User: "gZdomIS9Hz3d7HxvcoNx",
  107. Pswd: "",
  108. MqttUrl: "test-sbuilding.pacom.cn:1885",
  109. Name: "mqtt",
  110. cnn: nil,
  111. cliLock: new(sync.Mutex),
  112. }
  113. mq.BusId = GenMqttId("mqtt", param)
  114. if len(param) >= 1 {
  115. var cfgmq *QMqtt = nil
  116. switch param[0].(type) {
  117. case string:
  118. cfg := config.GetSysConfig().GetProfile(param[0].(string), mq)
  119. if cfg != nil {
  120. cfgmq = cfg.(*QMqtt)
  121. }
  122. case *QMqtt:
  123. cfgmq = param[0].(*QMqtt)
  124. }
  125. if cfgmq != nil {
  126. mq.User = cfgmq.User
  127. mq.MqttUrl = cfgmq.MqttUrl
  128. mq.Pswd = cfgmq.Pswd
  129. mq.Name = cfgmq.Name
  130. }
  131. if util.DebugLevel() > 0 {
  132. mq.User = cfgmq.Name
  133. mq.MqttUrl = "192.168.255.104:1883"
  134. mq.Name = cfgmq.Name
  135. mq.Pswd = "pacom"
  136. }
  137. }
  138. return mq
  139. }
  140. func GenMqttId(name string, param []interface{}) string {
  141. if len(param) > 0 {
  142. switch param[0].(type) {
  143. case string:
  144. return "mqtt-" + param[0].(string)
  145. case *QMqtt:
  146. return "mqtt-" + param[0].(*QMqtt).User
  147. }
  148. }
  149. return "mqtt"
  150. }
  151. func init() {
  152. BusReg["mqtt"] = NewMqtt
  153. BusGetID["mqtt"] = GenMqttId
  154. }