mqtt.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "strconv"
  6. "strings"
  7. "time"
  8. MQTT "github.com/eclipse/paho.mqtt.golang"
  9. "github.com/yuguorong/go/log"
  10. )
  11. type QdMqtt struct {
  12. User string
  13. Pswd string
  14. MqttUrl string
  15. MqttPort string
  16. cnn MQTT.Client
  17. }
  18. const (
  19. mqttBrokerUser = ""
  20. mqttBrokerPswd = ""
  21. )
  22. type CbRegistMqttSubs func(cnn *QdMqtt) int
  23. var cbMqttEdgeConfig MQTT.MessageHandler = func(client MQTT.Client, msg MQTT.Message) {
  24. log.Infof("R_TOPIC: %s\n", msg.Topic())
  25. log.Infof("R_MSG: %s\n", msg.Payload())
  26. }
  27. func Mqttsubs(mqtt *QdMqtt, subtopic string, Qos byte, hdlr MQTT.MessageHandler) {
  28. fmt.Println("MQTT sub :[" + subtopic + "]")
  29. if token := mqtt.cnn.Subscribe(subtopic, Qos, hdlr); token.Wait() && token.Error() != nil {
  30. log.Info(token.Error())
  31. }
  32. }
  33. func (mqtt *QdMqtt) PublishObj(topic string, qos byte, in interface{}) {
  34. jsonBytes, _ := json.Marshal(in)
  35. sjson := string(jsonBytes)
  36. log.Info(topic)
  37. log.Info(sjson)
  38. mqtt.cnn.Publish(topic, qos, true, sjson)
  39. }
  40. //MqttDisocnnect mqtt disconnect
  41. func (mqtt *QdMqtt) Disocnnect() {
  42. mqtt.cnn.Disconnect(0)
  43. }
  44. //MqttConnServer mqtt init and connect server
  45. func (mqtt *QdMqtt) ConnServer(retry int, cbSubs CbRegistMqttSubs) {
  46. //create a ClientOptions struct setting the broker address, clientid, turn
  47. //off trace output and set the default message handler
  48. suid := "8d18db45-2fa6-410a-a46d-acf30f011dac"
  49. //suid := "e6cda680-fc16-11ec-a3de-a55303223d4b"
  50. brokerurl := "tcp://" + strings.Trim(mqtt.MqttUrl, " ") + ":" + strings.Trim(mqtt.MqttPort, "")
  51. log.Info("Mqtt broker:", brokerurl)
  52. opts := MQTT.NewClientOptions().AddBroker(brokerurl)
  53. opts.SetClientID(suid)
  54. opts.SetUsername(mqtt.User)
  55. opts.SetPassword(mqtt.Pswd)
  56. opts.SetDefaultPublishHandler(cbMqttEdgeConfig)
  57. //create and start a client using the above ClientOptions
  58. mqtt.cnn = MQTT.NewClient(opts)
  59. for ; retry != 0; retry-- {
  60. if token := mqtt.cnn.Connect(); token.Wait() && token.Error() == nil {
  61. if cbSubs != nil {
  62. cbSubs(mqtt)
  63. }
  64. log.Info("MQTT connect OK!")
  65. return
  66. } else {
  67. log.Error("Retry to connect the MQTT server!! ", token.Error())
  68. }
  69. time.Sleep(time.Duration(3) * time.Second)
  70. }
  71. log.Info("Fault Error! can not connect MQTT server!!!")
  72. }
  73. func AppMqttSubs(mqtt *QdMqtt) int {
  74. Mqttsubs(mqtt, "/sub/default", 1, cbMqttEdgeConfig)
  75. return 0
  76. }
  77. var mqServer *QdMqtt = nil
  78. func MqttConnServer(cbSubs CbRegistMqttSubs) *QdMqtt {
  79. mq := &QdMqtt{
  80. User: "tD0geKmBqrxHk3QDnZ7u", //"5V1K3yCy5I5TRlVoCeeY", //"6XiAxAxtm0uTPnzwjtWd",
  81. Pswd: "",
  82. MqttUrl: "test-sbuilding.pacom.cn",
  83. MqttPort: "1885",
  84. }
  85. mq.ConnServer(-1, cbSubs)
  86. return mq
  87. }
  88. //{"ts":1632289574000, "values":{"temperature":"26.6", "pressure":"1000"}}
  89. func ReportTelemty(ts uint32, temp uint16, humidity uint16) {
  90. devVal := make(map[string]interface{})
  91. devVal["ts"] = float64(ts) * 1000
  92. devItems := make(map[string]string)
  93. devItems["temperature"] = strconv.Itoa(int(temp)/10) + "." + strconv.Itoa(int(temp)%10)
  94. devItems["humidity"] = strconv.Itoa(int(humidity)/10) + "." + strconv.Itoa(int(humidity)%10)
  95. devVal["values"] = devItems
  96. mqServer.PublishObj("v1/devices/me/telemetry", 1, devVal)
  97. }
  98. func StartMqttServer() {
  99. log.Info("Start MQTT server")
  100. mqServer = MqttConnServer(AppMqttSubs)
  101. }
  102. func StopMqttServer() {
  103. mqServer.Disocnnect()
  104. log.Info("Stop MQTT server")
  105. }