ftpDtu.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295
  1. package bus
  2. import (
  3. "encoding/hex"
  4. "errors"
  5. "fmt"
  6. "net"
  7. "reflect"
  8. "strconv"
  9. "strings"
  10. "sync"
  11. "time"
  12. "github.com/yuguorong/go/log"
  13. "github.com/ammeter/config"
  14. "github.com/ammeter/util"
  15. )
  16. const (
  17. maxConnCount = 100
  18. moduleName = "dtuFtpServer"
  19. DEF_FTP_PORT = 10010
  20. )
  21. //var cmdQueryAddr []byte = []byte{0xfe, 0xfe, 0xfe, 0xfe, 0x68, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0x68, 0x13, 0x00, 0xDF, 0x16}
  22. type DtuServer struct {
  23. baseBus
  24. Port int
  25. name string
  26. lis net.Listener
  27. listPassive net.Listener
  28. connlist map[int]net.Conn
  29. clientNum int
  30. loop bool
  31. patchs map[string][]byte
  32. }
  33. func (dtu *DtuServer) ResetChannel(chn IChannel) {
  34. dtu.baseBus.ResetChannel(chn)
  35. }
  36. func (dtu *DtuServer) closeConn(conn net.Conn, connid int, chn IChannel) {
  37. dtu.mutex.Lock()
  38. if _, has := dtu.connlist[connid]; has {
  39. delete(dtu.connlist, connid)
  40. dtu.clientNum--
  41. }
  42. dtu.mutex.Unlock()
  43. conn.Close()
  44. if chn != nil {
  45. basechn := chn.(*BusChannel)
  46. log.Infof("client [%s] close\n", basechn.chnID)
  47. basechn.mountCnt = 0
  48. if basechn.event != nil {
  49. basechn.event.OnDetach(chn)
  50. }
  51. for i, id := range basechn.conIdList {
  52. if id == connid {
  53. basechn.conIdList[i] = -1
  54. break
  55. }
  56. }
  57. }
  58. }
  59. func (dtu *DtuServer) ApplyPatch(patchName string, param ...interface{}) {
  60. if len(param) > 0 && reflect.TypeOf(param[0]).Kind() == reflect.Slice {
  61. dtu.patchs[patchName] = param[0].([]byte)
  62. }
  63. }
  64. func (dtu *DtuServer) ClientConnect(conn net.Conn, connectID int, PassiveFlag bool) {
  65. var ic IChannel = nil
  66. remoteAddr := conn.RemoteAddr().String()
  67. //SrcPort := uint(conn.RemoteAddr().(*net.TCPAddr).Port)
  68. defer dtu.closeConn(conn, connectID, ic)
  69. var err error
  70. var buf [1024]byte
  71. n := 0
  72. for i := 0; i < 10; i++ {
  73. if PassiveFlag {
  74. for key, patchCmd := range dtu.patchs {
  75. if strings.Contains(key, "Passive") {
  76. conn.Write(patchCmd)
  77. }
  78. }
  79. }
  80. n, err = conn.Read(buf[:])
  81. if err != nil {
  82. log.Infof("read from %s header faild err:[%v]\n", remoteAddr, err)
  83. return
  84. }
  85. log.Infof("Connect %d (passive devices port:%t) with code %X", connectID, PassiveFlag, buf[:n])
  86. ic = dtu.baseBus.ScanChannel(buf[:n], connectID)
  87. if ic != nil {
  88. break
  89. }
  90. time.Sleep(time.Second * 5)
  91. }
  92. if ic == nil {
  93. return
  94. }
  95. chnin := ic.GetChan(0)
  96. if chnin == nil {
  97. panic("no chan for read message")
  98. }
  99. chnout := ic.GetChan(1)
  100. dtu.name = hex.EncodeToString(buf[:n])
  101. for {
  102. smeter := hex.EncodeToString(buf[:n])
  103. dtu.mutex.Lock()
  104. chnin <- buf[:n]
  105. dtu.mutex.Unlock()
  106. if chnout != nil {
  107. after := time.After(time.Second * 60)
  108. select {
  109. case msg := <-chnout:
  110. conn.Write(msg.([]byte))
  111. case <-after:
  112. log.Info("timeafter for write message")
  113. break
  114. }
  115. }
  116. // log.Printf("rev data from %s msg:%s\n", conn.RemoteAddr().String(), string(buf[:n]))
  117. log.Infof("[%s]rev data from %s msg(%d):[%s]\n", time.Now().Format("2006-01-02 15:04:05"), remoteAddr, n, smeter)
  118. if ic.(*BusChannel).timeout != 0 {
  119. conn.SetReadDeadline(time.Now().Add(ic.(*BusChannel).timeout))
  120. } else {
  121. conn.SetReadDeadline(time.Now().Add(30 * time.Minute))
  122. }
  123. n, err = conn.Read(buf[:])
  124. if err != nil {
  125. log.Errorf("read from %s msg faild err:[%v]\n", ic.ID(), err)
  126. chnin <- "closed"
  127. break //return and close connection
  128. }
  129. }
  130. }
  131. func (dtu *DtuServer) StartServer(lister *net.Listener, PassiveFlag bool) {
  132. defer func() {
  133. (*lister).Close()
  134. }()
  135. connectID := 1000
  136. for dtu.loop {
  137. if dtu.clientNum >= maxConnCount {
  138. log.Infof("there is %d clients,is max num\n", dtu.clientNum)
  139. time.Sleep(5 * time.Second)
  140. continue
  141. }
  142. conn, err := (*lister).Accept()
  143. if err != nil {
  144. log.Errorf("listen err:[%v]\n", err)
  145. }
  146. dtu.mutex.Lock()
  147. connectID++
  148. dtu.connlist[connectID] = conn
  149. dtu.clientNum++
  150. dtu.mutex.Unlock()
  151. go dtu.ClientConnect(conn, connectID, PassiveFlag)
  152. }
  153. }
  154. func (dtu *DtuServer) Init() error {
  155. dtu.baseBus.Init()
  156. if !dtu.loop {
  157. addr := fmt.Sprintf("0.0.0.0:%d", dtu.Port)
  158. log.Info("start ", addr)
  159. var err error = nil
  160. dtu.lis, err = net.Listen("tcp", addr)
  161. if err != nil {
  162. log.Errorf("err! addr %s open faild err:[%v]\n", addr, err)
  163. return err
  164. }
  165. dtu.mutex.Lock()
  166. defer dtu.mutex.Unlock()
  167. dtu.loop = true
  168. go dtu.StartServer(&dtu.lis, false)
  169. addr = fmt.Sprintf("0.0.0.0:%d", dtu.Port+10)
  170. log.Info("Passive port listen start ", addr)
  171. dtu.listPassive, err = net.Listen("tcp", addr)
  172. if err != nil {
  173. log.Errorf("err! addr %s open faild err:[%v]\n", addr, err)
  174. return err
  175. }
  176. go dtu.StartServer(&dtu.listPassive, true)
  177. }
  178. return nil
  179. }
  180. func (dtu *DtuServer) Uninit() {
  181. dtu.mutex.Lock()
  182. defer dtu.mutex.Unlock()
  183. if dtu.loop {
  184. dtu.loop = false
  185. for _, c := range dtu.connlist {
  186. c.Close()
  187. }
  188. dtu.lis.Close()
  189. dtu.connlist = make(map[int]net.Conn)
  190. }
  191. }
  192. func (dtu *DtuServer) MountChannel(chn interface{}, router []chan interface{}) IChannel {
  193. if chn == nil || reflect.TypeOf(chn).Kind() != reflect.String {
  194. panic("Open channel should be a unique string")
  195. }
  196. ic := dtu.baseBus.MountChannel(chn, router)
  197. return ic
  198. }
  199. func (dtu *DtuServer) FreeChannel(chn IChannel) error {
  200. dtu.baseBus.FreeChannel(chn)
  201. if dtu.loop {
  202. //idx := chn.(*BusChannel).conn.(int)
  203. for _, connID := range chn.(*BusChannel).conIdList {
  204. if conn, has := dtu.connlist[connID]; has {
  205. dtu.mutex.Lock()
  206. delete(dtu.connlist, connID)
  207. dtu.mutex.Unlock()
  208. conn.Close()
  209. }
  210. }
  211. }
  212. return nil
  213. }
  214. func (dtu *DtuServer) Send(ichn IChannel, buff interface{}) (int, error) {
  215. if ichn != nil {
  216. chn := ichn.(*BusChannel)
  217. for _, connID := range chn.conIdList {
  218. if connID >= 0 {
  219. if conn, has := dtu.connlist[connID]; has {
  220. conn.Write(buff.([]byte))
  221. return len(buff.([]byte)), nil
  222. }
  223. }
  224. }
  225. }
  226. return 0, errors.New("no such connection")
  227. }
  228. func GetFtpServerConfig(param []interface{}) int {
  229. Port := 0
  230. if param != nil && len(param) > 0 && reflect.TypeOf(param[0]).Kind() == reflect.Int {
  231. Port = param[0].(int)
  232. }
  233. if Port == 0 {
  234. Port = util.ToInt(config.GetSysConfig().GetValue("Bus/DtuServer/Port", DEF_FTP_PORT))
  235. }
  236. return Port
  237. }
  238. func NewDtuServer(param []interface{}) IBus {
  239. Port := GetFtpServerConfig(param)
  240. busid := GenDtuServerId(moduleName, param)
  241. b := &DtuServer{
  242. baseBus: baseBus{
  243. BusId: busid,
  244. mutex: &sync.Mutex{},
  245. },
  246. connlist: make(map[int]net.Conn),
  247. patchs: make(map[string][]byte),
  248. clientNum: 0,
  249. Port: Port,
  250. loop: false,
  251. }
  252. return b
  253. }
  254. func GenDtuServerId(name string, param []interface{}) string {
  255. Port := GetFtpServerConfig(param)
  256. return name + ":" + strconv.Itoa(Port)
  257. }
  258. func init() {
  259. BusReg[moduleName] = NewDtuServer
  260. BusGetID[moduleName] = GenDtuServerId
  261. }