package bus import ( "encoding/hex" "errors" "fmt" "net" "reflect" "strconv" "strings" "sync" "time" "github.com/yuguorong/go/log" "github.com/ammeter/config" "github.com/ammeter/util" ) const ( maxConnCount = 100 moduleName = "dtuFtpServer" DEF_FTP_PORT = 10010 ) //var cmdQueryAddr []byte = []byte{0xfe, 0xfe, 0xfe, 0xfe, 0x68, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0x68, 0x13, 0x00, 0xDF, 0x16} type DtuServer struct { baseBus Port int name string lis net.Listener listPassive net.Listener connlist map[int]net.Conn clientNum int loop bool patchs map[string][]byte } func (dtu *DtuServer) ResetChannel(chn IChannel) { dtu.baseBus.ResetChannel(chn) } func (dtu *DtuServer) closeConn(conn net.Conn, connid int, chn IChannel) { dtu.mutex.Lock() if _, has := dtu.connlist[connid]; has { delete(dtu.connlist, connid) dtu.clientNum-- } dtu.mutex.Unlock() conn.Close() if chn != nil { basechn := chn.(*BusChannel) log.Infof("client [%s] close\n", basechn.chnID) basechn.mountCnt = 0 if basechn.event != nil { basechn.event.OnDetach(chn) } for i, id := range basechn.conIdList { if id == connid { basechn.conIdList[i] = -1 break } } } } func (dtu *DtuServer) ApplyPatch(patchName string, param ...interface{}) { if len(param) > 0 && reflect.TypeOf(param[0]).Kind() == reflect.Slice { dtu.patchs[patchName] = param[0].([]byte) } } func (dtu *DtuServer) ClientConnect(conn net.Conn, connectID int, PassiveFlag bool) { var ic IChannel = nil remoteAddr := conn.RemoteAddr().String() //SrcPort := uint(conn.RemoteAddr().(*net.TCPAddr).Port) defer dtu.closeConn(conn, connectID, ic) var err error var buf [1024]byte n := 0 for i := 0; i < 10; i++ { if PassiveFlag { for key, patchCmd := range dtu.patchs { if strings.Contains(key, "Passive") { conn.Write(patchCmd) } } } n, err = conn.Read(buf[:]) if err != nil { log.Infof("read from %s header faild err:[%v]\n", remoteAddr, err) return } log.Infof("Connect %d (passive devices port:%t) with code %X", connectID, PassiveFlag, buf[:n]) ic = dtu.baseBus.ScanChannel(buf[:n], connectID) if ic != nil { break } time.Sleep(time.Second * 5) } if ic == nil { return } chnin := ic.GetChan(0) if chnin == nil { panic("no chan for read message") } chnout := ic.GetChan(1) dtu.name = hex.EncodeToString(buf[:n]) for { smeter := hex.EncodeToString(buf[:n]) dtu.mutex.Lock() chnin <- buf[:n] dtu.mutex.Unlock() if chnout != nil { after := time.After(time.Second * 60) select { case msg := <-chnout: conn.Write(msg.([]byte)) case <-after: log.Info("timeafter for write message") break } } // log.Printf("rev data from %s msg:%s\n", conn.RemoteAddr().String(), string(buf[:n])) log.Infof("[%s]rev data from %s msg(%d):[%s]\n", time.Now().Format("2006-01-02 15:04:05"), remoteAddr, n, smeter) if ic.(*BusChannel).timeout != 0 { conn.SetReadDeadline(time.Now().Add(ic.(*BusChannel).timeout)) } else { conn.SetReadDeadline(time.Now().Add(30 * time.Minute)) } n, err = conn.Read(buf[:]) if err != nil { log.Errorf("read from %s msg faild err:[%v]\n", ic.ID(), err) chnin <- "closed" break //return and close connection } } } func (dtu *DtuServer) StartServer(lister *net.Listener, PassiveFlag bool) { defer func() { (*lister).Close() }() connectID := 1000 for dtu.loop { if dtu.clientNum >= maxConnCount { log.Infof("there is %d clients,is max num\n", dtu.clientNum) time.Sleep(5 * time.Second) continue } conn, err := (*lister).Accept() if err != nil { log.Errorf("listen err:[%v]\n", err) } dtu.mutex.Lock() connectID++ dtu.connlist[connectID] = conn dtu.clientNum++ dtu.mutex.Unlock() go dtu.ClientConnect(conn, connectID, PassiveFlag) } } func (dtu *DtuServer) Init() error { dtu.baseBus.Init() if !dtu.loop { addr := fmt.Sprintf("0.0.0.0:%d", dtu.Port) log.Info("start ", addr) var err error = nil dtu.lis, err = net.Listen("tcp", addr) if err != nil { log.Errorf("err! addr %s open faild err:[%v]\n", addr, err) return err } dtu.mutex.Lock() defer dtu.mutex.Unlock() dtu.loop = true go dtu.StartServer(&dtu.lis, false) addr = fmt.Sprintf("0.0.0.0:%d", dtu.Port+10) log.Info("Passive port listen start ", addr) dtu.listPassive, err = net.Listen("tcp", addr) if err != nil { log.Errorf("err! addr %s open faild err:[%v]\n", addr, err) return err } go dtu.StartServer(&dtu.listPassive, true) } return nil } func (dtu *DtuServer) Uninit() { dtu.mutex.Lock() defer dtu.mutex.Unlock() if dtu.loop { dtu.loop = false for _, c := range dtu.connlist { c.Close() } dtu.lis.Close() dtu.connlist = make(map[int]net.Conn) } } func (dtu *DtuServer) MountChannel(chn interface{}, router []chan interface{}) IChannel { if chn == nil || reflect.TypeOf(chn).Kind() != reflect.String { panic("Open channel should be a unique string") } ic := dtu.baseBus.MountChannel(chn, router) return ic } func (dtu *DtuServer) FreeChannel(chn IChannel) error { dtu.baseBus.FreeChannel(chn) if dtu.loop { //idx := chn.(*BusChannel).conn.(int) for _, connID := range chn.(*BusChannel).conIdList { if conn, has := dtu.connlist[connID]; has { dtu.mutex.Lock() delete(dtu.connlist, connID) dtu.mutex.Unlock() conn.Close() } } } return nil } func (dtu *DtuServer) Send(ichn IChannel, buff interface{}) (int, error) { if ichn != nil { chn := ichn.(*BusChannel) for _, connID := range chn.conIdList { if connID >= 0 { if conn, has := dtu.connlist[connID]; has { conn.Write(buff.([]byte)) return len(buff.([]byte)), nil } } } } return 0, errors.New("no such connection") } func GetFtpServerConfig(param []interface{}) int { Port := 0 if param != nil && len(param) > 0 && reflect.TypeOf(param[0]).Kind() == reflect.Int { Port = param[0].(int) } if Port == 0 { Port = util.ToInt(config.GetSysConfig().GetValue("Bus/DtuServer/Port", DEF_FTP_PORT)) } return Port } func NewDtuServer(param []interface{}) IBus { Port := GetFtpServerConfig(param) busid := GenDtuServerId(moduleName, param) b := &DtuServer{ baseBus: baseBus{ BusId: busid, mutex: &sync.Mutex{}, }, connlist: make(map[int]net.Conn), patchs: make(map[string][]byte), clientNum: 0, Port: Port, loop: false, } return b } func GenDtuServerId(name string, param []interface{}) string { Port := GetFtpServerConfig(param) return name + ":" + strconv.Itoa(Port) } func init() { BusReg[moduleName] = NewDtuServer BusGetID[moduleName] = GenDtuServerId }