store the usage in influx (#128)

This commit is contained in:
Michael Quigley 2023-03-07 16:29:39 -05:00
parent 97f20acd87
commit 7ac9e25611
No known key found for this signature in database
GPG Key ID: 9B60314A9DD20A62
5 changed files with 25 additions and 2 deletions

View File

@ -31,6 +31,12 @@ func Run(cfg *Config) (*MetricsAgent, error) {
return nil, errors.New("invalid 'source'; exiting")
}
if cfg.Influx == nil {
return nil, errors.New("no 'influx' configured; exiting")
}
idb := openInfluxDb(cfg.Influx)
events := make(chan map[string]interface{})
join, err := src.Start(events)
if err != nil {
@ -43,7 +49,10 @@ func Run(cfg *Config) (*MetricsAgent, error) {
case event := <-events:
usage := Ingest(event)
if shrToken, err := cache.getToken(usage.ZitiServiceId); err == nil {
logrus.Infof("share: %v, circuit: %v, rx: %d, tx: %d", shrToken, usage.ZitiCircuitId, usage.BackendRx, usage.BackendTx)
usage.ShareToken = shrToken
if err := idb.Write(usage); err != nil {
logrus.Error(err)
}
} else {
logrus.Error(err)
}

View File

@ -1,8 +1,10 @@
package metrics
import (
"context"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
"github.com/influxdata/influxdb-client-go/v2/api"
"github.com/sirupsen/logrus"
)
type influxDb struct {
@ -17,5 +19,14 @@ func openInfluxDb(cfg *InfluxConfig) *influxDb {
}
func (i *influxDb) Write(u *Usage) error {
pt := influxdb2.NewPoint("xfer",
map[string]string{"namespace": "backend", "share": u.ShareToken},
map[string]interface{}{"bytesRead": u.BackendRx, "bytesWritten": u.BackendTx},
u.IntervalStart)
if err := i.writeApi.WritePoint(context.Background(), pt); err == nil {
logrus.Infof("share: %v, circuit: %v, rx: %d, tx: %d", u.ShareToken, u.ZitiCircuitId, u.BackendRx, u.BackendTx)
} else {
return err
}
return nil
}

View File

@ -11,6 +11,7 @@ type Usage struct {
IntervalStart time.Time
ZitiServiceId string
ZitiCircuitId string
ShareToken string
FrontendTx int64
FrontendRx int64
BackendTx int64
@ -23,6 +24,7 @@ func (u Usage) String() string {
out += ", " + fmt.Sprintf("interval '%v'", u.IntervalStart)
out += ", " + fmt.Sprintf("service '%v'", u.ZitiServiceId)
out += ", " + fmt.Sprintf("circuit '%v'", u.ZitiCircuitId)
out += ", " + fmt.Sprintf("share '%v'", u.ShareToken)
out += ", " + fmt.Sprintf("fe {rx %v, tx %v}", util.BytesToSize(u.FrontendRx), util.BytesToSize(u.FrontendTx))
out += ", " + fmt.Sprintf("be {rx %v, tx %v}", util.BytesToSize(u.BackendRx), util.BytesToSize(u.BackendTx))
out += "}"

View File

@ -45,6 +45,7 @@ func (h *shareDetailHandler) Handle(params metadata.GetShareDetailParams, princi
var sparkData map[string][]int64
if cfg.Influx != nil {
sparkData, err = sparkDataForShares([]*store.Share{shr})
logrus.Info(sparkData)
if err != nil {
logrus.Errorf("error querying spark data for share: %v", err)
}

View File

@ -49,7 +49,7 @@ func sparkFluxQuery(shrs []*store.Share) string {
"|> range(start: -5m)" +
"|> filter(fn: (r) => r[\"_measurement\"] == \"xfer\")" +
"|> filter(fn: (r) => r[\"_field\"] == \"bytesRead\" or r[\"_field\"] == \"bytesWritten\")" +
"|> filter(fn: (r) => r[\"namespace\"] == \"frontend\")" +
"|> filter(fn: (r) => r[\"namespace\"] == \"backend\")" +
shrFilter +
"|> aggregateWindow(every: 5s, fn: sum, createEmpty: true)\n" +
"|> pivot(rowKey:[\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")" +