ameter.go 9.9 KB


  1. package drviers
  2. import (
  3. "encoding/hex"
  4. "errors"
  5. "fmt"
  6. "reflect"
  7. "strconv"
  8. "strings"
  9. "time"
  10. bus "github.com/ammeter/Bus"
  11. "github.com/ammeter/config"
  12. "github.com/yuguorong/go/log"
  13. )
  14. const (
  15. DEF_SAMPLE_PERIOD = 30 * time.Second // time.Minute
  16. DEF_SAMPLE_PEER_DURATION = 10 * time.Second
  17. DEF_TCP_READ_TIMEOUT = 1 * time.Minute
  18. POSTPONE_PERSIST_TIME = 1 * time.Minute
  19. )
  20. //amList map[string]*Ammeter
  21. //amList : key=[code + address], each meter id must be unique, otherwise ammeter would not work correctly!
  22. //Careful! address from cloud may not real address. software varAddress functon would change it to real address
  23. type AmmeterModel struct {
  24. Id string `json:"id"`
  25. DevName string `json:"devName" gorm:"primary_key; unique"`
  26. Code string `json:"code" `
  27. DutSn string `json:"dutSn" `
  28. Address string `json:"address" `
  29. Protocol string `json:"protocol" `
  30. GwDevId string `json:"gwDevId" `
  31. TransRadio string `json:"transformerRatio" `
  32. Type string `json:"-" `
  33. }
  34. type meterPersist struct {
  35. DevName string `json:"devName" gorm:"primary_key; unique"`
  36. TsSample int64 `gorm:"autoUpdateTime"`
  37. TotalEnergy float64
  38. DBStatus int32
  39. }
  40. type Ammeter struct {
  41. AmmeterModel
  42. TsSample int64
  43. TotalEnergy float64
  44. TransDeno float64
  45. TransDiv float64
  46. varAddr string
  47. DBStatus int32
  48. TsUpdate int64
  49. }
  50. type AmMeterHub struct {
  51. baseDevice
  52. DutSn string
  53. DevCompCode string ` gorm:"-"`
  54. Protocol string
  55. chnExit chan bool
  56. loopserver bool
  57. sampleTmr *time.Timer
  58. amList map[string]*Ammeter //key=[code + address], each meter id must be unique
  59. persitList map[string]*meterPersist
  60. tmrPersist *time.Timer
  61. queryIdx int
  62. }
  63. func (dev *AmMeterHub) Close() error {
  64. dev.sampleTmr.Stop()
  65. dev.loopserver = false
  66. dev.chnExit <- true
  67. dev.baseDevice.Close()
  68. return nil
  69. }
  70. func (dev *AmMeterHub) OnAttach(chn bus.IChannel) {
  71. log.Info(dev.DutSn, " Attached!")
  72. chn.SetTimeout(DEF_TCP_READ_TIMEOUT)
  73. dev.sampleTmr.Reset(DEF_SAMPLE_PEER_DURATION)
  74. dev.baseDevice.OnAttach(chn)
  75. }
  76. func (dev *AmMeterHub) OnDetach(chn bus.IChannel) {
  77. log.Info(dev.DutSn, " Detached!")
  78. dev.sampleTmr.Stop()
  79. dev.baseDevice.OnDetach(chn)
  80. }
  81. func (dev *AmMeterHub) ChannelDispatch(stream []byte, args interface{}) bus.ChnDispResult {
  82. if len(stream) > 8 {
  83. if stream[0] == 0x68 && stream[7] == 0x68 && stream[8] == 0x93 {
  84. devname := ""
  85. mret := dev.Route[1].iproto.ParsePacket(stream, &devname)
  86. if mret != nil && devname == dev.DutSn {
  87. return bus.DispatchSingle
  88. }
  89. }
  90. return bus.DispatchNone
  91. }
  92. if ret := dev.baseDevice.ChannelDispatch(stream, args); ret != bus.DispatchNone {
  93. return ret
  94. }
  95. sid := hex.EncodeToString(stream)
  96. for _, am := range dev.amList {
  97. if am.DutSn == sid {
  98. log.Info("MOUNT meter: ", am.DutSn, ",", sid)
  99. return bus.DispatchSingle
  100. }
  101. }
  102. return bus.DispatchNone
  103. }
  104. func (dev *AmMeterHub) IssueSampleCmd(tmrTx *time.Timer) (pam *Ammeter) {
  105. i := 0
  106. for _, am := range dev.amList {
  107. if i == 0 {
  108. pam = am
  109. }
  110. if i == dev.queryIdx {
  111. pam = am
  112. break
  113. }
  114. i++
  115. }
  116. if pam == nil {
  117. return nil
  118. }
  119. pack := dev.Route[1].iproto.PackageCmd("TotalActivePower", pam.Code+pam.Address)
  120. dev.Route[1].ibus.Send(dev.Route[1].iChn, pack)
  121. tmrTx.Reset(1 * time.Second)
  122. return pam
  123. }
  124. func (dev *AmMeterHub) SchedulNextSample(tmrTx *time.Timer) {
  125. tmrTx.Stop()
  126. dev.queryIdx++
  127. log.Info("Schedul next:(%d/%d)", dev.queryIdx, len(dev.amList))
  128. if dev.queryIdx >= len(dev.amList) {
  129. dev.queryIdx = 0
  130. dev.sampleTmr.Reset(DEF_SAMPLE_PERIOD)
  131. } else {
  132. dev.sampleTmr.Reset(DEF_SAMPLE_PEER_DURATION)
  133. }
  134. }
  135. func (dev *AmMeterHub) AdjustTelemetry(mval interface{}, am *Ammeter) {
  136. vlist := mval.(map[string]interface{})
  137. for k, v := range vlist {
  138. if reflect.TypeOf(v).Kind() == reflect.Float64 {
  139. valItem := v.(float64)
  140. vlist[k] = (valItem * am.TransDeno) / am.TransDiv
  141. }
  142. }
  143. if val, has := vlist["TotalActivePower"]; has {
  144. diff := val.(float64) - am.TotalEnergy
  145. if am.DBStatus == 0 { //数据库清0
  146. diff = 0
  147. am.DBStatus = 1
  148. }
  149. am.TotalEnergy = val.(float64)
  150. vlist["ActivePowerIncrement"] = diff
  151. if len(dev.persitList) == 0 {
  152. dev.tmrPersist.Reset(POSTPONE_PERSIST_TIME)
  153. }
  154. energe := &meterPersist{
  155. TotalEnergy: am.TotalEnergy,
  156. TsSample: am.TsSample,
  157. DBStatus: am.DBStatus,
  158. DevName: am.DevName,
  159. }
  160. dev.persitList[am.DevName] = energe
  161. }
  162. }
  163. func (dev *AmMeterHub) Run() error {
  164. dev.loopserver = true
  165. tmrTxTimout := time.NewTimer(5 * time.Second)
  166. dev.tmrPersist = time.NewTimer(POSTPONE_PERSIST_TIME)
  167. tmrTxTimout.Stop()
  168. //var pam *Ammeter = nil
  169. for dev.loopserver {
  170. select {
  171. case msg := <-dev.Route[1].router[0]:
  172. if reflect.TypeOf(msg).Kind() == reflect.Slice {
  173. b := msg.([]byte)
  174. log.Info("[", dev.DutSn, "]:", hex.EncodeToString(b)) //[9521003697]:fefefefe68040042050821689108333333338b694c331c16
  175. devid := ""
  176. mret := dev.Route[1].iproto.ParsePacket(b, &devid)
  177. if mret != nil && reflect.TypeOf(mret).Kind() == reflect.Map && devid != "" {
  178. log.Info(devid, mret) //210805420004 map[TotalActivePower:1936.58]
  179. if am, has := dev.amList[devid]; has {
  180. dev.AdjustTelemetry(mret, am)
  181. telemetry := dev.Route[0].iproto.PackageCmd(am.DevName, mret)
  182. dev.Route[0].ibus.Send(dev.Route[0].iChn, telemetry)
  183. am.TsSample = time.Now().Unix()
  184. }
  185. dev.SchedulNextSample(tmrTxTimout)
  186. }
  187. }
  188. case <-tmrTxTimout.C:
  189. dev.SchedulNextSample(tmrTxTimout)
  190. case <-dev.sampleTmr.C:
  191. dev.IssueSampleCmd(tmrTxTimout)
  192. case <-dev.chnExit:
  193. dev.loopserver = false
  194. case <-dev.tmrPersist.C:
  195. for k, eng := range dev.persitList {
  196. config.GetDB().Save(eng)
  197. delete(dev.persitList, k)
  198. }
  199. }
  200. }
  201. return nil
  202. }
  203. func DutchanDispatch(rxin interface{}, param interface{}) interface{} {
  204. log.Info("DutchanDispatch")
  205. b := rxin.([]byte)
  206. if len(b) == 8 {
  207. return hex.EncodeToString(b)
  208. }
  209. return nil
  210. }
  211. func (dev *AmMeterHub) Open(param ...interface{}) error {
  212. if dev.DutSn != "" {
  213. if dev.SetRoute(dev, dev.Protocol, "dtuFtpServer", dev.DutSn, 0) == 0 {
  214. return errors.New("can not set default rout!")
  215. }
  216. r := dev.Route[len(dev.Route)-1]
  217. r.ibus.ApplyPatch("PassiveAddress", r.iproto.PackageCmd("ReadAddress"))
  218. }
  219. dev.sampleTmr = time.NewTimer(DEF_SAMPLE_PERIOD)
  220. dev.sampleTmr.Stop()
  221. go dev.Run()
  222. return nil
  223. }
  224. func (dev *AmMeterHub) GetDevice(devname string) interface{} {
  225. for _, am := range dev.amList {
  226. if am.DevName == devname {
  227. return dev
  228. }
  229. }
  230. return nil
  231. }
  232. func (dev *AmMeterHub) ListDevices() interface{} {
  233. mdev := make(map[string]interface{})
  234. mdev["State"] = dev.Status
  235. dinfo := make(map[string]interface{})
  236. for _, subdev := range dev.amList {
  237. meterval := make(map[string]string)
  238. meterval["TotalEnergy"] = fmt.Sprintf("%f", subdev.TotalEnergy)
  239. if subdev.TsSample != 0 {
  240. meterval["TsSample"] = time.Unix(subdev.TsSample, 0).Format("2006-01-02 15:04:05")
  241. }
  242. dinfo[subdev.DevName] = meterval
  243. }
  244. mdev["Sub Meters"] = dinfo
  245. return mdev
  246. }
  247. /*
  248. [{"code":"26462285","devName":"SAIR10-0000000026462285","gwDevId":"1ec2d8421b2ed30bf52b38d8579115b","id":"1ec2d84cc494690bf52b38d8579115b","protocol":"HJ212"},
  249. {"code":"61748803","devName":"SAIR10-0000000061748803","gwDevId":"1ec2d8421b2ed30bf52b38d8579115b","id":"1ec2d84eb428160bf52b38d8579115b","protocol":"HJ212"}]
  250. {"address":"05420001","code":"2108","devName":"AM10-2108054200019521","dutSn":"9521003712","gwDevId":"1ec2d8421b2ed30bf52b38d8579115b","id":"1ec2de6df5a2e40bf52b38d8579115b","protocol":"DLT645-2007","transformerRatio":"80"},
  251. */
  252. type AmmeterDrv struct {
  253. baseDriver
  254. }
  255. func (drv *AmmeterDrv) ParseTransRatio(dev *Ammeter) {
  256. Trans := strings.Split(dev.TransRadio, "/")
  257. dev.TransDeno, _ = strconv.ParseFloat(Trans[0], 64)
  258. if len(Trans) > 1 {
  259. dev.TransDiv, _ = strconv.ParseFloat(Trans[1], 64)
  260. }
  261. }
  262. func (drv *AmmeterDrv) varyAdress(m *AmmeterModel, addr string, subid string) {
  263. defer func() {
  264. recover()
  265. }()
  266. iaddr, _ := strconv.Atoi(addr)
  267. isub, _ := strconv.Atoi(subid)
  268. if iaddr != 0 && isub > 0 {
  269. iaddr = iaddr + (isub-1)*3
  270. m.Address = strconv.Itoa(iaddr)
  271. }
  272. }
  273. func (drv *AmmeterDrv) mountHub(m *AmmeterModel, list *[]IDevice) IDevice {
  274. devCompID := m.DutSn
  275. if m.DutSn == "" || m.DutSn == "0000" || len(m.DutSn) < 10 {
  276. addr := m.Address
  277. ss := strings.Split(addr, "-")
  278. m.DutSn = m.Code + ss[0]
  279. devCompID = "0000"
  280. if len(ss) > 1 {
  281. drv.varyAdress(m, ss[0], ss[1])
  282. }
  283. }
  284. if hub, has := drv.DevList[m.DutSn]; has {
  285. return hub
  286. }
  287. hub := &AmMeterHub{
  288. DutSn: m.DutSn,
  289. Protocol: m.Protocol,
  290. chnExit: make(chan bool),
  291. loopserver: false,
  292. sampleTmr: nil,
  293. amList: make(map[string]*Ammeter),
  294. persitList: make(map[string]*meterPersist),
  295. queryIdx: 0,
  296. DevCompCode: devCompID,
  297. }
  298. hub.Probe(m.DutSn, drv)
  299. drv.DevList[m.DutSn] = hub
  300. *list = append(*list, hub)
  301. return hub
  302. }
  303. func (drv *AmmeterDrv) CreateDevice(model interface{}) (int, []IDevice) {
  304. ts := time.Now().UnixMilli()
  305. devlist := make([]IDevice, 0)
  306. if model != nil {
  307. devModels := model.(*[]AmmeterModel)
  308. for _, mod := range *devModels {
  309. if len(mod.Code) < 4 || len(mod.Address) < 8 {
  310. continue
  311. }
  312. hub := drv.mountHub(&mod, &devlist)
  313. id := mod.Code + mod.Address
  314. am, has := hub.(*AmMeterHub).amList[id]
  315. if !has {
  316. eng := &meterPersist{}
  317. config.GetDB().Find(eng, "dev_name='"+mod.DevName+"'")
  318. am = &Ammeter{
  319. TotalEnergy: eng.TotalEnergy,
  320. DBStatus: eng.DBStatus,
  321. TsSample: eng.TsSample,
  322. TransDiv: 1,
  323. TransDeno: 1,
  324. }
  325. am.AmmeterModel = mod
  326. drv.ParseTransRatio(am)
  327. hub.(*AmMeterHub).amList[id] = am
  328. }
  329. am.TsUpdate = ts
  330. }
  331. }
  332. for _, hub := range drv.DevList {
  333. for id, am := range hub.(*AmMeterHub).amList {
  334. if am.TsUpdate != ts {
  335. delete(hub.(*AmMeterHub).amList, id)
  336. }
  337. }
  338. }
  339. return int(ts), devlist
  340. }
  341. func (drv *AmmeterDrv) GetModel() interface{} {
  342. return &[]AmmeterModel{}
  343. }
  344. func NewAmMeter(param interface{}) IDriver {
  345. am := new(AmmeterDrv)
  346. return am
  347. }
  348. func init() {
  349. driverReg["ammeter"] = NewAmMeter
  350. config.GetDB().CreateTbl(&AmmeterModel{})
  351. config.GetDB().CreateTbl(&meterPersist{})
  352. }