瀏覽代碼

add error count of timeout times;change sample timing

yuguorong 2 年之前
父節點
當前提交
4bd5835041
共有 4 個文件被更改,包括 49 次插入11 次删除
  1. 1 0
      Bus/api.go
  2. 5 1
      Bus/bus.go
  3. 17 1
      Bus/ftpDtu.go
  4. 26 9
      drivers/ameter.go

+ 1 - 0
Bus/api.go

@@ -41,4 +41,5 @@ type IBus interface {
 	Recive(chn IChannel, buff interface{}) (int, error)
 	ApplyPatch(ptchName string, param ...interface{})
 	TimeStamp() int64
+	Disconnect(chn IChannel) int
 }

+ 5 - 1
Bus/bus.go

@@ -14,7 +14,7 @@ const (
 
 type CbBusChanDisp func(stream []byte, param interface{}) interface{}
 type BusChannel struct {
-	chnID     string
+	chnID     string //name of channel, set when setroute
 	mountCnt  int
 	event     IBusEvent
 	bus       IBus
@@ -166,6 +166,10 @@ func (bus *baseBus) TimeStamp() int64 {
 	return int64(bus.TsLast)
 }
 
+func (bus *baseBus) Disconnect(chn IChannel) int {
+	return 0
+}
+
 type funcRegBus func(param []interface{}) IBus
 type funcGetID func(string, []interface{}) string
 

+ 17 - 1
Bus/ftpDtu.go

@@ -39,6 +39,7 @@ type DtuServer struct {
 
 func (dtu *DtuServer) ResetChannel(chn IChannel) {
 	dtu.baseBus.ResetChannel(chn)
+
 }
 
 func (dtu *DtuServer) closeConn(conn net.Conn, connid int, chn IChannel) {
@@ -117,7 +118,9 @@ func (dtu *DtuServer) ClientConnect(conn net.Conn, connectID int, PassiveFlag bo
 	for {
 		smeter := hex.EncodeToString(buf[:n])
 		dtu.mutex.Lock()
-		chnin <- buf[:n]
+		if n > 0 {
+			chnin <- buf[:n]
+		}
 		dtu.mutex.Unlock()
 
 		if chnout != nil {
@@ -140,6 +143,10 @@ func (dtu *DtuServer) ClientConnect(conn net.Conn, connectID int, PassiveFlag bo
 		}
 		n, err = conn.Read(buf[:])
 		if err != nil {
+			if strings.Contains(err.Error(), "timeout") || strings.Contains(err.Error(), " timed out") {
+				log.Infof("read Next:[%s]\n", ic.ID())
+				continue
+			}
 			log.Errorf("read from %s msg faild err:[%v]\n", ic.ID(), err)
 			chnin <- "closed"
 			break //return and close connection
@@ -147,6 +154,15 @@ func (dtu *DtuServer) ClientConnect(conn net.Conn, connectID int, PassiveFlag bo
 	}
 }
 
+func (dtu *DtuServer) Disconnect(chn IChannel) int {
+	for _, conid := range chn.(*BusChannel).conIdList {
+		if conn, has := dtu.connlist[conid]; has {
+			dtu.closeConn(conn, conid, chn)
+		}
+	}
+	return dtu.baseBus.Disconnect(chn)
+}
+
 func (dtu *DtuServer) StartServer(lister *net.Listener, PassiveFlag bool) {
 	defer func() {
 		(*lister).Close()

+ 26 - 9
drivers/ameter.go

@@ -17,9 +17,10 @@ import (
 
 const (
 	DEF_SAMPLE_PERIOD        = 10 * time.Minute
-	DEF_SAMPLE_PEER_DURATION = 60 * time.Second
-	DEF_TCP_READ_TIMEOUT     = (DEF_SAMPLE_PERIOD + 10*time.Minute)
+	DEF_SAMPLE_PEER_DURATION = 5 * time.Second
+	DEF_TCP_READ_TIMEOUT     = (1 * time.Minute)
 	POSTPONE_PERSIST_TIME    = 5 * time.Minute
+	MIN_TIMEOUT_TIMES        = 2
 )
 
 //amList  map[string]*Ammeter
@@ -63,6 +64,7 @@ type AmMeterHub struct {
 	chnExit     chan bool
 	loopserver  bool
 	sampleTmr   *time.Timer
+	errCount    []int
 	amIndex     []string            //avoid map chaos order,even map read only
 	amList      map[string]*Ammeter //key=[code + address], each meter id must be unique
 	persitList  map[string]*meterPersist
@@ -81,6 +83,9 @@ func (dev *AmMeterHub) Close() error {
 
 func (dev *AmMeterHub) OnAttach(chn bus.IChannel) {
 	log.Info(dev.DutSn, " Attached!")
+	for i := 0; i < len(dev.errCount); i++ {
+		dev.errCount[i] = 0
+	}
 	chn.SetTimeout(DEF_TCP_READ_TIMEOUT)
 	dev.sampleTmr.Reset(DEF_SAMPLE_PEER_DURATION)
 	dev.baseDevice.OnAttach(chn)
@@ -129,20 +134,20 @@ func (dev *AmMeterHub) IssueSampleCmd(tmrTx *time.Timer) (pam *Ammeter) {
 	if _, err := dev.Route[1].ibus.Send(dev.Route[1].iChn, pack); err != nil {
 		dev.OnDetach(dev.Route[1].iChn)
 	} else {
-		tmrTx.Reset(1 * time.Second)
+		tmrTx.Reset(DEF_TCP_READ_TIMEOUT)
 	}
 	return pam
 }
 
-func (dev *AmMeterHub) SchedulNextSample(tmrTx *time.Timer) {
+func (dev *AmMeterHub) SchedulNextSample(tmrTx *time.Timer, timoutdiff time.Duration) {
 	tmrTx.Stop()
 	dev.queryIdx++
 
 	if dev.queryIdx >= len(dev.amList) {
 		dev.queryIdx = 0
-		dev.sampleTmr.Reset(DEF_SAMPLE_PERIOD)
+		dev.sampleTmr.Reset(DEF_SAMPLE_PERIOD - timoutdiff)
 	} else {
-		dev.sampleTmr.Reset(DEF_SAMPLE_PEER_DURATION)
+		dev.sampleTmr.Reset(DEF_SAMPLE_PEER_DURATION - timoutdiff)
 	}
 }
 
@@ -191,6 +196,7 @@ func (dev *AmMeterHub) Run() error {
 	for dev.loopserver {
 		select {
 		case msg := <-dev.Route[1].router[0]:
+			dev.errCount[dev.queryIdx] = 0
 			if reflect.TypeOf(msg).Kind() == reflect.Slice {
 				b := msg.([]byte)
 				log.Info("[", dev.DutSn, "]:", hex.EncodeToString(b)) //[9521003697]:fefefefe68040042050821689108333333338b694c331c16
@@ -204,16 +210,26 @@ func (dev *AmMeterHub) Run() error {
 						dev.Route[0].ibus.Send(dev.Route[0].iChn, telemetry)
 						am.TsSample = time.Now().Unix()
 					}
-					dev.SchedulNextSample(tmrTxTimout)
+					dev.SchedulNextSample(tmrTxTimout, 0)
 				}
 			} else if reflect.TypeOf(msg).Kind() == reflect.String {
 				if msg.(string) == "closed" {
-
 					dev.OnDetach(dev.Route[1].iChn)
 				}
 			}
 		case <-tmrTxTimout.C:
-			dev.SchedulNextSample(tmrTxTimout)
+			dev.errCount[dev.queryIdx]++
+			keepconnect := false
+			for errcnt := range dev.errCount {
+				if errcnt < MIN_TIMEOUT_TIMES {
+					dev.SchedulNextSample(tmrTxTimout, DEF_TCP_READ_TIMEOUT)
+					keepconnect = true
+					break
+				}
+			}
+			if !keepconnect {
+				dev.Route[1].ibus.Disconnect(dev.Route[1].iChn)
+			}
 		case <-dev.sampleTmr.C:
 			dev.IssueSampleCmd(tmrTxTimout)
 		case <-dev.chnExit:
@@ -381,6 +397,7 @@ func (drv *AmmeterDrv) CreateDevice(model interface{}) (int, []IDevice) {
 				hub.(*AmMeterHub).amIndex = append(hub.(*AmMeterHub).amIndex, id)
 			}
 		}
+		hub.(*AmMeterHub).errCount = make([]int, len(hub.(*AmMeterHub).amList))
 	}
 
 	if len(drv.DevList) == 0 {