Merge pull request #353 from openziti/v0.4.0_controller_profiling

AMQP Connection Handling Fixes; Profiling Support
This commit is contained in:
Michael Quigley 2023-06-16 14:00:37 -04:00 committed by GitHub
commit ebfb039687
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 96 additions and 66 deletions

View File

@ -12,6 +12,8 @@ FEATURE: The invite mechanism has been reworked to improve user experience. The
FEATURE: New password strength checking rules and configuration. See the example configuration file (`etc/ctrl.yml`) for details about how to configure the strength checking rules (https://github.com/openziti/zrok/issues/167)
FEATURE: A new `admin/profile_endpoint` configuration option is available to start a `net/http/pprof` listener. See `etc/ctrl.yml` for details.
CHANGE: The controller configuration version bumps from `v: 2` to `v: 3` to support all of the new `v0.4` functionality. See the [example ctrl.yml](etc/ctrl.yml) for details on the new configuration.
CHANGE: The underlying database store now utilizes a `deleted` flag on all tables to implement "soft deletes". This was necessary for the new metrics infrastructure, where we need to account for metrics data that arrived after the lifetime of a share or environment; and also we're going to need this for limits, where we need to see historical information about activity in the past (https://github.com/openziti/zrok/issues/262)

View File

@ -34,8 +34,9 @@ type Config struct {
}
type AdminConfig struct {
Secrets []string `cf:"+secret"`
TouLink string
Secrets []string `cf:"+secret"`
TouLink string
ProfileEndpoint string
}
type EndpointConfig struct {

View File

@ -2,11 +2,13 @@ package controller
import (
"context"
"github.com/openziti/zrok/controller/config"
"github.com/openziti/zrok/controller/limits"
"github.com/openziti/zrok/controller/metrics"
"github.com/sirupsen/logrus"
"log"
"net/http"
_ "net/http/pprof"
"github.com/go-openapi/loads"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
@ -26,6 +28,12 @@ var limitsAgent *limits.Agent
func Run(inCfg *config.Config) error {
cfg = inCfg
if cfg.Admin != nil && cfg.Admin.ProfileEndpoint != "" {
go func() {
log.Println(http.ListenAndServe(cfg.Admin.ProfileEndpoint, nil))
}()
}
swaggerSpec, err := loads.Embedded(rest_server_zrok.SwaggerJSON, rest_server_zrok.FlatSwaggerJSON)
if err != nil {
return errors.Wrap(err, "error loading embedded swagger spec")

View File

@ -209,14 +209,18 @@ mainLoop:
for {
select {
case usage := <-a.queue:
if err := a.enforce(usage); err != nil {
logrus.Errorf("error running enforcement: %v", err)
}
if time.Since(lastCycle) > a.cfg.Cycle {
if err := a.relax(); err != nil {
logrus.Errorf("error running relax cycle: %v", err)
if usage.ShareToken != "" {
if err := a.enforce(usage); err != nil {
logrus.Errorf("error running enforcement: %v", err)
}
lastCycle = time.Now()
if time.Since(lastCycle) > a.cfg.Cycle {
if err := a.relax(); err != nil {
logrus.Errorf("error running relax cycle: %v", err)
}
lastCycle = time.Now()
}
} else {
logrus.Warnf("not enforcing for usage with no share token: %v", usage.String())
}
case <-time.After(a.cfg.Cycle):

View File

@ -45,13 +45,15 @@ func (a *Agent) Start() error {
select {
case event := <-a.events:
if usage, err := Ingest(event.Data()); err == nil {
if err := a.cache.addZrokDetail(usage); err != nil {
logrus.Error(err)
if usage.ZitiServiceId != "" {
if err := a.cache.addZrokDetail(usage); err != nil {
logrus.Errorf("unable to add zrok detail for: %v: %v", usage.String(), err)
}
}
shouldAck := true
for _, snk := range a.snks {
if err := snk.Handle(usage); err != nil {
logrus.Error(err)
logrus.Errorf("error handling usage: %v", err)
if shouldAck {
shouldAck = false
}
@ -59,11 +61,11 @@ func (a *Agent) Start() error {
}
if shouldAck {
if err := event.Ack(); err != nil {
logrus.Error("unable to Ack message", err)
logrus.Errorf("unable to ack handled message: %v", err)
}
}
} else {
logrus.Error(err)
logrus.Errorf("unable to ingest '%v': %v", event.Data(), err)
}
}
}

View File

@ -83,14 +83,6 @@ mainLoop:
msgLoop:
for {
select {
case event := <-s.msgs:
if event.Body != nil {
s.events <- &ZitiEventAMQP{
data: ZitiEventJson(event.Body),
msg: event,
}
}
case err, ok := <-s.errs:
if err != nil || !ok {
logrus.Error(err)
@ -99,6 +91,21 @@ mainLoop:
case <-s.close:
break mainLoop
case event, ok := <-s.msgs:
if !ok {
logrus.Debug("selecting on msg !ok")
break msgLoop
}
if event.Body != nil {
s.events <- &ZitiEventAMQP{
data: ZitiEventJson(event.Body),
msg: event,
}
} else {
logrus.Debug("event body was nil!")
break msgLoop
}
}
}
}

View File

@ -22,39 +22,41 @@ func newInfluxWriter(cfg *InfluxConfig) *influxWriter {
}
func (w *influxWriter) Handle(u *Usage) error {
out := fmt.Sprintf("share: %v, circuit: %v", u.ShareToken, u.ZitiCircuitId)
if u.ShareToken != "" {
out := fmt.Sprintf("share: %v, circuit: %v", u.ShareToken, u.ZitiCircuitId)
envId := fmt.Sprintf("%d", u.EnvironmentId)
acctId := fmt.Sprintf("%d", u.AccountId)
envId := fmt.Sprintf("%d", u.EnvironmentId)
acctId := fmt.Sprintf("%d", u.AccountId)
var pts []*write.Point
circuitPt := influxdb2.NewPoint("circuits",
map[string]string{"share": u.ShareToken, "envId": envId, "acctId": acctId},
map[string]interface{}{"circuit": u.ZitiCircuitId},
u.IntervalStart)
pts = append(pts, circuitPt)
if u.BackendTx > 0 || u.BackendRx > 0 {
pt := influxdb2.NewPoint("xfer",
map[string]string{"namespace": "backend", "share": u.ShareToken, "envId": envId, "acctId": acctId},
map[string]interface{}{"rx": u.BackendRx, "tx": u.BackendTx},
var pts []*write.Point
circuitPt := influxdb2.NewPoint("circuits",
map[string]string{"share": u.ShareToken, "envId": envId, "acctId": acctId},
map[string]interface{}{"circuit": u.ZitiCircuitId},
u.IntervalStart)
pts = append(pts, pt)
out += fmt.Sprintf(" backend {rx: %v, tx: %v}", util.BytesToSize(u.BackendRx), util.BytesToSize(u.BackendTx))
}
if u.FrontendTx > 0 || u.FrontendRx > 0 {
pt := influxdb2.NewPoint("xfer",
map[string]string{"namespace": "frontend", "share": u.ShareToken, "envId": envId, "acctId": acctId},
map[string]interface{}{"rx": u.FrontendRx, "tx": u.FrontendTx},
u.IntervalStart)
pts = append(pts, pt)
out += fmt.Sprintf(" frontend {rx: %v, tx: %v}", util.BytesToSize(u.FrontendRx), util.BytesToSize(u.FrontendTx))
}
pts = append(pts, circuitPt)
if err := w.writeApi.WritePoint(context.Background(), pts...); err == nil {
logrus.Info(out)
} else {
return err
if u.BackendTx > 0 || u.BackendRx > 0 {
pt := influxdb2.NewPoint("xfer",
map[string]string{"namespace": "backend", "share": u.ShareToken, "envId": envId, "acctId": acctId},
map[string]interface{}{"rx": u.BackendRx, "tx": u.BackendTx},
u.IntervalStart)
pts = append(pts, pt)
out += fmt.Sprintf(" backend {rx: %v, tx: %v}", util.BytesToSize(u.BackendRx), util.BytesToSize(u.BackendTx))
}
if u.FrontendTx > 0 || u.FrontendRx > 0 {
pt := influxdb2.NewPoint("xfer",
map[string]string{"namespace": "frontend", "share": u.ShareToken, "envId": envId, "acctId": acctId},
map[string]interface{}{"rx": u.FrontendRx, "tx": u.FrontendTx},
u.IntervalStart)
pts = append(pts, pt)
out += fmt.Sprintf(" frontend {rx: %v, tx: %v}", util.BytesToSize(u.FrontendRx), util.BytesToSize(u.FrontendTx))
}
if err := w.writeApi.WritePoint(context.Background(), pts...); err == nil {
logrus.Info(out)
} else {
return err
}
}
return nil

View File

@ -17,10 +17,10 @@ func Ingest(event ZitiEventJson) (*Usage, error) {
if vFloat64, ok := v.(float64); ok {
u.IntervalStart = time.Unix(int64(vFloat64), 0)
} else {
logrus.Error("unable to assert 'interval_start_utc'")
logrus.Errorf("unable to assert 'interval_start_utc': %v", event)
}
} else {
logrus.Error("missing 'interval_start_utc'")
logrus.Errorf("missing 'interval_start_utc': %v", event)
}
if v, found := eventMap["tags"]; found {
if tags, ok := v.(map[string]interface{}); ok {
@ -28,16 +28,16 @@ func Ingest(event ZitiEventJson) (*Usage, error) {
if vStr, ok := v.(string); ok {
u.ZitiServiceId = vStr
} else {
logrus.Error("unable to assert 'tags/serviceId'")
logrus.Errorf("unable to assert 'tags/serviceId': %v", event)
}
} else {
logrus.Error("missing 'tags/serviceId'")
logrus.Errorf("missing 'tags/serviceId': %v", event)
}
} else {
logrus.Errorf("unable to assert 'tags'")
logrus.Errorf("unable to assert 'tags': %v", event)
}
} else {
logrus.Errorf("missing 'tags'")
logrus.Errorf("missing 'tags': %v", event)
}
if v, found := eventMap["usage"]; found {
if usage, ok := v.(map[string]interface{}); ok {
@ -45,47 +45,47 @@ func Ingest(event ZitiEventJson) (*Usage, error) {
if vFloat64, ok := v.(float64); ok {
u.FrontendTx = int64(vFloat64)
} else {
logrus.Error("unable to assert 'usage/ingress.tx'")
logrus.Errorf("unable to assert 'usage/ingress.tx': %v", event)
}
}
if v, found := usage["ingress.rx"]; found {
if vFloat64, ok := v.(float64); ok {
u.FrontendRx = int64(vFloat64)
} else {
logrus.Error("unable to assert 'usage/ingress.rx")
logrus.Errorf("unable to assert 'usage/ingress.rx': %v", event)
}
}
if v, found := usage["egress.tx"]; found {
if vFloat64, ok := v.(float64); ok {
u.BackendTx = int64(vFloat64)
} else {
logrus.Error("unable to assert 'usage/egress.tx'")
logrus.Errorf("unable to assert 'usage/egress.tx': %v", event)
}
}
if v, found := usage["egress.rx"]; found {
if vFloat64, ok := v.(float64); ok {
u.BackendRx = int64(vFloat64)
} else {
logrus.Error("unable to assert 'usage/egress.rx'")
logrus.Errorf("unable to assert 'usage/egress.rx': %v", event)
}
}
} else {
logrus.Errorf("unable to assert 'usage' (%v) %v", reflect.TypeOf(v), event)
}
} else {
logrus.Warnf("missing 'usage'")
logrus.Warnf("missing 'usage': %v", event)
}
if v, found := eventMap["circuit_id"]; found {
if vStr, ok := v.(string); ok {
u.ZitiCircuitId = vStr
} else {
logrus.Error("unable to assert 'circuit_id'")
logrus.Errorf("unable to assert 'circuit_id': %v", event)
}
} else {
logrus.Warn("missing 'circuit_id'")
logrus.Warnf("missing 'circuit_id': %v", event)
}
} else {
logrus.Errorf("not 'fabric.usage'")
logrus.Errorf("not 'fabric.usage': %v", event)
}
return u, nil
} else {

View File

@ -23,6 +23,10 @@ admin:
# If `tou_link` is present, the frontend will display the "Terms of Use" link on the login and registration forms
#
tou_link: '<a href="https://google.com" target="_">Terms and Conditions</a>'
#
# If `profile_endpoint` is present, the controller will start a `net/http/pprof` endpoint at the specified host:port
#
#profile_endpoint: localhost:6060
# The `bridge` section configures the `zrok controller metrics bridge`, specifying the source and sink where OpenZiti
# `fabric.usage` events are consumed and then sent into `zrok`. For production environments, we recommend that you use