Преглед изворни кода

basic without rpc architecture

yuguorong пре 3 година
комит
dbad5dffc5

+ 21 - 0
.vscode/launch.json

@@ -0,0 +1,21 @@
+{
+    // Use IntelliSense to learn about possible attributes.
+    // Hover to view descriptions of existing attributes.
+    // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
+    "version": "0.2.0",
+    "configurations": [
+
+        {
+            "name": "Launch",
+            "type": "go",
+            "request": "launch",
+            "mode": "auto",
+            "program": "${workspaceRoot}/cmd",
+            "dlvFlags": [
+                "--check-go-version=false"
+            ],
+            "env": {},
+            "args": []
+        }        
+    ]
+}

+ 40 - 0
Bus/api.go

@@ -0,0 +1,40 @@
+package bus
+
+import "time"
+
+//Channel Dispatch not in event because event and dispatch handler not same one.
+
+type ChnDispResult int
+
+const (
+	DispatchSingle ChnDispResult = -1 + iota
+	DispatchNone
+	DispatchMulti
+)
+
+//Dispatch maybe a protocol object
+type IBusEvent interface {
+	OnAttach(chn IChannel)
+	OnDetach(chn IChannel)
+	ChannelDispatch(stream []byte, argEvt interface{}) ChnDispResult
+}
+
+type IChannel interface {
+	ID() string
+	GetChan(id int) chan interface{}
+	SetTimeout(time.Duration)
+	SetEvent(evt IBusEvent, evtArgs interface{})
+	GetEvent() IBusEvent
+	GetBus() IBus
+}
+
+type IBus interface {
+	Init() error
+	Uninit()
+	OpenChannel(chnID interface{}, router []chan interface{}) IChannel
+	CloseChannel(chn IChannel) error
+	ResetChannel(chn IChannel)
+	ScanChannel(stream []byte, conn int) IChannel
+	Send(chn IChannel, buff interface{}) (int, error)
+	Recive(chn IChannel, buff interface{}) (int, error)
+}

+ 8 - 0
Bus/backup.txt

@@ -0,0 +1,8 @@
+
+func (bus *baseBus) SetChannelDispatch(FxchanDisp interface{}, param interface{}) {
+	f := reflect.ValueOf(FxchanDisp)
+	var fnxdisp CbBusChanDisp = nil
+	pfnx := reflect.ValueOf(&fnxdisp).Elem()
+	pfnx.Set(f)
+
+}

+ 183 - 0
Bus/bus.go

@@ -0,0 +1,183 @@
+package bus
+
+import (
+	"reflect"
+	"strconv"
+	"sync"
+	"time"
+)
+
+const (
+	CHAN_INPUT  = 0
+	CHAN_OUTPUT = 1
+)
+
+type CbBusChanDisp func(stream []byte, param interface{}) interface{}
+type BusChannel struct {
+	chnID     string
+	mountCnt  int
+	event     IBusEvent
+	bus       IBus
+	timeout   time.Duration
+	conIdList []int
+	evtArg    interface{}
+	chin      chan interface{}
+	chout     chan interface{}
+}
+
+func (chn *BusChannel) ID() string {
+	return chn.chnID
+}
+
+func (chn *BusChannel) GetChan(id int) chan interface{} {
+	switch id {
+	case 0:
+		return chn.chin
+	case 1:
+		return chn.chout
+	}
+	return nil
+}
+
+func (chn *BusChannel) SetTimeout(timeout time.Duration) {
+
+}
+
+func (chn *BusChannel) SetEvent(evt IBusEvent, evtArgs interface{}) {
+	chn.event = evt
+	chn.evtArg = evtArgs
+}
+
+func (chn *BusChannel) GetEvent() IBusEvent {
+	return chn.event
+}
+
+func (chn *BusChannel) GetBus() IBus {
+	return chn.bus
+}
+
+type baseBus struct {
+	BusId   string
+	ChnList map[string]IChannel
+	mutex   *sync.Mutex
+}
+
+func (bus *baseBus) Init() error {
+	if bus.ChnList == nil {
+		bus.ChnList = make(map[string]IChannel)
+		bus.mutex = new(sync.Mutex)
+	}
+
+	return nil
+}
+
+func (bus *baseBus) Uninit() {
+
+}
+
+func (bus *baseBus) stringId(chnID interface{}) string {
+	switch reflect.TypeOf(chnID).Kind() {
+	case reflect.String:
+		return chnID.(string)
+	case reflect.Int:
+		return strconv.Itoa(chnID.(int))
+	case reflect.Int64:
+		return strconv.FormatInt(chnID.(int64), 10)
+	}
+	return ""
+}
+
+func (bus *baseBus) OpenChannel(chnID interface{}, router []chan interface{}) IChannel {
+	schnid := bus.stringId(chnID)
+	if chn, has := bus.ChnList[schnid]; has {
+		return chn
+	}
+	c := &BusChannel{
+		chnID:     schnid,
+		event:     nil,
+		mountCnt:  0,
+		bus:       bus,
+		conIdList: make([]int, 0),
+		timeout:   0,
+	}
+	if len(router) > 1 {
+		c.chout = router[1]
+	}
+	if len(router) > 0 {
+		c.chin = router[0]
+	}
+
+	bus.mutex.Lock()
+	bus.ChnList[schnid] = c
+	bus.mutex.Unlock()
+
+	return c
+}
+func (dtu *baseBus) ResetChannel(chn IChannel) {
+}
+
+func (bus *baseBus) CloseChannel(chn IChannel) error {
+	id := chn.ID()
+	bus.mutex.Lock()
+	delete(bus.ChnList, id)
+	bus.mutex.Unlock()
+	basechn := chn.(*BusChannel)
+	basechn.mountCnt = 0
+	if basechn.event != nil {
+		basechn.event.OnDetach(chn)
+	}
+	return nil
+}
+
+func (bus *baseBus) ScanChannel(stream []byte, connID int) IChannel {
+	for _, ichn := range bus.ChnList {
+		chn := ichn.(*BusChannel)
+		if chn.event == nil || chn.mountCnt < 0 {
+			continue
+		}
+		if ret := chn.event.ChannelDispatch(stream, chn.evtArg); ret != DispatchNone {
+			chn.mountCnt += int(ret)
+			chn.conIdList = append(chn.conIdList, connID)
+			chn.event.OnAttach(ichn)
+			return ichn
+		}
+	}
+	return nil
+}
+
+func (bus *baseBus) Send(chn IChannel, buff interface{}) (int, error) {
+	return 0, nil
+}
+
+func (bus *baseBus) Recive(chn IChannel, buff interface{}) (int, error) {
+	return 0, nil
+}
+
+type funcRegBus func(param []interface{}) IBus
+type funcGetID func(string, []interface{}) string
+
+var BusList map[string]IBus
+var BusGetID map[string]funcGetID
+var BusReg map[string]funcRegBus
+
+func MountBus(name string, param []interface{}) IBus {
+	busid := name
+	if f, has := BusGetID[name]; has {
+		busid = f(name, param)
+	}
+	if p, has := BusList[busid]; has && p != nil {
+		return p
+	}
+	if f, has := BusReg[name]; has && f != nil {
+		b := f(param)
+		BusList[busid] = b
+		return b
+	}
+	return nil
+}
+
+func init() {
+	BusList = make(map[string]IBus)
+	BusReg = make(map[string]funcRegBus)
+	BusGetID = make(map[string]funcGetID)
+}

+ 91 - 0
Bus/bus_test.go

@@ -0,0 +1,91 @@
+package bus
+
+import (
+	"encoding/hex"
+	"reflect"
+	"testing"
+	"time"
+
+	"github.com/yuguorong/go/log"
+)
+
+var dbgChan IChannel = nil
+var dbgBus IBus = nil
+var dbgtx = []byte{
+	0xfe, 0xfe, 0xfe, 0xfe, 0x68, 0x04, 0x00, 0x42, 0x05, 0x08, 0x21, 0x68, 0x11, 0x04, 0x33, 0x33, 0x33, 0x33, 0x25, 0x16,
+}
+
+//fefefefe68040042050821681104333333332516
+func testrecive(chnExit chan bool, r []chan interface{}) {
+	tmr := time.NewTimer(time.Second * 10)
+	tmr.Stop()
+	loop := true
+	for loop {
+		select {
+		case msg := <-r[0]:
+			switch reflect.TypeOf(msg).Kind() {
+			case reflect.Slice:
+				b := msg.([]byte)
+				log.Info(hex.EncodeToString(b))
+				if b[0] == 0x23 && b[1] == 0x23 {
+					log.Info(string(b))
+				}
+			case reflect.String:
+				log.Info(msg.(string))
+				tmr.Reset(time.Second * 10)
+			}
+		case <-chnExit:
+			loop = false
+		case <-tmr.C:
+			if dbgChan != nil && dbgBus != nil {
+				log.Info(dbgChan.ID(), " in timer command send")
+				dbgBus.Send(dbgChan, dbgtx)
+				tmr.Reset(time.Second * 20)
+			}
+
+		}
+	}
+}
+
+func mountbus(name string, param ...interface{}) IBus {
+	return MountBus(name, param)
+}
+
+func testDispath(buf []byte, param interface{}) interface{} {
+
+	log.Info(string(buf))
+	log.Info(hex.EncodeToString(buf))
+	if hex.EncodeToString(buf) == "9521003697" {
+		return hex.EncodeToString(buf)
+	}
+
+	return nil
+}
+
+var deviceList map[string][]string
+
+func TestDtuServer(T *testing.T) {
+	deviceList = map[string][]string{
+		"9521003872": {"2108", "05420005", "05420003", "05420002"}, //6f
+		"9521003712": {"2108", "05420001"},                         //1F
+		"9521003534": {"2108", "05420006"},                         //-1F
+		"9521003697": {"2108", "05420004"},                         //5c
+	}
+
+	router := make([]chan interface{}, 2)
+	router[0] = make(chan interface{}, 16)
+	router[1] = make(chan interface{}, 4)
+	chExit := make(chan bool, 2)
+	go testrecive(chExit, router)
+
+	dbgBus = mountbus("dtuFtpServer", 10010)
+	dbgBus.Init()
+
+	dbgChan = dbgBus.OpenChannel("9521003697", router)
+
+	time.Sleep(time.Second * 40)
+	chExit <- false
+	dbgBus.CloseChannel(dbgChan)
+	time.Sleep(time.Second * 10)
+	dbgBus.Uninit()
+}

+ 252 - 0
Bus/ftpDtu.go

