From e8e0167a5144d84a464259b6af5298b1699da2d7 Mon Sep 17 00:00:00 2001 From: Michael Quigley Date: Thu, 6 Oct 2022 14:52:52 -0400 Subject: [PATCH] batch, offline garbage collection (#75) --- controller/edge.go | 130 ++++++++++++++++++++++++++++++++++++ controller/gc.go | 36 +++++++++- controller/store/service.go | 16 +++++ controller/untunnel.go | 130 ++---------------------------------- 4 files changed, 186 insertions(+), 126 deletions(-) create mode 100644 controller/edge.go diff --git a/controller/edge.go b/controller/edge.go new file mode 100644 index 00000000..92f0618f --- /dev/null +++ b/controller/edge.go @@ -0,0 +1,130 @@ +package controller + +import ( + "context" + "fmt" + "github.com/openziti/edge/rest_management_api_client" + "github.com/openziti/edge/rest_management_api_client/config" + "github.com/openziti/edge/rest_management_api_client/service" + "github.com/openziti/edge/rest_management_api_client/service_edge_router_policy" + "github.com/openziti/edge/rest_management_api_client/service_policy" + "github.com/sirupsen/logrus" + "time" +) + +func deleteServiceEdgeRouterPolicy(svcName string, edge *rest_management_api_client.ZitiEdgeManagement) error { + filter := fmt.Sprintf("name=\"%v\"", svcName) + limit := int64(1) + offset := int64(0) + listReq := &service_edge_router_policy.ListServiceEdgeRouterPoliciesParams{ + Filter: &filter, + Limit: &limit, + Offset: &offset, + Context: context.Background(), + } + listReq.SetTimeout(30 * time.Second) + listResp, err := edge.ServiceEdgeRouterPolicy.ListServiceEdgeRouterPolicies(listReq, nil) + if err != nil { + return err + } + if len(listResp.Payload.Data) == 1 { + serpId := *(listResp.Payload.Data[0].ID) + req := &service_edge_router_policy.DeleteServiceEdgeRouterPolicyParams{ + ID: serpId, + Context: context.Background(), + } + req.SetTimeout(30 * time.Second) + _, err := edge.ServiceEdgeRouterPolicy.DeleteServiceEdgeRouterPolicy(req, nil) + if err != nil { + return err + } + logrus.Infof("deleted service edge router policy '%v'", serpId) + } else { + logrus.Infof("did not find a service edge router policy") + } + return nil +} + +func deleteServicePolicyBind(svcName string, edge *rest_management_api_client.ZitiEdgeManagement) error { + return deleteServicePolicy(fmt.Sprintf("name=\"%v-backend\"", svcName), edge) +} + +func deleteServicePolicyDial(svcName string, edge *rest_management_api_client.ZitiEdgeManagement) error { + return deleteServicePolicy(fmt.Sprintf("name=\"%v-dial\"", svcName), edge) +} + +func deleteServicePolicy(filter string, edge *rest_management_api_client.ZitiEdgeManagement) error { + limit := int64(1) + offset := int64(0) + listReq := &service_policy.ListServicePoliciesParams{ + Filter: &filter, + Limit: &limit, + Offset: &offset, + Context: context.Background(), + } + listReq.SetTimeout(30 * time.Second) + listResp, err := edge.ServicePolicy.ListServicePolicies(listReq, nil) + if err != nil { + return err + } + if len(listResp.Payload.Data) == 1 { + spId := *(listResp.Payload.Data[0].ID) + req := &service_policy.DeleteServicePolicyParams{ + ID: spId, + Context: context.Background(), + } + req.SetTimeout(30 * time.Second) + _, err := edge.ServicePolicy.DeleteServicePolicy(req, nil) + if err != nil { + return err + } + logrus.Infof("deleted service policy '%v'", spId) + } else { + logrus.Infof("did not find a service policy") + } + return nil +} + +func deleteService(svcId string, edge *rest_management_api_client.ZitiEdgeManagement) error { + req := &service.DeleteServiceParams{ + ID: svcId, + Context: context.Background(), + } + req.SetTimeout(30 * time.Second) + _, err := edge.Service.DeleteService(req, nil) + if err != nil { + return err + } + logrus.Infof("deleted service '%v'", svcId) + return nil +} + +func deleteConfig(svcName string, edge *rest_management_api_client.ZitiEdgeManagement) error { + filter := fmt.Sprintf("name=\"%v\"", svcName) + limit := int64(0) + offset := int64(0) + listReq := &config.ListConfigsParams{ + Filter: &filter, + Limit: &limit, + Offset: &offset, + Context: context.Background(), + } + listReq.SetTimeout(30 * time.Second) + listResp, err := edge.Config.ListConfigs(listReq, nil) + if err != nil { + return err + } + for _, cfg := range listResp.Payload.Data { + deleteReq := &config.DeleteConfigParams{ + ID: *cfg.ID, + Context: context.Background(), + } + deleteReq.SetTimeout(30 * time.Second) + _, err := edge.Config.DeleteConfig(deleteReq, nil) + if err != nil { + return err + } + logrus.Infof("deleted config '%v'", *cfg.ID) + } + return nil +} diff --git a/controller/gc.go b/controller/gc.go index 272799d5..1c376294 100644 --- a/controller/gc.go +++ b/controller/gc.go @@ -31,6 +31,19 @@ func gcServices(cfg *Config, str *store.Store) error { if err != nil { return err } + tx, err := str.Begin() + if err != nil { + return err + } + defer func() { _ = tx.Rollback() }() + dbSvcs, err := str.GetAllServices(tx) + if err != nil { + return err + } + liveMap := make(map[string]struct{}) + for _, dbSvc := range dbSvcs { + liveMap[dbSvc.ZrokServiceId] = struct{}{} + } filter := "tags.zrok != null" limit := int64(0) offset := int64(0) @@ -43,8 +56,29 @@ func gcServices(cfg *Config, str *store.Store) error { listReq.SetTimeout(30 * time.Second) if listResp, err := edge.Service.ListServices(listReq, nil); err == nil { for _, svc := range listResp.Payload.Data { - logrus.Infof("found svcId='%v', name='%v'", *svc.ID, *svc.Name) + if _, found := liveMap[*svc.Name]; !found { + logrus.Infof("garbage collecting, zitiSvcId='%v', zrokSvcId='%v'", *svc.ID, *svc.Name) + if err := deleteServiceEdgeRouterPolicy(*svc.Name, edge); err != nil { + logrus.Errorf("error garbage collecting service edge router policy: %v", err) + } + if err := deleteServicePolicyDial(*svc.Name, edge); err != nil { + logrus.Errorf("error garbage collecting service dial policy: %v", err) + } + if err := deleteServicePolicyBind(*svc.Name, edge); err != nil { + logrus.Errorf("error garbage collecting service bind policy: %v", err) + } + if err := deleteConfig(*svc.Name, edge); err != nil { + logrus.Errorf("error garbage collecting config: %v", err) + } + if err := deleteService(*svc.ID, edge); err != nil { + logrus.Errorf("error garbage collecting service: %v", err) + } + } else { + logrus.Infof("remaining live, zitiSvcId='%v', zrokSvcId='%v'", *svc.ID, *svc.Name) + } } + } else { + return errors.Wrap(err, "error listing services") } return nil } diff --git a/controller/store/service.go b/controller/store/service.go index 4cbbddd7..6febbf10 100644 --- a/controller/store/service.go +++ b/controller/store/service.go @@ -39,6 +39,22 @@ func (self *Store) GetService(id int, tx *sqlx.Tx) (*Service, error) { return svc, nil } +func (self *Store) GetAllServices(tx *sqlx.Tx) ([]*Service, error) { + rows, err := tx.Queryx("select * from services order by id") + if err != nil { + return nil, errors.Wrap(err, "error selecting all services") + } + var svcs []*Service + for rows.Next() { + svc := &Service{} + if err := rows.StructScan(svc); err != nil { + return nil, errors.Wrap(err, "error scanning service") + } + svcs = append(svcs, svc) + } + return svcs, nil +} + func (self *Store) FindServicesForEnvironment(envId int, tx *sqlx.Tx) ([]*Service, error) { rows, err := tx.Queryx("select services.* from services where environment_id = ?", envId) if err != nil { diff --git a/controller/untunnel.go b/controller/untunnel.go index 6949c5e2..20fe41c1 100644 --- a/controller/untunnel.go +++ b/controller/untunnel.go @@ -8,10 +8,7 @@ import ( "github.com/openziti-test-kitchen/zrok/rest_model_zrok" "github.com/openziti-test-kitchen/zrok/rest_server_zrok/operations/tunnel" "github.com/openziti/edge/rest_management_api_client" - "github.com/openziti/edge/rest_management_api_client/config" "github.com/openziti/edge/rest_management_api_client/service" - "github.com/openziti/edge/rest_management_api_client/service_edge_router_policy" - "github.com/openziti/edge/rest_management_api_client/service_policy" "github.com/pkg/errors" "github.com/sirupsen/logrus" "time" @@ -82,23 +79,23 @@ func (self *untunnelHandler) Handle(params tunnel.UntunnelParams, principal *res return tunnel.NewUntunnelInternalServerError() } - if err := self.deleteServiceEdgeRouterPolicy(svcName, edge); err != nil { + if err := deleteServiceEdgeRouterPolicy(svcName, edge); err != nil { logrus.Error(err) return tunnel.NewUntunnelInternalServerError() } - if err := self.deleteServicePolicyDial(svcName, edge); err != nil { + if err := deleteServicePolicyDial(svcName, edge); err != nil { logrus.Error(err) return tunnel.NewUntunnelInternalServerError() } - if err := self.deleteServicePolicyBind(svcName, edge); err != nil { + if err := deleteServicePolicyBind(svcName, edge); err != nil { logrus.Error(err) return tunnel.NewUntunnelInternalServerError() } - if err := self.deleteConfig(svcName, edge); err != nil { + if err := deleteConfig(svcName, edge); err != nil { logrus.Error(err) return tunnel.NewTunnelInternalServerError() } - if err := self.deleteService(svcId, edge); err != nil { + if err := deleteService(svcId, edge); err != nil { logrus.Error(err) return tunnel.NewUntunnelInternalServerError() } @@ -138,120 +135,3 @@ func (_ *untunnelHandler) findServiceId(svcName string, edge *rest_management_ap } return "", errors.Errorf("service '%v' not found", svcName) } - -func (_ *untunnelHandler) deleteServiceEdgeRouterPolicy(svcName string, edge *rest_management_api_client.ZitiEdgeManagement) error { - filter := fmt.Sprintf("name=\"%v\"", svcName) - limit := int64(1) - offset := int64(0) - listReq := &service_edge_router_policy.ListServiceEdgeRouterPoliciesParams{ - Filter: &filter, - Limit: &limit, - Offset: &offset, - Context: context.Background(), - } - listReq.SetTimeout(30 * time.Second) - listResp, err := edge.ServiceEdgeRouterPolicy.ListServiceEdgeRouterPolicies(listReq, nil) - if err != nil { - return err - } - if len(listResp.Payload.Data) == 1 { - serpId := *(listResp.Payload.Data[0].ID) - req := &service_edge_router_policy.DeleteServiceEdgeRouterPolicyParams{ - ID: serpId, - Context: context.Background(), - } - req.SetTimeout(30 * time.Second) - _, err := edge.ServiceEdgeRouterPolicy.DeleteServiceEdgeRouterPolicy(req, nil) - if err != nil { - return err - } - logrus.Infof("deleted service edge router policy '%v'", serpId) - } else { - logrus.Infof("did not find a service edge router policy") - } - return nil -} - -func (self *untunnelHandler) deleteServicePolicyBind(svcName string, edge *rest_management_api_client.ZitiEdgeManagement) error { - return self.deleteServicePolicy(fmt.Sprintf("name=\"%v-backend\"", svcName), edge) -} - -func (self *untunnelHandler) deleteServicePolicyDial(svcName string, edge *rest_management_api_client.ZitiEdgeManagement) error { - return self.deleteServicePolicy(fmt.Sprintf("name=\"%v-dial\"", svcName), edge) -} - -func (_ *untunnelHandler) deleteServicePolicy(filter string, edge *rest_management_api_client.ZitiEdgeManagement) error { - limit := int64(1) - offset := int64(0) - listReq := &service_policy.ListServicePoliciesParams{ - Filter: &filter, - Limit: &limit, - Offset: &offset, - Context: context.Background(), - } - listReq.SetTimeout(30 * time.Second) - listResp, err := edge.ServicePolicy.ListServicePolicies(listReq, nil) - if err != nil { - return err - } - if len(listResp.Payload.Data) == 1 { - spId := *(listResp.Payload.Data[0].ID) - req := &service_policy.DeleteServicePolicyParams{ - ID: spId, - Context: context.Background(), - } - req.SetTimeout(30 * time.Second) - _, err := edge.ServicePolicy.DeleteServicePolicy(req, nil) - if err != nil { - return err - } - logrus.Infof("deleted service policy '%v'", spId) - } else { - logrus.Infof("did not find a service policy") - } - return nil -} - -func (_ *untunnelHandler) deleteConfig(svcName string, edge *rest_management_api_client.ZitiEdgeManagement) error { - filter := fmt.Sprintf("name=\"%v\"", svcName) - limit := int64(0) - offset := int64(0) - listReq := &config.ListConfigsParams{ - Filter: &filter, - Limit: &limit, - Offset: &offset, - Context: context.Background(), - } - listReq.SetTimeout(30 * time.Second) - listResp, err := edge.Config.ListConfigs(listReq, nil) - if err != nil { - return err - } - for _, cfg := range listResp.Payload.Data { - deleteReq := &config.DeleteConfigParams{ - ID: *cfg.ID, - Context: context.Background(), - } - deleteReq.SetTimeout(30 * time.Second) - _, err := edge.Config.DeleteConfig(deleteReq, nil) - if err != nil { - return err - } - logrus.Infof("deleted config '%v'", *cfg.ID) - } - return nil -} - -func (_ *untunnelHandler) deleteService(svcId string, edge *rest_management_api_client.ZitiEdgeManagement) error { - req := &service.DeleteServiceParams{ - ID: svcId, - Context: context.Background(), - } - req.SetTimeout(30 * time.Second) - _, err := edge.Service.DeleteService(req, nil) - if err != nil { - return err - } - logrus.Infof("deleted service '%v'", svcId) - return nil -}