package main import ( "encoding/json" "fmt" "strconv" "strings" "time" MQTT "github.com/eclipse/paho.mqtt.golang" "github.com/yuguorong/go/log" ) type QdMqtt struct { User string Pswd string MqttUrl string MqttPort string cnn MQTT.Client } const ( mqttBrokerUser = "" mqttBrokerPswd = "" ) type CbRegistMqttSubs func(cnn *QdMqtt) int var cbMqttEdgeConfig MQTT.MessageHandler = func(client MQTT.Client, msg MQTT.Message) { log.Infof("R_TOPIC: %s\n", msg.Topic()) log.Infof("R_MSG: %s\n", msg.Payload()) } func Mqttsubs(mqtt *QdMqtt, subtopic string, Qos byte, hdlr MQTT.MessageHandler) { fmt.Println("MQTT sub :[" + subtopic + "]") if token := mqtt.cnn.Subscribe(subtopic, Qos, hdlr); token.Wait() && token.Error() != nil { log.Info(token.Error()) } } func (mqtt *QdMqtt) PublishObj(topic string, qos byte, in interface{}) { jsonBytes, _ := json.Marshal(in) sjson := string(jsonBytes) log.Info(topic) log.Info(sjson) mqtt.cnn.Publish(topic, qos, true, sjson) } //MqttDisocnnect mqtt disconnect func (mqtt *QdMqtt) Disocnnect() { mqtt.cnn.Disconnect(0) } //MqttConnServer mqtt init and connect server func (mqtt *QdMqtt) ConnServer(retry int, cbSubs CbRegistMqttSubs) { //create a ClientOptions struct setting the broker address, clientid, turn //off trace output and set the default message handler suid := "8d18db45-2fa6-410a-a46d-acf30f011dac" //suid := "e6cda680-fc16-11ec-a3de-a55303223d4b" brokerurl := "tcp://" + strings.Trim(mqtt.MqttUrl, " ") + ":" + strings.Trim(mqtt.MqttPort, "") log.Info("Mqtt broker:", brokerurl) opts := MQTT.NewClientOptions().AddBroker(brokerurl) opts.SetClientID(suid) opts.SetUsername(mqtt.User) opts.SetPassword(mqtt.Pswd) opts.SetDefaultPublishHandler(cbMqttEdgeConfig) //create and start a client using the above ClientOptions mqtt.cnn = MQTT.NewClient(opts) for ; retry != 0; retry-- { if token := mqtt.cnn.Connect(); token.Wait() && token.Error() == nil { if cbSubs != nil { cbSubs(mqtt) } log.Info("MQTT connect OK!") return } else { log.Error("Retry to connect the MQTT server!! ", token.Error()) } time.Sleep(time.Duration(3) * time.Second) } log.Info("Fault Error! can not connect MQTT server!!!") } func AppMqttSubs(mqtt *QdMqtt) int { Mqttsubs(mqtt, "/sub/default", 1, cbMqttEdgeConfig) return 0 } var mqServer *QdMqtt = nil func MqttConnServer(cbSubs CbRegistMqttSubs) *QdMqtt { mq := &QdMqtt{ User: "tD0geKmBqrxHk3QDnZ7u", //"5V1K3yCy5I5TRlVoCeeY", //"6XiAxAxtm0uTPnzwjtWd", Pswd: "", MqttUrl: "test-sbuilding.pacom.cn", MqttPort: "1885", } mq.ConnServer(-1, cbSubs) return mq } //{"ts":1632289574000, "values":{"temperature":"26.6", "pressure":"1000"}} func ReportTelemty(ts uint32, temp uint16, humidity uint16) { devVal := make(map[string]interface{}) devVal["ts"] = float64(ts) * 1000 devItems := make(map[string]string) devItems["temperature"] = strconv.Itoa(int(temp)/10) + "." + strconv.Itoa(int(temp)%10) devItems["humidity"] = strconv.Itoa(int(humidity)/10) + "." + strconv.Itoa(int(humidity)%10) devVal["values"] = devItems mqServer.PublishObj("v1/devices/me/telemetry", 1, devVal) } func StartMqttServer() { log.Info("Start MQTT server") mqServer = MqttConnServer(AppMqttSubs) } func StopMqttServer() { mqServer.Disocnnect() log.Info("Stop MQTT server") }