|
- package drviers
- import (
- "encoding/hex"
- "errors"
- "fmt"
- "reflect"
- "strconv"
- "strings"
- "time"
- bus "github.com/ammeter/Bus"
- "github.com/ammeter/config"
- "github.com/ammeter/util"
- "github.com/yuguorong/go/log"
- )
- const (
- DEF_SAMPLE_PERIOD = 10 * time.Minute
- DEF_SAMPLE_PEER_DURATION = 60 * time.Second
- DEF_TCP_READ_TIMEOUT = (DEF_SAMPLE_PERIOD + 10*time.Minute)
- POSTPONE_PERSIST_TIME = 5 * time.Minute
- )
- //amList map[string]*Ammeter
- //amList : key=[code + address], each meter id must be unique, otherwise ammeter would not work correctly!
- //Careful! address from cloud may not real address. software varAddress functon would change it to real address
- type AmmeterModel struct {
- Id string `json:"id"`
- DevName string `json:"devName" gorm:"primary_key; unique"`
- Code string `json:"code" `
- DutSn string `json:"dutSn" `
- Address string `json:"address" `
- Protocol string `json:"protocol" `
- GwDevId string `json:"gwDevId" `
- TransRadio string `json:"transformerRatio" `
- Type string `json:"-" `
- }
- type meterPersist struct {
- DevName string `json:"devName" gorm:"primary_key; unique"`
- TsSample int64 `gorm:"autoUpdateTime"`
- TotalEnergy float64
- DBStatus int32
- }
- type Ammeter struct {
- AmmeterModel
- TsSample int64
- TotalEnergy float64
- TransDeno float64
- TransDiv float64
- varAddr string
- DBStatus int32
- TsUpdate int64
- }
- type AmMeterHub struct {
- baseDevice
- DutSn string
- DevCompCode string ` gorm:"-"`
- Protocol string
- chnExit chan bool
- loopserver bool
- sampleTmr *time.Timer
- amIndex []string //avoid map chaos order,even map read only
- amList map[string]*Ammeter //key=[code + address], each meter id must be unique
- persitList map[string]*meterPersist
- tmrPersist *time.Timer
- queryIdx int
- }
- func (dev *AmMeterHub) Close() error {
- dev.sampleTmr.Stop()
- dev.loopserver = false
- dev.chnExit <- true
- dev.baseDevice.Close()
- return nil
- }
- func (dev *AmMeterHub) OnAttach(chn bus.IChannel) {
- log.Info(dev.DutSn, " Attached!")
- chn.SetTimeout(DEF_TCP_READ_TIMEOUT)
- dev.sampleTmr.Reset(DEF_SAMPLE_PEER_DURATION)
- dev.baseDevice.OnAttach(chn)
- }
- func (dev *AmMeterHub) OnDetach(chn bus.IChannel) {
- log.Info(dev.DutSn, " Detached!")
- dev.sampleTmr.Stop()
- dev.baseDevice.OnDetach(chn)
- }
- func (dev *AmMeterHub) ChannelDispatch(stream []byte, args interface{}) bus.ChnDispResult {
- if len(stream) > 8 {
- for i := 0; i < len(stream); i++ {
- if stream[i] != 0xFE {
- stream = stream[i:]
- break
- }
- }
- if stream[0] == 0x68 && stream[7] == 0x68 && stream[8] == 0x93 {
- devname := ""
- mret := dev.Route[1].iproto.ParsePacket(stream, &devname)
- if mret != nil && devname == dev.DutSn {
- return bus.DispatchSingle
- }
- }
- return bus.DispatchNone
- }
- if ret := dev.baseDevice.ChannelDispatch(stream, args); ret != bus.DispatchNone {
- return ret
- }
- sid := hex.EncodeToString(stream)
- for _, am := range dev.amList {
- if am.DutSn == sid {
- log.Info("MOUNT meter: ", am.DutSn, ",", sid)
- return bus.DispatchSingle
- }
- }
- return bus.DispatchNone
- }
- func (dev *AmMeterHub) IssueSampleCmd(tmrTx *time.Timer) (pam *Ammeter) {
- pam = dev.amList[dev.amIndex[dev.queryIdx]]
- pack := dev.Route[1].iproto.PackageCmd("TotalActivePower", pam.Code+pam.Address)
- if _, err := dev.Route[1].ibus.Send(dev.Route[1].iChn, pack); err != nil {
- dev.OnDetach(dev.Route[1].iChn)
- } else {
- tmrTx.Reset(1 * time.Second)
- }
- return pam
- }
- func (dev *AmMeterHub) SchedulNextSample(tmrTx *time.Timer) {
- tmrTx.Stop()
- dev.queryIdx++
- if dev.queryIdx >= len(dev.amList) {
- dev.queryIdx = 0
- dev.sampleTmr.Reset(DEF_SAMPLE_PERIOD)
- } else {
- dev.sampleTmr.Reset(DEF_SAMPLE_PEER_DURATION)
- }
- }
- func (dev *AmMeterHub) AdjustTelemetry(mval interface{}, am *Ammeter) {
- vlist := mval.(map[string]interface{})
- for k, v := range vlist {
- if reflect.TypeOf(v).Kind() == reflect.Float64 {
- valItem := v.(float64)
- vlist[k] = (valItem * am.TransDeno) / am.TransDiv
- }
- }
- if val, has := vlist["TotalActivePower"]; has {
- v := val.(float64)
- if v < 0.001 {
- v = 0.001 //hack for cloud query
- }
- diff := v - am.TotalEnergy
- if am.DBStatus == 0 { //数据库清0
- diff = 0.001 //hack for cloud query
- am.DBStatus = 1
- }
- am.TotalEnergy = v
- vlist["ActivePowerIncrement"] = diff
- if len(dev.persitList) == 0 {
- dev.tmrPersist.Reset(POSTPONE_PERSIST_TIME)
- }
- energe := &meterPersist{
- TotalEnergy: am.TotalEnergy,
- TsSample: am.TsSample,
- DBStatus: am.DBStatus,
- DevName: am.DevName,
- }
- dev.persitList[am.DevName] = energe
- }
- // 增加在线状态上报
- vlist["onOffLine"] = 1
- }
- func (dev *AmMeterHub) Run() error {
- dev.loopserver = true
- tmrTxTimout := time.NewTimer(5 * time.Second)
- dev.tmrPersist = time.NewTimer(POSTPONE_PERSIST_TIME)
- tmrTxTimout.Stop()
- //var pam *Ammeter = nil
- for dev.loopserver {
- select {
- case msg := <-dev.Route[1].router[0]:
- if reflect.TypeOf(msg).Kind() == reflect.Slice {
- b := msg.([]byte)
- log.Info("[", dev.DutSn, "]:", hex.EncodeToString(b)) //[9521003697]:fefefefe68040042050821689108333333338b694c331c16
- devid := ""
- mret := dev.Route[1].iproto.ParsePacket(b, &devid)
- if mret != nil && reflect.TypeOf(mret).Kind() == reflect.Map && devid != "" {
- log.Info(devid, mret) //210805420004 map[TotalActivePower:1936.58]
- if am, has := dev.amList[devid]; has {
- dev.AdjustTelemetry(mret, am)
- telemetry := dev.Route[0].iproto.PackageCmd(am.DevName, mret)
- dev.Route[0].ibus.Send(dev.Route[0].iChn, telemetry)
- am.TsSample = time.Now().Unix()
- }
- dev.SchedulNextSample(tmrTxTimout)
- }
- } else if reflect.TypeOf(msg).Kind() == reflect.String {
- if msg.(string) == "closed" {
- dev.OnDetach(dev.Route[1].iChn)
- }
- }
- case <-tmrTxTimout.C:
- dev.SchedulNextSample(tmrTxTimout)
- case <-dev.sampleTmr.C:
- dev.IssueSampleCmd(tmrTxTimout)
- case <-dev.chnExit:
- dev.loopserver = false
- case <-dev.tmrPersist.C:
- for k, eng := range dev.persitList {
- config.GetDB().Save(eng)
- delete(dev.persitList, k)
- }
- }
- }
- return nil
- }
- func DutchanDispatch(rxin interface{}, param interface{}) interface{} {
- log.Info("DutchanDispatch")
- b := rxin.([]byte)
- if len(b) == 8 {
- return hex.EncodeToString(b)
- }
- return nil
- }
- func (dev *AmMeterHub) Open(param ...interface{}) error {
- if dev.DutSn != "" {
- if dev.SetRoute(dev, dev.Protocol, "dtuFtpServer", dev.DutSn, 0) == 0 {
- return errors.New("can not set default rout!")
- }
- r := dev.Route[len(dev.Route)-1]
- r.ibus.ApplyPatch("PassiveAddress", r.iproto.PackageCmd("ReadAddress"))
- }
- dev.sampleTmr = time.NewTimer(DEF_SAMPLE_PERIOD)
- dev.sampleTmr.Stop()
- go dev.Run()
- return nil
- }
- func (dev *AmMeterHub) GetDevice(devname string) interface{} {
- for _, am := range dev.amList {
- if am.DevName == devname {
- return dev
- }
- }
- return nil
- }
- func (dev *AmMeterHub) ListDevices() interface{} {
- mdev := make(map[string]interface{})
- mdev["State"] = dev.Status
- dinfo := make(map[string]interface{})
- for _, subdev := range dev.amList {
- meterval := make(map[string]string)
- meterval["TotalEnergy"] = fmt.Sprintf("%f", subdev.TotalEnergy)
- if subdev.TsSample != 0 {
- meterval["TsSample"] = time.Unix(subdev.TsSample, 0).Format("2006-01-02 15:04:05")
- }
- dinfo[subdev.DevName] = meterval
- }
- mdev["Sub Meters"] = dinfo
- return mdev
- }
- /*
- [{"code":"26462285","devName":"SAIR10-0000000026462285","gwDevId":"1ec2d8421b2ed30bf52b38d8579115b","id":"1ec2d84cc494690bf52b38d8579115b","protocol":"HJ212"},
- {"code":"61748803","devName":"SAIR10-0000000061748803","gwDevId":"1ec2d8421b2ed30bf52b38d8579115b","id":"1ec2d84eb428160bf52b38d8579115b","protocol":"HJ212"}]
- {"address":"05420001","code":"2108","devName":"AM10-2108054200019521","dutSn":"9521003712","gwDevId":"1ec2d8421b2ed30bf52b38d8579115b","id":"1ec2de6df5a2e40bf52b38d8579115b","protocol":"DLT645-2007","transformerRatio":"80"},
- */
- type AmmeterDrv struct {
- baseDriver
- }
- func (drv *AmmeterDrv) ParseTransRatio(dev *Ammeter) {
- Trans := strings.Split(dev.TransRadio, "/")
- dev.TransDeno, _ = strconv.ParseFloat(util.NumberFilter(Trans[0], util.NUMBER_CONTINUE), 64)
- if len(Trans) > 1 {
- dev.TransDiv, _ = strconv.ParseFloat(util.NumberFilter(Trans[1], util.NUMBER_CONTINUE), 64)
- }
- }
- func (drv *AmmeterDrv) varyAdress(m *AmmeterModel, addr string, subid string) {
- defer func() {
- recover()
- }()
- iaddr, _ := strconv.Atoi(addr)
- isub, _ := strconv.Atoi(subid)
- if iaddr != 0 && isub > 0 {
- iaddr = iaddr + (isub-1)*3
- m.Address = strconv.Itoa(iaddr)
- }
- }
- func (drv *AmmeterDrv) mountHub(m *AmmeterModel, list *[]IDevice) IDevice {
- devCompID := m.DutSn
- if m.DutSn == "" || m.DutSn == "0000" || len(m.DutSn) < 10 {
- addr := m.Address
- ss := strings.Split(addr, "-")
- m.DutSn = m.Code + ss[0]
- devCompID = "0000"
- if len(ss) > 1 {
- drv.varyAdress(m, ss[0], ss[1])
- }
- }
- if hub, has := drv.DevList[m.DutSn]; has {
- return hub
- }
- hub := &AmMeterHub{
- DutSn: m.DutSn,
- Protocol: m.Protocol,
- chnExit: make(chan bool),
- loopserver: false,
- sampleTmr: nil,
- amList: make(map[string]*Ammeter),
- persitList: make(map[string]*meterPersist),
- queryIdx: 0,
- DevCompCode: devCompID,
- }
- hub.Probe(m.DutSn, drv)
- drv.DevList[m.DutSn] = hub
- *list = append(*list, hub)
- return hub
- }
- func (drv *AmmeterDrv) CreateDevice(model interface{}) (int, []IDevice) {
- ts := time.Now().UnixMilli()
- devlist := make([]IDevice, 0)
- if model != nil {
- devModels := model.(*[]AmmeterModel)
- for _, mod := range *devModels {
- if len(mod.Code) < 4 || len(mod.Address) < 8 {
- continue
- }
- //AmMeterHub is device!!! communication target is the hub
- hub := drv.mountHub(&mod, &devlist)
- id := mod.Code + mod.Address
- am, has := hub.(*AmMeterHub).amList[id]
- if !has {
- eng := &meterPersist{}
- config.GetDB().Find(eng, "dev_name='"+mod.DevName+"'")
- am = &Ammeter{
- TotalEnergy: eng.TotalEnergy,
- DBStatus: eng.DBStatus,
- TsSample: eng.TsSample,
- TransDiv: 1,
- TransDeno: 1,
- }
- am.AmmeterModel = mod
- drv.ParseTransRatio(am)
- hub.(*AmMeterHub).amList[id] = am
- }
- am.TsUpdate = ts
- }
- }
- for _, hub := range drv.DevList {
- hub.(*AmMeterHub).amIndex = make([]string, 0)
- for id, am := range hub.(*AmMeterHub).amList {
- if am.TsUpdate != ts {
- delete(hub.(*AmMeterHub).amList, id)
- log.Info("Delete ammeter:", id)
- } else {
- hub.(*AmMeterHub).amIndex = append(hub.(*AmMeterHub).amIndex, id)
- }
- }
- }
- if len(drv.DevList) == 0 {
- drv.Uninstall()
- }
- return len(devlist), devlist
- }
- func (drv *AmmeterDrv) GetModel() interface{} {
- return &[]AmmeterModel{}
- }
- func NewAmMeter(param interface{}) IDriver {
- am := new(AmmeterDrv)
- return am
- }
- func init() {
- driverReg["ammeter"] = NewAmMeter
- config.GetDB().CreateTbl(&AmmeterModel{})
- config.GetDB().CreateTbl(&meterPersist{})
- }
|