ameter.go 8.5 KB


  1. package drviers
  2. import (
  3. "encoding/hex"
  4. "fmt"
  5. "reflect"
  6. "strconv"
  7. "strings"
  8. "time"
  9. bus "github.com/ammeter/Bus"
  10. "github.com/ammeter/config"
  11. "github.com/yuguorong/go/log"
  12. )
  13. const (
  14. CMD_ENERGY_TOTAL = "00000000"
  15. DEF_SAMPLE_PERIOD = 5 * time.Minute
  16. DEF_SAMPLE_PEER_DURATION = 30 * time.Second
  17. DEF_TCP_READ_TIMEOUT = 1 * time.Minute
  18. POSTPONE_PERSIST_TIME = 1 * time.Minute
  19. )
  20. type AmmeterModel struct {
  21. Id string `json:"id" gorm:"-"`
  22. DevName string `json:"devName" gorm:"primarykey"`
  23. Code string `json:"code" gorm:"-"`
  24. DutSn string `json:"dutSn" gorm:"-"`
  25. Address string `json:"address" gorm:"-"`
  26. Protocol string `json:"protocol" gorm:"-"`
  27. GwDevId string `json:"gwDevId" gorm:"-"`
  28. TransRadio string `json:"transformerRatio" gorm:"-"`
  29. }
  30. type Ammeter struct {
  31. AmmeterModel
  32. TsSample int64
  33. TotalEnergy float64
  34. TransDeno float64 `gorm:"-"`
  35. TransDiv float64 `gorm:"-"`
  36. DBStatus int `json:"-"`
  37. }
  38. type AmMeterHub struct {
  39. baseDevice
  40. DutSn string
  41. DevCompCode string ` gorm:"-"`
  42. Protocol string
  43. chnExit chan bool
  44. loopserver bool
  45. sampleTmr *time.Timer
  46. amList map[string]*Ammeter
  47. persitList map[string]*Ammeter
  48. tmrPersist *time.Timer
  49. queryIdx int
  50. }
  51. func (dev *AmMeterHub) Close() error {
  52. dev.sampleTmr.Stop()
  53. dev.loopserver = false
  54. dev.chnExit <- true
  55. dev.baseDevice.Close()
  56. return nil
  57. }
  58. func (dev *AmMeterHub) OnAttach(chn bus.IChannel) {
  59. log.Info(dev.DutSn, " Attached!")
  60. chn.SetTimeout(DEF_TCP_READ_TIMEOUT)
  61. dev.sampleTmr.Reset(DEF_SAMPLE_PEER_DURATION)
  62. dev.baseDevice.OnAttach(chn)
  63. }
  64. func (dev *AmMeterHub) OnDetach(chn bus.IChannel) {
  65. log.Info(dev.DutSn, " Detached!")
  66. dev.sampleTmr.Stop()
  67. dev.baseDevice.OnDetach(chn)
  68. }
  69. func (dev *AmMeterHub) ChannelDispatch(stream []byte, args interface{}) bus.ChnDispResult {
  70. if len(stream) > 8 {
  71. if stream[0] == 0x68 && stream[7] == 0x68 && stream[8] == 0x93 {
  72. devname := ""
  73. mret := dev.Route[1].iproto.ParsePacket(stream, &devname)
  74. if mret != nil && devname == dev.DeviceName {
  75. return bus.DispatchSingle
  76. }
  77. // for i := 0; i < 6; i++ {
  78. // if stream[i+1] != stream[i+10] {
  79. // return bus.DispatchNone
  80. // }
  81. // }
  82. // log.Infof("MOUNT meter:0000[%x]", stream[1:7])
  83. // return bus.DispatchSingle
  84. }
  85. return bus.DispatchNone
  86. }
  87. if ret := dev.baseDevice.ChannelDispatch(stream, args); ret != bus.DispatchNone {
  88. return ret
  89. }
  90. sid := hex.EncodeToString(stream)
  91. for _, am := range dev.amList {
  92. if am.DutSn == sid {
  93. log.Info("MOUNT meter: ", am.DutSn, ",", sid)
  94. return bus.DispatchSingle
  95. }
  96. }
  97. return bus.DispatchNone
  98. }
  99. func (dev *AmMeterHub) IssueSampleCmd(tmrTx *time.Timer) (pam *Ammeter) {
  100. i := 0
  101. for _, am := range dev.amList {
  102. if i == dev.queryIdx {
  103. pam = am
  104. break
  105. }
  106. i++
  107. }
  108. if pam == nil {
  109. return nil
  110. }
  111. pack := dev.Route[1].iproto.PackageCmd(CMD_ENERGY_TOTAL, pam.Code+pam.Address)
  112. dev.Route[1].ibus.Send(dev.Route[1].iChn, pack)
  113. tmrTx.Reset(1 * time.Second)
  114. return pam
  115. }
  116. func (dev *AmMeterHub) SchedulNextSample(tmrTx *time.Timer) {
  117. tmrTx.Stop()
  118. dev.queryIdx++
  119. if dev.queryIdx >= len(dev.amList) {
  120. dev.queryIdx = 0
  121. dev.sampleTmr.Reset(DEF_SAMPLE_PERIOD)
  122. } else {
  123. dev.sampleTmr.Reset(DEF_SAMPLE_PEER_DURATION)
  124. }
  125. }
  126. func (dev *AmMeterHub) AdjustTelemetry(mval interface{}, devname string) {
  127. vlist := mval.(map[string]interface{})
  128. if am, has := dev.amList[devname]; has {
  129. for k, v := range vlist {
  130. if reflect.TypeOf(v).Kind() == reflect.Float64 {
  131. valItem := v.(float64)
  132. vlist[k] = (valItem * am.TransDeno) / am.TransDiv
  133. }
  134. }
  135. if val, has := vlist["TotalActivePower"]; has {
  136. diff := val.(float64) - am.TotalEnergy
  137. if am.DBStatus == 0 { //数据库清0
  138. diff = 0
  139. am.DBStatus = 1
  140. }
  141. am.TotalEnergy = val.(float64)
  142. vlist["ActivePowerIncrement"] = diff
  143. if len(dev.persitList) == 0 {
  144. dev.tmrPersist.Reset(POSTPONE_PERSIST_TIME)
  145. }
  146. dev.persitList[am.DevName] = am
  147. }
  148. }
  149. }
  150. func (dev *AmMeterHub) Run() error {
  151. dev.loopserver = true
  152. tmrTxTimout := time.NewTimer(5 * time.Second)
  153. dev.tmrPersist = time.NewTimer(POSTPONE_PERSIST_TIME)
  154. tmrTxTimout.Stop()
  155. //var pam *Ammeter = nil
  156. for dev.loopserver {
  157. select {
  158. case msg := <-dev.Route[1].router[0]:
  159. if reflect.TypeOf(msg).Kind() == reflect.Slice {
  160. b := msg.([]byte)
  161. log.Info("[", dev.DutSn, "]:", hex.EncodeToString(b))
  162. devname := "" //pam.DevName
  163. mret := dev.Route[1].iproto.ParsePacket(b, &devname)
  164. if mret != nil && reflect.TypeOf(mret).Kind() == reflect.Map && devname != "" {
  165. log.Info(devname, mret)
  166. devname = "AM10-" + devname + dev.DevCompCode[:4]
  167. dev.AdjustTelemetry(mret, devname)
  168. telemetry := dev.Route[0].iproto.PackageCmd(devname, mret)
  169. dev.Route[0].ibus.Send(dev.Route[0].iChn, telemetry)
  170. if meter, has := dev.amList[devname]; has {
  171. meter.TsSample = time.Now().Unix()
  172. }
  173. dev.SchedulNextSample(tmrTxTimout)
  174. }
  175. }
  176. case <-tmrTxTimout.C:
  177. dev.SchedulNextSample(tmrTxTimout)
  178. case <-dev.sampleTmr.C:
  179. dev.IssueSampleCmd(tmrTxTimout)
  180. case <-dev.chnExit:
  181. dev.loopserver = false
  182. case <-dev.tmrPersist.C:
  183. for k, am := range dev.persitList {
  184. if am.DBStatus == 0 {
  185. }
  186. config.GetDB().Save(am)
  187. delete(dev.persitList, k)
  188. }
  189. }
  190. }
  191. return nil
  192. }
  193. func DutchanDispatch(rxin interface{}, param interface{}) interface{} {
  194. log.Info("DutchanDispatch")
  195. b := rxin.([]byte)
  196. if len(b) == 8 {
  197. return hex.EncodeToString(b)
  198. }
  199. return nil
  200. }
  201. func (dev *AmMeterHub) Open(param ...interface{}) error {
  202. if dev.DutSn != "" {
  203. dev.SetRoute(dev, dev.Protocol, "dtuFtpServer", dev.DutSn, 0)
  204. }
  205. dev.sampleTmr = time.NewTimer(DEF_SAMPLE_PERIOD)
  206. dev.sampleTmr.Stop()
  207. go dev.Run()
  208. return nil
  209. }
  210. func (dev *AmMeterHub) GetDevice(devname string) interface{} {
  211. if dev, has := dev.amList[devname]; has {
  212. return dev
  213. }
  214. return nil
  215. }
  216. func (dev *AmMeterHub) ListDevices() interface{} {
  217. mdev := make(map[string]interface{})
  218. mdev["State"] = dev.Status
  219. dinfo := make(map[string]interface{})
  220. for sname, subdev := range dev.amList {
  221. meterval := make(map[string]string)
  222. meterval["TotalEnergy"] = fmt.Sprintf("%f", subdev.TotalEnergy)
  223. if subdev.TsSample != 0 {
  224. meterval["TsSample"] = time.Unix(subdev.TsSample, 0).Format("2006-01-02 15:04:05")
  225. }
  226. dinfo[sname] = meterval
  227. }
  228. mdev["Sub Meters"] = dinfo
  229. return mdev
  230. }
  231. /*
  232. [{"code":"26462285","devName":"SAIR10-0000000026462285","gwDevId":"1ec2d8421b2ed30bf52b38d8579115b","id":"1ec2d84cc494690bf52b38d8579115b","protocol":"HJ212"},
  233. {"code":"61748803","devName":"SAIR10-0000000061748803","gwDevId":"1ec2d8421b2ed30bf52b38d8579115b","id":"1ec2d84eb428160bf52b38d8579115b","protocol":"HJ212"}]
  234. {"address":"05420001","code":"2108","devName":"AM10-2108054200019521","dutSn":"9521003712","gwDevId":"1ec2d8421b2ed30bf52b38d8579115b","id":"1ec2de6df5a2e40bf52b38d8579115b","protocol":"DLT645-2007","transformerRatio":"80"},
  235. */
  236. type AmmeterDrv struct {
  237. baseDriver
  238. }
  239. func (drv *AmmeterDrv) ParseTransRatio(dev *Ammeter) {
  240. Trans := strings.Split(dev.TransRadio, "/")
  241. dev.TransDeno, _ = strconv.ParseFloat(Trans[0], 64)
  242. if len(Trans) > 1 {
  243. dev.TransDiv, _ = strconv.ParseFloat(Trans[1], 64)
  244. }
  245. }
  246. func (drv *AmmeterDrv) CreateDevice(model interface{}, filterGwId string) (int, []IDevice) {
  247. if model != nil {
  248. models := model.(*[]AmmeterModel)
  249. for _, m := range *models {
  250. var hub IDevice = nil
  251. var has bool
  252. if m.GwDevId != filterGwId {
  253. continue
  254. }
  255. if len(m.Code) < 4 || len(m.Address) < 8 {
  256. continue
  257. }
  258. devCompID := m.DutSn
  259. if m.DutSn == "" {
  260. m.DutSn = m.Code + m.Address
  261. devCompID = "0000"
  262. }
  263. if hub, has = drv.DevList[m.DutSn]; !has {
  264. hub = &AmMeterHub{
  265. DutSn: m.DutSn,
  266. Protocol: m.Protocol,
  267. chnExit: make(chan bool),
  268. loopserver: false,
  269. sampleTmr: nil,
  270. amList: make(map[string]*Ammeter),
  271. persitList: make(map[string]*Ammeter),
  272. queryIdx: 0,
  273. DevCompCode: devCompID,
  274. }
  275. hub.Probe(m.DutSn, drv)
  276. drv.DevList[m.DutSn] = hub
  277. }
  278. if _, has := hub.(*AmMeterHub).amList[m.DevName]; !has {
  279. dev := &Ammeter{
  280. TotalEnergy: 0,
  281. TsSample: 0,
  282. TransDiv: 1,
  283. TransDeno: 1,
  284. DBStatus: 0,
  285. }
  286. config.GetDB().Find(dev, "dev_name='"+m.DevName+"'")
  287. dev.AmmeterModel = m
  288. drv.ParseTransRatio(dev)
  289. hub.(*AmMeterHub).amList[dev.DevName] = dev
  290. }
  291. }
  292. }
  293. return drv.baseDriver.CreateDevice(model, filterGwId)
  294. }
  295. func (drv *AmmeterDrv) GetModel() interface{} {
  296. return &[]AmmeterModel{}
  297. }
  298. func NewAmMeter(param interface{}) IDriver {
  299. am := new(AmmeterDrv)
  300. return am
  301. }
  302. func init() {
  303. driverReg["ammeter"] = NewAmMeter
  304. config.GetDB().CreateTbl(&Ammeter{})
  305. }