Pārlūkot izejas kodu

implement multi-user ammter feature. bugfix: dlt645 decode change origin buffer. add/remove meters

yuguorong 3 gadi atpakaļ
vecāks
revīzija
6395b29309

+ 2 - 2
Bus/Rest.go

@@ -30,7 +30,7 @@ func (rest *RestApi) Init() error {
 func (rest *RestApi) Uninit() {
 
 }
-func (rest *RestApi) OpenChannel(chnID interface{}, router []chan interface{}) IChannel {
+func (rest *RestApi) MountChannel(chnID interface{}, router []chan interface{}) IChannel {
 	if reflect.TypeOf(chnID).Kind() != reflect.String {
 		log.Error("wrong channel type, REST URL need a string!")
 		return nil
@@ -40,7 +40,7 @@ func (rest *RestApi) OpenChannel(chnID interface{}, router []chan interface{}) I
 	log.Info("Rest channel:", url)
 	return nil
 }
-func (rest *RestApi) CloseChannel(chn IChannel) error {
+func (rest *RestApi) FreeChannel(chn IChannel) error {
 	return nil
 }
 func (rest *RestApi) ResetChannel(chn IChannel) {

+ 5 - 0
Bus/Untitled-1.sql

@@ -0,0 +1,5 @@
+SELECT * FROM ammeters
+
+DROP TABLE db_energes
+
+CREATE TABLE meter_persists AS SELECT db_energes.* FROM db_energes

+ 6 - 3
Bus/api.go

@@ -1,6 +1,8 @@
 package bus
 
-import "time"
+import (
+	"time"
+)
 
 //Channel Dispatch not in event because event and dispatch handler not same one.
 
@@ -31,11 +33,12 @@ type IChannel interface {
 type IBus interface {
 	Init() error
 	Uninit()
-	OpenChannel(chnID interface{}, router []chan interface{}) IChannel
-	CloseChannel(chn IChannel) error
+	MountChannel(chnID interface{}, router []chan interface{}) IChannel
+	FreeChannel(chn IChannel) error
 	ResetChannel(chn IChannel)
 	ScanChannel(stream []byte, conn int) IChannel
 	Send(chn IChannel, buff interface{}) (int, error)
 	Recive(chn IChannel, buff interface{}) (int, error)
+	ApplyPatch(ptchName string, param ...interface{})
 	TimeStamp() int64
 }

+ 5 - 2
Bus/bus.go

@@ -88,7 +88,7 @@ func (bus *baseBus) stringId(chnID interface{}) string {
 	return ""
 }
 
-func (bus *baseBus) OpenChannel(chnID interface{}, router []chan interface{}) IChannel {
+func (bus *baseBus) MountChannel(chnID interface{}, router []chan interface{}) IChannel {
 	schnid := bus.stringId(chnID)
 	if chn, has := bus.ChnList[schnid]; has {
 		return chn
@@ -117,7 +117,7 @@ func (bus *baseBus) OpenChannel(chnID interface{}, router []chan interface{}) IC
 func (dtu *baseBus) ResetChannel(chn IChannel) {
 }
 
-func (bus *baseBus) CloseChannel(chn IChannel) error {
+func (bus *baseBus) FreeChannel(chn IChannel) error {
 	id := chn.ID()
 	bus.mutex.Lock()
 	delete(bus.ChnList, id)
@@ -156,6 +156,9 @@ func (bus *baseBus) Recive(chn IChannel, buff interface{}) (int, error) {
 	return 0, nil
 }
 
+func (bus *baseBus) ApplyPatch(ptchName string, param ...interface{}) {
+}
+
 func (bus *baseBus) TimeStamp() int64 {
 	return int64(bus.TsLast)
 }

+ 6 - 2
Bus/bus_test.go

@@ -81,11 +81,15 @@ func TestDtuServer(T *testing.T) {
 	dbgBus = mountbus("dtuFtpServer", 10010)
 	dbgBus.Init()
 
-	dbgChan = dbgBus.OpenChannel("9521003697", router)
+	dbgChan = dbgBus.MountChannel("9521003697", router)
 
 	time.Sleep(time.Second * 40)
 	chExit <- false
-	dbgBus.CloseChannel(dbgChan)
+	dbgBus.FreeChannel(dbgChan)
 	time.Sleep(time.Second * 10)
 	dbgBus.Uninit()
 }
+
+func TestMqtt() {
+
+}

+ 48 - 31
Bus/ftpDtu.go

@@ -6,6 +6,7 @@ import (
 	"net"
 	"reflect"
 	"strconv"
+	"strings"
 	"sync"
 	"time"
 
@@ -20,17 +21,18 @@ const (
 	DEF_FTP_PORT = 10010
 )
 
-var cmdQueryAddr []byte = []byte{0xfe, 0xfe, 0xfe, 0xfe, 0x68, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0x68, 0x13, 0x00, 0xDF, 0x16}
+//var cmdQueryAddr []byte = []byte{0xfe, 0xfe, 0xfe, 0xfe, 0x68, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0x68, 0x13, 0x00, 0xDF, 0x16}
 
 type DtuServer struct {
 	baseBus
-	Port      int
-	name      string
-	lis       net.Listener
-	lisAlt    net.Listener
-	connlist  map[int]net.Conn
-	clientNum int
-	loop      bool
+	Port        int
+	name        string
+	lis         net.Listener
+	listPassive net.Listener
+	connlist    map[int]net.Conn
+	clientNum   int
+	loop        bool
+	patchs      map[string][]byte
 }
 
 func (dtu *DtuServer) ResetChannel(chn IChannel) {
@@ -62,30 +64,44 @@ func (dtu *DtuServer) closeConn(conn net.Conn, connid int, chn IChannel) {
 	}
 }
 
-func (dtu *DtuServer) LookupChannel(buf []byte, idx int, conn net.Conn) IChannel {
-	return dtu.baseBus.ScanChannel(buf, idx)
+func (dtu *DtuServer) ApplyPatch(patchName string, param ...interface{}) {
+	if len(param) > 0 && reflect.TypeOf(param[0]).Kind() == reflect.Slice {
+		dtu.patchs[patchName] = param[0].([]byte)
+	}
 }
 
-func (dtu *DtuServer) ClientConnect(conn net.Conn, idx int, altFlag bool) {
+func (dtu *DtuServer) ClientConnect(conn net.Conn, connectID int, PassiveFlag bool) {
 	var ic IChannel = nil
 	remoteAddr := conn.RemoteAddr().String()
 	//SrcPort := uint(conn.RemoteAddr().(*net.TCPAddr).Port)
-	defer dtu.closeConn(conn, idx, ic)
+	defer dtu.closeConn(conn, connectID, ic)
 
 	var err error
 	var buf [1024]byte
 	n := 0
-	for ic == nil {
-		if altFlag {
-			conn.Write(cmdQueryAddr)
+	for i := 0; i < 10; i++ {
+		if PassiveFlag {
+			for key, patchCmd := range dtu.patchs {
+				if strings.Contains(key, "Passive") {
+					conn.Write(patchCmd)
+				}
+			}
 		}
 		n, err = conn.Read(buf[:])
 		if err != nil {
 			log.Infof("read from %s header faild err:[%v]\n", remoteAddr, err)
 			return
 		}
-		log.Infof("Connected(alternat port:%t) with code %X", altFlag, buf[:n])
-		ic = dtu.LookupChannel(buf[:n], idx, conn)
+		log.Infof("Connect %d (passive devices port:%t) with code %X", connectID, PassiveFlag, buf[:n])
+		ic = dtu.baseBus.ScanChannel(buf[:n], connectID)
+		if ic != nil {
+			break
+		}
+		time.Sleep(time.Second * 5)
+	}
+
+	if ic == nil {
+		return
 	}
 
 	chnin := ic.GetChan(0)
@@ -119,17 +135,17 @@ func (dtu *DtuServer) ClientConnect(conn net.Conn, idx int, altFlag bool) {
 		n, err = conn.Read(buf[:])
 		if err != nil {
 			log.Errorf("read from %s msg faild err:[%v]\n", ic.ID(), err)
-			dtu.closeConn(conn, idx, ic)
+			dtu.closeConn(conn, connectID, ic)
 			break
 		}
 	}
 }
 
-func (dtu *DtuServer) StartServer(lister *net.Listener, altFlag bool) {
+func (dtu *DtuServer) StartServer(lister *net.Listener, PassiveFlag bool) {
 	defer func() {
 		(*lister).Close()
 	}()
-	idx := 0
+	connectID := 1000
 
 	for dtu.loop {
 		if dtu.clientNum >= maxConnCount {
@@ -142,11 +158,11 @@ func (dtu *DtuServer) StartServer(lister *net.Listener, altFlag bool) {
 			log.Errorf("listen err:[%v]\n", err)
 		}
 		dtu.mutex.Lock()
-		dtu.connlist[idx] = conn
-		idx++
+		connectID++
+		dtu.connlist[connectID] = conn
 		dtu.clientNum++
 		dtu.mutex.Unlock()
-		go dtu.ClientConnect(conn, idx-1, altFlag)
+		go dtu.ClientConnect(conn, connectID, PassiveFlag)
 	}
 }
 
@@ -168,13 +184,13 @@ func (dtu *DtuServer) Init() error {
 		go dtu.StartServer(&dtu.lis, false)
 
 		addr = fmt.Sprintf("0.0.0.0:%d", dtu.Port+10)
-		log.Info("Alternat port listen start ", addr)
-		dtu.lisAlt, err = net.Listen("tcp", addr)
+		log.Info("Passive port listen start ", addr)
+		dtu.listPassive, err = net.Listen("tcp", addr)
 		if err != nil {
 			log.Errorf("err! addr %s open faild err:[%v]\n", addr, err)
 			return err
 		}
-		go dtu.StartServer(&dtu.lisAlt, true)
+		go dtu.StartServer(&dtu.listPassive, true)
 
 	}
 	return nil
@@ -193,16 +209,16 @@ func (dtu *DtuServer) Uninit() {
 	}
 }
 
-func (dtu *DtuServer) OpenChannel(chn interface{}, router []chan interface{}) IChannel {
+func (dtu *DtuServer) MountChannel(chn interface{}, router []chan interface{}) IChannel {
 	if chn == nil || reflect.TypeOf(chn).Kind() != reflect.String {
-		panic("Open channel should be a uniq string")
+		panic("Open channel should be a unique string")
 	}
-	ic := dtu.baseBus.OpenChannel(chn, router)
+	ic := dtu.baseBus.MountChannel(chn, router)
 	return ic
 }
 
-func (dtu *DtuServer) CloseChannel(chn IChannel) error {
-	dtu.baseBus.CloseChannel(chn)
+func (dtu *DtuServer) FreeChannel(chn IChannel) error {
+	dtu.baseBus.FreeChannel(chn)
 	if dtu.loop {
 		//idx := chn.(*BusChannel).conn.(int)
 		for _, connID := range chn.(*BusChannel).conIdList {
@@ -252,6 +268,7 @@ func NewDtuServer(param []interface{}) IBus {
 			mutex: &sync.Mutex{},
 		},
 		connlist:  make(map[int]net.Conn),
+		patchs:    make(map[string][]byte),
 		clientNum: 0,
 		Port:      Port,
 		loop:      false,

+ 3 - 0
Bus/go.mod

@@ -4,12 +4,15 @@ go 1.14
 
 replace github.com/ammeter/platform => ../platform
 
+replace github.com/ammeter/util => ../util
+
 replace github.com/ammeter/config => ../config
 
 require (
 	github.com/ammeter/config v0.0.0-00010101000000-000000000000
 	github.com/eclipse/paho.mqtt.golang v1.3.5
 	github.com/gin-gonic/gin v1.7.7
+	github.com/go-delve/delve v1.8.2
 	github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726
 	github.com/yuguorong/go v0.0.0-20180604090527-bdc77568d726
 )

+ 15 - 8
Bus/mqtt.go

@@ -7,20 +7,21 @@ import (
 	"time"
 
 	"github.com/ammeter/config"
+	"github.com/ammeter/util"
 	MQTT "github.com/eclipse/paho.mqtt.golang"
 	"github.com/yuguorong/go/log"
 )
 
 type QMqtt struct {
-	baseBus
-	User    string      `json:"accessToken"`
+	baseBus `json:"-" gorm:"-"`
+	User    string      `json:"accessToken" gorm:"not null"`
 	MqttUrl string      `json:"mqttserver"`
-	Uid     string      `json:"id"`
-	Name    string      `json:"name"`
-	Pswd    string      `json:-`
-	Qos     byte        `json:-`
-	cnn     MQTT.Client `json:-`
-	cliLock *sync.Mutex `json:-`
+	Uid     string      `json:"id" gorm:"primary_key:unique"`
+	Name    string      `json:"name" gorm:"not null"`
+	Pswd    string      `json:"-"  gorm:"-"`
+	Qos     byte        `json:"-"  gorm:"-"`
+	cnn     MQTT.Client `json:"-"  gorm:"-"`
+	cliLock *sync.Mutex `json:"-"  gorm:"-"`
 }
 
 type CbRegistMqttSubs func(cnn *QMqtt) int
@@ -147,6 +148,12 @@ func NewMqtt(param []interface{}) IBus {
 			mq.Pswd = cfgmq.Pswd
 			mq.Name = cfgmq.Name
 		}
+		if util.DebugLevel() > 0 {
+			mq.User = cfgmq.Name
+			mq.MqttUrl = "192.168.255.104:1883"
+			mq.Name = cfgmq.Name
+			mq.Pswd = "pacom"
+		}
 	}
 	return mq
 }

+ 59 - 0
api/ginmain.go

@@ -0,0 +1,59 @@
+package api
+
+import (
+	"net/http"
+
+	"github.com/ammeter/util"
+	"github.com/gin-gonic/gin"
+	"github.com/yuguorong/go/log"
+)
+
+type APP_INFO struct {
+	AppName   string
+	AppID     string
+	AppSecret string
+}
+
+var AppInfo = &APP_INFO{
+	AppName:   "PowerMeter",
+	AppID:     "6L1MQezb",
+	AppSecret: "90ebe5ecbe350923cd00a140ab88739ed2858009",
+}
+
+var router *gin.Engine
+
+func RestServerStart() {
+	// Engin
+	router = gin.Default()
+	router.GET("/Gateway4G/UpdateDevcie", UpdateDevcie) // hello函数处理"/hello"请求
+	router.GET("/Gateway4G/ListDevices", ListDevices)
+	// 指定地址和端口号
+	go router.Run(":10001")
+}
+func UpdateDevcie(c *gin.Context) {
+	AppID := c.Query("AppID")
+	AppSecret := c.Query("AppSecret")
+
+	log.Infof("REST on Update device message :%s, %s/n", AppID, AppSecret)
+	if AppID == AppInfo.AppID && AppSecret == AppInfo.AppSecret {
+		util.PostMessage("API", "UpdateDevice")
+		c.JSON(http.StatusOK, gin.H{
+			"code":    200,
+			"success": true,
+		})
+	} else {
+		c.JSON(http.StatusUnauthorized, gin.H{
+			"code":  401,
+			"Erorr": "AppID and AppSecret wrong",
+		})
+	}
+}
+
+func ListDevices(c *gin.Context) {
+	log.Info("REST on list devcies")
+	devlistchn := make(chan interface{})
+	util.PostMessage("API", "ListDevices")
+	util.PostMessage("API", devlistchn)
+	devlist := <-devlistchn
+	c.IndentedJSON(http.StatusOK, devlist)
+}

+ 8 - 0
api/go.mod

@@ -0,0 +1,8 @@
+module github.com/ammeter/api
+
+go 1.14
+replace "github.com/ammeter/util" => "../util"
+require (
+	github.com/gin-gonic/gin v1.7.7
+	github.com/yuguorong/go v0.0.0-20180604090527-bdc77568d726
+)

+ 1 - 1
cloudserver/CloudServer.go

@@ -83,7 +83,7 @@ func (c *CloudServer) requestPage(url string, data interface{}) (p *PageModel, e
 	if erro != nil {
 		return
 	}
-	log.Info("reqParam", string(payload), url, time.Now())
+	log.Info(url, "; paramter:", string(payload))
 	reqBody := bytes.NewReader(payload)
 	req, err := http.NewRequest("POST", url, reqBody)
 	if err != nil {

+ 5 - 4
cmd/go.mod

@@ -2,8 +2,6 @@ module github.com/ammeter/cmd
 
 go 1.14
 
-replace github.com/ammeter/platform => ../platform
-
 replace github.com/ammeter/cloudserver => ../cloudserver
 
 replace github.com/ammeter/config => ../config
@@ -14,11 +12,14 @@ replace github.com/ammeter/protocol => ../protocol
 
 replace github.com/ammeter/Bus => ../Bus
 
-replace github.com/ammeter/middleware => ../middleware
+replace github.com/ammeter/util => ../util
+
+replace github.com/ammeter/platform => ../platform
+
+replace github.com/ammeter/api => ../api
 
 require (
 	github.com/ammeter/config v0.0.0-00010101000000-000000000000
-	github.com/ammeter/middleware v0.0.0-00010101000000-000000000000
 	github.com/ammeter/platform v0.0.0-00010101000000-000000000000
 	github.com/yuguorong/go v0.0.0-20180604090527-bdc77568d726
 )

BIN
cmd/local.sqlite


+ 3 - 5
cmd/main.go

@@ -5,8 +5,8 @@ import (
 
 	"github.com/yuguorong/go/log"
 
+	"github.com/ammeter/api"
 	"github.com/ammeter/config"
-	"github.com/ammeter/middleware"
 	"github.com/ammeter/platform"
 )
 
@@ -47,8 +47,6 @@ func main() {
 	p.SetModel("SA10", REQ_AIR_URL)
 	p.SaveModel()
 
-	mainloop := make(chan interface{})
-
-	middleware.RestServerStart(mainloop)
-	platform.StartServer(mainloop)
+	api.RestServerStart()
+	platform.StartServer()
 }

+ 12 - 0
cmd/test.sql

@@ -0,0 +1,12 @@
+-- SQLite
+SELECT user, mqtt_url, uid, name, pswd, qos, tenants
+FROM gateway_configs;
+
+SELECT * FROM gateway_configs
+
+drop table ammeter_models
+drop table s_air_models
+
+CREATE TABLE db_energes AS SELECT dev_name,total_energy,ts_sample from ammeters
+
+DELETE FROM meter_persists WHERE dev_name = ""

+ 1 - 1
config/config_test.go

@@ -46,7 +46,7 @@ func TestLoadConf(T *testing.T) {
 
 type AmmeterModel struct {
 	Id         string `json:"id" gorm:"-"`
-	DevName    string `json:"devName" gorm:"primarykey"`
+	DevName    string `json:"devName" gorm:"primary_key"`
 	Code       string `json:"code" `
 	DutSn      string `json:"dutSn" gorm:"-"`
 	Address    string `json:"address"`

+ 5 - 1
config/db.go

@@ -13,7 +13,7 @@ type QdDB struct {
 func (db *QdDB) ConnectQuarkSQL() {
 	var err error
 
-	db.dbSQL, err = gorm.Open(sqlite.Open("local.db"), &gorm.Config{})
+	db.dbSQL, err = gorm.Open(sqlite.Open("local.sqlite"), &gorm.Config{})
 	if err != nil {
 		panic("failed to connect database")
 	}
@@ -57,6 +57,10 @@ func (db *QdDB) Find(tbl interface{}, condition ...interface{}) {
 	}
 }
 
+func (db *QdDB) Table(tbl interface{}) {
+	db.dbSQL.Find(tbl)
+}
+
 func (db *QdDB) List(tbl interface{}, field interface{}, name string) {
 	db.dbSQL.Model(tbl).Association(name).Find(field)
 }

+ 2 - 13
drivers/Api.go

@@ -3,7 +3,7 @@ package drviers
 type IDriver interface {
 	Name() string
 	Probe(name string, param interface{})
-	CreateDevice(model interface{}, filterGwId string) (int, []IDevice)
+	CreateDevice(model interface{}) (int, []IDevice)
 	GetModel() interface{}
 	GetDevice(id string) IDevice
 	GetDeviceList() map[string]IDevice
@@ -24,7 +24,7 @@ func (drv *baseDriver) Probe(name string, param interface{}) {
 	drv.DevList = make(map[string]IDevice)
 }
 
-func (drv *baseDriver) CreateDevice(model interface{}, filterGwId string) (int, []IDevice) {
+func (drv *baseDriver) CreateDevice(model interface{}) (int, []IDevice) {
 	dev := make([]IDevice, len(drv.DevList))
 	i := 0
 	for _, v := range drv.DevList {
@@ -59,18 +59,8 @@ func (drv *baseDriver) Uninstall() {
 type funcRegDriver func(interface{}) IDriver
 
 var driverReg map[string]funcRegDriver
-var driverList map[string]IDriver
-
-func Shutdown() {
-	for _, drv := range driverList {
-		drv.Uninstall()
-	}
-}
 
 func Install(name string, param interface{}) IDriver {
-	if p, has := driverList[name]; has && p != nil {
-		return p
-	}
 	if f, has := driverReg[name]; has && f != nil {
 		drv := f(param)
 		drv.Probe(name, param)
@@ -80,6 +70,5 @@ func Install(name string, param interface{}) IDriver {
 }
 
 func init() {
-	driverList = make(map[string]IDriver)
 	driverReg = make(map[string]funcRegDriver)
 }

+ 157 - 108
drivers/ameter.go

@@ -2,6 +2,7 @@ package drviers
 
 import (
 	"encoding/hex"
+	"errors"
 	"fmt"
 	"reflect"
 	"strconv"
@@ -14,31 +15,43 @@ import (
 )
 
 const (
-	CMD_ENERGY_TOTAL         = "00000000"
-	DEF_SAMPLE_PERIOD        = 5 * time.Minute
-	DEF_SAMPLE_PEER_DURATION = 30 * time.Second
+	DEF_SAMPLE_PERIOD        = 30 * time.Second // time.Minute
+	DEF_SAMPLE_PEER_DURATION = 10 * time.Second
 	DEF_TCP_READ_TIMEOUT     = 1 * time.Minute
 	POSTPONE_PERSIST_TIME    = 1 * time.Minute
 )
 
+//amList  map[string]*Ammeter
+//amList : key=[code + address], each meter id must be unique, otherwise ammeter would not work correctly!
+//Careful! address from cloud may not real address. software varAddress functon would change it to real address
+
 type AmmeterModel struct {
-	Id         string `json:"id" gorm:"-"`
-	DevName    string `json:"devName" gorm:"primarykey"`
-	Code       string `json:"code" gorm:"-"`
-	DutSn      string `json:"dutSn" gorm:"-"`
-	Address    string `json:"address" gorm:"-"`
-	Protocol   string `json:"protocol" gorm:"-"`
-	GwDevId    string `json:"gwDevId" gorm:"-"`
-	TransRadio string `json:"transformerRatio" gorm:"-"`
+	Id         string `json:"id"`
+	DevName    string `json:"devName" gorm:"primary_key; unique"`
+	Code       string `json:"code" `
+	DutSn      string `json:"dutSn" `
+	Address    string `json:"address" `
+	Protocol   string `json:"protocol" `
+	GwDevId    string `json:"gwDevId" `
+	TransRadio string `json:"transformerRatio" `
+	Type       string `json:"-" `
 }
 
+type meterPersist struct {
+	DevName     string `json:"devName" gorm:"primary_key; unique"`
+	TsSample    int64  `gorm:"autoUpdateTime"`
+	TotalEnergy float64
+	DBStatus    int32
+}
 type Ammeter struct {
 	AmmeterModel
 	TsSample    int64
 	TotalEnergy float64
-	TransDeno   float64 `gorm:"-"`
-	TransDiv    float64 `gorm:"-"`
-	DBStatus    int     `json:"-"`
+	TransDeno   float64
+	TransDiv    float64
+	varAddr     string
+	DBStatus    int32
+	TsUpdate    int64
 }
 
 type AmMeterHub struct {
@@ -49,8 +62,8 @@ type AmMeterHub struct {
 	chnExit     chan bool
 	loopserver  bool
 	sampleTmr   *time.Timer
-	amList      map[string]*Ammeter
-	persitList  map[string]*Ammeter
+	amList      map[string]*Ammeter //key=[code + address], each meter id must be unique
+	persitList  map[string]*meterPersist
 	tmrPersist  *time.Timer
 
 	queryIdx int
@@ -82,16 +95,9 @@ func (dev *AmMeterHub) ChannelDispatch(stream []byte, args interface{}) bus.ChnD
 		if stream[0] == 0x68 && stream[7] == 0x68 && stream[8] == 0x93 {
 			devname := ""
 			mret := dev.Route[1].iproto.ParsePacket(stream, &devname)
-			if mret != nil && devname == dev.DeviceName {
+			if mret != nil && devname == dev.DutSn {
 				return bus.DispatchSingle
 			}
-			// for i := 0; i < 6; i++ {
-			// 	if stream[i+1] != stream[i+10] {
-			// 		return bus.DispatchNone
-			// 	}
-			// }
-			// log.Infof("MOUNT meter:0000[%x]", stream[1:7])
-			// return bus.DispatchSingle
 		}
 		return bus.DispatchNone
 	}
@@ -112,6 +118,9 @@ func (dev *AmMeterHub) ChannelDispatch(stream []byte, args interface{}) bus.ChnD
 func (dev *AmMeterHub) IssueSampleCmd(tmrTx *time.Timer) (pam *Ammeter) {
 	i := 0
 	for _, am := range dev.amList {
+		if i == 0 {
+			pam = am
+		}
 		if i == dev.queryIdx {
 			pam = am
 			break
@@ -121,7 +130,7 @@ func (dev *AmMeterHub) IssueSampleCmd(tmrTx *time.Timer) (pam *Ammeter) {
 	if pam == nil {
 		return nil
 	}
-	pack := dev.Route[1].iproto.PackageCmd(CMD_ENERGY_TOTAL, pam.Code+pam.Address)
+	pack := dev.Route[1].iproto.PackageCmd("TotalActivePower", pam.Code+pam.Address)
 	dev.Route[1].ibus.Send(dev.Route[1].iChn, pack)
 	tmrTx.Reset(1 * time.Second)
 	return pam
@@ -130,6 +139,7 @@ func (dev *AmMeterHub) IssueSampleCmd(tmrTx *time.Timer) (pam *Ammeter) {
 func (dev *AmMeterHub) SchedulNextSample(tmrTx *time.Timer) {
 	tmrTx.Stop()
 	dev.queryIdx++
+	log.Info("Schedul next:(%d/%d)", dev.queryIdx, len(dev.amList))
 	if dev.queryIdx >= len(dev.amList) {
 		dev.queryIdx = 0
 		dev.sampleTmr.Reset(DEF_SAMPLE_PERIOD)
@@ -138,30 +148,33 @@ func (dev *AmMeterHub) SchedulNextSample(tmrTx *time.Timer) {
 	}
 }
 
-func (dev *AmMeterHub) AdjustTelemetry(mval interface{}, devname string) {
+func (dev *AmMeterHub) AdjustTelemetry(mval interface{}, am *Ammeter) {
 	vlist := mval.(map[string]interface{})
-	if am, has := dev.amList[devname]; has {
-		for k, v := range vlist {
-			if reflect.TypeOf(v).Kind() == reflect.Float64 {
-				valItem := v.(float64)
-				vlist[k] = (valItem * am.TransDeno) / am.TransDiv
-			}
+	for k, v := range vlist {
+		if reflect.TypeOf(v).Kind() == reflect.Float64 {
+			valItem := v.(float64)
+			vlist[k] = (valItem * am.TransDeno) / am.TransDiv
 		}
+	}
 
-		if val, has := vlist["TotalActivePower"]; has {
-			diff := val.(float64) - am.TotalEnergy
-			if am.DBStatus == 0 { //数据库清0
-				diff = 0
-				am.DBStatus = 1
-			}
-			am.TotalEnergy = val.(float64)
-			vlist["ActivePowerIncrement"] = diff
-			if len(dev.persitList) == 0 {
-				dev.tmrPersist.Reset(POSTPONE_PERSIST_TIME)
-			}
-
-			dev.persitList[am.DevName] = am
+	if val, has := vlist["TotalActivePower"]; has {
+		diff := val.(float64) - am.TotalEnergy
+		if am.DBStatus == 0 { //数据库清0
+			diff = 0
+			am.DBStatus = 1
 		}
+		am.TotalEnergy = val.(float64)
+		vlist["ActivePowerIncrement"] = diff
+		if len(dev.persitList) == 0 {
+			dev.tmrPersist.Reset(POSTPONE_PERSIST_TIME)
+		}
+		energe := &meterPersist{
+			TotalEnergy: am.TotalEnergy,
+			TsSample:    am.TsSample,
+			DBStatus:    am.DBStatus,
+			DevName:     am.DevName,
+		}
+		dev.persitList[am.DevName] = energe
 	}
 }
 
@@ -176,17 +189,16 @@ func (dev *AmMeterHub) Run() error {
 		case msg := <-dev.Route[1].router[0]:
 			if reflect.TypeOf(msg).Kind() == reflect.Slice {
 				b := msg.([]byte)
-				log.Info("[", dev.DutSn, "]:", hex.EncodeToString(b))
-				devname := "" //pam.DevName
-				mret := dev.Route[1].iproto.ParsePacket(b, &devname)
-				if mret != nil && reflect.TypeOf(mret).Kind() == reflect.Map && devname != "" {
-					log.Info(devname, mret)
-					devname = "AM10-" + devname + dev.DevCompCode[:4]
-					dev.AdjustTelemetry(mret, devname)
-					telemetry := dev.Route[0].iproto.PackageCmd(devname, mret)
-					dev.Route[0].ibus.Send(dev.Route[0].iChn, telemetry)
-					if meter, has := dev.amList[devname]; has {
-						meter.TsSample = time.Now().Unix()
+				log.Info("[", dev.DutSn, "]:", hex.EncodeToString(b)) //[9521003697]:fefefefe68040042050821689108333333338b694c331c16
+				devid := ""
+				mret := dev.Route[1].iproto.ParsePacket(b, &devid)
+				if mret != nil && reflect.TypeOf(mret).Kind() == reflect.Map && devid != "" {
+					log.Info(devid, mret) //210805420004 map[TotalActivePower:1936.58]
+					if am, has := dev.amList[devid]; has {
+						dev.AdjustTelemetry(mret, am)
+						telemetry := dev.Route[0].iproto.PackageCmd(am.DevName, mret)
+						dev.Route[0].ibus.Send(dev.Route[0].iChn, telemetry)
+						am.TsSample = time.Now().Unix()
 					}
 					dev.SchedulNextSample(tmrTxTimout)
 				}
@@ -198,11 +210,8 @@ func (dev *AmMeterHub) Run() error {
 		case <-dev.chnExit:
 			dev.loopserver = false
 		case <-dev.tmrPersist.C:
-			for k, am := range dev.persitList {
-				if am.DBStatus == 0 {
-
-				}
-				config.GetDB().Save(am)
+			for k, eng := range dev.persitList {
+				config.GetDB().Save(eng)
 				delete(dev.persitList, k)
 			}
 		}
@@ -221,8 +230,13 @@ func DutchanDispatch(rxin interface{}, param interface{}) interface{} {
 
 func (dev *AmMeterHub) Open(param ...interface{}) error {
 	if dev.DutSn != "" {
-		dev.SetRoute(dev, dev.Protocol, "dtuFtpServer", dev.DutSn, 0)
+		if dev.SetRoute(dev, dev.Protocol, "dtuFtpServer", dev.DutSn, 0) == 0 {
+			return errors.New("can not set default rout!")
+		}
+		r := dev.Route[len(dev.Route)-1]
+		r.ibus.ApplyPatch("PassiveAddress", r.iproto.PackageCmd("ReadAddress"))
 	}
+
 	dev.sampleTmr = time.NewTimer(DEF_SAMPLE_PERIOD)
 	dev.sampleTmr.Stop()
 
@@ -231,8 +245,10 @@ func (dev *AmMeterHub) Open(param ...interface{}) error {
 }
 
 func (dev *AmMeterHub) GetDevice(devname string) interface{} {
-	if dev, has := dev.amList[devname]; has {
-		return dev
+	for _, am := range dev.amList {
+		if am.DevName == devname {
+			return dev
+		}
 	}
 	return nil
 }
@@ -241,14 +257,13 @@ func (dev *AmMeterHub) ListDevices() interface{} {
 	mdev := make(map[string]interface{})
 	mdev["State"] = dev.Status
 	dinfo := make(map[string]interface{})
-	for sname, subdev := range dev.amList {
-
+	for _, subdev := range dev.amList {
 		meterval := make(map[string]string)
 		meterval["TotalEnergy"] = fmt.Sprintf("%f", subdev.TotalEnergy)
 		if subdev.TsSample != 0 {
 			meterval["TsSample"] = time.Unix(subdev.TsSample, 0).Format("2006-01-02 15:04:05")
 		}
-		dinfo[sname] = meterval
+		dinfo[subdev.DevName] = meterval
 	}
 	mdev["Sub Meters"] = dinfo
 	return mdev
@@ -272,55 +287,88 @@ func (drv *AmmeterDrv) ParseTransRatio(dev *Ammeter) {
 	}
 }
 
-func (drv *AmmeterDrv) CreateDevice(model interface{}, filterGwId string) (int, []IDevice) {
+func (drv *AmmeterDrv) varyAdress(m *AmmeterModel, addr string, subid string) {
+	defer func() {
+		recover()
+	}()
+	iaddr, _ := strconv.Atoi(addr)
+	isub, _ := strconv.Atoi(subid)
+	if iaddr != 0 && isub > 0 {
+		iaddr = iaddr + (isub-1)*3
+		m.Address = strconv.Itoa(iaddr)
+	}
+}
+
+func (drv *AmmeterDrv) mountHub(m *AmmeterModel, list *[]IDevice) IDevice {
+	devCompID := m.DutSn
+	if m.DutSn == "" || m.DutSn == "0000" || len(m.DutSn) < 10 {
+		addr := m.Address
+		ss := strings.Split(addr, "-")
+		m.DutSn = m.Code + ss[0]
+		devCompID = "0000"
+		if len(ss) > 1 {
+			drv.varyAdress(m, ss[0], ss[1])
+		}
+	}
+	if hub, has := drv.DevList[m.DutSn]; has {
+		return hub
+	}
+	hub := &AmMeterHub{
+		DutSn:       m.DutSn,
+		Protocol:    m.Protocol,
+		chnExit:     make(chan bool),
+		loopserver:  false,
+		sampleTmr:   nil,
+		amList:      make(map[string]*Ammeter),
+		persitList:  make(map[string]*meterPersist),
+		queryIdx:    0,
+		DevCompCode: devCompID,
+	}
+	hub.Probe(m.DutSn, drv)
+	drv.DevList[m.DutSn] = hub
+	*list = append(*list, hub)
+	return hub
+}
+
+func (drv *AmmeterDrv) CreateDevice(model interface{}) (int, []IDevice) {
+	ts := time.Now().UnixMilli()
+	devlist := make([]IDevice, 0)
 	if model != nil {
-		models := model.(*[]AmmeterModel)
-		for _, m := range *models {
-			var hub IDevice = nil
-			var has bool
-			if m.GwDevId != filterGwId {
-				continue
-			}
-			if len(m.Code) < 4 || len(m.Address) < 8 {
+		devModels := model.(*[]AmmeterModel)
+		for _, mod := range *devModels {
+			if len(mod.Code) < 4 || len(mod.Address) < 8 {
 				continue
 			}
-			devCompID := m.DutSn
-			if m.DutSn == "" {
-				m.DutSn = m.Code + m.Address
-				devCompID = "0000"
-			}
-			if hub, has = drv.DevList[m.DutSn]; !has {
-				hub = &AmMeterHub{
-					DutSn:       m.DutSn,
-					Protocol:    m.Protocol,
-					chnExit:     make(chan bool),
-					loopserver:  false,
-					sampleTmr:   nil,
-					amList:      make(map[string]*Ammeter),
-					persitList:  make(map[string]*Ammeter),
-					queryIdx:    0,
-					DevCompCode: devCompID,
-				}
-				hub.Probe(m.DutSn, drv)
-				drv.DevList[m.DutSn] = hub
-			}
-
-			if _, has := hub.(*AmMeterHub).amList[m.DevName]; !has {
-				dev := &Ammeter{
-					TotalEnergy: 0,
-					TsSample:    0,
+			hub := drv.mountHub(&mod, &devlist)
+			id := mod.Code + mod.Address
+			am, has := hub.(*AmMeterHub).amList[id]
+			if !has {
+				eng := &meterPersist{}
+				config.GetDB().Find(eng, "dev_name='"+mod.DevName+"'")
+				am = &Ammeter{
+					TotalEnergy: eng.TotalEnergy,
+					DBStatus:    eng.DBStatus,
+					TsSample:    eng.TsSample,
 					TransDiv:    1,
 					TransDeno:   1,
-					DBStatus:    0,
 				}
-				config.GetDB().Find(dev, "dev_name='"+m.DevName+"'")
-				dev.AmmeterModel = m
-				drv.ParseTransRatio(dev)
-				hub.(*AmMeterHub).amList[dev.DevName] = dev
+				am.AmmeterModel = mod
+				drv.ParseTransRatio(am)
+				hub.(*AmMeterHub).amList[id] = am
 			}
+			am.TsUpdate = ts
 		}
 	}
-	return drv.baseDriver.CreateDevice(model, filterGwId)
+
+	for _, hub := range drv.DevList {
+		for id, am := range hub.(*AmMeterHub).amList {
+			if am.TsUpdate != ts {
+				delete(hub.(*AmMeterHub).amList, id)
+			}
+		}
+	}
+
+	return int(ts), devlist
 }
 
 func (drv *AmmeterDrv) GetModel() interface{} {
@@ -334,5 +382,6 @@ func NewAmMeter(param interface{}) IDriver {
 
 func init() {
 	driverReg["ammeter"] = NewAmMeter
-	config.GetDB().CreateTbl(&Ammeter{})
+	config.GetDB().CreateTbl(&AmmeterModel{})
+	config.GetDB().CreateTbl(&meterPersist{})
 }

+ 5 - 1
drivers/device.go

@@ -28,6 +28,7 @@ type routePath struct {
 }
 
 type baseDevice struct {
+	tsUpdate   int64
 	DeviceName string
 	Status     string
 	drv        IDriver
@@ -106,11 +107,14 @@ func (dev *baseDevice) SetRoute(evt bus.IBusEvent, prot string, busName string,
 	r.router[0] = make(chan interface{}, 16)
 
 	r.iproto = protocol.LoadProtocol(prot)
+	if r.iproto == nil {
+		return 0
+	}
 	r.iproto.Init(prot)
 
 	r.ibus = bus.MountBus(busName, param)
 	r.ibus.Init()
-	r.iChn = r.ibus.OpenChannel(chn, r.router)
+	r.iChn = r.ibus.MountChannel(chn, r.router)
 	r.iChn.SetEvent(evt, r.iproto)
 
 	dev.Route = append(dev.Route, *r)

+ 31 - 0
drivers/models.go

@@ -0,0 +1,31 @@
+package drviers
+
+type DevModel struct {
+	Id      string `json:"id" gorm:"-"`
+	DevName string `json:"devName" gorm:"primary_key"`
+	Type    string `json:"-" gorm:"type"`
+	GwDevId string `json:"gwDevId" gorm:"-"`
+}
+
+func (m *DevModel) GetType() string {
+	return m.Type
+}
+
+func (m *DevModel) GetGatewayID() string {
+	return m.GwDevId
+}
+
+func (m *DevModel) GetName() string {
+	return m.DevName
+}
+
+func (m *DevModel) GetObject() interface{} {
+	return m
+}
+
+type IModel interface {
+	GetName() string
+	GetType() string
+	GetGatewayID() string
+	GetObject() interface{}
+}

+ 2 - 2
middleware/ginmain.go

@@ -22,9 +22,9 @@ var AppInfo = &APP_INFO{
 var msgInd chan interface{} = nil
 var router *gin.Engine
 
-func RestServerStart(mainloop chan interface{}) {
+func RestServerStart(msgHandler chan interface{}) {
 	// Engin
-	msgInd = mainloop
+	msgInd = msgHandler
 	router = gin.Default()
 	router.GET("/Gateway4G/UpdateDevcie", UpdateDevcie) // hello函数处理"/hello"请求
 	router.GET("/Gateway4G/ListDevices", ListDevices)

+ 38 - 28
platform/gateway.go

@@ -4,8 +4,6 @@ import (
 	"errors"
 	"strings"
 
-	"github.com/yuguorong/go/log"
-
 	bus "github.com/ammeter/Bus"
 	"github.com/ammeter/cloudserver"
 	"github.com/ammeter/config"
@@ -27,14 +25,18 @@ const (
 )
 
 type Gateway struct {
+	ID         int `gorm:"not null"`
 	Mqttcfg    bus.QMqtt
-	Name       string
-	Uid        string
+	Name       string `gorm:"not null"`
+	Uid        string `gorm:"not null"`
 	modelList  map[string]string
-	NorthRoute interface{}
-	DeviceList map[string]drviers.IDevice
+	tsUpdate   int64
+	NorthRoute interface{} `json:"-" gorm:"-"`
+	drvList    map[string]drviers.IDriver
 }
 
+//https://test-admin.pacom.cn/platform/dev/get-sair-list
+
 func (gw *Gateway) SyncCloudModelDevice(url string, model interface{}) error {
 	var err = errors.New("Nil model?")
 	if model != nil {
@@ -44,7 +46,7 @@ func (gw *Gateway) SyncCloudModelDevice(url string, model interface{}) error {
 			"gwDevId": gw.Uid,
 		}
 		err = cs.GetClientData(url, model, &param)
-		log.Info(model)
+		//log.Info(model)
 	}
 	return err
 }
@@ -68,30 +70,34 @@ func priotyInf(ni *netinfo, name string) bool {
 	return false
 }
 
-func (gw *Gateway) InitGateway(modelList map[string]string) {
+func (gw *Gateway) InitGateway() {
 	//gw.GetGatewayName()
-	gw.DeviceList = make(map[string]drviers.IDevice)
-	gw.modelList = modelList
 
 }
 
+func (gw *Gateway) Remove(uid string) {
+	for _, drv := range gw.drvList {
+		drv.Uninstall()
+	}
+	gw.drvList = nil
+	gw.modelList = nil
+	gw.Mqttcfg = bus.QMqtt{}
+}
+
 //
-func (gw *Gateway) ApplyProfile(uid string) {
-	//	name := "ammeter"
-	//	drvU := drviers.Install("SAIR10", nil)
-	for name, url := range gw.modelList { //model name + model cloud api url
-		drv := drviers.Install(name, nil) //If already installed ,return installed instance.
-		model := drv.GetModel()           //model: cloud information of device, create exact device absctrace from model
-		if url != "" {
-			if gw.SyncCloudModelDevice(url, model) != nil {
-				model = config.GetSysConfig().GetProfile("gateway/"+name+"/amlist", model) //Cloud failed ,using local saved model
-			} else {
-				config.GetSysConfig().SetProfile("gateway/"+name+"/amlist", model) //update local model
-			}
+func (gw *Gateway) InstallDrivers(uid string) {
+
+	//tsUpdate := time.Now().UnixMilli()
+	for name, _ := range gw.modelList { //model name + model cloud api url
+		drv, has := gw.drvList[name]
+		if !has || drv == nil {
+			drv = drviers.Install(name, nil) //If already installed ,return installed instance.
+			gw.drvList[name] = drv
 		}
-		if sz, devlist := drv.CreateDevice(model, gw.Mqttcfg.Uid); sz > 0 {
+		model := drv.GetModel() //model: cloud information of device, create exact device absctrace from model
+		config.GetDB().Find(model, `gw_dev_id="`+gw.Uid+`"`)
+		if sz, devlist := drv.CreateDevice(model); sz > 0 {
 			for _, dev := range devlist {
-				gw.DeviceList[dev.Name()] = dev
 				dev.SetRoute(nil, "ThingsBoards", "mqtt", "v1/gateway/telemetry", &gw.Mqttcfg)
 				dev.Open()
 			}
@@ -99,11 +105,15 @@ func (gw *Gateway) ApplyProfile(uid string) {
 	}
 }
 
-func InitGateway(mqttcfg *bus.QMqtt, modelList map[string]string) *Gateway {
+func InitGateway(mqttcfg *bus.QMqtt, extname string, models map[string]string) *Gateway {
 	gw := &Gateway{
-		Uid:     mqttcfg.Uid,
-		Mqttcfg: *mqttcfg,
+		Uid:       mqttcfg.Uid,
+		Mqttcfg:   *mqttcfg,
+		tsUpdate:  0,
+		drvList:   make(map[string]drviers.IDriver),
+		Name:      extname,
+		modelList: models,
 	}
-	gw.InitGateway(modelList)
+
 	return gw
 }

+ 2 - 0
platform/go.mod

@@ -12,6 +12,8 @@ replace github.com/ammeter/protocol => ../protocol
 
 replace github.com/ammeter/Bus => ../Bus
 
+replace github.com/ammeter/util => ../util
+
 require (
 	github.com/ammeter/Bus v0.0.0-00010101000000-000000000000
 	github.com/ammeter/cloudserver v0.0.0-00010101000000-000000000000

+ 93 - 22
platform/platform.go

@@ -1,6 +1,8 @@
 package platform
 
 import (
+	"encoding/json"
+	"errors"
 	"reflect"
 	"time"
 
@@ -9,6 +11,8 @@ import (
 	bus "github.com/ammeter/Bus"
 	"github.com/ammeter/cloudserver"
 	"github.com/ammeter/config"
+	drviers "github.com/ammeter/drivers"
+	"github.com/ammeter/util"
 )
 
 const (
@@ -18,42 +22,101 @@ const (
 	DEF_FTP_PORT     = 10010
 )
 
+type GatewayConfig struct {
+	bus.QMqtt
+	Tenants string `json:"userName"`
+}
+
 type PaPlatform struct {
 	gwUrl       string
+	jsGwList    string
+	jsDevList   map[string]string
 	GatewayList map[string]*Gateway
 	modelList   map[string]string
 }
 
-func (p *PaPlatform) SyncCloudGWConfig() *[]bus.QMqtt {
+func (p *PaPlatform) LoadSyncGWConfig() *[]GatewayConfig {
 	if p.gwUrl == "" {
 		p.gwUrl = DEF_REQ_AMGW_URL
 	}
-	mqttList := []bus.QMqtt{}
+	mqttList := []GatewayConfig{}
 	prof := config.GetSysConfig().GetProfile("remote_config", cloudserver.GetCloudConfig())
 	cs := cloudserver.GetCloudServer(prof)
-	err := cs.GetClientData(p.gwUrl, &mqttList, nil)
-	if err != nil {
-		return nil
+	if err := cs.GetClientData(p.gwUrl, &mqttList, nil); err == nil {
+		for i := 0; i < len(mqttList); {
+			if mqttList[i].QMqtt.User == "" || mqttList[i].QMqtt.MqttUrl == "" || mqttList[i].QMqtt.Uid == "" || mqttList[i].QMqtt.Name == "" {
+				mqttList = append(mqttList[:i], mqttList[i+1:]...)
+			} else {
+				i++
+			}
+		}
+		jmq, _ := json.Marshal(&mqttList)
+		if string(jmq) != p.jsGwList {
+			config.GetDB().Save(&mqttList)
+			p.jsGwList = string(jmq)
+			//need check different
+		}
+	} else {
+		config.GetDB().Table(&mqttList)
 	}
-	config.GetSysConfig().SetValue("mqtt_list", &mqttList)
-	config.GetSysConfig().Save()
 	return &mqttList
 }
 
-func (p *PaPlatform) LoadGatewayProfile() {
-	gwMqttList := &[]bus.QMqtt{}
-	config.GetSysConfig().GetValue("mqtt_list", gwMqttList)
-	if gwrmt := p.SyncCloudGWConfig(); gwrmt != nil {
-		gwMqttList = gwrmt
-		config.GetSysConfig().SetValue("mqtt_list", gwMqttList)
+func (p *PaPlatform) LoadCloudDevices(url string, devInfo interface{}) error {
+	var err = errors.New("Nil model?")
+	if devInfo != nil {
+
+		//log.Info(model)
 	}
+	return err
+}
+
+func (p *PaPlatform) LoadSyncAllDevices() {
+	for name, url := range p.modelList {
+		drv := drviers.Install(name, nil)
+		model := drv.GetModel()
+		if url != "" && model != nil {
+			prof := config.GetSysConfig().GetProfile("remote_config", cloudserver.GetCloudConfig())
+			cs := cloudserver.GetCloudServer(prof)
+			param := map[string]string{
+				"gwDevId": "",
+			}
+			bupdate := true
+			err := cs.GetClientData(url, model, &param)
+			if err == nil {
+				jdev, _ := json.Marshal(model)
+				if js, has := p.jsDevList[name]; has {
+					bupdate = (js != string(jdev))
+				}
+				p.jsDevList[name] = string(jdev)
+				if bupdate {
+					config.GetDB().CreateTbl(model)
+					config.GetDB().Save(model)
+				}
+			}
+		}
+	}
+}
+
+func (p *PaPlatform) LoadGatewayProfile() {
+	tscur := time.Now().UnixMilli()
+	p.LoadSyncAllDevices()
+	gwMqttList := p.LoadSyncGWConfig()
 	for _, mq := range *gwMqttList {
 		gw, has := p.GatewayList[mq.Uid]
 		if !has {
-			gw = InitGateway(&mq, p.modelList)
+			gw = InitGateway(&mq.QMqtt, mq.Tenants, p.modelList)
 			p.GatewayList[gw.Uid] = gw
 		}
-		gw.ApplyProfile(mq.Uid)
+		gw.InstallDrivers(mq.Uid)
+		gw.tsUpdate = tscur
+	}
+
+	for name, gw := range p.GatewayList {
+		if gw.tsUpdate != tscur {
+			gw.Remove(gw.Uid)
+			delete(p.GatewayList, name)
+		}
 	}
 }
 
@@ -63,19 +126,22 @@ func (p *PaPlatform) ListDevices() interface{} {
 
 	for _, gw := range p.GatewayList {
 		mgw := make(map[string]interface{})
-		for n, dev := range gw.DeviceList {
-			mgw[n] = dev.ListDevices()
+		for _, drv := range gw.drvList {
+			for n, dev := range drv.GetDeviceList() {
+				mgw[n] = dev.ListDevices()
+			}
 		}
-		mgw["ID"] = gw.Uid
+		//mgw["ID"] = gw.Uid
+		mgw["Tenants"] = gw.Name
 		mgw["URL"] = gw.Mqttcfg.MqttUrl
 		devmap[gw.Mqttcfg.Name] = mgw
 	}
-
 	return devmap
 }
 
 func (p *PaPlatform) Init() {
 	p.GatewayList = make(map[string]*Gateway)
+	p.jsDevList = make(map[string]string)
 	p.LoadModles()
 	p.LoadGatewayProfile()
 }
@@ -103,21 +169,22 @@ func (p *PaPlatform) SetGatewayUrl(url string) {
 	p.gwUrl = url
 }
 
-func StartServer(mainloop chan interface{}) {
+func StartServer() {
 	p := PaPlatform{}
 	p.Init()
+	msgAPI := util.GetMsgHandler("API")
 
 	ticksAutoFresh := time.NewTicker(time.Duration(time.Minute * 10))
 
 	for loop := true; loop; {
 		select {
-		case msg := <-mainloop:
+		case msg := <-msgAPI:
 			if reflect.TypeOf(msg).Kind() == reflect.String {
 				switch msg.(string) {
 				case "UpdateDevice":
 					p.LoadGatewayProfile()
 				case "ListDevices":
-					chn := <-mainloop
+					chn := <-msgAPI
 					chn.(chan interface{}) <- p.ListDevices()
 				case "exit":
 					loop = false
@@ -130,3 +197,7 @@ func StartServer(mainloop chan interface{}) {
 	log.Info("exit now")
 
 }
+
+func init() {
+	config.GetDB().CreateTbl(&GatewayConfig{})
+}

+ 52 - 24
protocol/dlt645.go

@@ -179,10 +179,20 @@ const (
 10	DATA	数据域
 11	CS	校验码
 */
+type dlt645Opcode struct {
+	ctrl   int32
+	dataID int32
+	dIDLen int32
+}
+
 type dlt645 struct {
 	name string
 }
 
+const (
+	DEFAULT_OPCODE = "TotalActivePower"
+)
+
 func bcd2int(bcd []byte) int64 {
 	vi := int64(0)
 	for i := 0; i < len(bcd); i++ {
@@ -228,25 +238,42 @@ func (p *dlt645) setDltCheckSum(buff []byte) {
 	buff[len] = sum
 }
 
-func (p *dlt645) PackageCmd(opcode string, param ...interface{}) interface{} {
-	amaddr := param[0].(string)
-	buff := []byte{0xFE, 0xFE, 0xFE, 0xFE, DLT_CHAR_FRAME_START, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, DLT_CHAR_FRAME_START, DLT_DEF_CHAR_OPCODE, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}
-	pkPos := DLT_LEAD_LEN + DLT_ADDR_POS
-	err := p.setDltAddr(buff[pkPos:pkPos+DLT_ADDR_LEN], amaddr)
-	if err != nil {
-		log.Error("error in set meter addr ", amaddr)
+func (p *dlt645) PackageCmd(strcmd string, param ...interface{}) interface{} {
+	opcode, has := cmd2DataField[strcmd]
+	if !has {
 		return nil
 	}
+	amaddr := "BROADCAST"
+
+	buff := []byte{0xFE, 0xFE, 0xFE, 0xFE, DLT_CHAR_FRAME_START, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA,
+		DLT_CHAR_FRAME_START, byte(opcode.ctrl), 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, DLT_CHAR_TAIL}
+	pkPos := DLT_LEAD_LEN + DLT_ADDR_POS
+	if len(param) > 0 {
+		amaddr = param[0].(string)
+		err := p.setDltAddr(buff[pkPos:pkPos+DLT_ADDR_LEN], amaddr)
+		if err != nil {
+			log.Error("error in set meter addr ", amaddr)
+			return nil
+		}
+	}
+
 	parampos := pkPos + DLT_ADDR_LEN + 2
-	err = p.setDltOpcode(buff[parampos:parampos+5], opcode)
-	if err != nil {
-		log.Error("error in set op code ", opcode)
-		return nil
+	if opcode.dIDLen > 0 {
+		sDataID := fmt.Sprintf("%08d", int(opcode.dataID))
+		if err := p.setDltOpcode(buff[parampos:parampos+int(opcode.dIDLen)+1], sDataID); err != nil {
+			log.Error("error in set op code ", opcode)
+			return nil
+		}
+		parampos += int(opcode.dIDLen) + 1
+	} else {
+		buff[parampos] = 0
+		parampos++
 	}
-	p.setDltCheckSum(buff)
-	buff = append(buff, 0x16)
-	log.Infof("Send cmd [%s] to dtu[%s]\n", hex.EncodeToString(buff), amaddr)
-	return buff
+	p.setDltCheckSum(buff[:parampos+1])
+	buff[parampos+1] = DLT_CHAR_TAIL
+	//[fefefefe68020068361211681104333333337416] to dtu[111236680002]
+	log.Infof("Pack cmd [%s] to dtu[%s]\n", hex.EncodeToString(buff), amaddr) //
+	return buff[:parampos+2]
 }
 
 func Assert(est bool, infoi int) {
@@ -294,11 +321,11 @@ func (p *dlt645) ParsePacket(rxb []byte, params ...interface{}) (mapv interface{
 	Assert(rxb[lenb-1-1] == sum, int(sum))
 
 	//get addr
-	addr := rxb[pos+1 : pos+1+DLT_ADDR_LEN]
-	for i := 0; i < DLT_ADDR_LEN/2; i++ {
-		addr[i], addr[DLT_ADDR_LEN-i-1] = addr[DLT_ADDR_LEN-i-1], addr[i]
+	addr := make([]uint8, DLT_ADDR_LEN) // rxb[pos+1 : pos+1+DLT_ADDR_LEN]
+	for i := 0; i < DLT_ADDR_LEN; i++ {
+		addr[i] = rxb[pos+DLT_ADDR_LEN-i] //, addr[DLT_ADDR_LEN-i-1] = addr[DLT_ADDR_LEN-i-1], addr[i]
 	}
-	saddr := hex.EncodeToString(addr)
+	saddr := hex.EncodeToString(addr) //"111236680002"
 	if len(params) > 0 && reflect.TypeOf(params[0]).Kind() == reflect.Ptr {
 		*params[0].(*string) = saddr
 	}
@@ -329,8 +356,8 @@ func (p *dlt645) ParsePacket(rxb []byte, params ...interface{}) (mapv interface{
 
 	if cmd == (C_2007_CODE_RD | DLT_CMD_DIR_REVERT) {
 		val := float64(bcd2int(param[:lenParam-DLT_DICCODE_LEN])) / 100.0
-		for k, v := range Cmd2Code {
-			if int(v) == dicCode {
+		for k, v := range cmd2DataField {
+			if int(v.ctrl) == dicCode {
 				mapv := make(map[string]interface{})
 				mapv[k] = val
 				return mapv
@@ -369,11 +396,12 @@ func NewDlt645() IProtocol {
 	return p
 }
 
-var Cmd2Code map[string]uint32
+var cmd2DataField map[string]dlt645Opcode
 
 func init() {
 	ProtReg["DLT645-2007"] = NewDlt645
-	Cmd2Code = map[string]uint32{
-		"TotalActivePower": DLT_GROUP_ACTIVE_POWER_TOTAL,
+	cmd2DataField = map[string]dlt645Opcode{
+		"TotalActivePower": {C_2007_CODE_RD, DLT_GROUP_ACTIVE_POWER_TOTAL, 4},
+		"ReadAddress":      {C_2007_CODE_RDA, 0, 0},
 	}
 }

+ 9 - 2
protocol/dlt645_test.go

@@ -9,11 +9,15 @@ import (
 
 //fefefefe68040042050821681104333333332516
 //fefefefe68040042050821681104333333332516
-
 func TestPacket(t *testing.T) {
 	dlt645 := LoadProtocol("DLT645-2007")
+	//FEFEFEFE68AAAAAAAAAAAA681300DF16
+	readcmd := dlt645.PackageCmd("ReadAddress")
+	log.Infof("%X\n", readcmd)
 
-	cmd := dlt645.PackageCmd("00000000", "210805420004")
+	//fefefefe68010018251120681104333333332016
+	cmd := dlt645.PackageCmd("TotalActivePower", "201125180001")
+	log.Infof("cmd %X", cmd)
 	if cmd == nil {
 		t.Error("failed in PackageCmd")
 	}
@@ -26,6 +30,9 @@ func TestPacket(t *testing.T) {
 func TestParse(t *testing.T) {
 	dlt645 := LoadProtocol("DLT645-2007")
 
+	v := dlt645.PackageCmd("TotalActivePower", "111236680002")
+	log.Infof("%X\n", v)
+
 	cap := []byte{104, 2, 0, 104, 54, 18, 17, 104, 145, 8, 51, 51, 51, 51, 51, 51, 51, 51, 196, 22}
 	//104, 17, 18, 54, 104, 0, 2, 104, 147, 6, 53, 51, 155, 105, 69, 68, 33, 22}
 	//t1 := []byte{0xfe, 0xfe, 0xfe, 0xfe, 0x68, 0x04, 0x00, 0x42, 0x05, 0x08, 0x21, 0x68, 0x11, 0x04, 0x33, 0x33, 0x33, 0x33, 0x25, 0x16}

+ 3 - 0
util/go.mod

@@ -0,0 +1,3 @@
+module github.com/ammeter/util
+
+go 1.17

+ 29 - 0
util/util.go

@@ -0,0 +1,29 @@
+package util
+
+var msgHandler map[string](chan interface{})
+
+func GetMsgHandler(msgPipe string) chan interface{} {
+	hdr, has := msgHandler[msgPipe]
+	if !has {
+		hdr = make(chan interface{})
+		msgHandler[msgPipe] = hdr
+	}
+	return hdr
+}
+
+func PostMessage(msgPipe string, msg interface{}) {
+	hdr, has := msgHandler[msgPipe]
+	if !has {
+		hdr = make(chan interface{})
+		msgHandler[msgPipe] = hdr
+	}
+	hdr <- msg
+}
+
+func init() {
+	msgHandler = make(map[string]chan interface{})
+}
+
+func DebugLevel() int {
+	return 1
+}