ftpDtu.go 6.3 KB


  1. package bus
  2. import (
  3. "encoding/hex"
  4. "fmt"
  5. "net"
  6. "reflect"
  7. "strconv"
  8. "strings"
  9. "sync"
  10. "time"
  11. "github.com/yuguorong/go/log"
  12. "github.com/ammeter/config"
  13. )
  14. const (
  15. maxConnCount = 100
  16. moduleName = "dtuFtpServer"
  17. DEF_FTP_PORT = 10010
  18. )
  19. //var cmdQueryAddr []byte = []byte{0xfe, 0xfe, 0xfe, 0xfe, 0x68, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0x68, 0x13, 0x00, 0xDF, 0x16}
  20. type DtuServer struct {
  21. baseBus
  22. Port int
  23. name string
  24. lis net.Listener
  25. listPassive net.Listener
  26. connlist map[int]net.Conn
  27. clientNum int
  28. loop bool
  29. patchs map[string][]byte
  30. }
  31. func (dtu *DtuServer) ResetChannel(chn IChannel) {
  32. }
  33. func (dtu *DtuServer) closeConn(conn net.Conn, connid int, chn IChannel) {
  34. dtu.mutex.Lock()
  35. if _, has := dtu.connlist[connid]; has {
  36. delete(dtu.connlist, connid)
  37. dtu.clientNum--
  38. }
  39. dtu.mutex.Unlock()
  40. conn.Close()
  41. if chn != nil {
  42. basechn := chn.(*BusChannel)
  43. log.Infof("client [%s] close\n", basechn.chnID)
  44. basechn.mountCnt = 0
  45. if basechn.event != nil {
  46. basechn.event.OnDetach(chn)
  47. }
  48. for i, id := range basechn.conIdList {
  49. if id == connid {
  50. basechn.conIdList[i] = -1
  51. break
  52. }
  53. }
  54. }
  55. }
  56. func (dtu *DtuServer) ApplyPatch(patchName string, param ...interface{}) {
  57. if len(param) > 0 && reflect.TypeOf(param[0]).Kind() == reflect.Slice {
  58. dtu.patchs[patchName] = param[0].([]byte)
  59. }
  60. }
  61. func (dtu *DtuServer) ClientConnect(conn net.Conn, connectID int, PassiveFlag bool) {
  62. var ic IChannel = nil
  63. remoteAddr := conn.RemoteAddr().String()
  64. //SrcPort := uint(conn.RemoteAddr().(*net.TCPAddr).Port)
  65. defer dtu.closeConn(conn, connectID, ic)
  66. var err error
  67. var buf [1024]byte
  68. n := 0
  69. for i := 0; i < 10; i++ {
  70. if PassiveFlag {
  71. for key, patchCmd := range dtu.patchs {
  72. if strings.Contains(key, "Passive") {
  73. conn.Write(patchCmd)
  74. }
  75. }
  76. }
  77. n, err = conn.Read(buf[:])
  78. if err != nil {
  79. log.Infof("read from %s header faild err:[%v]\n", remoteAddr, err)
  80. return
  81. }
  82. log.Infof("Connect %d (passive devices port:%t) with code %X", connectID, PassiveFlag, buf[:n])
  83. ic = dtu.baseBus.ScanChannel(buf[:n], connectID)
  84. if ic != nil {
  85. break
  86. }
  87. time.Sleep(time.Second * 5)
  88. }
  89. if ic == nil {
  90. return
  91. }
  92. chnin := ic.GetChan(0)
  93. if chnin == nil {
  94. panic("no chan for read message")
  95. }
  96. chnout := ic.GetChan(1)
  97. dtu.name = hex.EncodeToString(buf[:n])
  98. for {
  99. smeter := hex.EncodeToString(buf[:n])
  100. dtu.mutex.Lock()
  101. chnin <- buf[:n]
  102. dtu.mutex.Unlock()
  103. if chnout != nil {
  104. after := time.After(time.Second * 60)
  105. select {
  106. case msg := <-chnout:
  107. conn.Write(msg.([]byte))
  108. case <-after:
  109. break
  110. }
  111. }
  112. // log.Printf("rev data from %s msg:%s\n", conn.RemoteAddr().String(), string(buf[:n]))
  113. log.Infof("[%s]rev data from %s msg(%d):[%s]\n", time.Now().Format("2006-01-02 15:04:05"), remoteAddr, n, smeter)
  114. if ic.(*BusChannel).timeout != 0 {
  115. conn.SetReadDeadline(time.Now().Add(ic.(*BusChannel).timeout))
  116. }
  117. n, err = conn.Read(buf[:])
  118. if err != nil {
  119. log.Errorf("read from %s msg faild err:[%v]\n", ic.ID(), err)
  120. dtu.closeConn(conn, connectID, ic)
  121. break
  122. }
  123. }
  124. }
  125. func (dtu *DtuServer) StartServer(lister *net.Listener, PassiveFlag bool) {
  126. defer func() {
  127. (*lister).Close()
  128. }()
  129. connectID := 1000
  130. for dtu.loop {
  131. if dtu.clientNum >= maxConnCount {
  132. log.Infof("there is %d clients,is max num\n", dtu.clientNum)
  133. time.Sleep(5 * time.Second)
  134. continue
  135. }
  136. conn, err := (*lister).Accept()
  137. if err != nil {
  138. log.Errorf("listen err:[%v]\n", err)
  139. }
  140. dtu.mutex.Lock()
  141. connectID++
  142. dtu.connlist[connectID] = conn
  143. dtu.clientNum++
  144. dtu.mutex.Unlock()
  145. go dtu.ClientConnect(conn, connectID, PassiveFlag)
  146. }
  147. }
  148. func (dtu *DtuServer) Init() error {
  149. dtu.baseBus.Init()
  150. if !dtu.loop {
  151. addr := fmt.Sprintf("0.0.0.0:%d", dtu.Port)
  152. log.Info("start ", addr)
  153. var err error = nil
  154. dtu.lis, err = net.Listen("tcp", addr)
  155. if err != nil {
  156. log.Errorf("err! addr %s open faild err:[%v]\n", addr, err)
  157. return err
  158. }
  159. dtu.mutex.Lock()
  160. defer dtu.mutex.Unlock()
  161. dtu.loop = true
  162. go dtu.StartServer(&dtu.lis, false)
  163. addr = fmt.Sprintf("0.0.0.0:%d", dtu.Port+10)
  164. log.Info("Passive port listen start ", addr)
  165. dtu.listPassive, err = net.Listen("tcp", addr)
  166. if err != nil {
  167. log.Errorf("err! addr %s open faild err:[%v]\n", addr, err)
  168. return err
  169. }
  170. go dtu.StartServer(&dtu.listPassive, true)
  171. }
  172. return nil
  173. }
  174. func (dtu *DtuServer) Uninit() {
  175. dtu.mutex.Lock()
  176. defer dtu.mutex.Unlock()
  177. if dtu.loop {
  178. dtu.loop = false
  179. for _, c := range dtu.connlist {
  180. c.Close()
  181. }
  182. dtu.lis.Close()
  183. dtu.connlist = make(map[int]net.Conn)
  184. }
  185. }
  186. func (dtu *DtuServer) MountChannel(chn interface{}, router []chan interface{}) IChannel {
  187. if chn == nil || reflect.TypeOf(chn).Kind() != reflect.String {
  188. panic("Open channel should be a unique string")
  189. }
  190. ic := dtu.baseBus.MountChannel(chn, router)
  191. return ic
  192. }
  193. func (dtu *DtuServer) FreeChannel(chn IChannel) error {
  194. dtu.baseBus.FreeChannel(chn)
  195. if dtu.loop {
  196. //idx := chn.(*BusChannel).conn.(int)
  197. for _, connID := range chn.(*BusChannel).conIdList {
  198. if conn, has := dtu.connlist[connID]; has {
  199. dtu.mutex.Lock()
  200. delete(dtu.connlist, connID)
  201. dtu.mutex.Unlock()
  202. conn.Close()
  203. }
  204. }
  205. }
  206. return nil
  207. }
  208. func (dtu *DtuServer) Send(ichn IChannel, buff interface{}) (int, error) {
  209. if ichn != nil {
  210. chn := ichn.(*BusChannel)
  211. for _, connID := range chn.conIdList {
  212. if connID >= 0 {
  213. if conn, has := dtu.connlist[connID]; has {
  214. conn.Write(buff.([]byte))
  215. }
  216. }
  217. }
  218. }
  219. return dtu.baseBus.Send(ichn, buff)
  220. }
  221. func GetFtpServerConfig(param []interface{}) int {
  222. Port := 0
  223. if param != nil && len(param) > 0 && reflect.TypeOf(param[0]).Kind() == reflect.Int {
  224. Port = param[0].(int)
  225. }
  226. if Port == 0 {
  227. Port = config.GetSysConfig().GetValue("Bus/DtuServer/Port", DEF_FTP_PORT).(int)
  228. }
  229. return Port
  230. }
  231. func NewDtuServer(param []interface{}) IBus {
  232. Port := GetFtpServerConfig(param)
  233. busid := GenDtuServerId(moduleName, param)
  234. b := &DtuServer{
  235. baseBus: baseBus{
  236. BusId: busid,
  237. mutex: &sync.Mutex{},
  238. },
  239. connlist: make(map[int]net.Conn),
  240. patchs: make(map[string][]byte),
  241. clientNum: 0,
  242. Port: Port,
  243. loop: false,
  244. }
  245. return b
  246. }
  247. func GenDtuServerId(name string, param []interface{}) string {
  248. Port := GetFtpServerConfig(param)
  249. return name + ":" + strconv.Itoa(Port)
  250. }
  251. func init() {
  252. BusReg[moduleName] = NewDtuServer
  253. BusGetID[moduleName] = GenDtuServerId
  254. }