Просмотр исходного кода

bugfix: reset channel when connection was broken

yuguorong 2 лет назад
Родитель
Сommit
02d593f8d8
4 измененных файлов с 34 добавлено и 9 удалено
  1. 5 2
      Bus/bus.go
  2. 11 4
      Bus/ftpDtu.go
  3. 16 2
      drivers/ameter.go
  4. 2 1
      drivers/device.go

+ 5 - 2
Bus/bus.go

@@ -40,7 +40,7 @@ func (chn *BusChannel) GetChan(id int) chan interface{} {
 }
 
 func (chn *BusChannel) SetTimeout(timeout time.Duration) {
-
+	chn.timeout = timeout
 }
 
 func (chn *BusChannel) SetEvent(evt IBusEvent, evtArgs interface{}) {
@@ -115,6 +115,9 @@ func (bus *baseBus) MountChannel(chnID interface{}, router []chan interface{}) I
 	return c
 }
 func (dtu *baseBus) ResetChannel(chn IChannel) {
+	basechn := chn.(*BusChannel)
+	basechn.mountCnt = 0
+
 }
 
 func (bus *baseBus) FreeChannel(chn IChannel) error {
@@ -122,8 +125,8 @@ func (bus *baseBus) FreeChannel(chn IChannel) error {
 	bus.mutex.Lock()
 	delete(bus.ChnList, id)
 	bus.mutex.Unlock()
+	bus.ResetChannel(chn)
 	basechn := chn.(*BusChannel)
-	basechn.mountCnt = 0
 	if basechn.event != nil {
 		basechn.event.OnDetach(chn)
 	}

+ 11 - 4
Bus/ftpDtu.go

@@ -2,6 +2,7 @@ package bus
 
 import (
 	"encoding/hex"
+	"errors"
 	"fmt"
 	"net"
 	"reflect"
@@ -37,7 +38,7 @@ type DtuServer struct {
 }
 
 func (dtu *DtuServer) ResetChannel(chn IChannel) {
-
+	dtu.baseBus.ResetChannel(chn)
 }
 
 func (dtu *DtuServer) closeConn(conn net.Conn, connid int, chn IChannel) {
@@ -93,6 +94,7 @@ func (dtu *DtuServer) ClientConnect(conn net.Conn, connectID int, PassiveFlag bo
 			log.Infof("read from %s header faild err:[%v]\n", remoteAddr, err)
 			return
 		}
+
 		log.Infof("Connect %d (passive devices port:%t) with code %X", connectID, PassiveFlag, buf[:n])
 		ic = dtu.baseBus.ScanChannel(buf[:n], connectID)
 		if ic != nil {
@@ -124,6 +126,7 @@ func (dtu *DtuServer) ClientConnect(conn net.Conn, connectID int, PassiveFlag bo
 			case msg := <-chnout:
 				conn.Write(msg.([]byte))
 			case <-after:
+				log.Info("timeafter for write message")
 				break
 			}
 		}
@@ -132,12 +135,14 @@ func (dtu *DtuServer) ClientConnect(conn net.Conn, connectID int, PassiveFlag bo
 
 		if ic.(*BusChannel).timeout != 0 {
 			conn.SetReadDeadline(time.Now().Add(ic.(*BusChannel).timeout))
+		} else {
+			conn.SetReadDeadline(time.Now().Add(30 * time.Minute))
 		}
 		n, err = conn.Read(buf[:])
 		if err != nil {
 			log.Errorf("read from %s msg faild err:[%v]\n", ic.ID(), err)
-			dtu.closeConn(conn, connectID, ic)
-			break
+			chnin <- "closed"
+			break //return and close connection
 		}
 	}
 }
@@ -241,11 +246,13 @@ func (dtu *DtuServer) Send(ichn IChannel, buff interface{}) (int, error) {
 			if connID >= 0 {
 				if conn, has := dtu.connlist[connID]; has {
 					conn.Write(buff.([]byte))
+					return len(buff.([]byte)), nil
 				}
 			}
 		}
 	}
-	return dtu.baseBus.Send(ichn, buff)
+
+	return 0, errors.New("no such connection")
 }
 
 func GetFtpServerConfig(param []interface{}) int {

+ 16 - 2
drivers/ameter.go

@@ -94,6 +94,12 @@ func (dev *AmMeterHub) OnDetach(chn bus.IChannel) {
 
 func (dev *AmMeterHub) ChannelDispatch(stream []byte, args interface{}) bus.ChnDispResult {
 	if len(stream) > 8 {
+		for i := 0; i < len(stream); i++ {
+			if stream[i] != 0xFE {
+				stream = stream[i:]
+				break
+			}
+		}
 		if stream[0] == 0x68 && stream[7] == 0x68 && stream[8] == 0x93 {
 			devname := ""
 			mret := dev.Route[1].iproto.ParsePacket(stream, &devname)
@@ -120,8 +126,11 @@ func (dev *AmMeterHub) ChannelDispatch(stream []byte, args interface{}) bus.ChnD
 func (dev *AmMeterHub) IssueSampleCmd(tmrTx *time.Timer) (pam *Ammeter) {
 	pam = dev.amList[dev.amIndex[dev.queryIdx]]
 	pack := dev.Route[1].iproto.PackageCmd("TotalActivePower", pam.Code+pam.Address)
-	dev.Route[1].ibus.Send(dev.Route[1].iChn, pack)
-	tmrTx.Reset(1 * time.Second)
+	if _, err := dev.Route[1].ibus.Send(dev.Route[1].iChn, pack); err != nil {
+		dev.OnDetach(dev.Route[1].iChn)
+	} else {
+		tmrTx.Reset(1 * time.Second)
+	}
 	return pam
 }
 
@@ -197,6 +206,11 @@ func (dev *AmMeterHub) Run() error {
 					}
 					dev.SchedulNextSample(tmrTxTimout)
 				}
+			} else if reflect.TypeOf(msg).Kind() == reflect.String {
+				if msg.(string) == "closed" {
+
+					dev.OnDetach(dev.Route[1].iChn)
+				}
 			}
 		case <-tmrTxTimout.C:
 			dev.SchedulNextSample(tmrTxTimout)

+ 2 - 1
drivers/device.go

@@ -55,6 +55,7 @@ func (dev *baseDevice) OnAttach(chn bus.IChannel) {
 }
 
 func (dev *baseDevice) OnDetach(chn bus.IChannel) {
+	chn.GetBus().ResetChannel(chn)
 	dev.Status = "Detached"
 }
 
@@ -108,7 +109,7 @@ func (dev *baseDevice) Resume() error {
 
 func (dev *baseDevice) SetRoute(evt bus.IBusEvent, prot string, busName string, chn string, param ...interface{}) int {
 	r := new(routePath)
-	r.router = make([]chan interface{}, 1)
+	r.router = make([]chan interface{}, 2)
 	r.router[0] = make(chan interface{}, 16)
 
 	r.iproto = protocol.LoadProtocol(prot)