mirror of
https://github.com/openziti/zrok.git
synced 2025-06-19 17:27:54 +02:00
tweaks for non-zrok traffic
This commit is contained in:
parent
1578ecca98
commit
36d267256f
@ -209,14 +209,18 @@ mainLoop:
|
|||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case usage := <-a.queue:
|
case usage := <-a.queue:
|
||||||
if err := a.enforce(usage); err != nil {
|
if usage.ShareToken != "" {
|
||||||
logrus.Errorf("error running enforcement: %v", err)
|
if err := a.enforce(usage); err != nil {
|
||||||
}
|
logrus.Errorf("error running enforcement: %v", err)
|
||||||
if time.Since(lastCycle) > a.cfg.Cycle {
|
|
||||||
if err := a.relax(); err != nil {
|
|
||||||
logrus.Errorf("error running relax cycle: %v", err)
|
|
||||||
}
|
}
|
||||||
lastCycle = time.Now()
|
if time.Since(lastCycle) > a.cfg.Cycle {
|
||||||
|
if err := a.relax(); err != nil {
|
||||||
|
logrus.Errorf("error running relax cycle: %v", err)
|
||||||
|
}
|
||||||
|
lastCycle = time.Now()
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
logrus.Warn("not enforcing for usage with no share token: %v", usage.String())
|
||||||
}
|
}
|
||||||
|
|
||||||
case <-time.After(a.cfg.Cycle):
|
case <-time.After(a.cfg.Cycle):
|
||||||
|
@ -46,12 +46,12 @@ func (a *Agent) Start() error {
|
|||||||
case event := <-a.events:
|
case event := <-a.events:
|
||||||
if usage, err := Ingest(event.Data()); err == nil {
|
if usage, err := Ingest(event.Data()); err == nil {
|
||||||
if err := a.cache.addZrokDetail(usage); err != nil {
|
if err := a.cache.addZrokDetail(usage); err != nil {
|
||||||
logrus.Error(err)
|
logrus.Errorf("unable to add zrok detail for: %v: %v", usage.String(), err)
|
||||||
}
|
}
|
||||||
shouldAck := true
|
shouldAck := true
|
||||||
for _, snk := range a.snks {
|
for _, snk := range a.snks {
|
||||||
if err := snk.Handle(usage); err != nil {
|
if err := snk.Handle(usage); err != nil {
|
||||||
logrus.Error(err)
|
logrus.Errorf("error handling usage: %v", err)
|
||||||
if shouldAck {
|
if shouldAck {
|
||||||
shouldAck = false
|
shouldAck = false
|
||||||
}
|
}
|
||||||
@ -59,11 +59,14 @@ func (a *Agent) Start() error {
|
|||||||
}
|
}
|
||||||
if shouldAck {
|
if shouldAck {
|
||||||
if err := event.Ack(); err != nil {
|
if err := event.Ack(); err != nil {
|
||||||
logrus.Error("unable to Ack message", err)
|
logrus.Errorf("unable to ack handled message: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
logrus.Error(err)
|
logrus.Errorf("unable to ingest '%v': %v", event.Data(), err)
|
||||||
|
if err := event.Ack(); err != nil {
|
||||||
|
logrus.Errorf("unable to ack unparseable message: %v", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -83,14 +83,6 @@ mainLoop:
|
|||||||
msgLoop:
|
msgLoop:
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case event := <-s.msgs:
|
|
||||||
if event.Body != nil {
|
|
||||||
s.events <- &ZitiEventAMQP{
|
|
||||||
data: ZitiEventJson(event.Body),
|
|
||||||
msg: event,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
case err, ok := <-s.errs:
|
case err, ok := <-s.errs:
|
||||||
if err != nil || !ok {
|
if err != nil || !ok {
|
||||||
logrus.Error(err)
|
logrus.Error(err)
|
||||||
@ -99,6 +91,21 @@ mainLoop:
|
|||||||
|
|
||||||
case <-s.close:
|
case <-s.close:
|
||||||
break mainLoop
|
break mainLoop
|
||||||
|
|
||||||
|
case event, ok := <-s.msgs:
|
||||||
|
if !ok {
|
||||||
|
logrus.Warn("selecting on msg !ok")
|
||||||
|
break msgLoop
|
||||||
|
}
|
||||||
|
if event.Body != nil {
|
||||||
|
s.events <- &ZitiEventAMQP{
|
||||||
|
data: ZitiEventJson(event.Body),
|
||||||
|
msg: event,
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
logrus.Warn("event body was nil!")
|
||||||
|
break msgLoop
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -22,39 +22,41 @@ func newInfluxWriter(cfg *InfluxConfig) *influxWriter {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (w *influxWriter) Handle(u *Usage) error {
|
func (w *influxWriter) Handle(u *Usage) error {
|
||||||
out := fmt.Sprintf("share: %v, circuit: %v", u.ShareToken, u.ZitiCircuitId)
|
if u.ShareToken != "" {
|
||||||
|
out := fmt.Sprintf("share: %v, circuit: %v", u.ShareToken, u.ZitiCircuitId)
|
||||||
|
|
||||||
envId := fmt.Sprintf("%d", u.EnvironmentId)
|
envId := fmt.Sprintf("%d", u.EnvironmentId)
|
||||||
acctId := fmt.Sprintf("%d", u.AccountId)
|
acctId := fmt.Sprintf("%d", u.AccountId)
|
||||||
|
|
||||||
var pts []*write.Point
|
var pts []*write.Point
|
||||||
circuitPt := influxdb2.NewPoint("circuits",
|
circuitPt := influxdb2.NewPoint("circuits",
|
||||||
map[string]string{"share": u.ShareToken, "envId": envId, "acctId": acctId},
|
map[string]string{"share": u.ShareToken, "envId": envId, "acctId": acctId},
|
||||||
map[string]interface{}{"circuit": u.ZitiCircuitId},
|
map[string]interface{}{"circuit": u.ZitiCircuitId},
|
||||||
u.IntervalStart)
|
|
||||||
pts = append(pts, circuitPt)
|
|
||||||
|
|
||||||
if u.BackendTx > 0 || u.BackendRx > 0 {
|
|
||||||
pt := influxdb2.NewPoint("xfer",
|
|
||||||
map[string]string{"namespace": "backend", "share": u.ShareToken, "envId": envId, "acctId": acctId},
|
|
||||||
map[string]interface{}{"rx": u.BackendRx, "tx": u.BackendTx},
|
|
||||||
u.IntervalStart)
|
u.IntervalStart)
|
||||||
pts = append(pts, pt)
|
pts = append(pts, circuitPt)
|
||||||
out += fmt.Sprintf(" backend {rx: %v, tx: %v}", util.BytesToSize(u.BackendRx), util.BytesToSize(u.BackendTx))
|
|
||||||
}
|
|
||||||
if u.FrontendTx > 0 || u.FrontendRx > 0 {
|
|
||||||
pt := influxdb2.NewPoint("xfer",
|
|
||||||
map[string]string{"namespace": "frontend", "share": u.ShareToken, "envId": envId, "acctId": acctId},
|
|
||||||
map[string]interface{}{"rx": u.FrontendRx, "tx": u.FrontendTx},
|
|
||||||
u.IntervalStart)
|
|
||||||
pts = append(pts, pt)
|
|
||||||
out += fmt.Sprintf(" frontend {rx: %v, tx: %v}", util.BytesToSize(u.FrontendRx), util.BytesToSize(u.FrontendTx))
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := w.writeApi.WritePoint(context.Background(), pts...); err == nil {
|
if u.BackendTx > 0 || u.BackendRx > 0 {
|
||||||
logrus.Info(out)
|
pt := influxdb2.NewPoint("xfer",
|
||||||
} else {
|
map[string]string{"namespace": "backend", "share": u.ShareToken, "envId": envId, "acctId": acctId},
|
||||||
return err
|
map[string]interface{}{"rx": u.BackendRx, "tx": u.BackendTx},
|
||||||
|
u.IntervalStart)
|
||||||
|
pts = append(pts, pt)
|
||||||
|
out += fmt.Sprintf(" backend {rx: %v, tx: %v}", util.BytesToSize(u.BackendRx), util.BytesToSize(u.BackendTx))
|
||||||
|
}
|
||||||
|
if u.FrontendTx > 0 || u.FrontendRx > 0 {
|
||||||
|
pt := influxdb2.NewPoint("xfer",
|
||||||
|
map[string]string{"namespace": "frontend", "share": u.ShareToken, "envId": envId, "acctId": acctId},
|
||||||
|
map[string]interface{}{"rx": u.FrontendRx, "tx": u.FrontendTx},
|
||||||
|
u.IntervalStart)
|
||||||
|
pts = append(pts, pt)
|
||||||
|
out += fmt.Sprintf(" frontend {rx: %v, tx: %v}", util.BytesToSize(u.FrontendRx), util.BytesToSize(u.FrontendTx))
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := w.writeApi.WritePoint(context.Background(), pts...); err == nil {
|
||||||
|
logrus.Info(out)
|
||||||
|
} else {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
Loading…
x
Reference in New Issue
Block a user