@@ -0,0 +1,252 @@
+package bus
+
+import (
+	"encoding/hex"
+	"fmt"
+	"net"
+	"reflect"
+	"strconv"
+	"sync"
+	"time"
+
+	"github.com/yuguorong/go/log"
+
+	"github.com/ammeter/config"
+)
+
+const (
+	maxConnCount = 100
+	moduleName   = "dtuFtpServer"
+	DEF_FTP_PORT = 10010
+)
+
+type DtuServer struct {
+	baseBus
+	Port      int
+	name      string
+	lis       net.Listener
+	connlist  map[int]net.Conn
+	clientNum int
+	loop      bool
+}
+
+func (dtu *DtuServer) ResetChannel(chn IChannel) {
+
+}
+
+func (dtu *DtuServer) closeConn(conn net.Conn, connid int, chn IChannel) {
+	dtu.mutex.Lock()
+	if _, has := dtu.connlist[connid]; has {
+		delete(dtu.connlist, connid)
+		dtu.clientNum--
+	}
+	dtu.mutex.Unlock()
+
+	conn.Close()
+	if chn != nil {
+		basechn := chn.(*BusChannel)
+		log.Infof("client [%s] close\n", basechn.chnID)
+		basechn.mountCnt = 0
+		if basechn.event != nil {
+			basechn.event.OnDetach(chn)
+		}
+		for i, id := range basechn.conIdList {
+			if id == connid {
+				basechn.conIdList[i] = -1
+				break
+			}
+		}
+	}
+}
+
+func (dtu *DtuServer) LookupChannel(buf []byte, idx int, conn net.Conn) IChannel {
+	return dtu.baseBus.ScanChannel(buf, idx)
+}
+
+func (dtu *DtuServer) ClientConnect(conn net.Conn, idx int) {
+	var ic IChannel = nil
+	remoteAddr := conn.RemoteAddr().String()
+	defer dtu.closeConn(conn, idx, ic)
+
+	var err error
+	var buf [1024]byte
+	n := 0
+	for ic == nil {
+		n, err = conn.Read(buf[:])
+		if err != nil {
+			log.Infof("read from %s header faild err:[%v]\n", remoteAddr, err)
+			return
+		}
+		ic = dtu.LookupChannel(buf[:n], idx, conn)
+	}
+
+	chnin := ic.GetChan(0)
+	if chnin == nil {
+		panic("no chan for read message")
+	}
+	chnout := ic.GetChan(1)
+	dtu.name = hex.EncodeToString(buf[:n])
+
+	for {
+		smeter := hex.EncodeToString(buf[:n])
+		dtu.mutex.Lock()
+		chnin <- buf[:n]
+		dtu.mutex.Unlock()
+
+		if chnout != nil {
+			after := time.After(time.Second * 60)
+			select {
+			case msg := <-chnout:
+				conn.Write(msg.([]byte))
+			case <-after:
+				break
+			}
+		}
+		// log.Printf("rev data from %s msg:%s\n", conn.RemoteAddr().String(), string(buf[:n]))
+		log.Infof("[%s]rev data from %s msg(%d):[%s]\n", time.Now().Format("2006-01-02 15:04:05"), remoteAddr, n, smeter)
+
+		if ic.(*BusChannel).timeout != 0 {
+			conn.SetReadDeadline(time.Now().Add(ic.(*BusChannel).timeout))
+		}
+		n, err = conn.Read(buf[:])
+		if err != nil {
+			log.Errorf("read from %s msg faild err:[%v]\n", ic.ID(), err)
+			dtu.closeConn(conn, idx, ic)
+			break
+		}
+	}
+}
+
+func (dtu *DtuServer) StartServer() {
+	defer func() {
+		dtu.lis.Close()
+	}()
+	idx := 0
+
+	for dtu.loop {
+		if dtu.clientNum >= maxConnCount {
+			log.Infof("there is %d clients,is max num\n", dtu.clientNum)
+			time.Sleep(5 * time.Second)
+			continue
+		}
+		conn, err := dtu.lis.Accept()
+		if err != nil {
+			log.Errorf("listen err:[%v]\n", err)
+		}
+		dtu.mutex.Lock()
+		dtu.connlist[idx] = conn
+		idx++
+		dtu.clientNum++
+		dtu.mutex.Unlock()
+		go dtu.ClientConnect(conn, idx-1)
+	}
+}
+
+func (dtu *DtuServer) Init() error {
+	dtu.baseBus.Init()
+	if !dtu.loop {
+		addr := fmt.Sprintf("0.0.0.0:%d", dtu.Port)
+		log.Info("start ", addr)
+		var err error = nil
+		dtu.lis, err = net.Listen("tcp", addr)
+		if err != nil {
+			log.Errorf("err! addr %s open faild err:[%v]\n", addr, err)
+			return err
+		}
+		dtu.mutex.Lock()
+		defer dtu.mutex.Unlock()
+
+		dtu.loop = true
+		go dtu.StartServer()
+	}
+	return nil
+}
+
+func (dtu *DtuServer) Uninit() {
+	dtu.mutex.Lock()
+	defer dtu.mutex.Unlock()
+	if dtu.loop {
+		dtu.loop = false
+		for _, c := range dtu.connlist {
+			c.Close()
+		}
+		dtu.lis.Close()
+		dtu.connlist = make(map[int]net.Conn)
+	}
+}
+
+func (dtu *DtuServer) OpenChannel(chn interface{}, router []chan interface{}) IChannel {
+	if chn == nil || reflect.TypeOf(chn).Kind() != reflect.String {
+		panic("Open channel should be a uniq string")
+	}
+	ic := dtu.baseBus.OpenChannel(chn, router)
+	return ic
+}
+
+func (dtu *DtuServer) CloseChannel(chn IChannel) error {
+	dtu.baseBus.CloseChannel(chn)
+	if dtu.loop {
+		//idx := chn.(*BusChannel).conn.(int)
+		for _, connID := range chn.(*BusChannel).conIdList {
+			if conn, has := dtu.connlist[connID]; has {
+				dtu.mutex.Lock()
+				delete(dtu.connlist, connID)
+				dtu.mutex.Unlock()
+				conn.Close()
+			}
+		}
+	}
+	return nil
+}
+
+func (dtu *DtuServer) Send(ichn IChannel, buff interface{}) (int, error) {
+	if ichn != nil {
+		chn := ichn.(*BusChannel)
+		for _, connID := range chn.conIdList {
+			if connID >= 0 {
+				if conn, has := dtu.connlist[connID]; has {
+					conn.Write(buff.([]byte))
+				}
+			}
+		}
+	}
+	return 0, nil
+}
+
+func GetFtpServerConfig(param []interface{}) int {
+	Port := 0
+	if param != nil && len(param) > 0 && reflect.TypeOf(param[0]).Kind() == reflect.Int {
+		Port = param[0].(int)
+	}
+
+	if Port == 0 {
+		Port = config.GetSysConfig().GetValue("Bus/DtuServer/Port", DEF_FTP_PORT).(int)
+	}
+	return Port
+}
+
+func NewDtuServer(param []interface{}) IBus {
+	Port := GetFtpServerConfig(param)
+	busid := GenDtuServerId(moduleName, param)
+	b := &DtuServer{
+		baseBus: baseBus{
+			BusId: busid,
+			mutex: &sync.Mutex{},
+		},
+		connlist:  make(map[int]net.Conn),
+		clientNum: 0,
+		Port:      Port,
+		loop:      false,
+	}
+	return b
+}
+
+func GenDtuServerId(name string, param []interface{}) string {
+	Port := GetFtpServerConfig(param)
+	return name + ":" + strconv.Itoa(Port)
+}
+
+func init() {
+	BusReg[moduleName] = NewDtuServer
+	BusGetID[moduleName] = GenDtuServerId
+}

+ 14 - 0
Bus/go.mod

@@ -0,0 +1,14 @@
+module github.com/ammeter/Bus
+
+go 1.14
+
+replace github.com/ammeter/platform => ../platform
+
+replace github.com/ammeter/config => ../config
+
+require (
+	github.com/ammeter/config v0.0.0-00010101000000-000000000000
+	github.com/eclipse/paho.mqtt.golang v1.3.5
+	github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726
+	github.com/yuguorong/go v0.0.0-20180604090527-bdc77568d726
+)

+ 14 - 0
Bus/go.sum

@@ -0,0 +1,14 @@
+github.com/eclipse/paho.mqtt.golang v1.3.5 h1:sWtmgNxYM9P2sP+xEItMozsR3w0cqZFlqnNN1bdl41Y=
+github.com/eclipse/paho.mqtt.golang v1.3.5/go.mod h1:eTzb4gxwwyWpqBUHGQZ4ABAV7+Jgm1PklsYT/eo8Hcc=
+github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
+github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
+github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726 h1:xT+JlYxNGqyT+XcU8iUrN18JYed2TvG9yN5ULG2jATM=
+github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726/go.mod h1:3yhqj7WBBfRhbBlzyOC3gUxftwsU0u8gqevxwIHQpMw=
+github.com/yuguorong/go v0.0.0-20180604090527-bdc77568d726 h1:3BSFfDVmj5yqpvj3H9m1EpZWhEKgJXW2C4MbTRv6rIM=
+github.com/yuguorong/go v0.0.0-20180604090527-bdc77568d726/go.mod h1:i+SDEIi6IWgErb4Yq6H542yr5F8vxSoNbe9wFC+N5jc=
+golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0 h1:Jcxah/M+oLZ/R4/z5RzfPzGbPXnVDPkEDtf2JnuxN+U=
+golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
+golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=

+ 155 - 0
Bus/mqtt.go

@@ -0,0 +1,155 @@
+package bus
+
+import (
+	"encoding/json"
+	"errors"
+	"sync"
+	"time"
+
+	"github.com/ammeter/config"
+	MQTT "github.com/eclipse/paho.mqtt.golang"
+	"github.com/yuguorong/go/log"
+)
+
+type QMqtt struct {
+	baseBus
+	User    string      `json:"accessToken"`
+	MqttUrl string      `json:"mqttserver"`
+	Uid     string      `json:"id"`
+	Name    string      `json:"name"`
+	Pswd    string      `json:-`
+	Qos     byte        `json:-`
+	cnn     MQTT.Client `json:-`
+	cliLock *sync.Mutex `json:-`
+}
+
+type CbRegistMqttSubs func(cnn *QMqtt) int
+
+var cbMqttEdgeConfig MQTT.MessageHandler = func(client MQTT.Client, msg MQTT.Message) {
+	log.Infof("R_TOPIC: %s\n", msg.Topic())
+	log.Infof("R_MSG: %s\n", msg.Payload())
+}
+
+func (mqtt *QMqtt) subTopic(subtopic string, Qos byte, hdlr MQTT.MessageHandler) {
+	log.Info("MQTT sub :[" + subtopic + "]")
+	if token := mqtt.cnn.Subscribe(subtopic, Qos, hdlr); token.Wait() && token.Error() != nil {
+		log.Info(token.Error())
+	}
+}
+
+func (mqtt *QMqtt) PublishObj(topic string, qos byte, in interface{}) {
+	jsonBytes, _ := json.Marshal(in)
+	sjson := string(jsonBytes)
+	log.Info(topic)
+	log.Info(sjson)
+	mqtt.cnn.Publish(topic, qos, true, sjson)
+}
+
+//MqttDisocnnect mqtt disconnect
+func (mqtt *QMqtt) Disocnnect() {
+	mqtt.cliLock.Lock()
+	mqtt.cnn.Disconnect(0)
+	mqtt.cnn = nil
+	mqtt.cliLock.Unlock()
+}
+
+//MqttConnServer mqtt init and connect server
+func (mqtt *QMqtt) ConnServer(retry int, cbSubs CbRegistMqttSubs) error {
+	suid := mqtt.Uid //"8df8de45-efa6-419a-a46c-acf30f017da5"
+	if mqtt.cnn != nil {
+		return errors.New("Mqtt already connected")
+	}
+
+	opts := MQTT.NewClientOptions().AddBroker(mqtt.MqttUrl)
+	opts.SetClientID(suid)
+	opts.SetUsername(mqtt.User)
+	opts.SetPassword(mqtt.Pswd)
+	opts.SetDefaultPublishHandler(cbMqttEdgeConfig)
+
+	mqtt.cliLock.Lock()
+	defer mqtt.cliLock.Unlock()
+	//create and start a client using the above ClientOptions
+	mqtt.cnn = MQTT.NewClient(opts)
+	for ; retry != 0; retry-- {
+		if token := mqtt.cnn.Connect(); token.Wait() && token.Error() == nil {
+			if cbSubs != nil {
+				cbSubs(mqtt)
+			}
+			log.Info("MQTT connect OK!")
+			return nil
+		} else {
+			log.Error("Retry to connect the MQTT server!! ", token.Error())
+		}
+		time.Sleep(time.Duration(3) * time.Second)
+	}
+	return errors.New("Fault Error! can not connect MQTT server!!!")
+}
+
+func (mqtt *QMqtt) StartServer() error {
+	return mqtt.ConnServer(-1, AppMqttSubs)
+}
+
+func (mqtt *QMqtt) StopServer() {
+	mqtt.Disocnnect()
+	log.Info("Stop MQTT server")
+}
+
+func (mqtt *QMqtt) Init() error {
+	mqtt.baseBus.Init()
+	go mqtt.StartServer()
+	return nil
+}
+
+func (mqtt *QMqtt) Uninit() {
+	mqtt.baseBus.Uninit()
+	mqtt.StopServer()
+}
+
+func (mqtt *QMqtt) Send(chn IChannel, buff interface{}) (int, error) {
+	mqtt.PublishObj(chn.ID(), mqtt.Qos, buff)
+	return 0, nil
+}
+
+func (mqtt *QMqtt) Recive(chn IChannel, buff interface{}) (int, error) {
+	return 0, nil
+}
+
+func AppMqttSubs(mqtt *QMqtt) int {
+	mqtt.subTopic("/sub/default", 1, cbMqttEdgeConfig)
+	return 0
+}
+
+func NewMqtt(param []interface{}) IBus {
+	mq := &QMqtt{
+		User:    "gZdomIS9Hz3d7HxvcoNx",
+		Pswd:    "",
+		MqttUrl: "test-sbuilding.pacom.cn:1885",
+		Name:    "mqtt",
+		cnn:     nil,
+		cliLock: new(sync.Mutex),
+	}
+
+	if len(param) >= 1 {
+		var cfgmq *QMqtt = nil
+		switch param[0].(type) {
+		case string:
+			cfg := config.GetSysConfig().GetProfile(param[0].(string), mq)
+			if cfg != nil {
+				cfgmq = cfg.(*QMqtt)
+			}
+		case *QMqtt:
+			cfgmq = param[0].(*QMqtt)
+		}
+		if cfgmq != nil {
+			mq.User = cfgmq.User
+			mq.MqttUrl = cfgmq.MqttUrl
+			mq.Pswd = cfgmq.Pswd
+			mq.Name = cfgmq.Name
+		}
+	}
+	return mq
+}
+
+func init() {
+	BusReg["mqtt"] = NewMqtt
+}

