ameter.go 11 KB


  1. package drviers
  2. import (
  3. "encoding/hex"
  4. "errors"
  5. "fmt"
  6. "reflect"
  7. "strconv"
  8. "strings"
  9. "time"
  10. bus "github.com/ammeter/Bus"
  11. "github.com/ammeter/config"
  12. "github.com/ammeter/util"
  13. "github.com/yuguorong/go/log"
  14. )
  15. const (
  16. DEF_SAMPLE_PERIOD = 10 * time.Minute
  17. DEF_SAMPLE_PEER_DURATION = 5 * time.Second
  18. DEF_TCP_READ_TIMEOUT = (1 * time.Minute)
  19. POSTPONE_PERSIST_TIME = 5 * time.Minute
  20. MIN_TIMEOUT_TIMES = 2
  21. )
  22. //amList map[string]*Ammeter
  23. //amList : key=[code + address], each meter id must be unique, otherwise ammeter would not work correctly!
  24. //Careful! address from cloud may not real address. software varAddress functon would change it to real address
  25. type AmmeterModel struct {
  26. Id string `json:"id"`
  27. DevName string `json:"devName" gorm:"primary_key; unique"`
  28. Code string `json:"code" `
  29. DutSn string `json:"dutSn" `
  30. Address string `json:"address" `
  31. Protocol string `json:"protocol" `
  32. GwDevId string `json:"gwDevId" `
  33. TransRadio string `json:"transformerRatio" `
  34. Type string `json:"-" `
  35. }
  36. type meterPersist struct {
  37. DevName string `json:"devName" gorm:"primary_key; unique"`
  38. TsSample int64 `gorm:"autoUpdateTime"`
  39. TotalEnergy float64
  40. DBStatus int32
  41. }
  42. type Ammeter struct {
  43. AmmeterModel
  44. TsSample int64
  45. TotalEnergy float64
  46. TransDeno float64
  47. TransDiv float64
  48. varAddr string
  49. DBStatus int32
  50. TsUpdate int64
  51. }
  52. type AmMeterHub struct {
  53. baseDevice
  54. DutSn string
  55. DevCompCode string ` gorm:"-"`
  56. Protocol string
  57. chnExit chan bool
  58. loopserver bool
  59. sampleTmr *time.Timer
  60. errCount []int
  61. amIndex []string //avoid map chaos order,even map read only
  62. amList map[string]*Ammeter //key=[code + address], each meter id must be unique
  63. persitList map[string]*meterPersist
  64. tmrPersist *time.Timer
  65. queryIdx int
  66. }
  67. func (dev *AmMeterHub) Close() error {
  68. dev.sampleTmr.Stop()
  69. dev.loopserver = false
  70. dev.chnExit <- true
  71. dev.baseDevice.Close()
  72. return nil
  73. }
  74. func (dev *AmMeterHub) OnAttach(chn bus.IChannel) {
  75. log.Info(dev.DutSn, " Attached!")
  76. for i := 0; i < len(dev.errCount); i++ {
  77. dev.errCount[i] = 0
  78. }
  79. chn.SetTimeout(DEF_TCP_READ_TIMEOUT)
  80. dev.sampleTmr.Reset(DEF_SAMPLE_PEER_DURATION)
  81. dev.baseDevice.OnAttach(chn)
  82. }
  83. func (dev *AmMeterHub) OnDetach(chn bus.IChannel) {
  84. log.Info(dev.DutSn, " Detached!")
  85. dev.sampleTmr.Stop()
  86. dev.baseDevice.OnDetach(chn)
  87. }
  88. func (dev *AmMeterHub) ChannelDispatch(stream []byte, args interface{}) bus.ChnDispResult {
  89. if len(stream) > 8 {
  90. for i := 0; i < len(stream); i++ {
  91. if stream[i] != 0xFE {
  92. stream = stream[i:]
  93. break
  94. }
  95. }
  96. if stream[0] == 0x68 && stream[7] == 0x68 && stream[8] == 0x93 {
  97. devname := ""
  98. mret := dev.Route[1].iproto.ParsePacket(stream, &devname)
  99. if mret != nil && devname == dev.DutSn {
  100. return bus.DispatchSingle
  101. }
  102. }
  103. return bus.DispatchNone
  104. }
  105. if ret := dev.baseDevice.ChannelDispatch(stream, args); ret != bus.DispatchNone {
  106. return ret
  107. }
  108. sid := hex.EncodeToString(stream)
  109. for _, am := range dev.amList {
  110. if am.DutSn == sid {
  111. log.Info("MOUNT meter: ", am.DutSn, ",", sid)
  112. return bus.DispatchSingle
  113. }
  114. }
  115. return bus.DispatchNone
  116. }
  117. func (dev *AmMeterHub) IssueSampleCmd(tmrTx *time.Timer) (pam *Ammeter) {
  118. pam = dev.amList[dev.amIndex[dev.queryIdx]]
  119. pack := dev.Route[1].iproto.PackageCmd("TotalActivePower", pam.Code+pam.Address)
  120. if _, err := dev.Route[1].ibus.Send(dev.Route[1].iChn, pack); err != nil {
  121. dev.OnDetach(dev.Route[1].iChn)
  122. } else {
  123. tmrTx.Reset(DEF_TCP_READ_TIMEOUT)
  124. }
  125. return pam
  126. }
  127. func (dev *AmMeterHub) SchedulNextSample(tmrTx *time.Timer, timoutdiff time.Duration) {
  128. tmrTx.Stop()
  129. dev.queryIdx++
  130. if dev.queryIdx >= len(dev.amList) {
  131. dev.queryIdx = 0
  132. dev.sampleTmr.Reset(DEF_SAMPLE_PERIOD - timoutdiff)
  133. } else {
  134. dev.sampleTmr.Reset(DEF_SAMPLE_PEER_DURATION - timoutdiff)
  135. }
  136. }
  137. func (dev *AmMeterHub) AdjustTelemetry(mval interface{}, am *Ammeter) {
  138. vlist := mval.(map[string]interface{})
  139. for k, v := range vlist {
  140. if reflect.TypeOf(v).Kind() == reflect.Float64 {
  141. valItem := v.(float64)
  142. vlist[k] = (valItem * am.TransDeno) / am.TransDiv
  143. }
  144. }
  145. if val, has := vlist["TotalActivePower"]; has {
  146. v := val.(float64)
  147. if v < 0.001 {
  148. v = 0.001 //hack for cloud query
  149. }
  150. diff := v - am.TotalEnergy
  151. if am.DBStatus == 0 { //数据库清0
  152. diff = 0.001 //hack for cloud query
  153. am.DBStatus = 1
  154. }
  155. am.TotalEnergy = v
  156. vlist["ActivePowerIncrement"] = diff
  157. if len(dev.persitList) == 0 {
  158. dev.tmrPersist.Reset(POSTPONE_PERSIST_TIME)
  159. }
  160. energe := &meterPersist{
  161. TotalEnergy: am.TotalEnergy,
  162. TsSample: am.TsSample,
  163. DBStatus: am.DBStatus,
  164. DevName: am.DevName,
  165. }
  166. dev.persitList[am.DevName] = energe
  167. }
  168. // 增加在线状态上报
  169. vlist["onOffLine"] = 1
  170. }
  171. func (dev *AmMeterHub) Run() error {
  172. dev.loopserver = true
  173. tmrTxTimout := time.NewTimer(5 * time.Second)
  174. dev.tmrPersist = time.NewTimer(POSTPONE_PERSIST_TIME)
  175. tmrTxTimout.Stop()
  176. //var pam *Ammeter = nil
  177. for dev.loopserver {
  178. select {
  179. case msg := <-dev.Route[1].router[0]:
  180. dev.errCount[dev.queryIdx] = 0
  181. if reflect.TypeOf(msg).Kind() == reflect.Slice {
  182. b := msg.([]byte)
  183. log.Info("[", dev.DutSn, "]:", hex.EncodeToString(b)) //[9521003697]:fefefefe68040042050821689108333333338b694c331c16
  184. devid := ""
  185. mret := dev.Route[1].iproto.ParsePacket(b, &devid)
  186. if mret != nil && reflect.TypeOf(mret).Kind() == reflect.Map && devid != "" {
  187. log.Info(devid, mret) //210805420004 map[TotalActivePower:1936.58]
  188. if am, has := dev.amList[devid]; has {
  189. dev.AdjustTelemetry(mret, am)
  190. telemetry := dev.Route[0].iproto.PackageCmd(am.DevName, mret)
  191. dev.Route[0].ibus.Send(dev.Route[0].iChn, telemetry)
  192. am.TsSample = time.Now().Unix()
  193. }
  194. dev.SchedulNextSample(tmrTxTimout, 0)
  195. }
  196. } else if reflect.TypeOf(msg).Kind() == reflect.String {
  197. if msg.(string) == "closed" {
  198. dev.OnDetach(dev.Route[1].iChn)
  199. }
  200. }
  201. case <-tmrTxTimout.C:
  202. dev.errCount[dev.queryIdx]++
  203. keepconnect := false
  204. for errcnt := range dev.errCount {
  205. if errcnt < MIN_TIMEOUT_TIMES {
  206. dev.SchedulNextSample(tmrTxTimout, DEF_TCP_READ_TIMEOUT)
  207. keepconnect = true
  208. break
  209. }
  210. }
  211. if !keepconnect {
  212. dev.Route[1].ibus.Disconnect(dev.Route[1].iChn)
  213. }
  214. case <-dev.sampleTmr.C:
  215. dev.IssueSampleCmd(tmrTxTimout)
  216. case <-dev.chnExit:
  217. dev.loopserver = false
  218. case <-dev.tmrPersist.C:
  219. for k, eng := range dev.persitList {
  220. config.GetDB().Save(eng)
  221. delete(dev.persitList, k)
  222. }
  223. }
  224. }
  225. return nil
  226. }
  227. func DutchanDispatch(rxin interface{}, param interface{}) interface{} {
  228. log.Info("DutchanDispatch")
  229. b := rxin.([]byte)
  230. if len(b) == 8 {
  231. return hex.EncodeToString(b)
  232. }
  233. return nil
  234. }
  235. func (dev *AmMeterHub) Open(param ...interface{}) error {
  236. if dev.DutSn != "" {
  237. if dev.SetRoute(dev, dev.Protocol, "dtuFtpServer", dev.DutSn, 0) == 0 {
  238. return errors.New("can not set default rout!")
  239. }
  240. r := dev.Route[len(dev.Route)-1]
  241. r.ibus.ApplyPatch("PassiveAddress", r.iproto.PackageCmd("ReadAddress"))
  242. }
  243. dev.sampleTmr = time.NewTimer(DEF_SAMPLE_PERIOD)
  244. dev.sampleTmr.Stop()
  245. go dev.Run()
  246. return nil
  247. }
  248. func (dev *AmMeterHub) GetDevice(devname string) interface{} {
  249. for _, am := range dev.amList {
  250. if am.DevName == devname {
  251. return dev
  252. }
  253. }
  254. return nil
  255. }
  256. func (dev *AmMeterHub) ListDevices() interface{} {
  257. mdev := make(map[string]interface{})
  258. mdev["State"] = dev.Status
  259. dinfo := make(map[string]interface{})
  260. for _, subdev := range dev.amList {
  261. meterval := make(map[string]string)
  262. meterval["TotalEnergy"] = fmt.Sprintf("%f", subdev.TotalEnergy)
  263. if subdev.TsSample != 0 {
  264. meterval["TsSample"] = time.Unix(subdev.TsSample, 0).Format("2006-01-02 15:04:05")
  265. }
  266. dinfo[subdev.DevName] = meterval
  267. }
  268. mdev["Sub Meters"] = dinfo
  269. return mdev
  270. }
  271. /*
  272. [{"code":"26462285","devName":"SAIR10-0000000026462285","gwDevId":"1ec2d8421b2ed30bf52b38d8579115b","id":"1ec2d84cc494690bf52b38d8579115b","protocol":"HJ212"},
  273. {"code":"61748803","devName":"SAIR10-0000000061748803","gwDevId":"1ec2d8421b2ed30bf52b38d8579115b","id":"1ec2d84eb428160bf52b38d8579115b","protocol":"HJ212"}]
  274. {"address":"05420001","code":"2108","devName":"AM10-2108054200019521","dutSn":"9521003712","gwDevId":"1ec2d8421b2ed30bf52b38d8579115b","id":"1ec2de6df5a2e40bf52b38d8579115b","protocol":"DLT645-2007","transformerRatio":"80"},
  275. */
  276. type AmmeterDrv struct {
  277. baseDriver
  278. }
  279. func (drv *AmmeterDrv) ParseTransRatio(dev *Ammeter) {
  280. Trans := strings.Split(dev.TransRadio, "/")
  281. dev.TransDeno, _ = strconv.ParseFloat(util.NumberFilter(Trans[0], util.NUMBER_CONTINUE), 64)
  282. if len(Trans) > 1 {
  283. dev.TransDiv, _ = strconv.ParseFloat(util.NumberFilter(Trans[1], util.NUMBER_CONTINUE), 64)
  284. }
  285. }
  286. func (drv *AmmeterDrv) varyAdress(m *AmmeterModel, addr string, subid string) {
  287. defer func() {
  288. recover()
  289. }()
  290. iaddr, _ := strconv.Atoi(addr)
  291. isub, _ := strconv.Atoi(subid)
  292. if iaddr != 0 && isub > 0 {
  293. iaddr = iaddr + (isub-1)*3
  294. m.Address = strconv.Itoa(iaddr)
  295. }
  296. }
  297. func (drv *AmmeterDrv) mountHub(m *AmmeterModel, list *[]IDevice) IDevice {
  298. devCompID := m.DutSn
  299. if m.DutSn == "" || m.DutSn == "0000" || len(m.DutSn) < 10 {
  300. addr := m.Address
  301. ss := strings.Split(addr, "-")
  302. m.DutSn = m.Code + ss[0]
  303. devCompID = "0000"
  304. if len(ss) > 1 {
  305. drv.varyAdress(m, ss[0], ss[1])
  306. }
  307. }
  308. if hub, has := drv.DevList[m.DutSn]; has {
  309. return hub
  310. }
  311. hub := &AmMeterHub{
  312. DutSn: m.DutSn,
  313. Protocol: m.Protocol,
  314. chnExit: make(chan bool),
  315. loopserver: false,
  316. sampleTmr: nil,
  317. amList: make(map[string]*Ammeter),
  318. persitList: make(map[string]*meterPersist),
  319. queryIdx: 0,
  320. DevCompCode: devCompID,
  321. }
  322. hub.Probe(m.DutSn, drv)
  323. drv.DevList[m.DutSn] = hub
  324. *list = append(*list, hub)
  325. return hub
  326. }
  327. func (drv *AmmeterDrv) CreateDevice(model interface{}) (int, []IDevice) {
  328. ts := time.Now().UnixMilli()
  329. devlist := make([]IDevice, 0)
  330. if model != nil {
  331. devModels := model.(*[]AmmeterModel)
  332. for _, mod := range *devModels {
  333. if len(mod.Code) < 4 || len(mod.Address) < 8 {
  334. continue
  335. }
  336. //AmMeterHub is device!!! communication target is the hub
  337. hub := drv.mountHub(&mod, &devlist)
  338. id := mod.Code + mod.Address
  339. am, has := hub.(*AmMeterHub).amList[id]
  340. if !has {
  341. eng := &meterPersist{}
  342. config.GetDB().Find(eng, "dev_name='"+mod.DevName+"'")
  343. am = &Ammeter{
  344. TotalEnergy: eng.TotalEnergy,
  345. DBStatus: eng.DBStatus,
  346. TsSample: eng.TsSample,
  347. TransDiv: 1,
  348. TransDeno: 1,
  349. }
  350. am.AmmeterModel = mod
  351. drv.ParseTransRatio(am)
  352. hub.(*AmMeterHub).amList[id] = am
  353. }
  354. am.TsUpdate = ts
  355. }
  356. }
  357. for _, hub := range drv.DevList {
  358. hub.(*AmMeterHub).amIndex = make([]string, 0)
  359. for id, am := range hub.(*AmMeterHub).amList {
  360. if am.TsUpdate != ts {
  361. delete(hub.(*AmMeterHub).amList, id)
  362. log.Info("Delete ammeter:", id)
  363. } else {
  364. hub.(*AmMeterHub).amIndex = append(hub.(*AmMeterHub).amIndex, id)
  365. }
  366. }
  367. hub.(*AmMeterHub).errCount = make([]int, len(hub.(*AmMeterHub).amList))
  368. }
  369. if len(drv.DevList) == 0 {
  370. drv.Uninstall()
  371. }
  372. return len(devlist), devlist
  373. }
  374. func (drv *AmmeterDrv) GetModel() interface{} {
  375. return &[]AmmeterModel{}
  376. }
  377. func NewAmMeter(param interface{}) IDriver {
  378. am := new(AmmeterDrv)
  379. return am
  380. }
  381. func init() {
  382. driverReg["ammeter"] = NewAmMeter
  383. config.GetDB().CreateTbl(&AmmeterModel{})
  384. config.GetDB().CreateTbl(&meterPersist{})
  385. }