123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169 |
- package bus
- import (
- "encoding/json"
- "errors"
- "sync"
- "time"
- "github.com/ammeter/config"
- MQTT "github.com/eclipse/paho.mqtt.golang"
- "github.com/yuguorong/go/log"
- )
- type QMqtt struct {
- baseBus `json:"-" gorm:"-"`
- User string `json:"accessToken" gorm:"not null"`
- MqttUrl string `json:"mqttserver"`
- Uid string `json:"id" gorm:"primary_key:unique"`
- Name string `json:"name" gorm:"not null"`
- Pswd string `json:"-" gorm:"-"`
- Qos byte `json:"-" gorm:"-"`
- cnn MQTT.Client `json:"-" gorm:"-"`
- cliLock *sync.Mutex `json:"-" gorm:"-"`
- }
- type CbRegistMqttSubs func(cnn *QMqtt) int
- var cbMqttEdgeConfig MQTT.MessageHandler = func(client MQTT.Client, msg MQTT.Message) {
- log.Infof("R_TOPIC: %s\n", msg.Topic())
- log.Infof("R_MSG: %s\n", msg.Payload())
- }
- func (mqtt *QMqtt) subTopic(subtopic string, Qos byte, hdlr MQTT.MessageHandler) {
- log.Info("MQTT sub :[" + subtopic + "]")
- if token := mqtt.cnn.Subscribe(subtopic, Qos, hdlr); token.Wait() && token.Error() != nil {
- log.Info(token.Error())
- }
- }
- func (mqtt *QMqtt) PublishObj(topic string, qos byte, in interface{}) {
- jsonBytes, _ := json.Marshal(in)
- sjson := string(jsonBytes)
- log.Info(mqtt.User, ",", topic)
- log.Info(sjson)
- mqtt.cnn.Publish(topic, qos, true, sjson)
- }
- //MqttDisocnnect mqtt disconnect
- func (mqtt *QMqtt) Disocnnect() {
- mqtt.cliLock.Lock()
- mqtt.cnn.Disconnect(0)
- mqtt.cnn = nil
- mqtt.cliLock.Unlock()
- }
- //MqttConnServer mqtt init and connect server
- func (mqtt *QMqtt) ConnServer(retry int, cbSubs CbRegistMqttSubs) error {
- suid := mqtt.Uid //"8df8de45-efa6-419a-a46c-acf30f017da5"
- if mqtt.cnn != nil {
- return errors.New("Mqtt already connected")
- }
- opts := MQTT.NewClientOptions().AddBroker(mqtt.MqttUrl)
- opts.SetClientID(suid)
- opts.SetUsername(mqtt.User)
- opts.SetPassword(mqtt.Pswd)
- opts.SetDefaultPublishHandler(cbMqttEdgeConfig)
- mqtt.cliLock.Lock()
- defer mqtt.cliLock.Unlock()
- //create and start a client using the above ClientOptions
- mqtt.cnn = MQTT.NewClient(opts)
- for ; retry != 0; retry-- {
- if token := mqtt.cnn.Connect(); token.Wait() && token.Error() == nil {
- if cbSubs != nil {
- cbSubs(mqtt)
- }
- log.Info("MQTT connect " + mqtt.User + " OK!")
- return nil
- } else {
- log.Error("Retry to connect the MQTT server!! ", token.Error())
- }
- time.Sleep(time.Duration(3) * time.Second)
- }
- return errors.New("Fault Error! can not connect MQTT server!!!")
- }
- func (mqtt *QMqtt) StartServer() error {
- return mqtt.ConnServer(-1, AppMqttSubs)
- }
- func (mqtt *QMqtt) StopServer() {
- mqtt.Disocnnect()
- log.Info("Stop MQTT server")
- }
- func (mqtt *QMqtt) Init() error {
- mqtt.baseBus.Init()
- go mqtt.StartServer()
- return nil
- }
- func (mqtt *QMqtt) Uninit() {
- mqtt.baseBus.Uninit()
- mqtt.StopServer()
- }
- func (mqtt *QMqtt) Send(chn IChannel, buff interface{}) (int, error) {
- mqtt.PublishObj(chn.ID(), mqtt.Qos, buff)
- return mqtt.baseBus.Send(chn, buff)
- }
- func (mqtt *QMqtt) Recive(chn IChannel, buff interface{}) (int, error) {
- return mqtt.baseBus.Recive(chn, buff)
- }
- func AppMqttSubs(mqtt *QMqtt) int {
- mqtt.subTopic("/sub/default", 1, cbMqttEdgeConfig)
- return 0
- }
- func NewMqtt(param []interface{}) IBus {
- mq := &QMqtt{
- User: "gZdomIS9Hz3d7HxvcoNx",
- Pswd: "",
- MqttUrl: "test-sbuilding.pacom.cn:1885",
- Name: "mqtt",
- cnn: nil,
- cliLock: new(sync.Mutex),
- }
- mq.BusId = GenMqttId("mqtt", param)
- if len(param) >= 1 {
- var cfgmq *QMqtt = nil
- switch param[0].(type) {
- case string:
- cfg := config.GetSysConfig().GetProfile(param[0].(string), mq)
- if cfg != nil {
- cfgmq = cfg.(*QMqtt)
- }
- case *QMqtt:
- cfgmq = param[0].(*QMqtt)
- }
- if cfgmq != nil {
- mq.User = cfgmq.User
- mq.MqttUrl = cfgmq.MqttUrl
- mq.Pswd = cfgmq.Pswd
- mq.Name = cfgmq.Name
- }
- }
- return mq
- }
- func GenMqttId(name string, param []interface{}) string {
- if len(param) > 0 {
- switch param[0].(type) {
- case string:
- return "mqtt-" + param[0].(string)
- case *QMqtt:
- return "mqtt-" + param[0].(*QMqtt).User
- }
- }
- return "mqtt"
- }
- func init() {
- BusReg["mqtt"] = NewMqtt
- BusGetID["mqtt"] = GenMqttId
- }
|