+ 133 - 0
cloudserver/CloudServer.go

@@ -0,0 +1,133 @@
+package cloudserver
+
+import (
+	"bytes"
+	"encoding/json"
+	"fmt"
+	"io/ioutil"
+	"net/http"
+	"reflect"
+	"time"
+
+	"github.com/yuguorong/go/log"
+)
+
+type CloudServer struct {
+	conf       *CloudServerConf
+	httpClient *http.Client
+	Name       string
+}
+
+func GetCloudServer(profile interface{}) *CloudServer {
+	pfofileCS := profile.(*CloudServerConf)
+	res := &CloudServer{conf: pfofileCS}
+	res.httpClient = &http.Client{Timeout: 5 * time.Second}
+	return res
+}
+
+func (c *CloudServer) GetClientData(infourl string, pData interface{}, param *map[string]string) error {
+	pNum, psize := 1, 100
+
+	reqParams := make(map[string]interface{})
+	reqParams["pageSize"] = psize
+	if param != nil {
+		for k, v := range *param {
+			reqParams[k] = v
+		}
+	}
+
+	pin := reflect.ValueOf(pData).Elem()
+	tin := reflect.TypeOf(pData).Elem()
+	sliceret := reflect.MakeSlice(tin, 0, 0)
+
+	itemCount := 0
+	for {
+		slicetmp := reflect.MakeSlice(tin, 0, 0)
+		pin.Set(slicetmp)
+		reqParams["pageNum"] = pNum
+		p, e := c.requestPage(infourl, reqParams)
+		pNum += 1
+		if e != nil {
+			log.Info("query ammeter error", e)
+			break
+		}
+
+		if len(p.Rows) == 0 {
+			break
+		}
+
+		e = p.ChangeData(pData)
+		if e != nil {
+			log.Info("change data error", e)
+			continue
+		}
+
+		vin := reflect.ValueOf(pData).Elem()
+		sliceret = reflect.AppendSlice(sliceret, vin)
+
+		itemCount += vin.Len()
+		if itemCount >= p.Total {
+			vin.Set(sliceret)
+			break
+		}
+	}
+
+	return nil
+}
+
+func (c *CloudServer) requestPage(url string, data interface{}) (p *PageModel, erro error) {
+	url = c.conf.ServerUrl + url
+	httpClient := c.httpClient
+
+	payload, erro := json.Marshal(data)
+	if erro != nil {
+		return
+	}
+	log.Info("reqParam", string(payload), url, time.Now())
+	reqBody := bytes.NewReader(payload)
+	req, err := http.NewRequest("POST", url, reqBody)
+	if err != nil {
+		erro = err
+		return
+	}
+
+	req.Header.Add("appId", c.conf.AppId)
+	req.Header.Add("signature", c.conf.GenerateSignature())
+	req.Header.Add("Content-Type", "application/json")
+
+	resp, err := httpClient.Do(req)
+
+	if err != nil {
+		erro = err
+		return
+	}
+
+	defer resp.Body.Close()
+
+	if resp.StatusCode != 200 {
+		erro = fmt.Errorf("respose statuscode is not equal 200")
+		return
+	}
+
+	body, err := ioutil.ReadAll(resp.Body)
+	if err != nil {
+		erro = err
+		return
+	}
+
+	var jbody CommonResp
+	err = json.Unmarshal(body, &jbody)
+	if err != nil {
+		erro = err
+		return
+	}
+	if jbody.Code != 0 {
+		erro = fmt.Errorf("response error %s", jbody.Msg)
+		return
+	}
+
+	pageModel := &PageModel{}
+	erro = jbody.ChangeData(pageModel)
+	p = pageModel
+	return
+}

+ 108 - 0
cloudserver/CloudServer_test.go

@@ -0,0 +1,108 @@
+package cloudserver
+
+import (
+	"encoding/json"
+	"os"
+	"testing"
+
+	"github.com/yuguorong/go/log"
+)
+
+const (
+	REQ_AMGW_URL    = "/platform/dev/get-4G-gateway-list"
+	REQ_AMMETER_URL = "/platform/dev/get-ammeter-list"
+)
+
+//4G 网关
+type GateWay4GModel struct {
+	AccessToken string `json:"accessToken"`
+	MqttServer  string `json:"mqttserver"`
+	Id          string `json:"id"`
+	Name        string `json:"name"`
+}
+
+type AmmeterModel struct {
+	Id       string `json:"id"`
+	DevName  string `json:"devName"`
+	Code     string `json:"code"`
+	DutSn    string `json:"dutSn"`
+	Address  string `json:"address"`
+	Protocol string `json:"protocol"`
+	GwDevId  string `json:"gwDevId"`
+}
+
+func TestForGetAmmeterGwList(t *testing.T) {
+	// t.Error(1)
+	conf := getTestConf()
+	cloudServer := GetCloudServer(conf)
+	gwList := []GateWay4GModel{}
+	err := cloudServer.GetClientData(REQ_AMGW_URL, &gwList, nil)
+	if err != nil || len(gwList) == 0 {
+		t.Error("test for getAmmeter error,please check the gw data  in server")
+	}
+	// for i := 0; i < len(gwList); i++ {
+	// 	d := gwList[i]
+	// 	log.Println("query ammeter by gw", d)
+	// 	ammeters := cloudServer.GetAmmeters(d.Id)
+	// 	log.Println("gw ammeters:", ammeters)
+	// }
+}
+
+func TestForGetAmmeterList(t *testing.T) {
+	// t.Error(1)
+	conf := getTestConf()
+	cloudServer := GetCloudServer(conf)
+	gwList := []GateWay4GModel{}
+	err := cloudServer.GetClientData(REQ_AMGW_URL, &gwList, nil)
+	if err != nil || len(gwList) == 0 {
+		t.Error("test for getAmmeter error,please check the gw data  in server")
+	}
+	checkRes := false
+	for i := 0; i < len(gwList); i++ {
+		d := gwList[i]
+		param := map[string]string{
+			"gwDevId": d.Id,
+		}
+		listMeters := []AmmeterModel{}
+		err := cloudServer.GetClientData(REQ_AMMETER_URL, &listMeters, &param)
+		if err == nil && !checkRes && len(listMeters) > 1 {
+			log.Info(listMeters)
+			checkRes = true
+			break
+		}
+	}
+	if !checkRes {
+		t.Error("test for getAmmeter error,please check the ammeter in your server ")
+	}
+}
+
+func getTestConf() *CloudServerConf {
+	res := CloudServerConf{
+		AppId:     "fvWmjGCU",
+		AppSecret: "054e7df0881eff8328858092f9e8ac0b0f356676",
+		ServerUrl: "https://test-admin.pacom.cn",
+	}
+	return &res
+}
+
+func TestForPageChangeData(t *testing.T) {
+	t.Log(1)
+	p := PageModel{
+		Total: 1,
+		Rows: []interface{}{
+			GateWay4GModel{AccessToken: "xx"}, GateWay4GModel{MqttServer: "xx123"},
+		},
+	}
+
+	var dd []GateWay4GModel
+	p.ChangeData(&dd)
+	t.Log(dd)
+}
+
+func TestConf(t *testing.T) {
+	f, _ := os.OpenFile("cloudconf.json", os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644)
+	defer f.Close()
+	jsonStu, _ := json.Marshal(GetCloudConfig())
+	f.WriteString(string(jsonStu))
+	log.Info("SysConfig saved!")
+}

+ 48 - 0
cloudserver/Conf.go

@@ -0,0 +1,48 @@
+package cloudserver
+
+import (
+	"crypto/md5"
+	"encoding/hex"
+	"encoding/json"
+	"os"
+
+	"github.com/yuguorong/go/log"
+)
+
+type CloudServerConf struct {
+	ServerUrl string
+	AppId     string
+	AppSecret string
+	Route     string
+}
+
+var defConf = &CloudServerConf{}
+
+func (c *CloudServerConf) GenerateSignature() string {
+	res := md5V(c.AppId + "-pacom-" + c.AppSecret)
+	return res
+}
+
+func md5V(str string) string {
+	h := md5.New()
+	h.Write([]byte(str))
+	return hex.EncodeToString(h.Sum(nil))
+}
+
+func GetCloudConfig() interface{} {
+	return defConf
+}
+
+func init() {
+	if file, err := os.Open("cloudconf.json"); err == nil {
+		defer file.Close()
+		var tmp = make([]byte, 1024)
+		n, err := file.Read(tmp)
+		if err == nil {
+			err = json.Unmarshal(tmp[:n], &defConf)
+			if err != nil {
+				log.Error(err)
+			}
+		}
+	}
+}

+ 9 - 0
cloudserver/FourGGatewayModel.go

@@ -0,0 +1,9 @@
+package cloudserver
+
+//4G 网关
+type FourGGatewayModel struct {
+	AccessToken string `json:"accessToken"`
+	MqttServer  string `json:"mqttserver"`
+	Id          string `json:"id"`
+	Name        string `json:"name"`
+}

+ 45 - 0
cloudserver/PageModel.go

@@ -0,0 +1,45 @@
+package cloudserver
+
+import (
+	"encoding/json"
+	"errors"
+
+	"github.com/yuguorong/go/log"
+)
+
+type PageModel struct {
+	Rows  []interface{} `json:"rows"`
+	Total int           `json:"total"`
+}
+
+func (p *PageModel) ChangeData(resPoint interface{}) error {
+	if len(p.Rows) == 0 {
+		return nil
+	}
+	b, e := json.Marshal(p.Rows)
+	log.Info("data", string(b))
+	if e != nil {
+		return e
+	}
+	e = json.Unmarshal(b, resPoint)
+	return e
+}
+
+type CommonResp struct {
+	Code int         `json:"code"`
+	Data interface{} `json:"data"`
+	Msg  string      `json:"msg"`
+}
+
+func (c *CommonResp) ChangeData(resPoint interface{}) error {
+	if c.Data == nil {
+		return errors.New("data is empty")
+	}
+
+	b, e := json.Marshal(c.Data)
+	if e != nil {
+		return e
+	}
+	e = json.Unmarshal(b, resPoint)
+	return e
+}

+ 9 - 0
cloudserver/go.mod

@@ -0,0 +1,9 @@
+module github.com/ammeter/cloudserver
+
+go 1.14
+
+require (
+	github.com/YuGuorong/go v0.0.0-20180604090527-bdc77568d726
+	github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726
+	github.com/yuguorong/go v0.0.0-20180604090527-bdc77568d726
+)

