bus.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. package bus
  2. import (
  3. "reflect"
  4. "strconv"
  5. "sync"
  6. "time"
  7. )
  8. const (
  9. CHAN_INPUT = 0
  10. CHAN_OUTPUT = 1
  11. )
  12. type CbBusChanDisp func(stream []byte, param interface{}) interface{}
  13. type BusChannel struct {
  14. chnID string
  15. mountCnt int
  16. event IBusEvent
  17. bus IBus
  18. timeout time.Duration
  19. conIdList []int
  20. evtArg interface{}
  21. chin chan interface{}
  22. chout chan interface{}
  23. }
  24. func (chn *BusChannel) ID() string {
  25. return chn.chnID
  26. }
  27. func (chn *BusChannel) GetChan(id int) chan interface{} {
  28. switch id {
  29. case 0:
  30. return chn.chin
  31. case 1:
  32. return chn.chout
  33. }
  34. return nil
  35. }
  36. func (chn *BusChannel) SetTimeout(timeout time.Duration) {
  37. chn.timeout = timeout
  38. }
  39. func (chn *BusChannel) SetEvent(evt IBusEvent, evtArgs interface{}) {
  40. chn.event = evt
  41. chn.evtArg = evtArgs
  42. }
  43. func (chn *BusChannel) GetEvent() IBusEvent {
  44. return chn.event
  45. }
  46. func (chn *BusChannel) GetBus() IBus {
  47. return chn.bus
  48. }
  49. type baseBus struct {
  50. BusId string
  51. TsLast uint64
  52. ChnList map[string]IChannel
  53. mutex *sync.Mutex
  54. }
  55. func (bus *baseBus) Init() error {
  56. if bus.ChnList == nil {
  57. bus.ChnList = make(map[string]IChannel)
  58. bus.mutex = new(sync.Mutex)
  59. }
  60. return nil
  61. }
  62. func (bus *baseBus) Uninit() {
  63. }
  64. func (bus *baseBus) stringId(chnID interface{}) string {
  65. switch reflect.TypeOf(chnID).Kind() {
  66. case reflect.String:
  67. return chnID.(string)
  68. case reflect.Int:
  69. return strconv.Itoa(chnID.(int))
  70. case reflect.Int64:
  71. return strconv.FormatInt(chnID.(int64), 10)
  72. }
  73. return ""
  74. }
  75. func (bus *baseBus) MountChannel(chnID interface{}, router []chan interface{}) IChannel {
  76. schnid := bus.stringId(chnID)
  77. if chn, has := bus.ChnList[schnid]; has {
  78. return chn
  79. }
  80. c := &BusChannel{
  81. chnID: schnid,
  82. event: nil,
  83. mountCnt: 0,
  84. bus: bus,
  85. conIdList: make([]int, 0),
  86. timeout: 0,
  87. }
  88. if len(router) > 1 {
  89. c.chout = router[1]
  90. }
  91. if len(router) > 0 {
  92. c.chin = router[0]
  93. }
  94. bus.mutex.Lock()
  95. bus.ChnList[schnid] = c
  96. bus.mutex.Unlock()
  97. return c
  98. }
  99. func (dtu *baseBus) ResetChannel(chn IChannel) {
  100. basechn := chn.(*BusChannel)
  101. basechn.mountCnt = 0
  102. }
  103. func (bus *baseBus) FreeChannel(chn IChannel) error {
  104. id := chn.ID()
  105. bus.mutex.Lock()
  106. delete(bus.ChnList, id)
  107. bus.mutex.Unlock()
  108. bus.ResetChannel(chn)
  109. basechn := chn.(*BusChannel)
  110. if basechn.event != nil {
  111. basechn.event.OnDetach(chn)
  112. }
  113. return nil
  114. }
  115. func (bus *baseBus) ScanChannel(stream []byte, connID int) IChannel {
  116. for _, ichn := range bus.ChnList {
  117. chn := ichn.(*BusChannel)
  118. if chn.event == nil || chn.mountCnt < 0 {
  119. continue
  120. }
  121. if ret := chn.event.ChannelDispatch(stream, chn.evtArg); ret != DispatchNone {
  122. chn.mountCnt += int(ret)
  123. chn.conIdList = append(chn.conIdList, connID)
  124. chn.event.OnAttach(ichn)
  125. return ichn
  126. }
  127. }
  128. return nil
  129. }
  130. func (bus *baseBus) Send(chn IChannel, buff interface{}) (int, error) {
  131. bus.TsLast = uint64(time.Now().Unix())
  132. return 0, nil
  133. }
  134. func (bus *baseBus) Recive(chn IChannel, buff interface{}) (int, error) {
  135. bus.TsLast = uint64(time.Now().Unix())
  136. return 0, nil
  137. }
  138. func (bus *baseBus) ApplyPatch(ptchName string, param ...interface{}) {
  139. }
  140. func (bus *baseBus) TimeStamp() int64 {
  141. return int64(bus.TsLast)
  142. }
  143. type funcRegBus func(param []interface{}) IBus
  144. type funcGetID func(string, []interface{}) string
  145. var BusList map[string]IBus
  146. var BusGetID map[string]funcGetID
  147. var BusReg map[string]funcRegBus
  148. func MountBus(name string, param []interface{}) IBus {
  149. busid := name
  150. if f, has := BusGetID[name]; has {
  151. busid = f(name, param)
  152. }
  153. if p, has := BusList[busid]; has && p != nil {
  154. return p
  155. }
  156. if f, has := BusReg[name]; has && f != nil {
  157. b := f(param)
  158. BusList[busid] = b
  159. return b
  160. }
  161. return nil
  162. }
  163. func init() {
  164. BusList = make(map[string]IBus)
  165. BusReg = make(map[string]funcRegBus)
  166. BusGetID = make(map[string]funcGetID)
  167. }