package drviers import ( "encoding/hex" "errors" "fmt" "reflect" "strconv" "strings" "time" bus "github.com/ammeter/Bus" "github.com/ammeter/config" "github.com/yuguorong/go/log" ) const ( DEF_SAMPLE_PERIOD = 10 * time.Minute DEF_SAMPLE_PEER_DURATION = 60 * time.Second DEF_TCP_READ_TIMEOUT = 30 * time.Second POSTPONE_PERSIST_TIME = 5 * time.Minute ) //amList map[string]*Ammeter //amList : key=[code + address], each meter id must be unique, otherwise ammeter would not work correctly! //Careful! address from cloud may not real address. software varAddress functon would change it to real address type AmmeterModel struct { Id string `json:"id"` DevName string `json:"devName" gorm:"primary_key; unique"` Code string `json:"code" ` DutSn string `json:"dutSn" ` Address string `json:"address" ` Protocol string `json:"protocol" ` GwDevId string `json:"gwDevId" ` TransRadio string `json:"transformerRatio" ` Type string `json:"-" ` } type meterPersist struct { DevName string `json:"devName" gorm:"primary_key; unique"` TsSample int64 `gorm:"autoUpdateTime"` TotalEnergy float64 DBStatus int32 } type Ammeter struct { AmmeterModel TsSample int64 TotalEnergy float64 TransDeno float64 TransDiv float64 varAddr string DBStatus int32 TsUpdate int64 } type AmMeterHub struct { baseDevice DutSn string DevCompCode string ` gorm:"-"` Protocol string chnExit chan bool loopserver bool sampleTmr *time.Timer amIndex []string //avoid map chaos order,even map read only amList map[string]*Ammeter //key=[code + address], each meter id must be unique persitList map[string]*meterPersist tmrPersist *time.Timer queryIdx int } func (dev *AmMeterHub) Close() error { dev.sampleTmr.Stop() dev.loopserver = false dev.chnExit <- true dev.baseDevice.Close() return nil } func (dev *AmMeterHub) OnAttach(chn bus.IChannel) { log.Info(dev.DutSn, " Attached!") chn.SetTimeout(DEF_TCP_READ_TIMEOUT) dev.sampleTmr.Reset(DEF_SAMPLE_PEER_DURATION) dev.baseDevice.OnAttach(chn) } func (dev *AmMeterHub) OnDetach(chn bus.IChannel) { log.Info(dev.DutSn, " Detached!") dev.sampleTmr.Stop() dev.baseDevice.OnDetach(chn) } func (dev *AmMeterHub) ChannelDispatch(stream []byte, args interface{}) bus.ChnDispResult { if len(stream) > 8 { if stream[0] == 0x68 && stream[7] == 0x68 && stream[8] == 0x93 { devname := "" mret := dev.Route[1].iproto.ParsePacket(stream, &devname) if mret != nil && devname == dev.DutSn { return bus.DispatchSingle } } return bus.DispatchNone } if ret := dev.baseDevice.ChannelDispatch(stream, args); ret != bus.DispatchNone { return ret } sid := hex.EncodeToString(stream) for _, am := range dev.amList { if am.DutSn == sid { log.Info("MOUNT meter: ", am.DutSn, ",", sid) return bus.DispatchSingle } } return bus.DispatchNone } func (dev *AmMeterHub) IssueSampleCmd(tmrTx *time.Timer) (pam *Ammeter) { pam = dev.amList[dev.amIndex[dev.queryIdx]] pack := dev.Route[1].iproto.PackageCmd("TotalActivePower", pam.Code+pam.Address) dev.Route[1].ibus.Send(dev.Route[1].iChn, pack) tmrTx.Reset(1 * time.Second) return pam } func (dev *AmMeterHub) SchedulNextSample(tmrTx *time.Timer) { tmrTx.Stop() dev.queryIdx++ if dev.queryIdx >= len(dev.amList) { dev.queryIdx = 0 dev.sampleTmr.Reset(DEF_SAMPLE_PERIOD) } else { dev.sampleTmr.Reset(DEF_SAMPLE_PEER_DURATION) } } func (dev *AmMeterHub) AdjustTelemetry(mval interface{}, am *Ammeter) { vlist := mval.(map[string]interface{}) for k, v := range vlist { if reflect.TypeOf(v).Kind() == reflect.Float64 { valItem := v.(float64) vlist[k] = (valItem * am.TransDeno) / am.TransDiv } } if val, has := vlist["TotalActivePower"]; has { diff := val.(float64) - am.TotalEnergy if am.DBStatus == 0 { //数据库清0 diff = 0 am.DBStatus = 1 } am.TotalEnergy = val.(float64) vlist["ActivePowerIncrement"] = diff if len(dev.persitList) == 0 { dev.tmrPersist.Reset(POSTPONE_PERSIST_TIME) } energe := &meterPersist{ TotalEnergy: am.TotalEnergy, TsSample: am.TsSample, DBStatus: am.DBStatus, DevName: am.DevName, } dev.persitList[am.DevName] = energe } } func (dev *AmMeterHub) Run() error { dev.loopserver = true tmrTxTimout := time.NewTimer(5 * time.Second) dev.tmrPersist = time.NewTimer(POSTPONE_PERSIST_TIME) tmrTxTimout.Stop() //var pam *Ammeter = nil for dev.loopserver { select { case msg := <-dev.Route[1].router[0]: if reflect.TypeOf(msg).Kind() == reflect.Slice { b := msg.([]byte) log.Info("[", dev.DutSn, "]:", hex.EncodeToString(b)) //[9521003697]:fefefefe68040042050821689108333333338b694c331c16 devid := "" mret := dev.Route[1].iproto.ParsePacket(b, &devid) if mret != nil && reflect.TypeOf(mret).Kind() == reflect.Map && devid != "" { log.Info(devid, mret) //210805420004 map[TotalActivePower:1936.58] if am, has := dev.amList[devid]; has { dev.AdjustTelemetry(mret, am) telemetry := dev.Route[0].iproto.PackageCmd(am.DevName, mret) dev.Route[0].ibus.Send(dev.Route[0].iChn, telemetry) am.TsSample = time.Now().Unix() } dev.SchedulNextSample(tmrTxTimout) } } case <-tmrTxTimout.C: dev.SchedulNextSample(tmrTxTimout) case <-dev.sampleTmr.C: dev.IssueSampleCmd(tmrTxTimout) case <-dev.chnExit: dev.loopserver = false case <-dev.tmrPersist.C: for k, eng := range dev.persitList { config.GetDB().Save(eng) delete(dev.persitList, k) } } } return nil } func DutchanDispatch(rxin interface{}, param interface{}) interface{} { log.Info("DutchanDispatch") b := rxin.([]byte) if len(b) == 8 { return hex.EncodeToString(b) } return nil } func (dev *AmMeterHub) Open(param ...interface{}) error { if dev.DutSn != "" { if dev.SetRoute(dev, dev.Protocol, "dtuFtpServer", dev.DutSn, 0) == 0 { return errors.New("can not set default rout!") } r := dev.Route[len(dev.Route)-1] r.ibus.ApplyPatch("PassiveAddress", r.iproto.PackageCmd("ReadAddress")) } dev.sampleTmr = time.NewTimer(DEF_SAMPLE_PERIOD) dev.sampleTmr.Stop() go dev.Run() return nil } func (dev *AmMeterHub) GetDevice(devname string) interface{} { for _, am := range dev.amList { if am.DevName == devname { return dev } } return nil } func (dev *AmMeterHub) ListDevices() interface{} { mdev := make(map[string]interface{}) mdev["State"] = dev.Status dinfo := make(map[string]interface{}) for _, subdev := range dev.amList { meterval := make(map[string]string) meterval["TotalEnergy"] = fmt.Sprintf("%f", subdev.TotalEnergy) if subdev.TsSample != 0 { meterval["TsSample"] = time.Unix(subdev.TsSample, 0).Format("2006-01-02 15:04:05") } dinfo[subdev.DevName] = meterval } mdev["Sub Meters"] = dinfo return mdev } /* [{"code":"26462285","devName":"SAIR10-0000000026462285","gwDevId":"1ec2d8421b2ed30bf52b38d8579115b","id":"1ec2d84cc494690bf52b38d8579115b","protocol":"HJ212"}, {"code":"61748803","devName":"SAIR10-0000000061748803","gwDevId":"1ec2d8421b2ed30bf52b38d8579115b","id":"1ec2d84eb428160bf52b38d8579115b","protocol":"HJ212"}] {"address":"05420001","code":"2108","devName":"AM10-2108054200019521","dutSn":"9521003712","gwDevId":"1ec2d8421b2ed30bf52b38d8579115b","id":"1ec2de6df5a2e40bf52b38d8579115b","protocol":"DLT645-2007","transformerRatio":"80"}, */ type AmmeterDrv struct { baseDriver } func (drv *AmmeterDrv) ParseTransRatio(dev *Ammeter) { Trans := strings.Split(dev.TransRadio, "/") dev.TransDeno, _ = strconv.ParseFloat(Trans[0], 64) if len(Trans) > 1 { dev.TransDiv, _ = strconv.ParseFloat(Trans[1], 64) } } func (drv *AmmeterDrv) varyAdress(m *AmmeterModel, addr string, subid string) { defer func() { recover() }() iaddr, _ := strconv.Atoi(addr) isub, _ := strconv.Atoi(subid) if iaddr != 0 && isub > 0 { iaddr = iaddr + (isub-1)*3 m.Address = strconv.Itoa(iaddr) } } func (drv *AmmeterDrv) mountHub(m *AmmeterModel, list *[]IDevice) IDevice { devCompID := m.DutSn if m.DutSn == "" || m.DutSn == "0000" || len(m.DutSn) < 10 { addr := m.Address ss := strings.Split(addr, "-") m.DutSn = m.Code + ss[0] devCompID = "0000" if len(ss) > 1 { drv.varyAdress(m, ss[0], ss[1]) } } if hub, has := drv.DevList[m.DutSn]; has { return hub } hub := &AmMeterHub{ DutSn: m.DutSn, Protocol: m.Protocol, chnExit: make(chan bool), loopserver: false, sampleTmr: nil, amList: make(map[string]*Ammeter), persitList: make(map[string]*meterPersist), queryIdx: 0, DevCompCode: devCompID, } hub.Probe(m.DutSn, drv) drv.DevList[m.DutSn] = hub *list = append(*list, hub) return hub } func (drv *AmmeterDrv) CreateDevice(model interface{}) (int, []IDevice) { ts := time.Now().UnixMilli() devlist := make([]IDevice, 0) if model != nil { devModels := model.(*[]AmmeterModel) for _, mod := range *devModels { if len(mod.Code) < 4 || len(mod.Address) < 8 { continue } //AmMeterHub is device!!! communication target is the hub hub := drv.mountHub(&mod, &devlist) id := mod.Code + mod.Address am, has := hub.(*AmMeterHub).amList[id] if !has { eng := &meterPersist{} config.GetDB().Find(eng, "dev_name='"+mod.DevName+"'") am = &Ammeter{ TotalEnergy: eng.TotalEnergy, DBStatus: eng.DBStatus, TsSample: eng.TsSample, TransDiv: 1, TransDeno: 1, } am.AmmeterModel = mod drv.ParseTransRatio(am) hub.(*AmMeterHub).amList[id] = am } am.TsUpdate = ts } } for _, hub := range drv.DevList { hub.(*AmMeterHub).amIndex = make([]string, 0) for id, am := range hub.(*AmMeterHub).amList { if am.TsUpdate != ts { delete(hub.(*AmMeterHub).amList, id) log.Info("Delete ammeter:", id) } else { hub.(*AmMeterHub).amIndex = append(hub.(*AmMeterHub).amIndex, id) } } } if len(drv.DevList) == 0 { drv.Uninstall() } return len(devlist), devlist } func (drv *AmmeterDrv) GetModel() interface{} { return &[]AmmeterModel{} } func NewAmMeter(param interface{}) IDriver { am := new(AmmeterDrv) return am } func init() { driverReg["ammeter"] = NewAmMeter config.GetDB().CreateTbl(&AmmeterModel{}) config.GetDB().CreateTbl(&meterPersist{}) }