+ 6 - 0
cloudserver/go.sum

@@ -0,0 +1,6 @@
+github.com/YuGuorong/go v0.0.0-20180604090527-bdc77568d726 h1:hUngjIpd9EPORkayfhtwxqMM7LvBvxtRaAMPEDXdciw=
+github.com/YuGuorong/go v0.0.0-20180604090527-bdc77568d726/go.mod h1:GbYXqsEgfdVZCQqV2sbQBsKePf1x+0A1bkVreGmCIE8=
+github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726 h1:xT+JlYxNGqyT+XcU8iUrN18JYed2TvG9yN5ULG2jATM=
+github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726/go.mod h1:3yhqj7WBBfRhbBlzyOC3gUxftwsU0u8gqevxwIHQpMw=
+github.com/yuguorong/go v0.0.0-20180604090527-bdc77568d726 h1:3BSFfDVmj5yqpvj3H9m1EpZWhEKgJXW2C4MbTRv6rIM=
+github.com/yuguorong/go v0.0.0-20180604090527-bdc77568d726/go.mod h1:i+SDEIi6IWgErb4Yq6H542yr5F8vxSoNbe9wFC+N5jc=



Разлика између датотеке није приказан због своје велике величине
+ 1 - 0
cmd/conf.json


+ 23 - 0
cmd/go.mod

@@ -0,0 +1,23 @@
+module github.com/ammeter/cmd
+
+go 1.14
+
+replace github.com/ammeter/platform => ../platform
+
+replace github.com/ammeter/cloudserver => ../cloudserver
+
+replace github.com/ammeter/config => ../config
+
+replace github.com/ammeter/drivers => ../drivers
+
+replace github.com/ammeter/protocol => ../protocol
+
+replace github.com/ammeter/Bus => ../Bus
+
+require (
+	github.com/ammeter/cloudserver v0.0.0-00010101000000-000000000000
+	github.com/ammeter/config v0.0.0-00010101000000-000000000000
+	github.com/ammeter/platform v0.0.0-00010101000000-000000000000
+	github.com/eclipse/paho.mqtt.golang v1.3.5
+	github.com/yuguorong/go v0.0.0-20180604090527-bdc77568d726
+)

+ 34 - 0
cmd/go.sum

@@ -0,0 +1,34 @@
+github.com/YuGuorong/go v0.0.0-20180604090527-bdc77568d726 h1:hUngjIpd9EPORkayfhtwxqMM7LvBvxtRaAMPEDXdciw=
+github.com/YuGuorong/go v0.0.0-20180604090527-bdc77568d726/go.mod h1:GbYXqsEgfdVZCQqV2sbQBsKePf1x+0A1bkVreGmCIE8=
+github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/eclipse/paho.mqtt.golang v1.3.5 h1:sWtmgNxYM9P2sP+xEItMozsR3w0cqZFlqnNN1bdl41Y=
+github.com/eclipse/paho.mqtt.golang v1.3.5/go.mod h1:eTzb4gxwwyWpqBUHGQZ4ABAV7+Jgm1PklsYT/eo8Hcc=
+github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
+github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
+github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E=
+github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
+github.com/jinzhu/now v1.1.2 h1:eVKgfIdy9b6zbWBMgFpfDPoAMifwSZagU9HmEU6zgiI=
+github.com/jinzhu/now v1.1.2/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
+github.com/mattn/go-sqlite3 v1.14.8 h1:gDp86IdQsN/xWjIEmr9MF6o9mpksUgh0fu+9ByFxzIU=
+github.com/mattn/go-sqlite3 v1.14.8/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
+github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726 h1:xT+JlYxNGqyT+XcU8iUrN18JYed2TvG9yN5ULG2jATM=
+github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726/go.mod h1:3yhqj7WBBfRhbBlzyOC3gUxftwsU0u8gqevxwIHQpMw=
+github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+github.com/yuguorong/go v0.0.0-20180604090527-bdc77568d726 h1:3BSFfDVmj5yqpvj3H9m1EpZWhEKgJXW2C4MbTRv6rIM=
+github.com/yuguorong/go v0.0.0-20180604090527-bdc77568d726/go.mod h1:i+SDEIi6IWgErb4Yq6H542yr5F8vxSoNbe9wFC+N5jc=
+golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0 h1:Jcxah/M+oLZ/R4/z5RzfPzGbPXnVDPkEDtf2JnuxN+U=
+golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
+golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+gorm.io/driver/sqlite v1.1.6 h1:p3U8WXkVFTOLPED4JjrZExfndjOtya3db8w9/vEMNyI=
+gorm.io/driver/sqlite v1.1.6/go.mod h1:W8LmC/6UvVbHKah0+QOC7Ja66EaZXHwUTjgXY8YNWX8=
+gorm.io/gorm v1.21.15/go.mod h1:F+OptMscr0P2F2qU97WT1WimdH9GaQPoDW7AYd5i2Y0=
+gorm.io/gorm v1.21.16 h1:YBIQLtP5PLfZQz59qfrq7xbrK7KWQ+JsXXCH/THlMqs=
+gorm.io/gorm v1.21.16/go.mod h1:F+OptMscr0P2F2qU97WT1WimdH9GaQPoDW7AYd5i2Y0=


+ 50 - 0
cmd/main.go

@@ -0,0 +1,50 @@
+package main
+
+import (
+	"net"
+
+	"github.com/yuguorong/go/log"
+
+	"github.com/ammeter/config"
+	"github.com/ammeter/platform"
+)
+
+//Port 端口
+const (
+	REQ_AMGW_URL    = "/platform/dev/get-4G-gateway-list"
+	REQ_AMMETER_URL = "/platform/dev/get-ammeter-list"
+	REQ_AIR_URL     = "/platform/dev/get-sair-list"
+	DEF_FTP_PORT    = 10010
+)
+
+var Port string
+
+var maxConnCount = 100
+
+var connclientSize = 0
+var connlist map[string]net.Conn
+
+var deviceList map[string][]string
+
+func init() {
+	connlist = make(map[string]net.Conn)
+	deviceList = map[string][]string{
+		"9521003872": {"2108", "05420005", "05420003", "05420002"}, //6f
+		"9521003712": {"2108", "05420001"},                         //1F
+		"9521003534": {"2108", "05420006"},                         //-1F
+		"9521003697": {"2108", "05420004"},                         //5c
+	}
+}
+
+func main() {
+
+	config.GetSysConfig().SetValue("Bus/DtuServer/Port", DEF_FTP_PORT)
+	log.Info("process start...")
+	p := platform.PaPlatform{}
+	p.SetGatewayUrl("/platform/dev/get-4G-gateway-list")
+	p.SetModel("ammeter", REQ_AMMETER_URL)
+	p.SetModel("SAIR10", REQ_AIR_URL)
+	p.SaveModel()
+
+	platform.StartServer()
+}

+ 0 - 0
cmd/nohup.out


+ 1 - 0
config/conf.json

@@ -0,0 +1 @@
+{"cloudserver":{"config":"{\"ServerUrl\":\"https://test-admin.pacom.cn\",\"AppId\":\"fvWmjGCU\",\"AppSecret\":\"054e7df0881eff8328858092f9e8ac0b0f356676\"}"},"gateway":{"MacID":"00163E0618F5","port":10010,"name":"E90_00163E0618F5_10010"}}

+ 187 - 0
config/config.go

@@ -0,0 +1,187 @@
+package config
+
+import (
+	"crypto/md5"
+	"encoding/json"
+	"fmt"
+	"os"
+	"reflect"
+	"strings"
+	"sync"
+
+	"github.com/yuguorong/go/log"
+)
+
+type Configs struct {
+	fileds map[string]interface{}
+	mutex  sync.Locker
+}
+
+func (c *Configs) LoadConfig() {
+	c.LoadFromFS()
+}
+
+func (c *Configs) Password(passwd string) string {
+	data := []byte(passwd)
+	has := md5.Sum(data)
+	md5str1 := fmt.Sprintf("%x", has) //将[]byte转成16进制
+	//log.Info(md5str1)
+	return md5str1
+}
+
+func (c *Configs) genUUID() {
+	f, _ := os.OpenFile("/dev/urandom", os.O_RDONLY, 0)
+	b := make([]byte, 16)
+	f.Read(b)
+	f.Close()
+	uuid := fmt.Sprintf("%x-%x-%x-%x-%x", b[0:4], b[4:6], b[6:8], b[8:10], b[10:])
+	log.Info(uuid)
+}
+
+//Load Get system debug configuration
+func (c *Configs) LoadFromFS() {
+	if file, err := os.Open("conf.json"); err == nil {
+		defer file.Close()
+		var tmp = make([]byte, 1024)
+		n, err := file.Read(tmp)
+		if err == nil {
+			confv := make(map[string]interface{})
+			err = json.Unmarshal(tmp[:n], &confv)
+			if err == nil {
+				c.fileds = confv
+			} else {
+				log.Error(err)
+			}
+		}
+	}
+}
+
+//Update Save sys config
+func (c *Configs) Save() {
+	f, _ := os.OpenFile("conf.json", os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644)
+	defer f.Close()
+	jsonStu, _ := json.Marshal(c.fileds)
+	f.WriteString(string(jsonStu))
+	log.Info("SysConfig saved!")
+}
+
+func (c *Configs) LoadFromCloud() {
+
+}
+
+func (c *Configs) GetValue(fieldPath string, defVal interface{}) interface{} {
+	spath := strings.Split(fieldPath, "/")
+	isSimpleT := true
+	if defVal != nil {
+		vt := reflect.TypeOf(defVal)
+		if vt.Kind() >= reflect.Array && vt.Kind() != reflect.String {
+			isSimpleT = false
+		}
+	}
+
+	lens := len(spath)
+	if lens > 0 {
+		lpath := c.fileds
+		c.mutex.Lock()
+		for i := 0; i < lens-1; i++ {
+			gwc, has := (lpath)[spath[i]]
+			if !has {
+				newp := make(map[string]interface{})
+				lpath[spath[i]] = newp
+				lpath = newp
+			} else {
+				lpath = gwc.(map[string]interface{})
+			}
+		}
+		c.mutex.Unlock()
+
+		value, has := lpath[spath[lens-1]]
+		if has {
+			if isSimpleT {
+				return value
+			} else {
+				json.Unmarshal([]byte(value.(string)), defVal)
+				return defVal
+			}
+		}
+
+	}
+	if isSimpleT {
+		return defVal
+	} else {
+		return nil
+	}
+}
+
+func (c *Configs) SetValue(fieldPath string, value interface{}) bool {
+	spath := strings.Split(fieldPath, "/")
+	lens := len(spath)
+	if lens > 0 {
+		c.mutex.Lock()
+		defer c.mutex.Unlock()
+		lpath := &c.fileds
+		for i := 0; i < lens-1; i++ {
+			gwc, has := (*lpath)[spath[i]]
+			if !has {
+				newp := make(map[string]interface{})
+				(*lpath)[spath[i]] = newp
+				lpath = &newp
+			} else {
+				v := gwc.(map[string]interface{})
+				lpath = &v
+			}
+		}
+		vt := reflect.TypeOf(value)
+		if vt.Kind() < reflect.Array || vt.Kind() == reflect.String {
+			if v, ok := (*lpath)[spath[lens-1]]; ok {
+				if reflect.TypeOf(value) == reflect.TypeOf(v) && reflect.DeepEqual(v, value) {
+					return false
+				}
+			}
+			(*lpath)[spath[lens-1]] = value
+		} else {
+			sval, _ := json.Marshal(value)
+			if v, ok := (*lpath)[spath[lens-1]]; ok {
+				if v.(string) == string(sval) {
+					return false
+				}
+			}
+			(*lpath)[spath[lens-1]] = string(sval)
+		}
+	}
+	return true
+}
+
+func (c *Configs) SetProfile(name string, prof interface{}) bool {
+	path := "profile/" + name
+	if c.SetValue(path, prof) {
+		c.Save()
+		return true
+	}
+	return false
+}
+
+func (c *Configs) GetProfile(name string, defV interface{}) interface{} {
+	path := "profile/" + name
+	prof := c.GetValue(path, defV)
+	if prof == nil {
+		prof = defV
+		c.SetValue(path, defV)
+		c.Save()
+	}
+
+	return prof
+}
+
+var sysConfig = &Configs{
+	fileds: make(map[string]interface{}),
+	mutex:  &sync.Mutex{},
+}
+
+func GetSysConfig() *Configs {
+	return sysConfig
+}
+
+func init() {
+	sysConfig.LoadConfig()
+}

