mirror of
https://github.com/openziti/zrok.git
synced 2025-02-17 02:30:50 +01:00
working (but not correct) values for account and environment rx/tx (#271)
This commit is contained in:
parent
e3222555e2
commit
6813f3a336
@ -40,15 +40,36 @@ func (a *Agent) Stop() {
|
|||||||
|
|
||||||
func (a *Agent) Handle(u *metrics.Usage) error {
|
func (a *Agent) Handle(u *metrics.Usage) error {
|
||||||
logrus.Infof("handling: %v", u)
|
logrus.Infof("handling: %v", u)
|
||||||
rxTotal, err := a.ifx.totalRxForShare(u.ShareToken, 24*time.Hour)
|
acctRx, err := a.ifx.totalRxForAccount(u.AccountId, 24*time.Hour)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.Error(err)
|
logrus.Error(err)
|
||||||
}
|
}
|
||||||
txTotal, err := a.ifx.totalTxForShare(u.ShareToken, 24*time.Hour)
|
acctTx, err := a.ifx.totalTxForAccount(u.AccountId, 24*time.Hour)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.Error(err)
|
logrus.Error(err)
|
||||||
}
|
}
|
||||||
logrus.Infof("'%v': {rx: %v, tx: %v}", u.ShareToken, util.BytesToSize(rxTotal), util.BytesToSize(txTotal))
|
envRx, err := a.ifx.totalRxForEnvironment(u.EnvironmentId, 24*time.Hour)
|
||||||
|
if err != nil {
|
||||||
|
logrus.Error(err)
|
||||||
|
}
|
||||||
|
envTx, err := a.ifx.totalTxForEnvironment(u.EnvironmentId, 24*time.Hour)
|
||||||
|
if err != nil {
|
||||||
|
logrus.Error(err)
|
||||||
|
}
|
||||||
|
shareRx, err := a.ifx.totalRxForShare(u.ShareToken, 24*time.Hour)
|
||||||
|
if err != nil {
|
||||||
|
logrus.Error(err)
|
||||||
|
}
|
||||||
|
shareTx, err := a.ifx.totalTxForShare(u.ShareToken, 24*time.Hour)
|
||||||
|
if err != nil {
|
||||||
|
logrus.Error(err)
|
||||||
|
}
|
||||||
|
logrus.Infof("'%v': acct:{rx: %v, tx: %v}, env:{rx: %v, tx: %v}, share:{rx: %v, tx: %v}",
|
||||||
|
u.ShareToken,
|
||||||
|
util.BytesToSize(acctRx), util.BytesToSize(acctTx),
|
||||||
|
util.BytesToSize(envRx), util.BytesToSize(envTx),
|
||||||
|
util.BytesToSize(shareRx), util.BytesToSize(shareTx),
|
||||||
|
)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -24,6 +24,50 @@ func newInfluxReader(cfg *metrics.InfluxConfig) *influxReader {
|
|||||||
return &influxReader{cfg, idb, queryApi}
|
return &influxReader{cfg, idb, queryApi}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *influxReader) totalRxForAccount(acctId int64, duration time.Duration) (int64, error) {
|
||||||
|
query := fmt.Sprintf("from(bucket: \"%v\")\n", r.cfg.Bucket) +
|
||||||
|
fmt.Sprintf("|> range(start: -%v)\n", duration) +
|
||||||
|
"|> filter(fn: (r) => r[\"_measurement\"] == \"xfer\")\n" +
|
||||||
|
"|> filter(fn: (r) => r[\"_field\"] == \"bytesRead\")\n" +
|
||||||
|
"|> filter(fn: (r) => r[\"namespace\"] == \"backend\")\n" +
|
||||||
|
fmt.Sprintf("|> filter(fn: (r) => r[\"acctId\"] == \"%d\")\n", acctId) +
|
||||||
|
"|> sum()"
|
||||||
|
return r.runQueryForSum(query)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *influxReader) totalTxForAccount(acctId int64, duration time.Duration) (int64, error) {
|
||||||
|
query := fmt.Sprintf("from(bucket: \"%v\")\n", r.cfg.Bucket) +
|
||||||
|
fmt.Sprintf("|> range(start: -%v)\n", duration) +
|
||||||
|
"|> filter(fn: (r) => r[\"_measurement\"] == \"xfer\")\n" +
|
||||||
|
"|> filter(fn: (r) => r[\"_field\"] == \"bytesWritten\")\n" +
|
||||||
|
"|> filter(fn: (r) => r[\"namespace\"] == \"backend\")\n" +
|
||||||
|
fmt.Sprintf("|> filter(fn: (r) => r[\"acctId\"] == \"%d\")\n", acctId) +
|
||||||
|
"|> sum()"
|
||||||
|
return r.runQueryForSum(query)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *influxReader) totalRxForEnvironment(envId int64, duration time.Duration) (int64, error) {
|
||||||
|
query := fmt.Sprintf("from(bucket: \"%v\")\n", r.cfg.Bucket) +
|
||||||
|
fmt.Sprintf("|> range(start: -%v)\n", duration) +
|
||||||
|
"|> filter(fn: (r) => r[\"_measurement\"] == \"xfer\")\n" +
|
||||||
|
"|> filter(fn: (r) => r[\"_field\"] == \"bytesRead\")\n" +
|
||||||
|
"|> filter(fn: (r) => r[\"namespace\"] == \"backend\")\n" +
|
||||||
|
fmt.Sprintf("|> filter(fn: (r) => r[\"envId\"] == \"%d\")\n", envId) +
|
||||||
|
"|> sum()"
|
||||||
|
return r.runQueryForSum(query)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *influxReader) totalTxForEnvironment(envId int64, duration time.Duration) (int64, error) {
|
||||||
|
query := fmt.Sprintf("from(bucket: \"%v\")\n", r.cfg.Bucket) +
|
||||||
|
fmt.Sprintf("|> range(start: -%v)\n", duration) +
|
||||||
|
"|> filter(fn: (r) => r[\"_measurement\"] == \"xfer\")\n" +
|
||||||
|
"|> filter(fn: (r) => r[\"_field\"] == \"bytesWritten\")\n" +
|
||||||
|
"|> filter(fn: (r) => r[\"namespace\"] == \"backend\")\n" +
|
||||||
|
fmt.Sprintf("|> filter(fn: (r) => r[\"envId\"] == \"%d\")\n", envId) +
|
||||||
|
"|> sum()"
|
||||||
|
return r.runQueryForSum(query)
|
||||||
|
}
|
||||||
|
|
||||||
func (r *influxReader) totalRxForShare(shrToken string, duration time.Duration) (int64, error) {
|
func (r *influxReader) totalRxForShare(shrToken string, duration time.Duration) (int64, error) {
|
||||||
query := fmt.Sprintf("from(bucket: \"%v\")\n", r.cfg.Bucket) +
|
query := fmt.Sprintf("from(bucket: \"%v\")\n", r.cfg.Bucket) +
|
||||||
fmt.Sprintf("|> range(start: -%v)\n", duration) +
|
fmt.Sprintf("|> range(start: -%v)\n", duration) +
|
||||||
|
Loading…
Reference in New Issue
Block a user