ftpDtu.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288
  1. package bus
  2. import (
  3. "encoding/hex"
  4. "fmt"
  5. "net"
  6. "reflect"
  7. "strconv"
  8. "strings"
  9. "sync"
  10. "time"
  11. "github.com/yuguorong/go/log"
  12. "github.com/ammeter/config"
  13. "github.com/ammeter/util"
  14. )
  15. const (
  16. maxConnCount = 100
  17. moduleName = "dtuFtpServer"
  18. DEF_FTP_PORT = 10010
  19. )
  20. //var cmdQueryAddr []byte = []byte{0xfe, 0xfe, 0xfe, 0xfe, 0x68, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0x68, 0x13, 0x00, 0xDF, 0x16}
  21. type DtuServer struct {
  22. baseBus
  23. Port int
  24. name string
  25. lis net.Listener
  26. listPassive net.Listener
  27. connlist map[int]net.Conn
  28. clientNum int
  29. loop bool
  30. patchs map[string][]byte
  31. }
  32. func (dtu *DtuServer) ResetChannel(chn IChannel) {
  33. }
  34. func (dtu *DtuServer) closeConn(conn net.Conn, connid int, chn IChannel) {
  35. dtu.mutex.Lock()
  36. if _, has := dtu.connlist[connid]; has {
  37. delete(dtu.connlist, connid)
  38. dtu.clientNum--
  39. }
  40. dtu.mutex.Unlock()
  41. conn.Close()
  42. if chn != nil {
  43. basechn := chn.(*BusChannel)
  44. log.Infof("client [%s] close\n", basechn.chnID)
  45. basechn.mountCnt = 0
  46. if basechn.event != nil {
  47. basechn.event.OnDetach(chn)
  48. }
  49. for i, id := range basechn.conIdList {
  50. if id == connid {
  51. basechn.conIdList[i] = -1
  52. break
  53. }
  54. }
  55. }
  56. }
  57. func (dtu *DtuServer) ApplyPatch(patchName string, param ...interface{}) {
  58. if len(param) > 0 && reflect.TypeOf(param[0]).Kind() == reflect.Slice {
  59. dtu.patchs[patchName] = param[0].([]byte)
  60. }
  61. }
  62. func (dtu *DtuServer) ClientConnect(conn net.Conn, connectID int, PassiveFlag bool) {
  63. var ic IChannel = nil
  64. remoteAddr := conn.RemoteAddr().String()
  65. //SrcPort := uint(conn.RemoteAddr().(*net.TCPAddr).Port)
  66. defer dtu.closeConn(conn, connectID, ic)
  67. var err error
  68. var buf [1024]byte
  69. n := 0
  70. for i := 0; i < 10; i++ {
  71. if PassiveFlag {
  72. for key, patchCmd := range dtu.patchs {
  73. if strings.Contains(key, "Passive") {
  74. conn.Write(patchCmd)
  75. }
  76. }
  77. }
  78. n, err = conn.Read(buf[:])
  79. if err != nil {
  80. log.Infof("read from %s header faild err:[%v]\n", remoteAddr, err)
  81. return
  82. }
  83. log.Infof("Connect %d (passive devices port:%t) with code %X", connectID, PassiveFlag, buf[:n])
  84. ic = dtu.baseBus.ScanChannel(buf[:n], connectID)
  85. if ic != nil {
  86. break
  87. }
  88. time.Sleep(time.Second * 5)
  89. }
  90. if ic == nil {
  91. return
  92. }
  93. chnin := ic.GetChan(0)
  94. if chnin == nil {
  95. panic("no chan for read message")
  96. }
  97. chnout := ic.GetChan(1)
  98. dtu.name = hex.EncodeToString(buf[:n])
  99. for {
  100. smeter := hex.EncodeToString(buf[:n])
  101. dtu.mutex.Lock()
  102. chnin <- buf[:n]
  103. dtu.mutex.Unlock()
  104. if chnout != nil {
  105. after := time.After(time.Second * 60)
  106. select {
  107. case msg := <-chnout:
  108. conn.Write(msg.([]byte))
  109. case <-after:
  110. break
  111. }
  112. }
  113. // log.Printf("rev data from %s msg:%s\n", conn.RemoteAddr().String(), string(buf[:n]))
  114. log.Infof("[%s]rev data from %s msg(%d):[%s]\n", time.Now().Format("2006-01-02 15:04:05"), remoteAddr, n, smeter)
  115. if ic.(*BusChannel).timeout != 0 {
  116. conn.SetReadDeadline(time.Now().Add(ic.(*BusChannel).timeout))
  117. }
  118. n, err = conn.Read(buf[:])
  119. if err != nil {
  120. log.Errorf("read from %s msg faild err:[%v]\n", ic.ID(), err)
  121. dtu.closeConn(conn, connectID, ic)
  122. break
  123. }
  124. }
  125. }
  126. func (dtu *DtuServer) StartServer(lister *net.Listener, PassiveFlag bool) {
  127. defer func() {
  128. (*lister).Close()
  129. }()
  130. connectID := 1000
  131. for dtu.loop {
  132. if dtu.clientNum >= maxConnCount {
  133. log.Infof("there is %d clients,is max num\n", dtu.clientNum)
  134. time.Sleep(5 * time.Second)
  135. continue
  136. }
  137. conn, err := (*lister).Accept()
  138. if err != nil {
  139. log.Errorf("listen err:[%v]\n", err)
  140. }
  141. dtu.mutex.Lock()
  142. connectID++
  143. dtu.connlist[connectID] = conn
  144. dtu.clientNum++
  145. dtu.mutex.Unlock()
  146. go dtu.ClientConnect(conn, connectID, PassiveFlag)
  147. }
  148. }
  149. func (dtu *DtuServer) Init() error {
  150. dtu.baseBus.Init()
  151. if !dtu.loop {
  152. addr := fmt.Sprintf("0.0.0.0:%d", dtu.Port)
  153. log.Info("start ", addr)
  154. var err error = nil
  155. dtu.lis, err = net.Listen("tcp", addr)
  156. if err != nil {
  157. log.Errorf("err! addr %s open faild err:[%v]\n", addr, err)
  158. return err
  159. }
  160. dtu.mutex.Lock()
  161. defer dtu.mutex.Unlock()
  162. dtu.loop = true
  163. go dtu.StartServer(&dtu.lis, false)
  164. addr = fmt.Sprintf("0.0.0.0:%d", dtu.Port+10)
  165. log.Info("Passive port listen start ", addr)
  166. dtu.listPassive, err = net.Listen("tcp", addr)
  167. if err != nil {
  168. log.Errorf("err! addr %s open faild err:[%v]\n", addr, err)
  169. return err
  170. }
  171. go dtu.StartServer(&dtu.listPassive, true)
  172. }
  173. return nil
  174. }
  175. func (dtu *DtuServer) Uninit() {
  176. dtu.mutex.Lock()
  177. defer dtu.mutex.Unlock()
  178. if dtu.loop {
  179. dtu.loop = false
  180. for _, c := range dtu.connlist {
  181. c.Close()
  182. }
  183. dtu.lis.Close()
  184. dtu.connlist = make(map[int]net.Conn)
  185. }
  186. }
  187. func (dtu *DtuServer) MountChannel(chn interface{}, router []chan interface{}) IChannel {
  188. if chn == nil || reflect.TypeOf(chn).Kind() != reflect.String {
  189. panic("Open channel should be a unique string")
  190. }
  191. ic := dtu.baseBus.MountChannel(chn, router)
  192. return ic
  193. }
  194. func (dtu *DtuServer) FreeChannel(chn IChannel) error {
  195. dtu.baseBus.FreeChannel(chn)
  196. if dtu.loop {
  197. //idx := chn.(*BusChannel).conn.(int)
  198. for _, connID := range chn.(*BusChannel).conIdList {
  199. if conn, has := dtu.connlist[connID]; has {
  200. dtu.mutex.Lock()
  201. delete(dtu.connlist, connID)
  202. dtu.mutex.Unlock()
  203. conn.Close()
  204. }
  205. }
  206. }
  207. return nil
  208. }
  209. func (dtu *DtuServer) Send(ichn IChannel, buff interface{}) (int, error) {
  210. if ichn != nil {
  211. chn := ichn.(*BusChannel)
  212. for _, connID := range chn.conIdList {
  213. if connID >= 0 {
  214. if conn, has := dtu.connlist[connID]; has {
  215. conn.Write(buff.([]byte))
  216. }
  217. }
  218. }
  219. }
  220. return dtu.baseBus.Send(ichn, buff)
  221. }
  222. func GetFtpServerConfig(param []interface{}) int {
  223. Port := 0
  224. if param != nil && len(param) > 0 && reflect.TypeOf(param[0]).Kind() == reflect.Int {
  225. Port = param[0].(int)
  226. }
  227. if Port == 0 {
  228. Port = util.ToInt(config.GetSysConfig().GetValue("Bus/DtuServer/Port", DEF_FTP_PORT))
  229. }
  230. return Port
  231. }
  232. func NewDtuServer(param []interface{}) IBus {
  233. Port := GetFtpServerConfig(param)
  234. busid := GenDtuServerId(moduleName, param)
  235. b := &DtuServer{
  236. baseBus: baseBus{
  237. BusId: busid,
  238. mutex: &sync.Mutex{},
  239. },
  240. connlist: make(map[int]net.Conn),
  241. patchs: make(map[string][]byte),
  242. clientNum: 0,
  243. Port: Port,
  244. loop: false,
  245. }
  246. return b
  247. }
  248. func GenDtuServerId(name string, param []interface{}) string {
  249. Port := GetFtpServerConfig(param)
  250. return name + ":" + strconv.Itoa(Port)
  251. }
  252. func init() {
  253. BusReg[moduleName] = NewDtuServer
  254. BusGetID[moduleName] = GenDtuServerId
  255. }