+ 97 - 0
config/config_test.go

@@ -0,0 +1,97 @@
+package config
+
+import (
+	"testing"
+	"time"
+
+	"github.com/yuguorong/go/log"
+)
+
+func TestConfigSet(T *testing.T) {
+	cfg := GetSysConfig()
+	cfg.LoadConfig()
+	vi := cfg.GetValue("test", 1)
+	cfg.SetValue("test", 2)
+	cfg.SetValue("test", 2)
+	cfg.SetValue("test", "2")
+	log.Info(vi)
+
+	cfg.Save()
+
+}
+
+type CloudServerConf struct {
+	ServerUrl string
+	AppId     string
+	AppSecret string
+}
+
+func TestLoadConf(T *testing.T) {
+	sys := GetSysConfig()
+	sys.LoadConfig()
+	conf := sys.GetValue("cloudserver/config", &CloudServerConf{})
+	log.Info(conf)
+
+	res := &CloudServerConf{
+		AppId:     "fvWmjGCU",
+		AppSecret: "054e7df0881eff8328858092f9e8ac0b0f356676",
+		ServerUrl: "https://test-admin.pacom.cn",
+	}
+	sys.SetValue("cloudserver/config", res)
+	conf = sys.GetValue("cloudserver/config", &CloudServerConf{})
+	log.Info(conf)
+	sys.Save()
+
+}
+
+type AmmeterModel struct {
+	Id         string `json:"id" gorm:"-"`
+	DevName    string `json:"devName" gorm:"primarykey"`
+	Code       string `json:"code" `
+	DutSn      string `json:"dutSn" gorm:"-"`
+	Address    string `json:"address"`
+	Protocol   string `json:"protocol" gorm:"-"`
+	GwDevId    string `json:"gwDevId" gorm:"-"`
+	TransRadio string `json:"transformerRatio" gorm:"-"`
+}
+
+type Ammeter struct {
+	AmmeterModel
+	timestamp   int32
+	TotalEnergy float64
+	TransDeno   float64 ` gorm:"-"`
+	TransDiv    float64 ` gorm:"-"`
+}
+
+func TestDBConf(T *testing.T) {
+	GetDB().CreateTbl(&Ammeter{})
+
+	v0 := &Ammeter{
+		TransDeno: 3,
+		TransDiv:  2,
+	}
+	GetDB().Find(v0)
+	log.Info(v0)
+
+	v0.DevName = ""
+	GetDB().Find(v0, "dev_name='test2'")
+	log.Info(v0)
+
+	if v0.DevName != "" {
+		v1 := &Ammeter{
+			TotalEnergy: 1373.78,
+			timestamp:   int32(time.Now().Unix()),
+			AmmeterModel: AmmeterModel{
+				DevName: "test3",
+				Code:    "2108",
+				Address: "123456",
+			},
+		}
+
+		GetDB().Save(v1)
+	} else {
+		v0.TotalEnergy += 100
+		GetDB().Save(v0)
+	}
+
+}

+ 82 - 0
config/db.go

@@ -0,0 +1,82 @@
+package config
+
+import (
+	"github.com/yuguorong/go/log"
+	"gorm.io/driver/sqlite"
+	"gorm.io/gorm"
+)
+
+type QdDB struct {
+	dbSQL *gorm.DB
+}
+
+func (db *QdDB) ConnectQuarkSQL() {
+	var err error
+
+	db.dbSQL, err = gorm.Open(sqlite.Open("local.db"), &gorm.Config{})
+	if err != nil {
+		panic("failed to connect database")
+	}
+}
+
+func (db *QdDB) CloseQuarkSQL() {
+
+}
+
+func (db *QdDB) CreateTbl(cls interface{}) {
+	err := db.dbSQL.AutoMigrate(cls)
+	if err != nil {
+		log.Error(err)
+	}
+}
+
+func (db *QdDB) Save(tbl interface{}) {
+	t := db.dbSQL.Save(tbl)
+	if t.Error != nil {
+		log.Info(t.Error)
+	}
+}
+
+func (db *QdDB) Delete(tbl interface{}) {
+	db.dbSQL.Where("1=1").Unscoped().Delete(tbl)
+}
+
+func (db *QdDB) First(tbl interface{}) {
+	db.dbSQL.First(tbl)
+}
+
+func (db *QdDB) Last(tbl interface{}) {
+	db.dbSQL.Last(tbl)
+}
+
+func (db *QdDB) Find(tbl interface{}, condition ...interface{}) {
+	if len(condition) > 0 {
+		db.dbSQL.Find(tbl, condition...)
+	} else {
+		db.dbSQL.Find(tbl)
+	}
+}
+
+func (db *QdDB) List(tbl interface{}, field interface{}, name string) {
+	db.dbSQL.Model(tbl).Association(name).Find(field)
+}
+
+func (db *QdDB) Gorm() *gorm.DB {
+	return db.dbSQL
+}
+
+var Qdb *QdDB = nil
+
+func GetDB() *QdDB {
+	return Qdb
+}
+
+func initDB() (db *QdDB) {
+	db = new(QdDB)
+	db.ConnectQuarkSQL()
+	return db
+}
+
+func init() {
+	Qdb = initDB()
+}

+ 10 - 0
config/go.mod

@@ -0,0 +1,10 @@
+module github.com/ammeter/config
+
+go 1.14
+
+require (
+	github.com/gorilla/websocket v1.4.2
+	github.com/yuguorong/go v0.0.0-20180604090527-bdc77568d726
+	gorm.io/driver/sqlite v1.1.6
+	gorm.io/gorm v1.21.16
+)

+ 24 - 0
config/go.sum

@@ -0,0 +1,24 @@
+github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
+github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
+github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E=
+github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
+github.com/jinzhu/now v1.1.2 h1:eVKgfIdy9b6zbWBMgFpfDPoAMifwSZagU9HmEU6zgiI=
+github.com/jinzhu/now v1.1.2/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
+github.com/mattn/go-sqlite3 v1.14.8 h1:gDp86IdQsN/xWjIEmr9MF6o9mpksUgh0fu+9ByFxzIU=
+github.com/mattn/go-sqlite3 v1.14.8/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
+github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726 h1:xT+JlYxNGqyT+XcU8iUrN18JYed2TvG9yN5ULG2jATM=
+github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726/go.mod h1:3yhqj7WBBfRhbBlzyOC3gUxftwsU0u8gqevxwIHQpMw=
+github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+github.com/yuguorong/go v0.0.0-20180604090527-bdc77568d726 h1:3BSFfDVmj5yqpvj3H9m1EpZWhEKgJXW2C4MbTRv6rIM=
+github.com/yuguorong/go v0.0.0-20180604090527-bdc77568d726/go.mod h1:i+SDEIi6IWgErb4Yq6H542yr5F8vxSoNbe9wFC+N5jc=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+gorm.io/driver/sqlite v1.1.6 h1:p3U8WXkVFTOLPED4JjrZExfndjOtya3db8w9/vEMNyI=
+gorm.io/driver/sqlite v1.1.6/go.mod h1:W8LmC/6UvVbHKah0+QOC7Ja66EaZXHwUTjgXY8YNWX8=
+gorm.io/gorm v1.21.15/go.mod h1:F+OptMscr0P2F2qU97WT1WimdH9GaQPoDW7AYd5i2Y0=
+gorm.io/gorm v1.21.16 h1:YBIQLtP5PLfZQz59qfrq7xbrK7KWQ+JsXXCH/THlMqs=
+gorm.io/gorm v1.21.16/go.mod h1:F+OptMscr0P2F2qU97WT1WimdH9GaQPoDW7AYd5i2Y0=


+ 48 - 0
config/ws.go

@@ -0,0 +1,48 @@
+package config
+
+import (
+	"flag"
+	"net/http"
+
+	"github.com/yuguorong/go/log"
+
+	"github.com/gorilla/websocket"
+)
+
+var addr = flag.String("addr", "localhost:8990", "http config address")
+
+var upgrader = websocket.Upgrader{
+	// 解决跨域问题
+	CheckOrigin: func(r *http.Request) bool {
+		return true
+	},
+} // use default options
+
+func ws(w http.ResponseWriter, r *http.Request) {
+	c, err := upgrader.Upgrade(w, r, nil)
+	if err != nil {
+		log.Error("upgrade:", err)
+		return
+	}
+	defer c.Close()
+	for {
+		mt, message, err := c.ReadMessage()
+		if err != nil {
+			log.Error("read:", err)
+			break
+		}
+		log.Infof("recv: %s", message)
+		err = c.WriteMessage(mt, message)
+		if err != nil {
+			log.Info("write:", err)
+			break
+		}
+	}
+}
+
+func WebsocketEntry() {
+	flag.Parse()
+	http.HandleFunc("/ws", ws)
+	log.Info(*addr)
+	log.Fatal(http.ListenAndServe(*addr, nil))
+}

+ 85 - 0
drivers/Api.go

@@ -0,0 +1,85 @@
+package drviers
+
+type IDriver interface {
+	Name() string
+	Probe(name string, param interface{})
+	CreateDevice(model interface{}) (int, []IDevice)
+	GetModel() interface{}
+	GetDevice(id string) IDevice
+	GetDeviceList() map[string]IDevice
+	Uninstall()
+}
+
+type baseDriver struct {
+	DrvName string
+	DevList map[string]IDevice
+}
+
+func (drv *baseDriver) Name() string {
+	return drv.DrvName
+}
+
+func (drv *baseDriver) Probe(name string, param interface{}) {
+	drv.DrvName = name
+	drv.DevList = make(map[string]IDevice)
+}
+
+func (drv *baseDriver) CreateDevice(model interface{}) (int, []IDevice) {
+	dev := make([]IDevice, len(drv.DevList))
+	i := 0
+	for _, v := range drv.DevList {
+		dev[i] = v
+		i++
+	}
+	return i, dev
+}
+
+func (drv *baseDriver) GetModel() interface{} {
+	return nil
+}
+
+func (drv *baseDriver) GetDevice(id string) IDevice {
+	if dev, has := drv.DevList[id]; has {
+		return dev
+	}
+	return nil
+}
+
+func (drv *baseDriver) GetDeviceList() map[string]IDevice {
+	return drv.DevList
+}
+
+func (drv *baseDriver) Uninstall() {
+	for _, dev := range drv.DevList {
+		dev.Close()
+	}
+	drv.DevList = nil
+}
+
+type funcRegDriver func(interface{}) IDriver
+
+var driverReg map[string]funcRegDriver
+var driverList map[string]IDriver
+
+func Shutdown() {
+	for _, drv := range driverList {
+		drv.Uninstall()
+	}
+}
+
+func Install(name string, param interface{}) IDriver {
+	if p, has := driverList[name]; has && p != nil {
+		return p
+	}
+	if f, has := driverReg[name]; has && f != nil {
+		drv := f(param)
+		drv.Probe(name, param)
+		return drv
+	}
+	return nil
+}
+
+func init() {
+	driverList = make(map[string]IDriver)
+	driverReg = make(map[string]funcRegDriver)
+}

+ 276 - 0
drivers/ameter.go

