ftpDtu.go 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311
  1. package bus
  2. import (
  3. "encoding/hex"
  4. "errors"
  5. "fmt"
  6. "net"
  7. "reflect"
  8. "strconv"
  9. "strings"
  10. "sync"
  11. "time"
  12. "github.com/yuguorong/go/log"
  13. "github.com/ammeter/config"
  14. "github.com/ammeter/util"
  15. )
  16. const (
  17. maxConnCount = 100
  18. moduleName = "dtuFtpServer"
  19. DEF_FTP_PORT = 10010
  20. )
  21. //var cmdQueryAddr []byte = []byte{0xfe, 0xfe, 0xfe, 0xfe, 0x68, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0x68, 0x13, 0x00, 0xDF, 0x16}
  22. type DtuServer struct {
  23. baseBus
  24. Port int
  25. name string
  26. lis net.Listener
  27. listPassive net.Listener
  28. connlist map[int]net.Conn
  29. clientNum int
  30. loop bool
  31. patchs map[string][]byte
  32. }
  33. func (dtu *DtuServer) ResetChannel(chn IChannel) {
  34. dtu.baseBus.ResetChannel(chn)
  35. }
  36. func (dtu *DtuServer) closeConn(conn net.Conn, connid int, chn IChannel) {
  37. dtu.mutex.Lock()
  38. if _, has := dtu.connlist[connid]; has {
  39. delete(dtu.connlist, connid)
  40. dtu.clientNum--
  41. }
  42. dtu.mutex.Unlock()
  43. conn.Close()
  44. if chn != nil {
  45. basechn := chn.(*BusChannel)
  46. log.Infof("client [%s] close\n", basechn.chnID)
  47. basechn.mountCnt = 0
  48. if basechn.event != nil {
  49. basechn.event.OnDetach(chn)
  50. }
  51. for i, id := range basechn.conIdList {
  52. if id == connid {
  53. basechn.conIdList[i] = -1
  54. break
  55. }
  56. }
  57. }
  58. }
  59. func (dtu *DtuServer) ApplyPatch(patchName string, param ...interface{}) {
  60. if len(param) > 0 && reflect.TypeOf(param[0]).Kind() == reflect.Slice {
  61. dtu.patchs[patchName] = param[0].([]byte)
  62. }
  63. }
  64. func (dtu *DtuServer) ClientConnect(conn net.Conn, connectID int, PassiveFlag bool) {
  65. var ic IChannel = nil
  66. remoteAddr := conn.RemoteAddr().String()
  67. //SrcPort := uint(conn.RemoteAddr().(*net.TCPAddr).Port)
  68. defer dtu.closeConn(conn, connectID, ic)
  69. var err error
  70. var buf [1024]byte
  71. n := 0
  72. for i := 0; i < 10; i++ {
  73. if PassiveFlag {
  74. for key, patchCmd := range dtu.patchs {
  75. if strings.Contains(key, "Passive") {
  76. conn.Write(patchCmd)
  77. }
  78. }
  79. }
  80. n, err = conn.Read(buf[:])
  81. if err != nil {
  82. log.Infof("read from %s header faild err:[%v]\n", remoteAddr, err)
  83. return
  84. }
  85. log.Infof("Connect %d (passive devices port:%t) with code %X", connectID, PassiveFlag, buf[:n])
  86. ic = dtu.baseBus.ScanChannel(buf[:n], connectID)
  87. if ic != nil {
  88. break
  89. }
  90. time.Sleep(time.Second * 5)
  91. }
  92. if ic == nil {
  93. return
  94. }
  95. chnin := ic.GetChan(0)
  96. if chnin == nil {
  97. panic("no chan for read message")
  98. }
  99. chnout := ic.GetChan(1)
  100. dtu.name = hex.EncodeToString(buf[:n])
  101. for {
  102. smeter := hex.EncodeToString(buf[:n])
  103. dtu.mutex.Lock()
  104. if n > 0 {
  105. chnin <- buf[:n]
  106. }
  107. dtu.mutex.Unlock()
  108. if chnout != nil {
  109. after := time.After(time.Second * 60)
  110. select {
  111. case msg := <-chnout:
  112. conn.Write(msg.([]byte))
  113. case <-after:
  114. log.Info("timeafter for write message")
  115. break
  116. }
  117. }
  118. // log.Printf("rev data from %s msg:%s\n", conn.RemoteAddr().String(), string(buf[:n]))
  119. log.Infof("[%s]rev data from %s msg(%d):[%s]\n", time.Now().Format("2006-01-02 15:04:05"), remoteAddr, n, smeter)
  120. if ic.(*BusChannel).timeout != 0 {
  121. conn.SetReadDeadline(time.Now().Add(ic.(*BusChannel).timeout))
  122. } else {
  123. conn.SetReadDeadline(time.Now().Add(30 * time.Minute))
  124. }
  125. n, err = conn.Read(buf[:])
  126. if err != nil {
  127. if strings.Contains(err.Error(), "timeout") || strings.Contains(err.Error(), " timed out") {
  128. log.Infof("read Next:[%s]\n", ic.ID())
  129. continue
  130. }
  131. log.Errorf("read from %s msg faild err:[%v]\n", ic.ID(), err)
  132. chnin <- "closed"
  133. break //return and close connection
  134. }
  135. }
  136. }
  137. func (dtu *DtuServer) Disconnect(chn IChannel) int {
  138. for _, conid := range chn.(*BusChannel).conIdList {
  139. if conn, has := dtu.connlist[conid]; has {
  140. dtu.closeConn(conn, conid, chn)
  141. }
  142. }
  143. return dtu.baseBus.Disconnect(chn)
  144. }
  145. func (dtu *DtuServer) StartServer(lister *net.Listener, PassiveFlag bool) {
  146. defer func() {
  147. (*lister).Close()
  148. }()
  149. connectID := 1000
  150. for dtu.loop {
  151. if dtu.clientNum >= maxConnCount {
  152. log.Infof("there is %d clients,is max num\n", dtu.clientNum)
  153. time.Sleep(5 * time.Second)
  154. continue
  155. }
  156. conn, err := (*lister).Accept()
  157. if err != nil {
  158. log.Errorf("listen err:[%v]\n", err)
  159. }
  160. dtu.mutex.Lock()
  161. connectID++
  162. dtu.connlist[connectID] = conn
  163. dtu.clientNum++
  164. dtu.mutex.Unlock()
  165. go dtu.ClientConnect(conn, connectID, PassiveFlag)
  166. }
  167. }
  168. func (dtu *DtuServer) Init() error {
  169. dtu.baseBus.Init()
  170. if !dtu.loop {
  171. addr := fmt.Sprintf("0.0.0.0:%d", dtu.Port)
  172. log.Info("start ", addr)
  173. var err error = nil
  174. dtu.lis, err = net.Listen("tcp", addr)
  175. if err != nil {
  176. log.Errorf("err! addr %s open faild err:[%v]\n", addr, err)
  177. return err
  178. }
  179. dtu.mutex.Lock()
  180. defer dtu.mutex.Unlock()
  181. dtu.loop = true
  182. go dtu.StartServer(&dtu.lis, false)
  183. addr = fmt.Sprintf("0.0.0.0:%d", dtu.Port+10)
  184. log.Info("Passive port listen start ", addr)
  185. dtu.listPassive, err = net.Listen("tcp", addr)
  186. if err != nil {
  187. log.Errorf("err! addr %s open faild err:[%v]\n", addr, err)
  188. return err
  189. }
  190. go dtu.StartServer(&dtu.listPassive, true)
  191. }
  192. return nil
  193. }
  194. func (dtu *DtuServer) Uninit() {
  195. dtu.mutex.Lock()
  196. defer dtu.mutex.Unlock()
  197. if dtu.loop {
  198. dtu.loop = false
  199. for _, c := range dtu.connlist {
  200. c.Close()
  201. }
  202. dtu.lis.Close()
  203. dtu.connlist = make(map[int]net.Conn)
  204. }
  205. }
  206. func (dtu *DtuServer) MountChannel(chn interface{}, router []chan interface{}) IChannel {
  207. if chn == nil || reflect.TypeOf(chn).Kind() != reflect.String {
  208. panic("Open channel should be a unique string")
  209. }
  210. ic := dtu.baseBus.MountChannel(chn, router)
  211. return ic
  212. }
  213. func (dtu *DtuServer) FreeChannel(chn IChannel) error {
  214. dtu.baseBus.FreeChannel(chn)
  215. if dtu.loop {
  216. //idx := chn.(*BusChannel).conn.(int)
  217. for _, connID := range chn.(*BusChannel).conIdList {
  218. if conn, has := dtu.connlist[connID]; has {
  219. dtu.mutex.Lock()
  220. delete(dtu.connlist, connID)
  221. dtu.mutex.Unlock()
  222. conn.Close()
  223. }
  224. }
  225. }
  226. return nil
  227. }
  228. func (dtu *DtuServer) Send(ichn IChannel, buff interface{}) (int, error) {
  229. if ichn != nil {
  230. chn := ichn.(*BusChannel)
  231. for _, connID := range chn.conIdList {
  232. if connID >= 0 {
  233. if conn, has := dtu.connlist[connID]; has {
  234. conn.Write(buff.([]byte))
  235. return len(buff.([]byte)), nil
  236. }
  237. }
  238. }
  239. }
  240. return 0, errors.New("no such connection")
  241. }
  242. func GetFtpServerConfig(param []interface{}) int {
  243. Port := 0
  244. if param != nil && len(param) > 0 && reflect.TypeOf(param[0]).Kind() == reflect.Int {
  245. Port = param[0].(int)
  246. }
  247. if Port == 0 {
  248. Port = util.ToInt(config.GetSysConfig().GetValue("Bus/DtuServer/Port", DEF_FTP_PORT))
  249. }
  250. return Port
  251. }
  252. func NewDtuServer(param []interface{}) IBus {
  253. Port := GetFtpServerConfig(param)
  254. busid := GenDtuServerId(moduleName, param)
  255. b := &DtuServer{
  256. baseBus: baseBus{
  257. BusId: busid,
  258. mutex: &sync.Mutex{},
  259. },
  260. connlist: make(map[int]net.Conn),
  261. patchs: make(map[string][]byte),
  262. clientNum: 0,
  263. Port: Port,
  264. loop: false,
  265. }
  266. return b
  267. }
  268. func GenDtuServerId(name string, param []interface{}) string {
  269. Port := GetFtpServerConfig(param)
  270. return name + ":" + strconv.Itoa(Port)
  271. }
  272. func init() {
  273. BusReg[moduleName] = NewDtuServer
  274. BusGetID[moduleName] = GenDtuServerId
  275. }