mirror of
https://github.com/openziti/zrok.git
synced 2024-12-27 00:58:49 +01:00
better account and environment total efficiency and accuracy; 'bytesRead'/'bytesWritten' -> 'rx'/'tx' (#271)
This commit is contained in:
parent
8bf6875c3f
commit
40ae2da2c9
@ -40,27 +40,15 @@ func (a *Agent) Stop() {
|
||||
|
||||
func (a *Agent) Handle(u *metrics.Usage) error {
|
||||
logrus.Infof("handling: %v", u)
|
||||
acctRx, err := a.ifx.totalRxForAccount(u.AccountId, 24*time.Hour)
|
||||
acctRx, acctTx, err := a.ifx.totalForAccount(u.AccountId, 24*time.Hour)
|
||||
if err != nil {
|
||||
logrus.Error(err)
|
||||
}
|
||||
acctTx, err := a.ifx.totalTxForAccount(u.AccountId, 24*time.Hour)
|
||||
envRx, envTx, err := a.ifx.totalForEnvironment(u.EnvironmentId, 24*time.Hour)
|
||||
if err != nil {
|
||||
logrus.Error(err)
|
||||
}
|
||||
envRx, err := a.ifx.totalRxForEnvironment(u.EnvironmentId, 24*time.Hour)
|
||||
if err != nil {
|
||||
logrus.Error(err)
|
||||
}
|
||||
envTx, err := a.ifx.totalTxForEnvironment(u.EnvironmentId, 24*time.Hour)
|
||||
if err != nil {
|
||||
logrus.Error(err)
|
||||
}
|
||||
shareRx, err := a.ifx.totalRxForShare(u.ShareToken, 24*time.Hour)
|
||||
if err != nil {
|
||||
logrus.Error(err)
|
||||
}
|
||||
shareTx, err := a.ifx.totalTxForShare(u.ShareToken, 24*time.Hour)
|
||||
shareRx, shareTx, err := a.ifx.totalForShare(u.ShareToken, 24*time.Hour)
|
||||
if err != nil {
|
||||
logrus.Error(err)
|
||||
}
|
||||
@ -81,7 +69,7 @@ mainLoop:
|
||||
for {
|
||||
select {
|
||||
case <-time.After(a.cfg.Cycle):
|
||||
logrus.Info("insepection cycle")
|
||||
logrus.Info("inspection cycle")
|
||||
|
||||
case <-a.close:
|
||||
close(a.join)
|
||||
|
@ -24,90 +24,65 @@ func newInfluxReader(cfg *metrics.InfluxConfig) *influxReader {
|
||||
return &influxReader{cfg, idb, queryApi}
|
||||
}
|
||||
|
||||
func (r *influxReader) totalRxForAccount(acctId int64, duration time.Duration) (int64, error) {
|
||||
func (r *influxReader) totalForAccount(acctId int64, duration time.Duration) (int64, int64, error) {
|
||||
query := fmt.Sprintf("from(bucket: \"%v\")\n", r.cfg.Bucket) +
|
||||
fmt.Sprintf("|> range(start: -%v)\n", duration) +
|
||||
"|> filter(fn: (r) => r[\"_measurement\"] == \"xfer\")\n" +
|
||||
"|> filter(fn: (r) => r[\"_field\"] == \"bytesRead\")\n" +
|
||||
"|> filter(fn: (r) => r[\"_field\"] == \"rx\" or r[\"_field\"] == \"tx\")\n" +
|
||||
"|> filter(fn: (r) => r[\"namespace\"] == \"backend\")\n" +
|
||||
fmt.Sprintf("|> filter(fn: (r) => r[\"acctId\"] == \"%d\")\n", acctId) +
|
||||
"|> set(key: \"share\", value: \"*\")\n" +
|
||||
"|> drop(columns: [\"share\", \"envId\"])\n" +
|
||||
"|> sum()"
|
||||
return r.runQueryForSum(query)
|
||||
}
|
||||
|
||||
func (r *influxReader) totalTxForAccount(acctId int64, duration time.Duration) (int64, error) {
|
||||
func (r *influxReader) totalForEnvironment(envId int64, duration time.Duration) (int64, int64, error) {
|
||||
query := fmt.Sprintf("from(bucket: \"%v\")\n", r.cfg.Bucket) +
|
||||
fmt.Sprintf("|> range(start: -%v)\n", duration) +
|
||||
"|> filter(fn: (r) => r[\"_measurement\"] == \"xfer\")\n" +
|
||||
"|> filter(fn: (r) => r[\"_field\"] == \"bytesWritten\")\n" +
|
||||
"|> filter(fn: (r) => r[\"namespace\"] == \"backend\")\n" +
|
||||
fmt.Sprintf("|> filter(fn: (r) => r[\"acctId\"] == \"%d\")\n", acctId) +
|
||||
"|> set(key: \"share\", value: \"*\")\n" +
|
||||
"|> sum()"
|
||||
return r.runQueryForSum(query)
|
||||
}
|
||||
|
||||
func (r *influxReader) totalRxForEnvironment(envId int64, duration time.Duration) (int64, error) {
|
||||
query := fmt.Sprintf("from(bucket: \"%v\")\n", r.cfg.Bucket) +
|
||||
fmt.Sprintf("|> range(start: -%v)\n", duration) +
|
||||
"|> filter(fn: (r) => r[\"_measurement\"] == \"xfer\")\n" +
|
||||
"|> filter(fn: (r) => r[\"_field\"] == \"bytesRead\")\n" +
|
||||
"|> filter(fn: (r) => r[\"_field\"] == \"rx\" or r[\"_field\"] == \"tx\")\n" +
|
||||
"|> filter(fn: (r) => r[\"namespace\"] == \"backend\")\n" +
|
||||
fmt.Sprintf("|> filter(fn: (r) => r[\"envId\"] == \"%d\")\n", envId) +
|
||||
"|> set(key: \"share\", value: \"*\")\n" +
|
||||
"|> drop(columns: [\"share\", \"acctId\"])\n" +
|
||||
"|> sum()"
|
||||
return r.runQueryForSum(query)
|
||||
}
|
||||
|
||||
func (r *influxReader) totalTxForEnvironment(envId int64, duration time.Duration) (int64, error) {
|
||||
func (r *influxReader) totalForShare(shrToken string, duration time.Duration) (int64, int64, error) {
|
||||
query := fmt.Sprintf("from(bucket: \"%v\")\n", r.cfg.Bucket) +
|
||||
fmt.Sprintf("|> range(start: -%v)\n", duration) +
|
||||
"|> filter(fn: (r) => r[\"_measurement\"] == \"xfer\")\n" +
|
||||
"|> filter(fn: (r) => r[\"_field\"] == \"bytesWritten\")\n" +
|
||||
"|> filter(fn: (r) => r[\"namespace\"] == \"backend\")\n" +
|
||||
fmt.Sprintf("|> filter(fn: (r) => r[\"envId\"] == \"%d\")\n", envId) +
|
||||
"|> set(key: \"share\", value: \"*\")\n" +
|
||||
"|> sum()"
|
||||
return r.runQueryForSum(query)
|
||||
}
|
||||
|
||||
func (r *influxReader) totalRxForShare(shrToken string, duration time.Duration) (int64, error) {
|
||||
query := fmt.Sprintf("from(bucket: \"%v\")\n", r.cfg.Bucket) +
|
||||
fmt.Sprintf("|> range(start: -%v)\n", duration) +
|
||||
"|> filter(fn: (r) => r[\"_measurement\"] == \"xfer\")\n" +
|
||||
"|> filter(fn: (r) => r[\"_field\"] == \"bytesRead\")\n" +
|
||||
"|> filter(fn: (r) => r[\"_field\"] == \"rx\" or r[\"_field\"] == \"tx\")\n" +
|
||||
"|> filter(fn: (r) => r[\"namespace\"] == \"backend\")\n" +
|
||||
fmt.Sprintf("|> filter(fn: (r) => r[\"share\"] == \"%v\")\n", shrToken) +
|
||||
"|> sum()"
|
||||
return r.runQueryForSum(query)
|
||||
}
|
||||
|
||||
func (r *influxReader) totalTxForShare(shrToken string, duration time.Duration) (int64, error) {
|
||||
query := fmt.Sprintf("from(bucket: \"%v\")\n", r.cfg.Bucket) +
|
||||
fmt.Sprintf("|> range(start: -%v)\n", duration) +
|
||||
"|> filter(fn: (r) => r[\"_measurement\"] == \"xfer\")\n" +
|
||||
"|> filter(fn: (r) => r[\"_field\"] == \"bytesWritten\")\n" +
|
||||
"|> filter(fn: (r) => r[\"namespace\"] == \"backend\")\n" +
|
||||
fmt.Sprintf("|> filter(fn: (r) => r[\"share\"] == \"%v\")\n", shrToken) +
|
||||
"|> sum()"
|
||||
return r.runQueryForSum(query)
|
||||
}
|
||||
|
||||
func (r *influxReader) runQueryForSum(query string) (int64, error) {
|
||||
func (r *influxReader) runQueryForSum(query string) (rx int64, tx int64, err error) {
|
||||
result, err := r.queryApi.Query(context.Background(), query)
|
||||
if err != nil {
|
||||
return -1, err
|
||||
return -1, -1, err
|
||||
}
|
||||
|
||||
if result.Next() {
|
||||
count := 0
|
||||
for result.Next() {
|
||||
if v, ok := result.Record().Value().(int64); ok {
|
||||
return v, nil
|
||||
switch result.Record().Field() {
|
||||
case "tx":
|
||||
tx = v
|
||||
case "rx":
|
||||
rx = v
|
||||
default:
|
||||
logrus.Warnf("field '%v'?", result.Record().Field())
|
||||
}
|
||||
} else {
|
||||
return -1, errors.New("error asserting result type")
|
||||
return -1, -1, errors.New("error asserting value type")
|
||||
}
|
||||
count++
|
||||
}
|
||||
|
||||
logrus.Warnf("empty read result set for '%v'", strings.ReplaceAll(query, "\n", ""))
|
||||
return 0, nil
|
||||
if count != 2 {
|
||||
return -1, -1, errors.Errorf("expected 2 results; got '%d' (%v)", count, strings.ReplaceAll(query, "\n", ""))
|
||||
}
|
||||
return rx, tx, nil
|
||||
}
|
||||
|
@ -37,7 +37,7 @@ func (w *influxWriter) Handle(u *Usage) error {
|
||||
if u.BackendTx > 0 || u.BackendRx > 0 {
|
||||
pt := influxdb2.NewPoint("xfer",
|
||||
map[string]string{"namespace": "backend", "share": u.ShareToken, "envId": envId, "acctId": acctId},
|
||||
map[string]interface{}{"bytesRead": u.BackendRx, "bytesWritten": u.BackendTx},
|
||||
map[string]interface{}{"rx": u.BackendRx, "tx": u.BackendTx},
|
||||
u.IntervalStart)
|
||||
pts = append(pts, pt)
|
||||
out += fmt.Sprintf(" backend {rx: %v, tx: %v}", util.BytesToSize(u.BackendRx), util.BytesToSize(u.BackendTx))
|
||||
@ -45,7 +45,7 @@ func (w *influxWriter) Handle(u *Usage) error {
|
||||
if u.FrontendTx > 0 || u.FrontendRx > 0 {
|
||||
pt := influxdb2.NewPoint("xfer",
|
||||
map[string]string{"namespace": "frontend", "share": u.ShareToken, "envId": envId, "acctId": acctId},
|
||||
map[string]interface{}{"bytesRead": u.FrontendRx, "bytesWritten": u.FrontendTx},
|
||||
map[string]interface{}{"rx": u.FrontendRx, "tx": u.FrontendTx},
|
||||
u.IntervalStart)
|
||||
pts = append(pts, pt)
|
||||
out += fmt.Sprintf(" frontend {rx: %v, tx: %v}", util.BytesToSize(u.FrontendRx), util.BytesToSize(u.FrontendTx))
|
||||
|
@ -19,11 +19,11 @@ func sparkDataForShares(shrs []*store.Share) (map[string][]int64, error) {
|
||||
|
||||
for result.Next() {
|
||||
combinedRate := int64(0)
|
||||
readRate := result.Record().ValueByKey("bytesRead")
|
||||
readRate := result.Record().ValueByKey("tx")
|
||||
if readRate != nil {
|
||||
combinedRate += readRate.(int64)
|
||||
}
|
||||
writeRate := result.Record().ValueByKey("bytesWritten")
|
||||
writeRate := result.Record().ValueByKey("tx")
|
||||
if writeRate != nil {
|
||||
combinedRate += writeRate.(int64)
|
||||
}
|
||||
@ -48,7 +48,7 @@ func sparkFluxQuery(shrs []*store.Share) string {
|
||||
query := "read = from(bucket: \"zrok\")" +
|
||||
"|> range(start: -5m)" +
|
||||
"|> filter(fn: (r) => r[\"_measurement\"] == \"xfer\")" +
|
||||
"|> filter(fn: (r) => r[\"_field\"] == \"bytesRead\" or r[\"_field\"] == \"bytesWritten\")" +
|
||||
"|> filter(fn: (r) => r[\"_field\"] == \"rx\" or r[\"_field\"] == \"tx\")" +
|
||||
"|> filter(fn: (r) => r[\"namespace\"] == \"backend\")" +
|
||||
shrFilter +
|
||||
"|> aggregateWindow(every: 5s, fn: sum, createEmpty: true)\n" +
|
||||
|
Loading…
Reference in New Issue
Block a user