Просмотр исходного кода

fix air AS10 device architecture issue and release

yuguorong 3 лет назад
Родитель
Сommit
f02306cd86
8 измененных файлов с 53 добавлено и 53 удалено
  1. 0 7
      Bus/mqtt.go
  2. 4 2
      cmd/test.sql
  3. 18 20
      drivers/ameter.go
  4. 5 0
      drivers/device.go
  5. 21 21
      drivers/uemis.go
  6. 1 1
      platform/gateway.go
  7. 3 1
      platform/platform.go
  8. 1 1
      protocol/dlt645.go

+ 0 - 7
Bus/mqtt.go

@@ -7,7 +7,6 @@ import (
 	"time"
 
 	"github.com/ammeter/config"
-	"github.com/ammeter/util"
 	MQTT "github.com/eclipse/paho.mqtt.golang"
 	"github.com/yuguorong/go/log"
 )
@@ -148,12 +147,6 @@ func NewMqtt(param []interface{}) IBus {
 			mq.Pswd = cfgmq.Pswd
 			mq.Name = cfgmq.Name
 		}
-		if util.DebugLevel() > 0 {
-			mq.User = cfgmq.Name
-			mq.MqttUrl = "192.168.255.104:1883"
-			mq.Name = cfgmq.Name
-			mq.Pswd = "pacom"
-		}
 	}
 	return mq
 }

+ 4 - 2
cmd/test.sql

@@ -7,6 +7,8 @@ SELECT * FROM gateway_configs
 drop table ammeter_models
 drop table s_air_models
 
-CREATE TABLE db_energes AS SELECT dev_name,total_energy,ts_sample from ammeters
+CREATE TABLE meter_persists AS SELECT dev_name,ts_sample,total_energy,db_status from ammeters
 
-DELETE FROM meter_persists WHERE dev_name = ""
+DELETE FROM meter_persists WHERE dev_name = ""
+
+DROP TABLE db_energes

+ 18 - 20
drivers/ameter.go

@@ -15,10 +15,10 @@ import (
 )
 
 const (
-	DEF_SAMPLE_PERIOD        = 30 * time.Second // time.Minute
-	DEF_SAMPLE_PEER_DURATION = 10 * time.Second
-	DEF_TCP_READ_TIMEOUT     = 1 * time.Minute
-	POSTPONE_PERSIST_TIME    = 1 * time.Minute
+	DEF_SAMPLE_PERIOD        = 10 * time.Minute
+	DEF_SAMPLE_PEER_DURATION = 60 * time.Second
+	DEF_TCP_READ_TIMEOUT     = 30 * time.Second
+	POSTPONE_PERSIST_TIME    = 5 * time.Minute
 )
 
 //amList  map[string]*Ammeter
