ftpDtu.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252
  1. package bus
  2. import (
  3. "encoding/hex"
  4. "fmt"
  5. "net"
  6. "reflect"
  7. "strconv"
  8. "sync"
  9. "time"
  10. "github.com/yuguorong/go/log"
  11. "github.com/ammeter/config"
  12. )
  13. const (
  14. maxConnCount = 100
  15. moduleName = "dtuFtpServer"
  16. DEF_FTP_PORT = 10010
  17. )
  18. type DtuServer struct {
  19. baseBus
  20. Port int
  21. name string
  22. lis net.Listener
  23. connlist map[int]net.Conn
  24. clientNum int
  25. loop bool
  26. }
  27. func (dtu *DtuServer) ResetChannel(chn IChannel) {
  28. }
  29. func (dtu *DtuServer) closeConn(conn net.Conn, connid int, chn IChannel) {
  30. dtu.mutex.Lock()
  31. if _, has := dtu.connlist[connid]; has {
  32. delete(dtu.connlist, connid)
  33. dtu.clientNum--
  34. }
  35. dtu.mutex.Unlock()
  36. conn.Close()
  37. if chn != nil {
  38. basechn := chn.(*BusChannel)
  39. log.Infof("client [%s] close\n", basechn.chnID)
  40. basechn.mountCnt = 0
  41. if basechn.event != nil {
  42. basechn.event.OnDetach(chn)
  43. }
  44. for i, id := range basechn.conIdList {
  45. if id == connid {
  46. basechn.conIdList[i] = -1
  47. break
  48. }
  49. }
  50. }
  51. }
  52. func (dtu *DtuServer) LookupChannel(buf []byte, idx int, conn net.Conn) IChannel {
  53. return dtu.baseBus.ScanChannel(buf, idx)
  54. }
  55. func (dtu *DtuServer) ClientConnect(conn net.Conn, idx int) {
  56. var ic IChannel = nil
  57. remoteAddr := conn.RemoteAddr().String()
  58. defer dtu.closeConn(conn, idx, ic)
  59. var err error
  60. var buf [1024]byte
  61. n := 0
  62. for ic == nil {
  63. n, err = conn.Read(buf[:])
  64. if err != nil {
  65. log.Infof("read from %s header faild err:[%v]\n", remoteAddr, err)
  66. return
  67. }
  68. ic = dtu.LookupChannel(buf[:n], idx, conn)
  69. }
  70. chnin := ic.GetChan(0)
  71. if chnin == nil {
  72. panic("no chan for read message")
  73. }
  74. chnout := ic.GetChan(1)
  75. dtu.name = hex.EncodeToString(buf[:n])
  76. for {
  77. smeter := hex.EncodeToString(buf[:n])
  78. dtu.mutex.Lock()
  79. chnin <- buf[:n]
  80. dtu.mutex.Unlock()
  81. if chnout != nil {
  82. after := time.After(time.Second * 60)
  83. select {
  84. case msg := <-chnout:
  85. conn.Write(msg.([]byte))
  86. case <-after:
  87. break
  88. }
  89. }
  90. // log.Printf("rev data from %s msg:%s\n", conn.RemoteAddr().String(), string(buf[:n]))
  91. log.Infof("[%s]rev data from %s msg(%d):[%s]\n", time.Now().Format("2006-01-02 15:04:05"), remoteAddr, n, smeter)
  92. if ic.(*BusChannel).timeout != 0 {
  93. conn.SetReadDeadline(time.Now().Add(ic.(*BusChannel).timeout))
  94. }
  95. n, err = conn.Read(buf[:])
  96. if err != nil {
  97. log.Errorf("read from %s msg faild err:[%v]\n", ic.ID(), err)
  98. dtu.closeConn(conn, idx, ic)
  99. break
  100. }
  101. }
  102. }
  103. func (dtu *DtuServer) StartServer() {
  104. defer func() {
  105. dtu.lis.Close()
  106. }()
  107. idx := 0
  108. for dtu.loop {
  109. if dtu.clientNum >= maxConnCount {
  110. log.Infof("there is %d clients,is max num\n", dtu.clientNum)
  111. time.Sleep(5 * time.Second)
  112. continue
  113. }
  114. conn, err := dtu.lis.Accept()
  115. if err != nil {
  116. log.Errorf("listen err:[%v]\n", err)
  117. }
  118. dtu.mutex.Lock()
  119. dtu.connlist[idx] = conn
  120. idx++
  121. dtu.clientNum++
  122. dtu.mutex.Unlock()
  123. go dtu.ClientConnect(conn, idx-1)
  124. }
  125. }
  126. func (dtu *DtuServer) Init() error {
  127. dtu.baseBus.Init()
  128. if !dtu.loop {
  129. addr := fmt.Sprintf("0.0.0.0:%d", dtu.Port)
  130. log.Info("start ", addr)
  131. var err error = nil
  132. dtu.lis, err = net.Listen("tcp", addr)
  133. if err != nil {
  134. log.Errorf("err! addr %s open faild err:[%v]\n", addr, err)
  135. return err
  136. }
  137. dtu.mutex.Lock()
  138. defer dtu.mutex.Unlock()
  139. dtu.loop = true
  140. go dtu.StartServer()
  141. }
  142. return nil
  143. }
  144. func (dtu *DtuServer) Uninit() {
  145. dtu.mutex.Lock()
  146. defer dtu.mutex.Unlock()
  147. if dtu.loop {
  148. dtu.loop = false
  149. for _, c := range dtu.connlist {
  150. c.Close()
  151. }
  152. dtu.lis.Close()
  153. dtu.connlist = make(map[int]net.Conn)
  154. }
  155. }
  156. func (dtu *DtuServer) OpenChannel(chn interface{}, router []chan interface{}) IChannel {
  157. if chn == nil || reflect.TypeOf(chn).Kind() != reflect.String {
  158. panic("Open channel should be a uniq string")
  159. }
  160. ic := dtu.baseBus.OpenChannel(chn, router)
  161. return ic
  162. }
  163. func (dtu *DtuServer) CloseChannel(chn IChannel) error {
  164. dtu.baseBus.CloseChannel(chn)
  165. if dtu.loop {
  166. //idx := chn.(*BusChannel).conn.(int)
  167. for _, connID := range chn.(*BusChannel).conIdList {
  168. if conn, has := dtu.connlist[connID]; has {
  169. dtu.mutex.Lock()
  170. delete(dtu.connlist, connID)
  171. dtu.mutex.Unlock()
  172. conn.Close()
  173. }
  174. }
  175. }
  176. return nil
  177. }
  178. func (dtu *DtuServer) Send(ichn IChannel, buff interface{}) (int, error) {
  179. if ichn != nil {
  180. chn := ichn.(*BusChannel)
  181. for _, connID := range chn.conIdList {
  182. if connID >= 0 {
  183. if conn, has := dtu.connlist[connID]; has {
  184. conn.Write(buff.([]byte))
  185. }
  186. }
  187. }
  188. }
  189. return 0, nil
  190. }
  191. func GetFtpServerConfig(param []interface{}) int {
  192. Port := 0
  193. if param != nil && len(param) > 0 && reflect.TypeOf(param[0]).Kind() == reflect.Int {
  194. Port = param[0].(int)
  195. }
  196. if Port == 0 {
  197. Port = config.GetSysConfig().GetValue("Bus/DtuServer/Port", DEF_FTP_PORT).(int)
  198. }
  199. return Port
  200. }
  201. func NewDtuServer(param []interface{}) IBus {
  202. Port := GetFtpServerConfig(param)
  203. busid := GenDtuServerId(moduleName, param)
  204. b := &DtuServer{
  205. baseBus: baseBus{
  206. BusId: busid,
  207. mutex: &sync.Mutex{},
  208. },
  209. connlist: make(map[int]net.Conn),
  210. clientNum: 0,
  211. Port: Port,
  212. loop: false,
  213. }
  214. return b
  215. }
  216. func GenDtuServerId(name string, param []interface{}) string {
  217. Port := GetFtpServerConfig(param)
  218. return name + ":" + strconv.Itoa(Port)
  219. }
  220. func init() {
  221. BusReg[moduleName] = NewDtuServer
  222. BusGetID[moduleName] = GenDtuServerId
  223. }