@@ -0,0 +1,276 @@
+package drviers
+
+import (
+	"encoding/hex"
+	"reflect"
+	"strconv"
+	"strings"
+	"time"
+
+	bus "github.com/ammeter/Bus"
+	"github.com/ammeter/config"
+	"github.com/yuguorong/go/log"
+)
+
+const (
+	CMD_ENERGY_TOTAL         = "00000000"
+	DEF_SAMPLE_PERIOD        = 5 * time.Minute
+	DEF_SAMPLE_PEER_DURATION = 30 * time.Second
+	DEF_TCP_READ_TIMEOUT     = 1 * time.Minute
+	POSTPONE_PERSIST_TIME    = 1 * time.Minute
+)
+
+type AmmeterModel struct {
+	Id         string `json:"id" gorm:"-"`
+	DevName    string `json:"devName" gorm:"primarykey"`
+	Code       string `json:"code" gorm:"-"`
+	DutSn      string `json:"dutSn" gorm:"-"`
+	Address    string `json:"address" gorm:"-"`
+	Protocol   string `json:"protocol" gorm:"-"`
+	GwDevId    string `json:"gwDevId" gorm:"-"`
+	TransRadio string `json:"transformerRatio" gorm:"-"`
+}
+
+type Ammeter struct {
+	AmmeterModel
+	timestamp   int32
+	TotalEnergy float64
+	TransDeno   float64 ` gorm:"-"`
+	TransDiv    float64 ` gorm:"-"`
+}
+
+type AmMeterHub struct {
+	baseDevice
+	DutSn      string
+	Protocol   string
+	chnExit    chan bool
+	loopserver bool
+	sampleTmr  *time.Timer
+	amList     map[string]*Ammeter
+	persitList map[string]*Ammeter
+	tmrPersist *time.Timer
+	queryIdx   int
+}
+
+func (dev *AmMeterHub) Close() error {
+	dev.sampleTmr.Stop()
+	dev.loopserver = false
+	dev.chnExit <- true
+	return nil
+}
+
+func (dev *AmMeterHub) OnAttach(chn bus.IChannel) {
+	log.Info(dev.DutSn, " Attached!")
+	chn.SetTimeout(DEF_TCP_READ_TIMEOUT)
+	dev.sampleTmr.Reset(DEF_SAMPLE_PEER_DURATION)
+}
+
+func (dev *AmMeterHub) OnDetach(chn bus.IChannel) {
+	log.Info(dev.DutSn, " Detached!")
+	dev.sampleTmr.Stop()
+}
+
+func (dev *AmMeterHub) ChannelDispatch(stream []byte, args interface{}) bus.ChnDispResult {
+	if len(stream) != 5 {
+		return bus.DispatchNone
+	}
+	if ret := dev.baseDevice.ChannelDispatch(stream, args); ret != bus.DispatchNone {
+		return ret
+	}
+
+	sid := hex.EncodeToString(stream)
+	for _, am := range dev.amList {
+		if am.DutSn == sid {
+			log.Info("MOUNT meter: ", am.DutSn, ",", sid)
+			return bus.DispatchSingle
+		}
+	}
+	return bus.DispatchNone
+}
+
+func (dev *AmMeterHub) IssueSampleCmd(tmrTx *time.Timer) (pam *Ammeter) {
+	i := 0
+	for _, am := range dev.amList {
+		if i == dev.queryIdx {
+			pam = am
+			break
+		}
+		i++
+	}
+	if pam == nil {
+		return nil
+	}
+	pack := dev.Route[1].iproto.PackageCmd(CMD_ENERGY_TOTAL, pam.Code+pam.Address)
+	dev.Route[1].ibus.Send(dev.Route[1].iChn, pack)
+	tmrTx.Reset(1 * time.Second)
+	return pam
+}
+
+func (dev *AmMeterHub) SchedulNextSample(tmrTx *time.Timer) {
+	tmrTx.Stop()
+	dev.queryIdx++
+	if dev.queryIdx >= len(dev.amList) {
+		dev.queryIdx = 0
+		dev.sampleTmr.Reset(DEF_SAMPLE_PERIOD)
+	} else {
+		dev.sampleTmr.Reset(DEF_SAMPLE_PEER_DURATION)
+	}
+}
+
+func (dev *AmMeterHub) AdjustTelemetry(mval interface{}, devname string) {
+	vlist := mval.(map[string]interface{})
+	if am, has := dev.amList[devname]; has {
+		for k, v := range vlist {
+			if reflect.TypeOf(v).Kind() == reflect.Float64 {
+				valItem := v.(float64)
+				vlist[k] = (valItem * am.TransDeno) / am.TransDiv
+			}
+		}
+
+		if val, has := vlist["TotalActivePower"]; has {
+			diff := val.(float64) - am.TotalEnergy
+			am.TotalEnergy = val.(float64)
+			vlist["ActivePowerIncrement"] = diff
+			if len(dev.persitList) == 0 {
+				dev.tmrPersist.Reset(POSTPONE_PERSIST_TIME)
+			}
+			dev.persitList[am.DevName] = am
+		}
+	}
+}
+
+func (dev *AmMeterHub) Run() error {
+	dev.loopserver = true
+	tmrTxTimout := time.NewTimer(5 * time.Second)
+	dev.tmrPersist = time.NewTimer(POSTPONE_PERSIST_TIME)
+	tmrTxTimout.Stop()
+	//var pam *Ammeter = nil
+	for dev.loopserver {
+		select {
+		case msg := <-dev.Route[1].router[0]:
+			if reflect.TypeOf(msg).Kind() == reflect.Slice {
+				b := msg.([]byte)
+				log.Info(hex.EncodeToString(b))
+				devname := "" //pam.DevName
+				mret := dev.Route[1].iproto.ParsePacket(b, &devname)
+				if mret != nil && devname != "" {
+					log.Info(devname, mret)
+					devname = "AM10-" + devname + dev.DutSn[:4]
+					dev.AdjustTelemetry(mret, devname)
+					telemetry := dev.Route[0].iproto.PackageCmd(devname, mret)
+					dev.Route[0].ibus.Send(dev.Route[0].iChn, telemetry)
+					dev.SchedulNextSample(tmrTxTimout)
+				}
+			}
+		case <-tmrTxTimout.C:
+			dev.SchedulNextSample(tmrTxTimout)
+		case <-dev.sampleTmr.C:
+			dev.IssueSampleCmd(tmrTxTimout)
+		case <-dev.chnExit:
+			dev.loopserver = false
+		case <-dev.tmrPersist.C:
+			for k, am := range dev.persitList {
+				config.GetDB().Save(am)
+				delete(dev.persitList, k)
+			}
+		}
+	}
+	return nil
+
+}
+
+func DutchanDispatch(rxin interface{}, param interface{}) interface{} {
+	log.Info("DutchanDispatch")
+	b := rxin.([]byte)
+	if len(b) == 8 {
+		return hex.EncodeToString(b)
+	}
+	return nil
+}
+
+func (dev *AmMeterHub) Open(param ...interface{}) error {
+
+	if dev.DutSn != "" {
+		dev.SetRoute(dev, dev.Protocol, "dtuFtpServer", dev.DutSn, 0)
+	}
+	dev.sampleTmr = time.NewTimer(DEF_SAMPLE_PERIOD)
+	dev.sampleTmr.Stop()
+
+	go dev.Run()
+	return nil
+}
+
+func (dev *AmMeterHub) GetDevice(devname string) interface{} {
+	if dev, has := dev.amList[devname]; has {
+		return dev
+	}
+	return nil
+}
+
+/*
+[{"code":"26462285","devName":"SAIR10-0000000026462285","gwDevId":"1ec2d8421b2ed30bf52b38d8579115b","id":"1ec2d84cc494690bf52b38d8579115b","protocol":"HJ212"},
+ {"code":"61748803","devName":"SAIR10-0000000061748803","gwDevId":"1ec2d8421b2ed30bf52b38d8579115b","id":"1ec2d84eb428160bf52b38d8579115b","protocol":"HJ212"}]
+ {"address":"05420001","code":"2108","devName":"AM10-2108054200019521","dutSn":"9521003712","gwDevId":"1ec2d8421b2ed30bf52b38d8579115b","id":"1ec2de6df5a2e40bf52b38d8579115b","protocol":"DLT645-2007","transformerRatio":"80"},
+*/
+
+type AmmeterDrv struct {
+	baseDriver
+}
+
+func (drv *AmmeterDrv) ParseTransRatio(dev *Ammeter) {
+	Trans := strings.Split(dev.TransRadio, "/")
+	dev.TransDeno, _ = strconv.ParseFloat(Trans[0], 64)
+	if len(Trans) > 1 {
+		dev.TransDiv, _ = strconv.ParseFloat(Trans[1], 64)
+	}
+}
+
+func (drv *AmmeterDrv) CreateDevice(model interface{}) (int, []IDevice) {
+	if model != nil {
+		models := model.(*[]AmmeterModel)
+		for _, m := range *models {
+			var hub IDevice = nil
+			var has bool
+			if hub, has = drv.DevList[m.DutSn]; !has {
+				hub = &AmMeterHub{
+					DutSn:      m.DutSn,
+					Protocol:   m.Protocol,
+					chnExit:    make(chan bool),
+					loopserver: false,
+					sampleTmr:  nil,
+					amList:     make(map[string]*Ammeter),
+					persitList: make(map[string]*Ammeter),
+					queryIdx:   0,
+				}
+				hub.Probe(m.DutSn, drv)
+				drv.DevList[m.DutSn] = hub
+			}
+
+			dev := &Ammeter{
+				TotalEnergy: 0,
+				timestamp:   0,
+				TransDiv:    1,
+				TransDeno:   1,
+			}
+			config.GetDB().Find(dev, "dev_name='"+m.DevName+"'")
+			dev.AmmeterModel = m
+			drv.ParseTransRatio(dev)
+			hub.(*AmMeterHub).amList[dev.DevName] = dev
+		}
+	}
+	return drv.baseDriver.CreateDevice(model)
+}
+
+func (drv *AmmeterDrv) GetModel() interface{} {
+	return &[]AmmeterModel{}
+}
+
+func NewAmMeter(param interface{}) IDriver {
+	am := new(AmmeterDrv)
+	return am
+}
+
+func init() {
+	driverReg["ammeter"] = NewAmMeter
+	config.GetDB().CreateTbl(&Ammeter{})
+}

+ 103 - 0
drivers/device.go

@@ -0,0 +1,103 @@
+package drviers
+
+import (
+	"encoding/hex"
+	"reflect"
+
+	bus "github.com/ammeter/Bus"
+	"github.com/ammeter/protocol"
+	"github.com/yuguorong/go/log"
+)
+
+type IDevice interface {
+	Probe(name string, drv IDriver) error
+	Open(...interface{}) error
+	Close() error
+	Ctrl(...interface{}) error
+	Suspend() error
+	Resume() error
+	GetDevice(string) interface{}
+	SetRoute(evt bus.IBusEvent, prot string, busName string, chn string, param ...interface{}) int
+}
+
+type routePath struct {
+	iChn   bus.IChannel
+	ibus   bus.IBus
+	iproto protocol.IProtocol
+	router []chan interface{}
+}
+
+type baseDevice struct {
+	DeviceName string
+	drv        IDriver
+	Route      []routePath
+}
+
+func (dev *baseDevice) Probe(name string, driver IDriver) error {
+	dev.Route = make([]routePath, 0)
+	dev.DeviceName = name
+	dev.drv = driver
+	return nil
+}
+
+//bus.IBusEvent
+func (dev *baseDevice) OnAttach(chn bus.IChannel) {
+}
+
+func (dev *baseDevice) OnDetach(chn bus.IChannel) {
+}
+
+func (dev *baseDevice) ChannelDispatch(stream []byte, args interface{}) bus.ChnDispResult {
+	log.Info(dev.drv.Name(), "-", dev.DeviceName, " try Dispatch: ", hex.EncodeToString(stream))
+	k := reflect.TypeOf(args).Kind()
+	if k == reflect.Ptr {
+		iprot := args.(protocol.IProtocol)
+		return bus.ChnDispResult(iprot.ChannelDispatch(stream))
+	}
+	return bus.DispatchNone
+}
+
+func (dev *baseDevice) Create(model interface{}) interface{} {
+	return nil
+}
+
+func (dev *baseDevice) Open(...interface{}) error {
+	return nil
+}
+
+func (dev *baseDevice) Close() error {
+	return nil
+}
+
+func (dev *baseDevice) Ctrl(...interface{}) error {
+	return nil
+}
+
+func (dev *baseDevice) GetDevice(string) interface{} {
+	return dev
+}
+
+func (dev *baseDevice) Suspend() error {
+	return nil
+}
+
+func (dev *baseDevice) Resume() error {
+	return nil
+}
+
+func (dev *baseDevice) SetRoute(evt bus.IBusEvent, prot string, busName string, chn string, param ...interface{}) int {
+	r := new(routePath)
+	r.router = make([]chan interface{}, 1)
+	r.router[0] = make(chan interface{}, 16)
+
+	r.iproto = protocol.LoadProtocol(prot)
+	r.iproto.Init(prot)
+
+	r.ibus = bus.MountBus(busName, param)
+	r.ibus.Init()
+	r.iChn = r.ibus.OpenChannel(chn, r.router)
+	r.iChn.SetEvent(evt, r.iproto)
+
+	dev.Route = append(dev.Route, *r)
+	return len(dev.Route)
+}