@@ -62,6 +62,7 @@ type AmMeterHub struct {
 	chnExit     chan bool
 	loopserver  bool
 	sampleTmr   *time.Timer
+	amIndex     []string            //avoid map chaos order,even map read only
 	amList      map[string]*Ammeter //key=[code + address], each meter id must be unique
 	persitList  map[string]*meterPersist
 	tmrPersist  *time.Timer
@@ -116,20 +117,7 @@ func (dev *AmMeterHub) ChannelDispatch(stream []byte, args interface{}) bus.ChnD
 }
 
 func (dev *AmMeterHub) IssueSampleCmd(tmrTx *time.Timer) (pam *Ammeter) {
-	i := 0
-	for _, am := range dev.amList {
-		if i == 0 {
-			pam = am
-		}
-		if i == dev.queryIdx {
-			pam = am
-			break
-		}
-		i++
-	}
-	if pam == nil {
-		return nil
-	}
+	pam = dev.amList[dev.amIndex[dev.queryIdx]]
 	pack := dev.Route[1].iproto.PackageCmd("TotalActivePower", pam.Code+pam.Address)
 	dev.Route[1].ibus.Send(dev.Route[1].iChn, pack)
 	tmrTx.Reset(1 * time.Second)
@@ -139,7 +127,7 @@ func (dev *AmMeterHub) IssueSampleCmd(tmrTx *time.Timer) (pam *Ammeter) {
 func (dev *AmMeterHub) SchedulNextSample(tmrTx *time.Timer) {
 	tmrTx.Stop()
 	dev.queryIdx++
-	log.Info("Schedul next:(%d/%d)", dev.queryIdx, len(dev.amList))
+
 	if dev.queryIdx >= len(dev.amList) {
 		dev.queryIdx = 0
 		dev.sampleTmr.Reset(DEF_SAMPLE_PERIOD)
@@ -339,7 +327,9 @@ func (drv *AmmeterDrv) CreateDevice(model interface{}) (int, []IDevice) {
 			if len(mod.Code) < 4 || len(mod.Address) < 8 {
 				continue
 			}
+			//AmMeterHub is device!!! communication target is the hub
 			hub := drv.mountHub(&mod, &devlist)
+
 			id := mod.Code + mod.Address
 			am, has := hub.(*AmMeterHub).amList[id]
 			if !has {
@@ -361,14 +351,22 @@ func (drv *AmmeterDrv) CreateDevice(model interface{}) (int, []IDevice) {
 	}
 
 	for _, hub := range drv.DevList {
+		hub.(*AmMeterHub).amIndex = make([]string, 0)
 		for id, am := range hub.(*AmMeterHub).amList {
 			if am.TsUpdate != ts {
 				delete(hub.(*AmMeterHub).amList, id)
+				log.Info("Delete ammeter:", id)
+			} else {
+				hub.(*AmMeterHub).amIndex = append(hub.(*AmMeterHub).amIndex, id)
 			}
 		}
 	}
 
-	return int(ts), devlist
+	if len(drv.DevList) == 0 {
+		drv.Uninstall()
+	}
+
+	return len(devlist), devlist
 }
 
 func (drv *AmmeterDrv) GetModel() interface{} {

+ 5 - 0
drivers/device.go

@@ -2,6 +2,7 @@ package drviers
 
 import (
 	"reflect"
+	"time"
 
 	bus "github.com/ammeter/Bus"
 	"github.com/ammeter/protocol"
@@ -29,6 +30,7 @@ type routePath struct {
 
 type baseDevice struct {
 	tsUpdate   int64
+	tsSample   int64
 	DeviceName string
 	Status     string
 	drv        IDriver
@@ -90,6 +92,9 @@ func (dev *baseDevice) ListDevices() interface{} {
 	dinfo := make(map[string]string)
 	dinfo["Name"] = dev.DeviceName
 	dinfo["State"] = dev.Status
+	if dev.tsSample != 0 {
+		dinfo["TsSample"] = time.Unix(dev.tsSample, 0).Format("2006-01-02 15:04:05")
+	}
 	return dinfo
 }
 

+ 21 - 21
drivers/uemis.go

@@ -3,6 +3,7 @@ package drviers
 import (
 	"bytes"
 	"errors"
+	"time"
 
 	bus "github.com/ammeter/Bus"
 	"github.com/yuguorong/go/log"
@@ -14,11 +15,11 @@ type SAirModel struct {
 	Code     string `json:"code"`
 	Protocol string `json:"protocol"`
 	GwDevId  string `json:"gwDevId"`
+	Type     string `json:"-" `
 }
 
 type Uemis struct {
 	baseDevice
-	devList    map[string]SAirModel //name->Id
 	chnExit    chan bool
 	loopserver bool
 }
@@ -30,9 +31,9 @@ func (u *Uemis) ChannelDispatch(stream []byte, args interface{}) bus.ChnDispResu
 	if mret != nil && devName != "" {
 		devN := devName[8:]
 		devN = "SA10" + "-00000000" + devN
-		if _, has := u.devList[devN]; has {
+		if devN == u.DeviceName {
 			log.Info(u.DeviceName+" Dispatch ["+devN+"]: ", string(stream))
-			return bus.DispatchMulti
+			return bus.DispatchSingle
 		}
 	}
 	return bus.DispatchNone
@@ -53,6 +54,7 @@ func (u *Uemis) Run() error {
 				log.Info(u.baseDevice.Id+"/"+dev, v)
 				telemetry := u.Route[0].iproto.PackageCmd(dev, mret)
 				u.Route[0].ibus.Send(u.Route[0].iChn, telemetry)
+				u.tsSample = time.Now().Unix()
 			}
 		case <-u.chnExit:
 			u.loopserver = false
@@ -93,28 +95,26 @@ func (drv *UemisDrv) GetModel() interface{} {
 	return &[]SAirModel{}
 }
 
-func (drv *UemisDrv) CreateDevice(model interface{}, filterGwId string) (int, []IDevice) {
-	drv.baseDriver.CreateDevice(model, filterGwId)
+func (drv *UemisDrv) CreateDevice(model interface{}) (int, []IDevice) {
+	ts := time.Now().UnixMilli()
 	devlist := make([]IDevice, 0)
 	mlist := model.(*[]SAirModel)
-
 	var dev *Uemis = nil
-	for _, m := range *mlist {
-		if m.GwDevId == filterGwId {
-			if dev == nil {
-				dev = new(Uemis)
-				dev.loopserver = false
-				dev.devList = make(map[string]SAirModel)
-
-				dev.Id = m.Id
-				dev.DeviceName = "S5-" + dev.Id
-				dev.Probe(dev.DeviceName, drv)
-				drv.baseDriver.DevList[dev.DeviceName] = dev
-				devlist = append(devlist, dev)
-			}
-			//m.DevName = strings.ReplaceAll(m.DevName, "SAIR10", "S10")
-			dev.devList[m.DevName] = m
+	for _, mod := range *mlist {
+		mt, has := (drv.DevList[mod.Id])
+		if !has {
+			dev = &Uemis{}
+			dev.Id = mod.Id
+			dev.DeviceName = mod.DevName
+			dev.loopserver = false
+			dev.Probe(dev.DeviceName, drv)
+
+			devlist = append(devlist, dev)
+			drv.DevList[mod.Id] = dev
+		} else {
+			dev = mt.(*Uemis)
 		}
+		dev.tsUpdate = ts
 	}
 	return len(devlist), devlist
 }

+ 1 - 1
platform/gateway.go

@@ -99,7 +99,7 @@ func (gw *Gateway) InstallDrivers(uid string) {
 		if sz, devlist := drv.CreateDevice(model); sz > 0 {
 			for _, dev := range devlist {
 				dev.SetRoute(nil, "ThingsBoards", "mqtt", "v1/gateway/telemetry", &gw.Mqttcfg)
-				dev.Open()
+				dev.Open() //open specail Route here
 			}
 		}
 	}

+ 3 - 1
platform/platform.go

@@ -90,6 +90,7 @@ func (p *PaPlatform) LoadSyncAllDevices() {
 				}
 				p.jsDevList[name] = string(jdev)
 				if bupdate {
+					config.GetDB().Delete(drv.GetModel()) //clear first
 					config.GetDB().CreateTbl(model)
 					config.GetDB().Save(model)
 				}
@@ -174,7 +175,7 @@ func StartServer() {
 	p.Init()
 	msgAPI := util.GetMsgHandler("API")
 
-	ticksAutoFresh := time.NewTicker(time.Duration(time.Minute * 10))
+	ticksAutoFresh := time.NewTicker(time.Duration(time.Minute * 60))
 
 	for loop := true; loop; {
 		select {
@@ -191,6 +192,7 @@ func StartServer() {
 				}
 			}
 		case <-ticksAutoFresh.C:
+			log.Info("Auto refresh devices")
 			p.LoadGatewayProfile()
 		}
 	}

+ 1 - 1
protocol/dlt645.go

@@ -357,7 +357,7 @@ func (p *dlt645) ParsePacket(rxb []byte, params ...interface{}) (mapv interface{
 	if cmd == (C_2007_CODE_RD | DLT_CMD_DIR_REVERT) {
 		val := float64(bcd2int(param[:lenParam-DLT_DICCODE_LEN])) / 100.0
 		for k, v := range cmd2DataField {
-			if int(v.ctrl) == dicCode {
+			if (byte(v.ctrl) | DLT_CMD_DIR_REVERT) == cmd {
 				mapv := make(map[string]interface{})
 				mapv[k] = val
 				return mapv