123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122 |
- package main
- import (
- "encoding/json"
- "fmt"
- "strconv"
- "strings"
- "time"
- MQTT "github.com/eclipse/paho.mqtt.golang"
- "github.com/yuguorong/go/log"
- )
- type QdMqtt struct {
- User string
- Pswd string
- MqttUrl string
- MqttPort string
- cnn MQTT.Client
- }
- const (
- mqttBrokerUser = ""
- mqttBrokerPswd = ""
- )
- type CbRegistMqttSubs func(cnn *QdMqtt) int
- var cbMqttEdgeConfig MQTT.MessageHandler = func(client MQTT.Client, msg MQTT.Message) {
- log.Infof("R_TOPIC: %s\n", msg.Topic())
- log.Infof("R_MSG: %s\n", msg.Payload())
- }
- func Mqttsubs(mqtt *QdMqtt, subtopic string, Qos byte, hdlr MQTT.MessageHandler) {
- fmt.Println("MQTT sub :[" + subtopic + "]")
- if token := mqtt.cnn.Subscribe(subtopic, Qos, hdlr); token.Wait() && token.Error() != nil {
- log.Info(token.Error())
- }
- }
- func (mqtt *QdMqtt) PublishObj(topic string, qos byte, in interface{}) {
- jsonBytes, _ := json.Marshal(in)
- sjson := string(jsonBytes)
- log.Info(topic)
- log.Info(sjson)
- mqtt.cnn.Publish(topic, qos, true, sjson)
- }
- //MqttDisocnnect mqtt disconnect
- func (mqtt *QdMqtt) Disocnnect() {
- mqtt.cnn.Disconnect(0)
- }
- //MqttConnServer mqtt init and connect server
- func (mqtt *QdMqtt) ConnServer(retry int, cbSubs CbRegistMqttSubs) {
- //create a ClientOptions struct setting the broker address, clientid, turn
- //off trace output and set the default message handler
- suid := "8d18db45-2fa6-410a-a46d-acf30f011dac"
- //suid := "e6cda680-fc16-11ec-a3de-a55303223d4b"
- brokerurl := "tcp://" + strings.Trim(mqtt.MqttUrl, " ") + ":" + strings.Trim(mqtt.MqttPort, "")
- log.Info("Mqtt broker:", brokerurl)
- opts := MQTT.NewClientOptions().AddBroker(brokerurl)
- opts.SetClientID(suid)
- opts.SetUsername(mqtt.User)
- opts.SetPassword(mqtt.Pswd)
- opts.SetDefaultPublishHandler(cbMqttEdgeConfig)
- //create and start a client using the above ClientOptions
- mqtt.cnn = MQTT.NewClient(opts)
- for ; retry != 0; retry-- {
- if token := mqtt.cnn.Connect(); token.Wait() && token.Error() == nil {
- if cbSubs != nil {
- cbSubs(mqtt)
- }
- log.Info("MQTT connect OK!")
- return
- } else {
- log.Error("Retry to connect the MQTT server!! ", token.Error())
- }
- time.Sleep(time.Duration(3) * time.Second)
- }
- log.Info("Fault Error! can not connect MQTT server!!!")
- }
- func AppMqttSubs(mqtt *QdMqtt) int {
- Mqttsubs(mqtt, "/sub/default", 1, cbMqttEdgeConfig)
- return 0
- }
- var mqServer *QdMqtt = nil
- func MqttConnServer(cbSubs CbRegistMqttSubs) *QdMqtt {
- mq := &QdMqtt{
- User: "tD0geKmBqrxHk3QDnZ7u", //"5V1K3yCy5I5TRlVoCeeY", //"6XiAxAxtm0uTPnzwjtWd",
- Pswd: "",
- MqttUrl: "test-sbuilding.pacom.cn",
- MqttPort: "1885",
- }
- mq.ConnServer(-1, cbSubs)
- return mq
- }
- //{"ts":1632289574000, "values":{"temperature":"26.6", "pressure":"1000"}}
- func ReportTelemty(ts uint32, temp uint16, humidity uint16) {
- devVal := make(map[string]interface{})
- devVal["ts"] = float64(ts) * 1000
- devItems := make(map[string]string)
- devItems["temperature"] = strconv.Itoa(int(temp)/10) + "." + strconv.Itoa(int(temp)%10)
- devItems["humidity"] = strconv.Itoa(int(humidity)/10) + "." + strconv.Itoa(int(humidity)%10)
- devVal["values"] = devItems
- mqServer.PublishObj("v1/devices/me/telemetry", 1, devVal)
- }
- func StartMqttServer() {
- log.Info("Start MQTT server")
- mqServer = MqttConnServer(AppMqttSubs)
- }
- func StopMqttServer() {
- mqServer.Disocnnect()
- log.Info("Stop MQTT server")
- }
|