ftpDtu.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270
  1. package bus
  2. import (
  3. "encoding/hex"
  4. "fmt"
  5. "net"
  6. "reflect"
  7. "strconv"
  8. "sync"
  9. "time"
  10. "github.com/yuguorong/go/log"
  11. "github.com/ammeter/config"
  12. )
  13. const (
  14. maxConnCount = 100
  15. moduleName = "dtuFtpServer"
  16. DEF_FTP_PORT = 10010
  17. )
  18. var cmdQueryAddr []byte = []byte{0xfe, 0xfe, 0xfe, 0xfe, 0x68, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0x68, 0x13, 0x00, 0xDF, 0x16}
  19. type DtuServer struct {
  20. baseBus
  21. Port int
  22. name string
  23. lis net.Listener
  24. lisAlt net.Listener
  25. connlist map[int]net.Conn
  26. clientNum int
  27. loop bool
  28. }
  29. func (dtu *DtuServer) ResetChannel(chn IChannel) {
  30. }
  31. func (dtu *DtuServer) closeConn(conn net.Conn, connid int, chn IChannel) {
  32. dtu.mutex.Lock()
  33. if _, has := dtu.connlist[connid]; has {
  34. delete(dtu.connlist, connid)
  35. dtu.clientNum--
  36. }
  37. dtu.mutex.Unlock()
  38. conn.Close()
  39. if chn != nil {
  40. basechn := chn.(*BusChannel)
  41. log.Infof("client [%s] close\n", basechn.chnID)
  42. basechn.mountCnt = 0
  43. if basechn.event != nil {
  44. basechn.event.OnDetach(chn)
  45. }
  46. for i, id := range basechn.conIdList {
  47. if id == connid {
  48. basechn.conIdList[i] = -1
  49. break
  50. }
  51. }
  52. }
  53. }
  54. func (dtu *DtuServer) LookupChannel(buf []byte, idx int, conn net.Conn) IChannel {
  55. return dtu.baseBus.ScanChannel(buf, idx)
  56. }
  57. func (dtu *DtuServer) ClientConnect(conn net.Conn, idx int, altFlag bool) {
  58. var ic IChannel = nil
  59. remoteAddr := conn.RemoteAddr().String()
  60. //SrcPort := uint(conn.RemoteAddr().(*net.TCPAddr).Port)
  61. defer dtu.closeConn(conn, idx, ic)
  62. var err error
  63. var buf [1024]byte
  64. n := 0
  65. for ic == nil {
  66. if altFlag {
  67. conn.Write(cmdQueryAddr)
  68. }
  69. n, err = conn.Read(buf[:])
  70. if err != nil {
  71. log.Infof("read from %s header faild err:[%v]\n", remoteAddr, err)
  72. return
  73. }
  74. log.Infof("Connected(alternat port:%t) with code %X", altFlag, buf[:n])
  75. ic = dtu.LookupChannel(buf[:n], idx, conn)
  76. }
  77. chnin := ic.GetChan(0)
  78. if chnin == nil {
  79. panic("no chan for read message")
  80. }
  81. chnout := ic.GetChan(1)
  82. dtu.name = hex.EncodeToString(buf[:n])
  83. for {
  84. smeter := hex.EncodeToString(buf[:n])
  85. dtu.mutex.Lock()
  86. chnin <- buf[:n]
  87. dtu.mutex.Unlock()
  88. if chnout != nil {
  89. after := time.After(time.Second * 60)
  90. select {
  91. case msg := <-chnout:
  92. conn.Write(msg.([]byte))
  93. case <-after:
  94. break
  95. }
  96. }
  97. // log.Printf("rev data from %s msg:%s\n", conn.RemoteAddr().String(), string(buf[:n]))
  98. log.Infof("[%s]rev data from %s msg(%d):[%s]\n", time.Now().Format("2006-01-02 15:04:05"), remoteAddr, n, smeter)
  99. if ic.(*BusChannel).timeout != 0 {
  100. conn.SetReadDeadline(time.Now().Add(ic.(*BusChannel).timeout))
  101. }
  102. n, err = conn.Read(buf[:])
  103. if err != nil {
  104. log.Errorf("read from %s msg faild err:[%v]\n", ic.ID(), err)
  105. dtu.closeConn(conn, idx, ic)
  106. break
  107. }
  108. }
  109. }
  110. func (dtu *DtuServer) StartServer(lister *net.Listener, altFlag bool) {
  111. defer func() {
  112. (*lister).Close()
  113. }()
  114. idx := 0
  115. for dtu.loop {
  116. if dtu.clientNum >= maxConnCount {
  117. log.Infof("there is %d clients,is max num\n", dtu.clientNum)
  118. time.Sleep(5 * time.Second)
  119. continue
  120. }
  121. conn, err := (*lister).Accept()
  122. if err != nil {
  123. log.Errorf("listen err:[%v]\n", err)
  124. }
  125. dtu.mutex.Lock()
  126. dtu.connlist[idx] = conn
  127. idx++
  128. dtu.clientNum++
  129. dtu.mutex.Unlock()
  130. go dtu.ClientConnect(conn, idx-1, altFlag)
  131. }
  132. }
  133. func (dtu *DtuServer) Init() error {
  134. dtu.baseBus.Init()
  135. if !dtu.loop {
  136. addr := fmt.Sprintf("0.0.0.0:%d", dtu.Port)
  137. log.Info("start ", addr)
  138. var err error = nil
  139. dtu.lis, err = net.Listen("tcp", addr)
  140. if err != nil {
  141. log.Errorf("err! addr %s open faild err:[%v]\n", addr, err)
  142. return err
  143. }
  144. dtu.mutex.Lock()
  145. defer dtu.mutex.Unlock()
  146. dtu.loop = true
  147. go dtu.StartServer(&dtu.lis, false)
  148. addr = fmt.Sprintf("0.0.0.0:%d", dtu.Port+10)
  149. log.Info("Alternat port listen start ", addr)
  150. dtu.lisAlt, err = net.Listen("tcp", addr)
  151. if err != nil {
  152. log.Errorf("err! addr %s open faild err:[%v]\n", addr, err)
  153. return err
  154. }
  155. go dtu.StartServer(&dtu.lisAlt, true)
  156. }
  157. return nil
  158. }
  159. func (dtu *DtuServer) Uninit() {
  160. dtu.mutex.Lock()
  161. defer dtu.mutex.Unlock()
  162. if dtu.loop {
  163. dtu.loop = false
  164. for _, c := range dtu.connlist {
  165. c.Close()
  166. }
  167. dtu.lis.Close()
  168. dtu.connlist = make(map[int]net.Conn)
  169. }
  170. }
  171. func (dtu *DtuServer) OpenChannel(chn interface{}, router []chan interface{}) IChannel {
  172. if chn == nil || reflect.TypeOf(chn).Kind() != reflect.String {
  173. panic("Open channel should be a uniq string")
  174. }
  175. ic := dtu.baseBus.OpenChannel(chn, router)
  176. return ic
  177. }
  178. func (dtu *DtuServer) CloseChannel(chn IChannel) error {
  179. dtu.baseBus.CloseChannel(chn)
  180. if dtu.loop {
  181. //idx := chn.(*BusChannel).conn.(int)
  182. for _, connID := range chn.(*BusChannel).conIdList {
  183. if conn, has := dtu.connlist[connID]; has {
  184. dtu.mutex.Lock()
  185. delete(dtu.connlist, connID)
  186. dtu.mutex.Unlock()
  187. conn.Close()
  188. }
  189. }
  190. }
  191. return nil
  192. }
  193. func (dtu *DtuServer) Send(ichn IChannel, buff interface{}) (int, error) {
  194. if ichn != nil {
  195. chn := ichn.(*BusChannel)
  196. for _, connID := range chn.conIdList {
  197. if connID >= 0 {
  198. if conn, has := dtu.connlist[connID]; has {
  199. conn.Write(buff.([]byte))
  200. }
  201. }
  202. }
  203. }
  204. return dtu.baseBus.Send(ichn, buff)
  205. }
  206. func GetFtpServerConfig(param []interface{}) int {
  207. Port := 0
  208. if param != nil && len(param) > 0 && reflect.TypeOf(param[0]).Kind() == reflect.Int {
  209. Port = param[0].(int)
  210. }
  211. if Port == 0 {
  212. Port = config.GetSysConfig().GetValue("Bus/DtuServer/Port", DEF_FTP_PORT).(int)
  213. }
  214. return Port
  215. }
  216. func NewDtuServer(param []interface{}) IBus {
  217. Port := GetFtpServerConfig(param)
  218. busid := GenDtuServerId(moduleName, param)
  219. b := &DtuServer{
  220. baseBus: baseBus{
  221. BusId: busid,
  222. mutex: &sync.Mutex{},
  223. },
  224. connlist: make(map[int]net.Conn),
  225. clientNum: 0,
  226. Port: Port,
  227. loop: false,
  228. }
  229. return b
  230. }
  231. func GenDtuServerId(name string, param []interface{}) string {
  232. Port := GetFtpServerConfig(param)
  233. return name + ":" + strconv.Itoa(Port)
  234. }
  235. func init() {
  236. BusReg[moduleName] = NewDtuServer
  237. BusGetID[moduleName] = GenDtuServerId
  238. }