+ 16 - 0
drivers/go.mod

@@ -0,0 +1,16 @@
+module github.com/ammeter/drviers
+
+go 1.14
+
+replace github.com/ammeter/Bus => ../Bus
+
+replace github.com/ammeter/config => ../config
+
+replace github.com/ammeter/protocol => ../protocol
+
+require (
+	github.com/ammeter/Bus v0.0.0-00010101000000-000000000000
+	github.com/ammeter/config v0.0.0-00010101000000-000000000000
+	github.com/ammeter/protocol v0.0.0-00010101000000-000000000000
+	github.com/yuguorong/go v0.0.0-20180604090527-bdc77568d726
+)

+ 31 - 0
drivers/go.sum

@@ -0,0 +1,31 @@
+github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/eclipse/paho.mqtt.golang v1.3.5 h1:sWtmgNxYM9P2sP+xEItMozsR3w0cqZFlqnNN1bdl41Y=
+github.com/eclipse/paho.mqtt.golang v1.3.5/go.mod h1:eTzb4gxwwyWpqBUHGQZ4ABAV7+Jgm1PklsYT/eo8Hcc=
+github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
+github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
+github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E=
+github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
+github.com/jinzhu/now v1.1.2 h1:eVKgfIdy9b6zbWBMgFpfDPoAMifwSZagU9HmEU6zgiI=
+github.com/jinzhu/now v1.1.2/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
+github.com/mattn/go-sqlite3 v1.14.8 h1:gDp86IdQsN/xWjIEmr9MF6o9mpksUgh0fu+9ByFxzIU=
+github.com/mattn/go-sqlite3 v1.14.8/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
+github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726/go.mod h1:3yhqj7WBBfRhbBlzyOC3gUxftwsU0u8gqevxwIHQpMw=
+github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+github.com/yuguorong/go v0.0.0-20180604090527-bdc77568d726 h1:3BSFfDVmj5yqpvj3H9m1EpZWhEKgJXW2C4MbTRv6rIM=
+github.com/yuguorong/go v0.0.0-20180604090527-bdc77568d726/go.mod h1:i+SDEIi6IWgErb4Yq6H542yr5F8vxSoNbe9wFC+N5jc=
+golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0 h1:Jcxah/M+oLZ/R4/z5RzfPzGbPXnVDPkEDtf2JnuxN+U=
+golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
+golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+gorm.io/driver/sqlite v1.1.6 h1:p3U8WXkVFTOLPED4JjrZExfndjOtya3db8w9/vEMNyI=
+gorm.io/driver/sqlite v1.1.6/go.mod h1:W8LmC/6UvVbHKah0+QOC7Ja66EaZXHwUTjgXY8YNWX8=
+gorm.io/gorm v1.21.15/go.mod h1:F+OptMscr0P2F2qU97WT1WimdH9GaQPoDW7AYd5i2Y0=
+gorm.io/gorm v1.21.16 h1:YBIQLtP5PLfZQz59qfrq7xbrK7KWQ+JsXXCH/THlMqs=
+gorm.io/gorm v1.21.16/go.mod h1:F+OptMscr0P2F2qU97WT1WimdH9GaQPoDW7AYd5i2Y0=

+ 94 - 0
drivers/uemis.go

@@ -0,0 +1,94 @@
+package drviers
+
+import (
+	"errors"
+
+	"github.com/yuguorong/go/log"
+)
+
+type Uemis struct {
+	baseDevice
+	chnExit    chan bool
+	loopserver bool
+}
+
+func (u *Uemis) Run() error {
+	u.loopserver = true
+	for u.loopserver {
+		select {
+		case msg := <-u.Route[1].router[0]:
+			b := msg.([]byte)
+			devName := ""
+			mret := u.Route[1].iproto.ParsePacket(b, &devName)
+			if mret != nil && devName != "" {
+				v := mret.(map[string]interface{})
+				dev := devName[8:]
+				dev = u.DeviceName + "-00000000" + dev
+				log.Info(dev, v)
+				telemetry := u.Route[0].iproto.PackageCmd(dev, mret)
+				u.Route[0].ibus.Send(u.Route[0].iChn, telemetry)
+			}
+		case <-u.chnExit:
+			u.loopserver = false
+
+		}
+	}
+	return nil
+}
+
+func (u *Uemis) Open(...interface{}) error {
+	lenr := len(u.Route)
+	if lenr < 1 {
+		return errors.New("wrong route config")
+	} else if lenr < 2 {
+		u.SetRoute(u, "HJ212", "dtuFtpServer", "HJ212", 0)
+	}
+	if !u.loopserver {
+		go u.Run()
+	}
+	return nil
+}
+
+func (u *Uemis) Close() error {
+	u.loopserver = false
+	u.chnExit <- true
+	return nil
+}
+
+func (u *Uemis) GetDevice(s string) interface{} {
+	return u
+}
+
+type SAirModel struct {
+	Id       string `json:"id"`
+	DevName  string `json:"devName"`
+	Code     string `json:"code"`
+	Protocol string `json:"protocol"`
+	GwDevId  string `json:"gwDevId"`
+}
+
+type UemisDrv struct {
+	baseDriver
+}
+
+func (drv *UemisDrv) GetModel() interface{} {
+	return &[]SAirModel{}
+}
+
+func (drv *UemisDrv) CreateDevice(model interface{}) (int, []IDevice) {
+	drv.baseDriver.CreateDevice(model)
+	dev := new(Uemis)
+	dev.loopserver = false
+	dev.Probe(drv.Name(), drv)
+	drv.baseDriver.DevList[drv.Name()] = dev
+	return drv.baseDriver.CreateDevice(model)
+}
+
+func NewUemis(param interface{}) IDriver {
+	drv := new(UemisDrv)
+	return drv
+}
+
+func init() {
+	driverReg["SAIR10"] = NewUemis
+}

+ 0 - 0
middleware/gmqtt.go


+ 5 - 0
platform/api.go

@@ -0,0 +1,5 @@
+package platform
+
+type platform interface {
+	Create(name string)
+}

Разлика између датотеке није приказан због своје велике величине
+ 1 - 0
platform/conf.json


+ 107 - 0
platform/gateway.go

@@ -0,0 +1,107 @@
+package platform
+
+import (
+	"errors"
+	"strings"
+
+	"github.com/yuguorong/go/log"
+
+	bus "github.com/ammeter/Bus"
+	"github.com/ammeter/cloudserver"
+	"github.com/ammeter/config"
+	drviers "github.com/ammeter/drivers"
+)
+
+/*
+gZdomIS9Hz3d7HxvcoNx
+data [{"accessToken":"AZM7QSPRqsV4CUxIGldn",
+"id":"1ec23004b88a020bf52b38d8579115b",
+"mqttServer":"test-sbuilding.pacom.cn:1885",
+"name":"FGGW10-202110020001"}]
+*/
+var debugName = true
+
+const (
+	DEF_VERDOR   = "GW4G"
+	DEF_INF_NAME = "eth0"
+)
+
+type Gateway struct {
+	Mqttcfg    bus.QMqtt
+	Name       string
+	Uid        string
+	modelList  map[string]string
+	NorthRoute interface{}
+	DeviceList []drviers.IDriver
+}
+
+func (gw *Gateway) SyncCloudModelDevice(url string, model interface{}) error {
+	var err = errors.New("Nil model?")
+	if model != nil {
+		prof := config.GetSysConfig().GetProfile("remote_config", cloudserver.GetCloudConfig())
+		cs := cloudserver.GetCloudServer(prof)
+		param := map[string]string{
+			"gwDevId": gw.Uid,
+		}
+		err = cs.GetClientData(url, model, &param)
+		log.Info(model)
+	}
+	return err
+}
+
+type netinfo struct {
+	name    string
+	uuid    string
+	prority int
+}
+
+func priotyInf(ni *netinfo, name string) bool {
+	name = strings.ToUpper(name)
+	ifnames := []string{"WLAN", "ETH", "ENS", "ENO", "本地连接"}
+	for i := 0; i < ni.prority && i < len(ifnames); i++ {
+		if strings.Contains(name, ifnames[i]) {
+			ni.name = name
+			ni.prority = i
+			return true
+		}
+	}
+	return false
+}
+
+func (gw *Gateway) InitGateway(modelList map[string]string) {
+	//gw.GetGatewayName()
+	gw.DeviceList = make([]drviers.IDriver, 0)
+	gw.modelList = modelList
+
+}
+
+func (gw *Gateway) StartServer(uid string) {
+	//	name := "ammeter"
+	//	drvU := drviers.Install("SAIR10", nil)
+	for name, url := range gw.modelList {
+		drv := drviers.Install(name, nil)
+		model := drv.GetModel()
+		if url != "" {
+			if gw.SyncCloudModelDevice(url, model) != nil {
+				model = config.GetSysConfig().GetProfile("gateway/"+name+"/amlist", model)
+			} else {
+				config.GetSysConfig().SetProfile("gateway/"+name+"/amlist", model)
+			}
+		}
+		if sz, devlist := drv.CreateDevice(model); sz > 0 {
+			for _, dev := range devlist {
+				dev.SetRoute(nil, "ThingsBoards", "mqtt", "v1/gateway/telemetry", &gw.Mqttcfg)
+				dev.Open()
+			}
+		}
+	}
+}
+
+func InitGateway(mqttcfg *bus.QMqtt, modelList map[string]string) *Gateway {
+	gw := &Gateway{
+		Uid:     mqttcfg.Uid,
+		Mqttcfg: *mqttcfg,
+	}
+	gw.InitGateway(modelList)
+	return gw
+}

+ 25 - 0
platform/gateway_test.go

@@ -0,0 +1,25 @@
+package platform
+
+import (
+	"testing"
+
+	"github.com/ammeter/config"
+	"github.com/yuguorong/go/log"
+)
+
+func TestGateway(T *testing.T) {
+	//gw := InitGateway("")
+	//gw.GetGatewayName()
+	//gw.StartServer()
+}
+
+func TestPlatform(T *testing.T) {
+	config.GetSysConfig().SetValue("Bus/DtuServer/Port", DEF_FTP_PORT)
+	log.Info("process start...")
+	p := PaPlatform{}
+	p.SetGatewayUrl("/platform/dev/get-4G-gateway-list")
+	p.SetModel("ammeter", REQ_AMMETER_URL)
+	p.SetModel("SAIR10", REQ_AIR_URL)
+	p.SaveModel()
+
+}

+ 23 - 0
platform/go.mod

@@ -0,0 +1,23 @@
+module github.com/ammeter/platform
+
+go 1.14
+
+replace github.com/ammeter/cloudserver => ../cloudserver
+
+replace github.com/ammeter/config => ../config
+
+replace github.com/ammeter/drivers => ../drivers
+
+replace github.com/ammeter/protocol => ../protocol
+
+replace github.com/ammeter/Bus => ../Bus
+
+require (
+	github.com/ammeter/Bus v0.0.0-00010101000000-000000000000
+	github.com/ammeter/cloudserver v0.0.0-00010101000000-000000000000
+	github.com/ammeter/config v0.0.0-00010101000000-000000000000
+	github.com/ammeter/drivers v0.0.0-00010101000000-000000000000
+	github.com/eclipse/paho.mqtt.golang v1.3.5
+	github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726
+	github.com/yuguorong/go v0.0.0-20180604090527-bdc77568d726
+)

