ameter.go 6.8 KB


  1. package drviers
  2. import (
  3. "encoding/hex"
  4. "reflect"
  5. "strconv"
  6. "strings"
  7. "time"
  8. bus "github.com/ammeter/Bus"
  9. "github.com/ammeter/config"
  10. "github.com/yuguorong/go/log"
  11. )
  12. const (
  13. CMD_ENERGY_TOTAL = "00000000"
  14. DEF_SAMPLE_PERIOD = 5 * time.Minute
  15. DEF_SAMPLE_PEER_DURATION = 30 * time.Second
  16. DEF_TCP_READ_TIMEOUT = 1 * time.Minute
  17. POSTPONE_PERSIST_TIME = 1 * time.Minute
  18. )
  19. type AmmeterModel struct {
  20. Id string `json:"id" gorm:"-"`
  21. DevName string `json:"devName" gorm:"primarykey"`
  22. Code string `json:"code" gorm:"-"`
  23. DutSn string `json:"dutSn" gorm:"-"`
  24. Address string `json:"address" gorm:"-"`
  25. Protocol string `json:"protocol" gorm:"-"`
  26. GwDevId string `json:"gwDevId" gorm:"-"`
  27. TransRadio string `json:"transformerRatio" gorm:"-"`
  28. }
  29. type Ammeter struct {
  30. AmmeterModel
  31. timestamp int32
  32. TotalEnergy float64
  33. TransDeno float64 ` gorm:"-"`
  34. TransDiv float64 ` gorm:"-"`
  35. }
  36. type AmMeterHub struct {
  37. baseDevice
  38. DutSn string
  39. Protocol string
  40. chnExit chan bool
  41. loopserver bool
  42. sampleTmr *time.Timer
  43. amList map[string]*Ammeter
  44. persitList map[string]*Ammeter
  45. tmrPersist *time.Timer
  46. queryIdx int
  47. }
  48. func (dev *AmMeterHub) Close() error {
  49. dev.sampleTmr.Stop()
  50. dev.loopserver = false
  51. dev.chnExit <- true
  52. return nil
  53. }
  54. func (dev *AmMeterHub) OnAttach(chn bus.IChannel) {
  55. log.Info(dev.DutSn, " Attached!")
  56. chn.SetTimeout(DEF_TCP_READ_TIMEOUT)
  57. dev.sampleTmr.Reset(DEF_SAMPLE_PEER_DURATION)
  58. }
  59. func (dev *AmMeterHub) OnDetach(chn bus.IChannel) {
  60. log.Info(dev.DutSn, " Detached!")
  61. dev.sampleTmr.Stop()
  62. }
  63. func (dev *AmMeterHub) ChannelDispatch(stream []byte, args interface{}) bus.ChnDispResult {
  64. if len(stream) != 5 {
  65. return bus.DispatchNone
  66. }
  67. if ret := dev.baseDevice.ChannelDispatch(stream, args); ret != bus.DispatchNone {
  68. return ret
  69. }
  70. sid := hex.EncodeToString(stream)
  71. for _, am := range dev.amList {
  72. if am.DutSn == sid {
  73. log.Info("MOUNT meter: ", am.DutSn, ",", sid)
  74. return bus.DispatchSingle
  75. }
  76. }
  77. return bus.DispatchNone
  78. }
  79. func (dev *AmMeterHub) IssueSampleCmd(tmrTx *time.Timer) (pam *Ammeter) {
  80. i := 0
  81. for _, am := range dev.amList {
  82. if i == dev.queryIdx {
  83. pam = am
  84. break
  85. }
  86. i++
  87. }
  88. if pam == nil {
  89. return nil
  90. }
  91. pack := dev.Route[1].iproto.PackageCmd(CMD_ENERGY_TOTAL, pam.Code+pam.Address)
  92. dev.Route[1].ibus.Send(dev.Route[1].iChn, pack)
  93. tmrTx.Reset(1 * time.Second)
  94. return pam
  95. }
  96. func (dev *AmMeterHub) SchedulNextSample(tmrTx *time.Timer) {
  97. tmrTx.Stop()
  98. dev.queryIdx++
  99. if dev.queryIdx >= len(dev.amList) {
  100. dev.queryIdx = 0
  101. dev.sampleTmr.Reset(DEF_SAMPLE_PERIOD)
  102. } else {
  103. dev.sampleTmr.Reset(DEF_SAMPLE_PEER_DURATION)
  104. }
  105. }
  106. func (dev *AmMeterHub) AdjustTelemetry(mval interface{}, devname string) {
  107. vlist := mval.(map[string]interface{})
  108. if am, has := dev.amList[devname]; has {
  109. for k, v := range vlist {
  110. if reflect.TypeOf(v).Kind() == reflect.Float64 {
  111. valItem := v.(float64)
  112. vlist[k] = (valItem * am.TransDeno) / am.TransDiv
  113. }
  114. }
  115. if val, has := vlist["TotalActivePower"]; has {
  116. diff := val.(float64) - am.TotalEnergy
  117. am.TotalEnergy = val.(float64)
  118. vlist["ActivePowerIncrement"] = diff
  119. if len(dev.persitList) == 0 {
  120. dev.tmrPersist.Reset(POSTPONE_PERSIST_TIME)
  121. }
  122. dev.persitList[am.DevName] = am
  123. }
  124. }
  125. }
  126. func (dev *AmMeterHub) Run() error {
  127. dev.loopserver = true
  128. tmrTxTimout := time.NewTimer(5 * time.Second)
  129. dev.tmrPersist = time.NewTimer(POSTPONE_PERSIST_TIME)
  130. tmrTxTimout.Stop()
  131. //var pam *Ammeter = nil
  132. for dev.loopserver {
  133. select {
  134. case msg := <-dev.Route[1].router[0]:
  135. if reflect.TypeOf(msg).Kind() == reflect.Slice {
  136. b := msg.([]byte)
  137. log.Info(hex.EncodeToString(b))
  138. devname := "" //pam.DevName
  139. mret := dev.Route[1].iproto.ParsePacket(b, &devname)
  140. if mret != nil && devname != "" {
  141. log.Info(devname, mret)
  142. devname = "AM10-" + devname + dev.DutSn[:4]
  143. dev.AdjustTelemetry(mret, devname)
  144. telemetry := dev.Route[0].iproto.PackageCmd(devname, mret)
  145. dev.Route[0].ibus.Send(dev.Route[0].iChn, telemetry)
  146. dev.SchedulNextSample(tmrTxTimout)
  147. }
  148. }
  149. case <-tmrTxTimout.C:
  150. dev.SchedulNextSample(tmrTxTimout)
  151. case <-dev.sampleTmr.C:
  152. dev.IssueSampleCmd(tmrTxTimout)
  153. case <-dev.chnExit:
  154. dev.loopserver = false
  155. case <-dev.tmrPersist.C:
  156. for k, am := range dev.persitList {
  157. config.GetDB().Save(am)
  158. delete(dev.persitList, k)
  159. }
  160. }
  161. }
  162. return nil
  163. }
  164. func DutchanDispatch(rxin interface{}, param interface{}) interface{} {
  165. log.Info("DutchanDispatch")
  166. b := rxin.([]byte)
  167. if len(b) == 8 {
  168. return hex.EncodeToString(b)
  169. }
  170. return nil
  171. }
  172. func (dev *AmMeterHub) Open(param ...interface{}) error {
  173. if dev.DutSn != "" {
  174. dev.SetRoute(dev, dev.Protocol, "dtuFtpServer", dev.DutSn, 0)
  175. }
  176. dev.sampleTmr = time.NewTimer(DEF_SAMPLE_PERIOD)
  177. dev.sampleTmr.Stop()
  178. go dev.Run()
  179. return nil
  180. }
  181. func (dev *AmMeterHub) GetDevice(devname string) interface{} {
  182. if dev, has := dev.amList[devname]; has {
  183. return dev
  184. }
  185. return nil
  186. }
  187. /*
  188. [{"code":"26462285","devName":"SAIR10-0000000026462285","gwDevId":"1ec2d8421b2ed30bf52b38d8579115b","id":"1ec2d84cc494690bf52b38d8579115b","protocol":"HJ212"},
  189. {"code":"61748803","devName":"SAIR10-0000000061748803","gwDevId":"1ec2d8421b2ed30bf52b38d8579115b","id":"1ec2d84eb428160bf52b38d8579115b","protocol":"HJ212"}]
  190. {"address":"05420001","code":"2108","devName":"AM10-2108054200019521","dutSn":"9521003712","gwDevId":"1ec2d8421b2ed30bf52b38d8579115b","id":"1ec2de6df5a2e40bf52b38d8579115b","protocol":"DLT645-2007","transformerRatio":"80"},
  191. */
  192. type AmmeterDrv struct {
  193. baseDriver
  194. }
  195. func (drv *AmmeterDrv) ParseTransRatio(dev *Ammeter) {
  196. Trans := strings.Split(dev.TransRadio, "/")
  197. dev.TransDeno, _ = strconv.ParseFloat(Trans[0], 64)
  198. if len(Trans) > 1 {
  199. dev.TransDiv, _ = strconv.ParseFloat(Trans[1], 64)
  200. }
  201. }
  202. func (drv *AmmeterDrv) CreateDevice(model interface{}) (int, []IDevice) {
  203. if model != nil {
  204. models := model.(*[]AmmeterModel)
  205. for _, m := range *models {
  206. var hub IDevice = nil
  207. var has bool
  208. if hub, has = drv.DevList[m.DutSn]; !has {
  209. hub = &AmMeterHub{
  210. DutSn: m.DutSn,
  211. Protocol: m.Protocol,
  212. chnExit: make(chan bool),
  213. loopserver: false,
  214. sampleTmr: nil,
  215. amList: make(map[string]*Ammeter),
  216. persitList: make(map[string]*Ammeter),
  217. queryIdx: 0,
  218. }
  219. hub.Probe(m.DutSn, drv)
  220. drv.DevList[m.DutSn] = hub
  221. }
  222. dev := &Ammeter{
  223. TotalEnergy: 0,
  224. timestamp: 0,
  225. TransDiv: 1,
  226. TransDeno: 1,
  227. }
  228. config.GetDB().Find(dev, "dev_name='"+m.DevName+"'")
  229. dev.AmmeterModel = m
  230. drv.ParseTransRatio(dev)
  231. hub.(*AmMeterHub).amList[dev.DevName] = dev
  232. }
  233. }
  234. return drv.baseDriver.CreateDevice(model)
  235. }
  236. func (drv *AmmeterDrv) GetModel() interface{} {
  237. return &[]AmmeterModel{}
  238. }
  239. func NewAmMeter(param interface{}) IDriver {
  240. am := new(AmmeterDrv)
  241. return am
  242. }
  243. func init() {
  244. driverReg["ammeter"] = NewAmMeter
  245. config.GetDB().CreateTbl(&Ammeter{})
  246. }