mqtt.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. package bus
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "sync"
  6. "time"
  7. "github.com/ammeter/config"
  8. MQTT "github.com/eclipse/paho.mqtt.golang"
  9. "github.com/yuguorong/go/log"
  10. )
  11. type QMqtt struct {
  12. baseBus
  13. User string `json:"accessToken"`
  14. MqttUrl string `json:"mqttserver"`
  15. Uid string `json:"id"`
  16. Name string `json:"name"`
  17. Pswd string `json:-`
  18. Qos byte `json:-`
  19. cnn MQTT.Client `json:-`
  20. cliLock *sync.Mutex `json:-`
  21. }
  22. type CbRegistMqttSubs func(cnn *QMqtt) int
  23. var cbMqttEdgeConfig MQTT.MessageHandler = func(client MQTT.Client, msg MQTT.Message) {
  24. log.Infof("R_TOPIC: %s\n", msg.Topic())
  25. log.Infof("R_MSG: %s\n", msg.Payload())
  26. }
  27. func (mqtt *QMqtt) subTopic(subtopic string, Qos byte, hdlr MQTT.MessageHandler) {
  28. log.Info("MQTT sub :[" + subtopic + "]")
  29. if token := mqtt.cnn.Subscribe(subtopic, Qos, hdlr); token.Wait() && token.Error() != nil {
  30. log.Info(token.Error())
  31. }
  32. }
  33. func (mqtt *QMqtt) PublishObj(topic string, qos byte, in interface{}) {
  34. jsonBytes, _ := json.Marshal(in)
  35. sjson := string(jsonBytes)
  36. log.Info(topic)
  37. log.Info(sjson)
  38. mqtt.cnn.Publish(topic, qos, true, sjson)
  39. }
  40. //MqttDisocnnect mqtt disconnect
  41. func (mqtt *QMqtt) Disocnnect() {
  42. mqtt.cliLock.Lock()
  43. mqtt.cnn.Disconnect(0)
  44. mqtt.cnn = nil
  45. mqtt.cliLock.Unlock()
  46. }
  47. //MqttConnServer mqtt init and connect server
  48. func (mqtt *QMqtt) ConnServer(retry int, cbSubs CbRegistMqttSubs) error {
  49. suid := mqtt.Uid //"8df8de45-efa6-419a-a46c-acf30f017da5"
  50. if mqtt.cnn != nil {
  51. return errors.New("Mqtt already connected")
  52. }
  53. opts := MQTT.NewClientOptions().AddBroker(mqtt.MqttUrl)
  54. opts.SetClientID(suid)
  55. opts.SetUsername(mqtt.User)
  56. opts.SetPassword(mqtt.Pswd)
  57. opts.SetDefaultPublishHandler(cbMqttEdgeConfig)
  58. mqtt.cliLock.Lock()
  59. defer mqtt.cliLock.Unlock()
  60. //create and start a client using the above ClientOptions
  61. mqtt.cnn = MQTT.NewClient(opts)
  62. for ; retry != 0; retry-- {
  63. if token := mqtt.cnn.Connect(); token.Wait() && token.Error() == nil {
  64. if cbSubs != nil {
  65. cbSubs(mqtt)
  66. }
  67. log.Info("MQTT connect OK!")
  68. return nil
  69. } else {
  70. log.Error("Retry to connect the MQTT server!! ", token.Error())
  71. }
  72. time.Sleep(time.Duration(3) * time.Second)
  73. }
  74. return errors.New("Fault Error! can not connect MQTT server!!!")
  75. }
  76. func (mqtt *QMqtt) StartServer() error {
  77. return mqtt.ConnServer(-1, AppMqttSubs)
  78. }
  79. func (mqtt *QMqtt) StopServer() {
  80. mqtt.Disocnnect()
  81. log.Info("Stop MQTT server")
  82. }
  83. func (mqtt *QMqtt) Init() error {
  84. mqtt.baseBus.Init()
  85. go mqtt.StartServer()
  86. return nil
  87. }
  88. func (mqtt *QMqtt) Uninit() {
  89. mqtt.baseBus.Uninit()
  90. mqtt.StopServer()
  91. }
  92. func (mqtt *QMqtt) Send(chn IChannel, buff interface{}) (int, error) {
  93. mqtt.PublishObj(chn.ID(), mqtt.Qos, buff)
  94. return 0, nil
  95. }
  96. func (mqtt *QMqtt) Recive(chn IChannel, buff interface{}) (int, error) {
  97. return 0, nil
  98. }
  99. func AppMqttSubs(mqtt *QMqtt) int {
  100. mqtt.subTopic("/sub/default", 1, cbMqttEdgeConfig)
  101. return 0
  102. }
  103. func NewMqtt(param []interface{}) IBus {
  104. mq := &QMqtt{
  105. User: "gZdomIS9Hz3d7HxvcoNx",
  106. Pswd: "",
  107. MqttUrl: "test-sbuilding.pacom.cn:1885",
  108. Name: "mqtt",
  109. cnn: nil,
  110. cliLock: new(sync.Mutex),
  111. }
  112. if len(param) >= 1 {
  113. var cfgmq *QMqtt = nil
  114. switch param[0].(type) {
  115. case string:
  116. cfg := config.GetSysConfig().GetProfile(param[0].(string), mq)
  117. if cfg != nil {
  118. cfgmq = cfg.(*QMqtt)
  119. }
  120. case *QMqtt:
  121. cfgmq = param[0].(*QMqtt)
  122. }
  123. if cfgmq != nil {
  124. mq.User = cfgmq.User
  125. mq.MqttUrl = cfgmq.MqttUrl
  126. mq.Pswd = cfgmq.Pswd
  127. mq.Name = cfgmq.Name
  128. }
  129. }
  130. return mq
  131. }
  132. func init() {
  133. BusReg["mqtt"] = NewMqtt
  134. }