ameter.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406
  1. package drviers
  2. import (
  3. "encoding/hex"
  4. "errors"
  5. "fmt"
  6. "reflect"
  7. "strconv"
  8. "strings"
  9. "time"
  10. bus "github.com/ammeter/Bus"
  11. "github.com/ammeter/config"
  12. "github.com/ammeter/util"
  13. "github.com/yuguorong/go/log"
  14. )
  15. const (
  16. DEF_SAMPLE_PERIOD = 10 * time.Minute
  17. DEF_SAMPLE_PEER_DURATION = 60 * time.Second
  18. DEF_TCP_READ_TIMEOUT = 30 * time.Second
  19. POSTPONE_PERSIST_TIME = 5 * time.Minute
  20. )
  21. //amList map[string]*Ammeter
  22. //amList : key=[code + address], each meter id must be unique, otherwise ammeter would not work correctly!
  23. //Careful! address from cloud may not real address. software varAddress functon would change it to real address
  24. type AmmeterModel struct {
  25. Id string `json:"id"`
  26. DevName string `json:"devName" gorm:"primary_key; unique"`
  27. Code string `json:"code" `
  28. DutSn string `json:"dutSn" `
  29. Address string `json:"address" `
  30. Protocol string `json:"protocol" `
  31. GwDevId string `json:"gwDevId" `
  32. TransRadio string `json:"transformerRatio" `
  33. Type string `json:"-" `
  34. }
  35. type meterPersist struct {
  36. DevName string `json:"devName" gorm:"primary_key; unique"`
  37. TsSample int64 `gorm:"autoUpdateTime"`
  38. TotalEnergy float64
  39. DBStatus int32
  40. }
  41. type Ammeter struct {
  42. AmmeterModel
  43. TsSample int64
  44. TotalEnergy float64
  45. TransDeno float64
  46. TransDiv float64
  47. varAddr string
  48. DBStatus int32
  49. TsUpdate int64
  50. }
  51. type AmMeterHub struct {
  52. baseDevice
  53. DutSn string
  54. DevCompCode string ` gorm:"-"`
  55. Protocol string
  56. chnExit chan bool
  57. loopserver bool
  58. sampleTmr *time.Timer
  59. amIndex []string //avoid map chaos order,even map read only
  60. amList map[string]*Ammeter //key=[code + address], each meter id must be unique
  61. persitList map[string]*meterPersist
  62. tmrPersist *time.Timer
  63. queryIdx int
  64. }
  65. func (dev *AmMeterHub) Close() error {
  66. dev.sampleTmr.Stop()
  67. dev.loopserver = false
  68. dev.chnExit <- true
  69. dev.baseDevice.Close()
  70. return nil
  71. }
  72. func (dev *AmMeterHub) OnAttach(chn bus.IChannel) {
  73. log.Info(dev.DutSn, " Attached!")
  74. chn.SetTimeout(DEF_TCP_READ_TIMEOUT)
  75. dev.sampleTmr.Reset(DEF_SAMPLE_PEER_DURATION)
  76. dev.baseDevice.OnAttach(chn)
  77. }
  78. func (dev *AmMeterHub) OnDetach(chn bus.IChannel) {
  79. log.Info(dev.DutSn, " Detached!")
  80. dev.sampleTmr.Stop()
  81. dev.baseDevice.OnDetach(chn)
  82. }
  83. func (dev *AmMeterHub) ChannelDispatch(stream []byte, args interface{}) bus.ChnDispResult {
  84. if len(stream) > 8 {
  85. for i := 0; i < len(stream); i++ {
  86. if stream[i] != 0xFE {
  87. stream = stream[i:]
  88. break
  89. }
  90. }
  91. if stream[0] == 0x68 && stream[7] == 0x68 && stream[8] == 0x93 {
  92. devname := ""
  93. mret := dev.Route[1].iproto.ParsePacket(stream, &devname)
  94. if mret != nil && devname == dev.DutSn {
  95. return bus.DispatchSingle
  96. }
  97. }
  98. return bus.DispatchNone
  99. }
  100. if ret := dev.baseDevice.ChannelDispatch(stream, args); ret != bus.DispatchNone {
  101. return ret
  102. }
  103. sid := hex.EncodeToString(stream)
  104. for _, am := range dev.amList {
  105. if am.DutSn == sid {
  106. log.Info("MOUNT meter: ", am.DutSn, ",", sid)
  107. return bus.DispatchSingle
  108. }
  109. }
  110. return bus.DispatchNone
  111. }
  112. func (dev *AmMeterHub) IssueSampleCmd(tmrTx *time.Timer) (pam *Ammeter) {
  113. pam = dev.amList[dev.amIndex[dev.queryIdx]]
  114. pack := dev.Route[1].iproto.PackageCmd("TotalActivePower", pam.Code+pam.Address)
  115. if _, err := dev.Route[1].ibus.Send(dev.Route[1].iChn, pack); err != nil {
  116. dev.OnDetach(dev.Route[1].iChn)
  117. } else {
  118. tmrTx.Reset(1 * time.Second)
  119. }
  120. return pam
  121. }
  122. func (dev *AmMeterHub) SchedulNextSample(tmrTx *time.Timer) {
  123. tmrTx.Stop()
  124. dev.queryIdx++
  125. if dev.queryIdx >= len(dev.amList) {
  126. dev.queryIdx = 0
  127. dev.sampleTmr.Reset(DEF_SAMPLE_PERIOD)
  128. } else {
  129. dev.sampleTmr.Reset(DEF_SAMPLE_PEER_DURATION)
  130. }
  131. }
  132. func (dev *AmMeterHub) AdjustTelemetry(mval interface{}, am *Ammeter) {
  133. vlist := mval.(map[string]interface{})
  134. for k, v := range vlist {
  135. if reflect.TypeOf(v).Kind() == reflect.Float64 {
  136. valItem := v.(float64)
  137. vlist[k] = (valItem * am.TransDeno) / am.TransDiv
  138. }
  139. }
  140. if val, has := vlist["TotalActivePower"]; has {
  141. v := val.(float64)
  142. if v < 0.001 {
  143. v = 0.001 //hack for cloud query
  144. }
  145. diff := v - am.TotalEnergy
  146. if am.DBStatus == 0 { //数据库清0
  147. diff = 0.001 //hack for cloud query
  148. am.DBStatus = 1
  149. }
  150. am.TotalEnergy = v
  151. vlist["ActivePowerIncrement"] = diff
  152. if len(dev.persitList) == 0 {
  153. dev.tmrPersist.Reset(POSTPONE_PERSIST_TIME)
  154. }
  155. energe := &meterPersist{
  156. TotalEnergy: am.TotalEnergy,
  157. TsSample: am.TsSample,
  158. DBStatus: am.DBStatus,
  159. DevName: am.DevName,
  160. }
  161. dev.persitList[am.DevName] = energe
  162. }
  163. // 增加在线状态上报
  164. vlist["onOffLine"] = 1
  165. }
  166. func (dev *AmMeterHub) Run() error {
  167. dev.loopserver = true
  168. tmrTxTimout := time.NewTimer(5 * time.Second)
  169. dev.tmrPersist = time.NewTimer(POSTPONE_PERSIST_TIME)
  170. tmrTxTimout.Stop()
  171. //var pam *Ammeter = nil
  172. for dev.loopserver {
  173. select {
  174. case msg := <-dev.Route[1].router[0]:
  175. if reflect.TypeOf(msg).Kind() == reflect.Slice {
  176. b := msg.([]byte)
  177. log.Info("[", dev.DutSn, "]:", hex.EncodeToString(b)) //[9521003697]:fefefefe68040042050821689108333333338b694c331c16
  178. devid := ""
  179. mret := dev.Route[1].iproto.ParsePacket(b, &devid)
  180. if mret != nil && reflect.TypeOf(mret).Kind() == reflect.Map && devid != "" {
  181. log.Info(devid, mret) //210805420004 map[TotalActivePower:1936.58]
  182. if am, has := dev.amList[devid]; has {
  183. dev.AdjustTelemetry(mret, am)
  184. telemetry := dev.Route[0].iproto.PackageCmd(am.DevName, mret)
  185. dev.Route[0].ibus.Send(dev.Route[0].iChn, telemetry)
  186. am.TsSample = time.Now().Unix()
  187. }
  188. dev.SchedulNextSample(tmrTxTimout)
  189. }
  190. } else if reflect.TypeOf(msg).Kind() == reflect.String {
  191. if msg.(string) == "closed" {
  192. dev.OnDetach(dev.Route[1].iChn)
  193. }
  194. }
  195. case <-tmrTxTimout.C:
  196. dev.SchedulNextSample(tmrTxTimout)
  197. case <-dev.sampleTmr.C:
  198. dev.IssueSampleCmd(tmrTxTimout)
  199. case <-dev.chnExit:
  200. dev.loopserver = false
  201. case <-dev.tmrPersist.C:
  202. for k, eng := range dev.persitList {
  203. config.GetDB().Save(eng)
  204. delete(dev.persitList, k)
  205. }
  206. }
  207. }
  208. return nil
  209. }
  210. func DutchanDispatch(rxin interface{}, param interface{}) interface{} {
  211. log.Info("DutchanDispatch")
  212. b := rxin.([]byte)
  213. if len(b) == 8 {
  214. return hex.EncodeToString(b)
  215. }
  216. return nil
  217. }
  218. func (dev *AmMeterHub) Open(param ...interface{}) error {
  219. if dev.DutSn != "" {
  220. if dev.SetRoute(dev, dev.Protocol, "dtuFtpServer", dev.DutSn, 0) == 0 {
  221. return errors.New("can not set default rout!")
  222. }
  223. r := dev.Route[len(dev.Route)-1]
  224. r.ibus.ApplyPatch("PassiveAddress", r.iproto.PackageCmd("ReadAddress"))
  225. }
  226. dev.sampleTmr = time.NewTimer(DEF_SAMPLE_PERIOD)
  227. dev.sampleTmr.Stop()
  228. go dev.Run()
  229. return nil
  230. }
  231. func (dev *AmMeterHub) GetDevice(devname string) interface{} {
  232. for _, am := range dev.amList {
  233. if am.DevName == devname {
  234. return dev
  235. }
  236. }
  237. return nil
  238. }
  239. func (dev *AmMeterHub) ListDevices() interface{} {
  240. mdev := make(map[string]interface{})
  241. mdev["State"] = dev.Status
  242. dinfo := make(map[string]interface{})
  243. for _, subdev := range dev.amList {
  244. meterval := make(map[string]string)
  245. meterval["TotalEnergy"] = fmt.Sprintf("%f", subdev.TotalEnergy)
  246. if subdev.TsSample != 0 {
  247. meterval["TsSample"] = time.Unix(subdev.TsSample, 0).Format("2006-01-02 15:04:05")
  248. }
  249. dinfo[subdev.DevName] = meterval
  250. }
  251. mdev["Sub Meters"] = dinfo
  252. return mdev
  253. }
  254. /*
  255. [{"code":"26462285","devName":"SAIR10-0000000026462285","gwDevId":"1ec2d8421b2ed30bf52b38d8579115b","id":"1ec2d84cc494690bf52b38d8579115b","protocol":"HJ212"},
  256. {"code":"61748803","devName":"SAIR10-0000000061748803","gwDevId":"1ec2d8421b2ed30bf52b38d8579115b","id":"1ec2d84eb428160bf52b38d8579115b","protocol":"HJ212"}]
  257. {"address":"05420001","code":"2108","devName":"AM10-2108054200019521","dutSn":"9521003712","gwDevId":"1ec2d8421b2ed30bf52b38d8579115b","id":"1ec2de6df5a2e40bf52b38d8579115b","protocol":"DLT645-2007","transformerRatio":"80"},
  258. */
  259. type AmmeterDrv struct {
  260. baseDriver
  261. }
  262. func (drv *AmmeterDrv) ParseTransRatio(dev *Ammeter) {
  263. Trans := strings.Split(dev.TransRadio, "/")
  264. dev.TransDeno, _ = strconv.ParseFloat(util.NumberFilter(Trans[0], util.NUMBER_CONTINUE), 64)
  265. if len(Trans) > 1 {
  266. dev.TransDiv, _ = strconv.ParseFloat(util.NumberFilter(Trans[1], util.NUMBER_CONTINUE), 64)
  267. }
  268. }
  269. func (drv *AmmeterDrv) varyAdress(m *AmmeterModel, addr string, subid string) {
  270. defer func() {
  271. recover()
  272. }()
  273. iaddr, _ := strconv.Atoi(addr)
  274. isub, _ := strconv.Atoi(subid)
  275. if iaddr != 0 && isub > 0 {
  276. iaddr = iaddr + (isub-1)*3
  277. m.Address = strconv.Itoa(iaddr)
  278. }
  279. }
  280. func (drv *AmmeterDrv) mountHub(m *AmmeterModel, list *[]IDevice) IDevice {
  281. devCompID := m.DutSn
  282. if m.DutSn == "" || m.DutSn == "0000" || len(m.DutSn) < 10 {
  283. addr := m.Address
  284. ss := strings.Split(addr, "-")
  285. m.DutSn = m.Code + ss[0]
  286. devCompID = "0000"
  287. if len(ss) > 1 {
  288. drv.varyAdress(m, ss[0], ss[1])
  289. }
  290. }
  291. if hub, has := drv.DevList[m.DutSn]; has {
  292. return hub
  293. }
  294. hub := &AmMeterHub{
  295. DutSn: m.DutSn,
  296. Protocol: m.Protocol,
  297. chnExit: make(chan bool),
  298. loopserver: false,
  299. sampleTmr: nil,
  300. amList: make(map[string]*Ammeter),
  301. persitList: make(map[string]*meterPersist),
  302. queryIdx: 0,
  303. DevCompCode: devCompID,
  304. }
  305. hub.Probe(m.DutSn, drv)
  306. drv.DevList[m.DutSn] = hub
  307. *list = append(*list, hub)
  308. return hub
  309. }
  310. func (drv *AmmeterDrv) CreateDevice(model interface{}) (int, []IDevice) {
  311. ts := time.Now().UnixMilli()
  312. devlist := make([]IDevice, 0)
  313. if model != nil {
  314. devModels := model.(*[]AmmeterModel)
  315. for _, mod := range *devModels {
  316. if len(mod.Code) < 4 || len(mod.Address) < 8 {
  317. continue
  318. }
  319. //AmMeterHub is device!!! communication target is the hub
  320. hub := drv.mountHub(&mod, &devlist)
  321. id := mod.Code + mod.Address
  322. am, has := hub.(*AmMeterHub).amList[id]
  323. if !has {
  324. eng := &meterPersist{}
  325. config.GetDB().Find(eng, "dev_name='"+mod.DevName+"'")
  326. am = &Ammeter{
  327. TotalEnergy: eng.TotalEnergy,
  328. DBStatus: eng.DBStatus,
  329. TsSample: eng.TsSample,
  330. TransDiv: 1,
  331. TransDeno: 1,
  332. }
  333. am.AmmeterModel = mod
  334. drv.ParseTransRatio(am)
  335. hub.(*AmMeterHub).amList[id] = am
  336. }
  337. am.TsUpdate = ts
  338. }
  339. }
  340. for _, hub := range drv.DevList {
  341. hub.(*AmMeterHub).amIndex = make([]string, 0)
  342. for id, am := range hub.(*AmMeterHub).amList {
  343. if am.TsUpdate != ts {
  344. delete(hub.(*AmMeterHub).amList, id)
  345. log.Info("Delete ammeter:", id)
  346. } else {
  347. hub.(*AmMeterHub).amIndex = append(hub.(*AmMeterHub).amIndex, id)
  348. }
  349. }
  350. }
  351. if len(drv.DevList) == 0 {
  352. drv.Uninstall()
  353. }
  354. return len(devlist), devlist
  355. }
  356. func (drv *AmmeterDrv) GetModel() interface{} {
  357. return &[]AmmeterModel{}
  358. }
  359. func NewAmMeter(param interface{}) IDriver {
  360. am := new(AmmeterDrv)
  361. return am
  362. }
  363. func init() {
  364. driverReg["ammeter"] = NewAmMeter
  365. config.GetDB().CreateTbl(&AmmeterModel{})
  366. config.GetDB().CreateTbl(&meterPersist{})
  367. }