+ 14 - 0
platform/go.sum

@@ -0,0 +1,14 @@
+github.com/eclipse/paho.mqtt.golang v1.3.5 h1:sWtmgNxYM9P2sP+xEItMozsR3w0cqZFlqnNN1bdl41Y=
+github.com/eclipse/paho.mqtt.golang v1.3.5/go.mod h1:eTzb4gxwwyWpqBUHGQZ4ABAV7+Jgm1PklsYT/eo8Hcc=
+github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
+github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
+github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726 h1:xT+JlYxNGqyT+XcU8iUrN18JYed2TvG9yN5ULG2jATM=
+github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726/go.mod h1:3yhqj7WBBfRhbBlzyOC3gUxftwsU0u8gqevxwIHQpMw=
+github.com/yuguorong/go v0.0.0-20180604090527-bdc77568d726 h1:3BSFfDVmj5yqpvj3H9m1EpZWhEKgJXW2C4MbTRv6rIM=
+github.com/yuguorong/go v0.0.0-20180604090527-bdc77568d726/go.mod h1:i+SDEIi6IWgErb4Yq6H542yr5F8vxSoNbe9wFC+N5jc=
+golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0 h1:Jcxah/M+oLZ/R4/z5RzfPzGbPXnVDPkEDtf2JnuxN+U=
+golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
+golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=

+ 90 - 0
platform/platform.go

@@ -0,0 +1,90 @@
+package platform
+
+import (
+	"github.com/yuguorong/go/log"
+
+	bus "github.com/ammeter/Bus"
+	"github.com/ammeter/cloudserver"
+	"github.com/ammeter/config"
+)
+
+const (
+	DEF_REQ_AMGW_URL = "/platform/dev/get-4G-gateway-list"
+	REQ_AMMETER_URL  = "/platform/dev/get-ammeter-list"
+	REQ_AIR_URL      = "/platform/dev/get-sair-list"
+	DEF_FTP_PORT     = 10010
+)
+
+type PaPlatform struct {
+	gwUrl       string
+	GatewayList []*Gateway
+	modelList   map[string]string
+}
+
+func (p *PaPlatform) SyncCloudGWConfig() *[]bus.QMqtt {
+	if p.gwUrl == "" {
+		p.gwUrl = DEF_REQ_AMGW_URL
+	}
+	mqttList := []bus.QMqtt{}
+	prof := config.GetSysConfig().GetProfile("remote_config", cloudserver.GetCloudConfig())
+	cs := cloudserver.GetCloudServer(prof)
+	err := cs.GetClientData(p.gwUrl, &mqttList, nil)
+	if err != nil {
+		return nil
+	}
+	config.GetSysConfig().SetValue("mqtt_list", &mqttList)
+	config.GetSysConfig().Save()
+	return &mqttList
+}
+
+func (p *PaPlatform) LoadGatewayProfile() {
+	p.GatewayList = make([]*Gateway, 0)
+	mqttList := &[]bus.QMqtt{}
+	config.GetSysConfig().GetValue("mqtt_list", mqttList)
+	if mqrmt := p.SyncCloudGWConfig(); mqrmt != nil {
+		mqttList = mqrmt
+		config.GetSysConfig().SetValue("mqtt_list", mqttList)
+	}
+	for _, mq := range *mqttList {
+		gw := InitGateway(&mq, p.modelList)
+		p.GatewayList = append(p.GatewayList, gw)
+		gw.StartServer(mq.Uid)
+	}
+
+}
+
+func (p *PaPlatform) SetModel(sname string, surl string) {
+	if p.modelList == nil {
+		p.modelList = make(map[string]string)
+	}
+	p.modelList[sname] = surl
+}
+
+func (p *PaPlatform) SaveModel() {
+	config.GetSysConfig().SetProfile("model_list", &p.modelList)
+}
+
+func (p *PaPlatform) LoadModles() {
+	p.modelList = make(map[string]string)
+	list := config.GetSysConfig().GetProfile("model_list", &p.modelList)
+	if list != nil {
+		p.modelList = *list.(*map[string]string)
+	}
+}
+
+func (p *PaPlatform) SetGatewayUrl(url string) {
+	p.gwUrl = url
+}
+
+var mainloop chan interface{}
+
+func StartServer() {
+	p := &PaPlatform{}
+	p.LoadModles()
+	p.LoadGatewayProfile()
+	mainloop = make(chan interface{})
+
+	v := <-mainloop
+	log.Info("exit now", v)
+
+}

+ 29 - 0
protocol/api.go

@@ -0,0 +1,29 @@
+package protocol
+
+type IProtocol interface {
+	Init(name string) error
+	Uninit()
+	ChannelDispatch(rxb []byte) int
+	ParsePacket(buff []byte, param ...interface{}) interface{}
+	PackageCmd(cmd string, param ...interface{}) interface{}
+}
+
+type funcRegProt func() IProtocol
+
+var ProtList map[string]IProtocol
+var ProtReg map[string]funcRegProt
+
+func LoadProtocol(name string) IProtocol {
+	if p, has := ProtList[name]; has && p != nil {
+		return p
+	}
+	if f, has := ProtReg[name]; has && f != nil {
+		return f()
+	}
+	return nil
+}
+
+func init() {
+	ProtList = make(map[string]IProtocol)
+	ProtReg = make(map[string]funcRegProt)
+}

Разлика између датотеке није приказан због своје велике величине
+ 377 - 0
protocol/dlt645.go


+ 45 - 0
protocol/dlt645_test.go

@@ -0,0 +1,45 @@
+package protocol
+
+import (
+	"encoding/hex"
+	"testing"
+
+	"github.com/yuguorong/go/log"
+)
+
+//fefefefe68040042050821681104333333332516
+//fefefefe68040042050821681104333333332516
+func TestPacket(t *testing.T) {
+	dlt645 := LoadProtocol("DLT645-2007")
+
+	cmd := dlt645.PackageCmd("00000000", "210805420004")
+	if cmd == nil {
+		t.Error("failed in PackageCmd")
+	}
+	srecv := hex.EncodeToString(cmd.([]byte))
+	t.Logf("param: [%s]\n", srecv)
+
+}
+
+func TestParse(t *testing.T) {
+	dlt645 := LoadProtocol("DLT645-2007")
+	//t1 := []byte{0xfe, 0xfe, 0xfe, 0xfe, 0x68, 0x04, 0x00, 0x42, 0x05, 0x08, 0x21, 0x68, 0x11, 0x04, 0x33, 0x33, 0x33, 0x33, 0x25, 0x16}
+	tcmd := []byte{0xfe, 0xfe, 0xfe, 0xfe, 0x68, 0x06, 0x00, 0x42, 0x05, 0x08, 0x21, 0x68, 0x11, 0x04, 0x33, 0x33, 0x33, 0x33, 0x27, 0x16}
+	tack := []byte{0xfe, 0xfe, 0xfe, 0xfe, 0x68, 0x06, 0x00, 0x42, 0x05, 0x08, 0x21, 0x68, 0x91, 0x08, 0x33, 0x33, 0x33, 0x33, 0x47, 0xc5, 0x33, 0x33, 0x1d, 0x16}
+	//tdec := []byte{0xfe, 0xfe, 0xfe, 0xfe, 0x68, 0x04, 0x00, 0x42, 0x05, 0x08, 0x21, 0x68, 0x91, 0x08, 0x33, 0x33, 0x33, 0x33, 0xb4, 0xbc, 0x34, 0x33, 0x80, 0x16}
+	vret := dlt645.ParsePacket(tack, tcmd)
+	if vret == nil {
+		t.Error("Error in parse packet")
+	}
+	t.Logf("Get param: [%#v]\n", vret)
+}
+
+//map[CH2O-Rtd:0.008 CN:2011 CO2-Rtd:405.0 DataTime:20211006030527 HUMI-Rtd:43.5 MN:8888888826462285 PM25-Rtd:2.0 TEMP-Rtd:22.0 VOC-Rtd:0.052
+func TestHj212(t *testing.T) {
+	parten := "##0159ST=22;CN=2011;PW=123456;MN=8888888826462285;CP=&&DataTime=20211006030527;TEMP-Rtd=22.0;HUMI-Rtd=43.5;PM25-Rtd=2.0;CO2-Rtd=405.0;CH2O-Rtd=0.008;VOC-Rtd=0.052;&&8ef7"
+	ip := LoadProtocol("HJ212")
+	ip.Init("HJ212")
+	mret := ip.ParsePacket([]byte(parten))
+	log.Info(mret)
+
+}

+ 5 - 0
protocol/go.mod

@@ -0,0 +1,5 @@
+module github.com/ammeter/protocol
+
+go 1.14
+
+require github.com/yuguorong/go v0.0.0-20180604090527-bdc77568d726

+ 2 - 0
protocol/go.sum

@@ -0,0 +1,2 @@
+github.com/yuguorong/go v0.0.0-20180604090527-bdc77568d726 h1:3BSFfDVmj5yqpvj3H9m1EpZWhEKgJXW2C4MbTRv6rIM=
+github.com/yuguorong/go v0.0.0-20180604090527-bdc77568d726/go.mod h1:i+SDEIi6IWgErb4Yq6H542yr5F8vxSoNbe9wFC+N5jc=

Разлика између датотеке није приказан због своје велике величине
+ 126 - 0
protocol/hj212.go


+ 97 - 0
protocol/thingsboards.go

@@ -0,0 +1,97 @@
+package protocol
+
+import (
+	"time"
+)
+
+type ThingsBoards struct {
+	Name string
+}
+
+/*
+{
+  "Device A": [
+    {
+      "ts": 1483228800000,
+      "values": {
+        "temperature": 42,
+        "humidity": 80
+      }
+    },
+    {
+      "ts": 1483228801000,
+      "values": {
+        "temperature": 43,
+        "humidity": 82
+      }
+    }
+  ],
+  "Device B": [
+    {
+      "ts": 1483228800000,
+      "values": {
+        "temperature": 42,
+        "humidity": 80
+      }
+    }
+  ]
+
+  {
+    "UEMIS61748803":[
+        {
+            "ts":1633471518102004,
+            "values":{
+                "CH2O-Rtd":"0.007",
+                "CO2-Rtd":"399.0",
+                "HUMI-Rtd":"38.8",
+                "PM25-Rtd":"1.0",
+                "TEMP-Rtd":"20.4",
+                "VOC-Rtd":"0.022"
+            }
+        }
+    ]
+}
+}
+*/
+func (tbs *ThingsBoards) Init(name string) error {
+	return nil
+}
+
+func (tbs *ThingsBoards) Uninit() {
+}
+
+func (tbs *ThingsBoards) ChannelDispatch(b []byte) int {
+	return 0
+}
+
+func (tbs *ThingsBoards) ParsePacket(buff []byte, param ...interface{}) interface{} {
+
+	return nil
+}
+
+func (tbs *ThingsBoards) rptDevTelemetry(devName string, telemetry interface{}) interface{} {
+	tbObj := make(map[string]interface{})
+	devVal := make(map[string]interface{})
+	devVal["ts"] = time.Now().UnixNano() / (1000 * 1000)
+	devVal["values"] = telemetry
+	devList := make([]interface{}, 1)
+	devList[0] = devVal
+	tbObj[devName] = devList
+	return tbObj
+
+}
+
+func (tbs *ThingsBoards) PackageCmd(dev string, param ...interface{}) interface{} {
+	v := param[0].(map[string]interface{})
+	return tbs.rptDevTelemetry(dev, v)
+}
+
+func NewThingsboard() IProtocol {
+	p := &ThingsBoards{}
+	p.Init("ThingsBoards")
+	return p
+}
+
+func init() {
+	ProtReg["ThingsBoards"] = NewThingsboard
+}