mqtt.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. package bus
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "sync"
  6. "time"
  7. "github.com/ammeter/config"
  8. MQTT "github.com/eclipse/paho.mqtt.golang"
  9. "github.com/yuguorong/go/log"
  10. )
  11. type QMqtt struct {
  12. baseBus `json:"-" gorm:"-"`
  13. User string `json:"accessToken" gorm:"not null"`
  14. MqttUrl string `json:"mqttserver"`
  15. Uid string `json:"id" gorm:"primary_key:unique"`
  16. Name string `json:"name" gorm:"not null"`
  17. Pswd string `json:"-" gorm:"-"`
  18. Qos byte `json:"-" gorm:"-"`
  19. cnn MQTT.Client `json:"-" gorm:"-"`
  20. cliLock *sync.Mutex `json:"-" gorm:"-"`
  21. }
  22. type CbRegistMqttSubs func(cnn *QMqtt) int
  23. var cbMqttEdgeConfig MQTT.MessageHandler = func(client MQTT.Client, msg MQTT.Message) {
  24. log.Infof("R_TOPIC: %s\n", msg.Topic())
  25. log.Infof("R_MSG: %s\n", msg.Payload())
  26. }
  27. func (mqtt *QMqtt) subTopic(subtopic string, Qos byte, hdlr MQTT.MessageHandler) {
  28. log.Info("MQTT sub :[" + subtopic + "]")
  29. if token := mqtt.cnn.Subscribe(subtopic, Qos, hdlr); token.Wait() && token.Error() != nil {
  30. log.Info(token.Error())
  31. }
  32. }
  33. func (mqtt *QMqtt) PublishObj(topic string, qos byte, in interface{}) {
  34. jsonBytes, _ := json.Marshal(in)
  35. sjson := string(jsonBytes)
  36. log.Info(mqtt.User, ",", topic)
  37. log.Info(sjson)
  38. mqtt.cnn.Publish(topic, qos, true, sjson)
  39. }
  40. //MqttDisocnnect mqtt disconnect
  41. func (mqtt *QMqtt) Disocnnect() {
  42. log.Infof("mqtt [%s] disconnected!\n", mqtt.Name)
  43. mqtt.cliLock.Lock()
  44. mqtt.cnn.Disconnect(0)
  45. mqtt.cnn = nil
  46. mqtt.cliLock.Unlock()
  47. }
  48. //MqttConnServer mqtt init and connect server
  49. func (mqtt *QMqtt) ConnServer(retry int, cbSubs CbRegistMqttSubs) error {
  50. suid := mqtt.Uid //"8df8de45-efa6-419a-a46c-acf30f017da5"
  51. if mqtt.cnn != nil {
  52. return errors.New("Mqtt already connected")
  53. }
  54. opts := MQTT.NewClientOptions().AddBroker(mqtt.MqttUrl)
  55. opts.SetClientID(suid)
  56. opts.SetUsername(mqtt.User)
  57. opts.SetPassword(mqtt.Pswd)
  58. opts.SetDefaultPublishHandler(cbMqttEdgeConfig)
  59. mqtt.cliLock.Lock()
  60. defer mqtt.cliLock.Unlock()
  61. //create and start a client using the above ClientOptions
  62. mqtt.cnn = MQTT.NewClient(opts)
  63. for ; retry != 0; retry-- {
  64. if token := mqtt.cnn.Connect(); token.Wait() && token.Error() == nil {
  65. if cbSubs != nil {
  66. cbSubs(mqtt)
  67. }
  68. log.Info("MQTT connect " + mqtt.User + " OK!")
  69. return nil
  70. } else {
  71. log.Error("Retry to connect the MQTT server!! ", token.Error())
  72. }
  73. time.Sleep(time.Duration(3) * time.Second)
  74. }
  75. return errors.New("Fault Error! can not connect MQTT server!!!")
  76. }
  77. func (mqtt *QMqtt) StartServer() error {
  78. return mqtt.ConnServer(-1, AppMqttSubs)
  79. }
  80. func (mqtt *QMqtt) StopServer() {
  81. mqtt.Disocnnect()
  82. log.Info("Stop MQTT server")
  83. }
  84. func (mqtt *QMqtt) Init() error {
  85. mqtt.baseBus.Init()
  86. go mqtt.StartServer()
  87. return nil
  88. }
  89. func (mqtt *QMqtt) Uninit() {
  90. mqtt.baseBus.Uninit()
  91. mqtt.StopServer()
  92. }
  93. func (mqtt *QMqtt) Send(chn IChannel, buff interface{}) (int, error) {
  94. mqtt.PublishObj(chn.ID(), mqtt.Qos, buff)
  95. return mqtt.baseBus.Send(chn, buff)
  96. }
  97. func (mqtt *QMqtt) Recive(chn IChannel, buff interface{}) (int, error) {
  98. return mqtt.baseBus.Recive(chn, buff)
  99. }
  100. func AppMqttSubs(mqtt *QMqtt) int {
  101. mqtt.subTopic("/sub/default", 1, cbMqttEdgeConfig)
  102. return 0
  103. }
  104. func NewMqtt(param []interface{}) IBus {
  105. mq := &QMqtt{
  106. User: "gZdomIS9Hz3d7HxvcoNx",
  107. Pswd: "",
  108. MqttUrl: "test-sbuilding.pacom.cn:1885",
  109. Name: "mqtt",
  110. cnn: nil,
  111. cliLock: new(sync.Mutex),
  112. }
  113. mq.BusId = GenMqttId("mqtt", param)
  114. if len(param) >= 1 {
  115. var cfgmq *QMqtt = nil
  116. switch param[0].(type) {
  117. case string:
  118. cfg := config.GetSysConfig().GetProfile(param[0].(string), mq)
  119. if cfg != nil {
  120. cfgmq = cfg.(*QMqtt)
  121. }
  122. case *QMqtt:
  123. cfgmq = param[0].(*QMqtt)
  124. }
  125. if cfgmq != nil {
  126. mq.User = cfgmq.User
  127. mq.MqttUrl = cfgmq.MqttUrl
  128. mq.Pswd = cfgmq.Pswd
  129. mq.Name = cfgmq.Name
  130. }
  131. }
  132. return mq
  133. }
  134. func GenMqttId(name string, param []interface{}) string {
  135. if len(param) > 0 {
  136. switch param[0].(type) {
  137. case string:
  138. return "mqtt-" + param[0].(string)
  139. case *QMqtt:
  140. return "mqtt-" + param[0].(*QMqtt).User
  141. }
  142. }
  143. return "mqtt"
  144. }
  145. func init() {
  146. BusReg["mqtt"] = NewMqtt
  147. BusGetID["mqtt"] = GenMqttId
  148. }