diff --git a/CHANGELOG.md b/CHANGELOG.md index 92bf4a56..dff57d2b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,8 @@ FEATURE: The invite mechanism has been reworked to improve user experience. The FEATURE: New password strength checking rules and configuration. See the example configuration file (`etc/ctrl.yml`) for details about how to configure the strength checking rules (https://github.com/openziti/zrok/issues/167) +FEATURE: A new `admin/profile_endpoint` configuration option is available to start a `net/http/pprof` listener. See `etc/ctrl.yml` for details. + CHANGE: The controller configuration version bumps from `v: 2` to `v: 3` to support all of the new `v0.4` functionality. See the [example ctrl.yml](etc/ctrl.yml) for details on the new configuration. CHANGE: The underlying database store now utilizes a `deleted` flag on all tables to implement "soft deletes". This was necessary for the new metrics infrastructure, where we need to account for metrics data that arrived after the lifetime of a share or environment; and also we're going to need this for limits, where we need to see historical information about activity in the past (https://github.com/openziti/zrok/issues/262) diff --git a/controller/config/config.go b/controller/config/config.go index 1f09d0c5..396bd3d1 100644 --- a/controller/config/config.go +++ b/controller/config/config.go @@ -34,8 +34,9 @@ type Config struct { } type AdminConfig struct { - Secrets []string `cf:"+secret"` - TouLink string + Secrets []string `cf:"+secret"` + TouLink string + ProfileEndpoint string } type EndpointConfig struct { diff --git a/controller/controller.go b/controller/controller.go index 712d234f..a5974e7f 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -2,11 +2,13 @@ package controller import ( "context" - "github.com/openziti/zrok/controller/config" "github.com/openziti/zrok/controller/limits" "github.com/openziti/zrok/controller/metrics" "github.com/sirupsen/logrus" + "log" + "net/http" + _ "net/http/pprof" "github.com/go-openapi/loads" influxdb2 "github.com/influxdata/influxdb-client-go/v2" @@ -26,6 +28,12 @@ var limitsAgent *limits.Agent func Run(inCfg *config.Config) error { cfg = inCfg + if cfg.Admin != nil && cfg.Admin.ProfileEndpoint != "" { + go func() { + log.Println(http.ListenAndServe(cfg.Admin.ProfileEndpoint, nil)) + }() + } + swaggerSpec, err := loads.Embedded(rest_server_zrok.SwaggerJSON, rest_server_zrok.FlatSwaggerJSON) if err != nil { return errors.Wrap(err, "error loading embedded swagger spec") diff --git a/controller/limits/agent.go b/controller/limits/agent.go index 02a7f99b..8df3ecb4 100644 --- a/controller/limits/agent.go +++ b/controller/limits/agent.go @@ -209,14 +209,18 @@ mainLoop: for { select { case usage := <-a.queue: - if err := a.enforce(usage); err != nil { - logrus.Errorf("error running enforcement: %v", err) - } - if time.Since(lastCycle) > a.cfg.Cycle { - if err := a.relax(); err != nil { - logrus.Errorf("error running relax cycle: %v", err) + if usage.ShareToken != "" { + if err := a.enforce(usage); err != nil { + logrus.Errorf("error running enforcement: %v", err) } - lastCycle = time.Now() + if time.Since(lastCycle) > a.cfg.Cycle { + if err := a.relax(); err != nil { + logrus.Errorf("error running relax cycle: %v", err) + } + lastCycle = time.Now() + } + } else { + logrus.Warnf("not enforcing for usage with no share token: %v", usage.String()) } case <-time.After(a.cfg.Cycle): diff --git a/controller/metrics/agent.go b/controller/metrics/agent.go index 23dcc6f2..c2f19bfd 100644 --- a/controller/metrics/agent.go +++ b/controller/metrics/agent.go @@ -45,13 +45,15 @@ func (a *Agent) Start() error { select { case event := <-a.events: if usage, err := Ingest(event.Data()); err == nil { - if err := a.cache.addZrokDetail(usage); err != nil { - logrus.Error(err) + if usage.ZitiServiceId != "" { + if err := a.cache.addZrokDetail(usage); err != nil { + logrus.Errorf("unable to add zrok detail for: %v: %v", usage.String(), err) + } } shouldAck := true for _, snk := range a.snks { if err := snk.Handle(usage); err != nil { - logrus.Error(err) + logrus.Errorf("error handling usage: %v", err) if shouldAck { shouldAck = false } @@ -59,11 +61,11 @@ func (a *Agent) Start() error { } if shouldAck { if err := event.Ack(); err != nil { - logrus.Error("unable to Ack message", err) + logrus.Errorf("unable to ack handled message: %v", err) } } } else { - logrus.Error(err) + logrus.Errorf("unable to ingest '%v': %v", event.Data(), err) } } } diff --git a/controller/metrics/amqpSource.go b/controller/metrics/amqpSource.go index 5de51fcb..811f038d 100644 --- a/controller/metrics/amqpSource.go +++ b/controller/metrics/amqpSource.go @@ -83,14 +83,6 @@ mainLoop: msgLoop: for { select { - case event := <-s.msgs: - if event.Body != nil { - s.events <- &ZitiEventAMQP{ - data: ZitiEventJson(event.Body), - msg: event, - } - } - case err, ok := <-s.errs: if err != nil || !ok { logrus.Error(err) @@ -99,6 +91,21 @@ mainLoop: case <-s.close: break mainLoop + + case event, ok := <-s.msgs: + if !ok { + logrus.Debug("selecting on msg !ok") + break msgLoop + } + if event.Body != nil { + s.events <- &ZitiEventAMQP{ + data: ZitiEventJson(event.Body), + msg: event, + } + } else { + logrus.Debug("event body was nil!") + break msgLoop + } } } } diff --git a/controller/metrics/influxWriter.go b/controller/metrics/influxWriter.go index 154f3e08..0af2a120 100644 --- a/controller/metrics/influxWriter.go +++ b/controller/metrics/influxWriter.go @@ -22,39 +22,41 @@ func newInfluxWriter(cfg *InfluxConfig) *influxWriter { } func (w *influxWriter) Handle(u *Usage) error { - out := fmt.Sprintf("share: %v, circuit: %v", u.ShareToken, u.ZitiCircuitId) + if u.ShareToken != "" { + out := fmt.Sprintf("share: %v, circuit: %v", u.ShareToken, u.ZitiCircuitId) - envId := fmt.Sprintf("%d", u.EnvironmentId) - acctId := fmt.Sprintf("%d", u.AccountId) + envId := fmt.Sprintf("%d", u.EnvironmentId) + acctId := fmt.Sprintf("%d", u.AccountId) - var pts []*write.Point - circuitPt := influxdb2.NewPoint("circuits", - map[string]string{"share": u.ShareToken, "envId": envId, "acctId": acctId}, - map[string]interface{}{"circuit": u.ZitiCircuitId}, - u.IntervalStart) - pts = append(pts, circuitPt) - - if u.BackendTx > 0 || u.BackendRx > 0 { - pt := influxdb2.NewPoint("xfer", - map[string]string{"namespace": "backend", "share": u.ShareToken, "envId": envId, "acctId": acctId}, - map[string]interface{}{"rx": u.BackendRx, "tx": u.BackendTx}, + var pts []*write.Point + circuitPt := influxdb2.NewPoint("circuits", + map[string]string{"share": u.ShareToken, "envId": envId, "acctId": acctId}, + map[string]interface{}{"circuit": u.ZitiCircuitId}, u.IntervalStart) - pts = append(pts, pt) - out += fmt.Sprintf(" backend {rx: %v, tx: %v}", util.BytesToSize(u.BackendRx), util.BytesToSize(u.BackendTx)) - } - if u.FrontendTx > 0 || u.FrontendRx > 0 { - pt := influxdb2.NewPoint("xfer", - map[string]string{"namespace": "frontend", "share": u.ShareToken, "envId": envId, "acctId": acctId}, - map[string]interface{}{"rx": u.FrontendRx, "tx": u.FrontendTx}, - u.IntervalStart) - pts = append(pts, pt) - out += fmt.Sprintf(" frontend {rx: %v, tx: %v}", util.BytesToSize(u.FrontendRx), util.BytesToSize(u.FrontendTx)) - } + pts = append(pts, circuitPt) - if err := w.writeApi.WritePoint(context.Background(), pts...); err == nil { - logrus.Info(out) - } else { - return err + if u.BackendTx > 0 || u.BackendRx > 0 { + pt := influxdb2.NewPoint("xfer", + map[string]string{"namespace": "backend", "share": u.ShareToken, "envId": envId, "acctId": acctId}, + map[string]interface{}{"rx": u.BackendRx, "tx": u.BackendTx}, + u.IntervalStart) + pts = append(pts, pt) + out += fmt.Sprintf(" backend {rx: %v, tx: %v}", util.BytesToSize(u.BackendRx), util.BytesToSize(u.BackendTx)) + } + if u.FrontendTx > 0 || u.FrontendRx > 0 { + pt := influxdb2.NewPoint("xfer", + map[string]string{"namespace": "frontend", "share": u.ShareToken, "envId": envId, "acctId": acctId}, + map[string]interface{}{"rx": u.FrontendRx, "tx": u.FrontendTx}, + u.IntervalStart) + pts = append(pts, pt) + out += fmt.Sprintf(" frontend {rx: %v, tx: %v}", util.BytesToSize(u.FrontendRx), util.BytesToSize(u.FrontendTx)) + } + + if err := w.writeApi.WritePoint(context.Background(), pts...); err == nil { + logrus.Info(out) + } else { + return err + } } return nil diff --git a/controller/metrics/usageIngest.go b/controller/metrics/usageIngest.go index e92542ee..239a6dbd 100644 --- a/controller/metrics/usageIngest.go +++ b/controller/metrics/usageIngest.go @@ -17,10 +17,10 @@ func Ingest(event ZitiEventJson) (*Usage, error) { if vFloat64, ok := v.(float64); ok { u.IntervalStart = time.Unix(int64(vFloat64), 0) } else { - logrus.Error("unable to assert 'interval_start_utc'") + logrus.Errorf("unable to assert 'interval_start_utc': %v", event) } } else { - logrus.Error("missing 'interval_start_utc'") + logrus.Errorf("missing 'interval_start_utc': %v", event) } if v, found := eventMap["tags"]; found { if tags, ok := v.(map[string]interface{}); ok { @@ -28,16 +28,16 @@ func Ingest(event ZitiEventJson) (*Usage, error) { if vStr, ok := v.(string); ok { u.ZitiServiceId = vStr } else { - logrus.Error("unable to assert 'tags/serviceId'") + logrus.Errorf("unable to assert 'tags/serviceId': %v", event) } } else { - logrus.Error("missing 'tags/serviceId'") + logrus.Errorf("missing 'tags/serviceId': %v", event) } } else { - logrus.Errorf("unable to assert 'tags'") + logrus.Errorf("unable to assert 'tags': %v", event) } } else { - logrus.Errorf("missing 'tags'") + logrus.Errorf("missing 'tags': %v", event) } if v, found := eventMap["usage"]; found { if usage, ok := v.(map[string]interface{}); ok { @@ -45,47 +45,47 @@ func Ingest(event ZitiEventJson) (*Usage, error) { if vFloat64, ok := v.(float64); ok { u.FrontendTx = int64(vFloat64) } else { - logrus.Error("unable to assert 'usage/ingress.tx'") + logrus.Errorf("unable to assert 'usage/ingress.tx': %v", event) } } if v, found := usage["ingress.rx"]; found { if vFloat64, ok := v.(float64); ok { u.FrontendRx = int64(vFloat64) } else { - logrus.Error("unable to assert 'usage/ingress.rx") + logrus.Errorf("unable to assert 'usage/ingress.rx': %v", event) } } if v, found := usage["egress.tx"]; found { if vFloat64, ok := v.(float64); ok { u.BackendTx = int64(vFloat64) } else { - logrus.Error("unable to assert 'usage/egress.tx'") + logrus.Errorf("unable to assert 'usage/egress.tx': %v", event) } } if v, found := usage["egress.rx"]; found { if vFloat64, ok := v.(float64); ok { u.BackendRx = int64(vFloat64) } else { - logrus.Error("unable to assert 'usage/egress.rx'") + logrus.Errorf("unable to assert 'usage/egress.rx': %v", event) } } } else { logrus.Errorf("unable to assert 'usage' (%v) %v", reflect.TypeOf(v), event) } } else { - logrus.Warnf("missing 'usage'") + logrus.Warnf("missing 'usage': %v", event) } if v, found := eventMap["circuit_id"]; found { if vStr, ok := v.(string); ok { u.ZitiCircuitId = vStr } else { - logrus.Error("unable to assert 'circuit_id'") + logrus.Errorf("unable to assert 'circuit_id': %v", event) } } else { - logrus.Warn("missing 'circuit_id'") + logrus.Warnf("missing 'circuit_id': %v", event) } } else { - logrus.Errorf("not 'fabric.usage'") + logrus.Errorf("not 'fabric.usage': %v", event) } return u, nil } else { diff --git a/etc/ctrl.yml b/etc/ctrl.yml index c88a4023..b8a5ccf5 100644 --- a/etc/ctrl.yml +++ b/etc/ctrl.yml @@ -23,6 +23,10 @@ admin: # If `tou_link` is present, the frontend will display the "Terms of Use" link on the login and registration forms # tou_link: 'Terms and Conditions' + # + # If `profile_endpoint` is present, the controller will start a `net/http/pprof` endpoint at the specified host:port + # + #profile_endpoint: localhost:6060 # The `bridge` section configures the `zrok controller metrics bridge`, specifying the source and sink where OpenZiti # `fabric.usage` events are consumed and then sent into `zrok`. For production environments, we recommend that you use