mirror of
https://github.com/openziti/zrok.git
synced 2024-11-25 01:23:49 +01:00
controller, store infrastructure; services -> shares (#144)
This commit is contained in:
parent
fcb0873784
commit
9572ed3c73
@ -45,12 +45,12 @@ func (h *accessHandler) Handle(params service.AccessParams, principal *rest_mode
|
||||
}
|
||||
|
||||
svcToken := params.Body.SvcToken
|
||||
ssvc, err := str.FindServiceWithToken(svcToken, tx)
|
||||
sshr, err := str.FindShareWithToken(svcToken, tx)
|
||||
if err != nil {
|
||||
logrus.Errorf("error finding service")
|
||||
return service.NewAccessNotFound()
|
||||
}
|
||||
if ssvc == nil {
|
||||
if sshr == nil {
|
||||
logrus.Errorf("unable to find service '%v' for user '%v'", svcToken, principal.Email)
|
||||
return service.NewAccessNotFound()
|
||||
}
|
||||
@ -76,7 +76,7 @@ func (h *accessHandler) Handle(params service.AccessParams, principal *rest_mode
|
||||
"zrokFrontendToken": feToken,
|
||||
"zrokServiceToken": svcToken,
|
||||
}
|
||||
if err := zrokEdgeSdk.CreateServicePolicyDial(envZId+"-"+ssvc.ZId+"-dial", ssvc.ZId, []string{envZId}, addlTags, edge); err != nil {
|
||||
if err := zrokEdgeSdk.CreateServicePolicyDial(envZId+"-"+sshr.ZId+"-dial", sshr.ZId, []string{envZId}, addlTags, edge); err != nil {
|
||||
logrus.Errorf("unable to create dial policy: %v", err)
|
||||
return service.NewAccessInternalServerError()
|
||||
}
|
||||
|
@ -80,12 +80,12 @@ func (h *disableHandler) removeServicesForEnvironment(envId int, tx *sqlx.Tx, ed
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
svcs, err := str.FindServicesForEnvironment(envId, tx)
|
||||
shrs, err := str.FindSharesForEnvironment(envId, tx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, svc := range svcs {
|
||||
svcToken := svc.Token
|
||||
for _, shr := range shrs {
|
||||
svcToken := shr.Token
|
||||
logrus.Infof("garbage collecting service '%v' for environment '%v'", svcToken, env.ZId)
|
||||
if err := zrokEdgeSdk.DeleteServiceEdgeRouterPolicy(env.ZId, svcToken, edge); err != nil {
|
||||
logrus.Error(err)
|
||||
@ -99,22 +99,22 @@ func (h *disableHandler) removeServicesForEnvironment(envId int, tx *sqlx.Tx, ed
|
||||
if err := zrokEdgeSdk.DeleteConfig(env.ZId, svcToken, edge); err != nil {
|
||||
logrus.Error(err)
|
||||
}
|
||||
if err := zrokEdgeSdk.DeleteService(env.ZId, svc.ZId, edge); err != nil {
|
||||
if err := zrokEdgeSdk.DeleteService(env.ZId, shr.ZId, edge); err != nil {
|
||||
logrus.Error(err)
|
||||
}
|
||||
logrus.Infof("removed service '%v' for environment '%v'", svc.Token, env.ZId)
|
||||
logrus.Infof("removed service '%v' for environment '%v'", shr.Token, env.ZId)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *disableHandler) removeEnvironment(envId int, tx *sqlx.Tx) error {
|
||||
svcs, err := str.FindServicesForEnvironment(envId, tx)
|
||||
shrs, err := str.FindSharesForEnvironment(envId, tx)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "error finding services for environment '%d'", envId)
|
||||
}
|
||||
for _, svc := range svcs {
|
||||
if err := str.DeleteService(svc.Id, tx); err != nil {
|
||||
return errors.Wrapf(err, "error deleting service '%d' for environment '%d'", svc.Id, envId)
|
||||
for _, shr := range shrs {
|
||||
if err := str.DeleteShare(shr.Id, tx); err != nil {
|
||||
return errors.Wrapf(err, "error deleting service '%d' for environment '%d'", shr.Id, envId)
|
||||
}
|
||||
}
|
||||
if err := str.DeleteEnvironment(envId, tx); err != nil {
|
||||
|
@ -35,44 +35,44 @@ func (h *environmentDetailHandler) Handle(params metadata.GetEnvironmentDetailPa
|
||||
ZID: senv.ZId,
|
||||
},
|
||||
}
|
||||
svcs, err := str.FindServicesForEnvironment(senv.Id, tx)
|
||||
shrs, err := str.FindSharesForEnvironment(senv.Id, tx)
|
||||
if err != nil {
|
||||
logrus.Errorf("error finding services for environment '%v': %v", senv.ZId, err)
|
||||
return metadata.NewGetEnvironmentDetailInternalServerError()
|
||||
}
|
||||
var sparkData map[string][]int64
|
||||
if cfg.Influx != nil {
|
||||
sparkData, err = sparkDataForServices(svcs)
|
||||
sparkData, err = sparkDataForServices(shrs)
|
||||
if err != nil {
|
||||
logrus.Errorf("error querying spark data for services: %v", err)
|
||||
return metadata.NewGetEnvironmentDetailInternalServerError()
|
||||
}
|
||||
}
|
||||
for _, svc := range svcs {
|
||||
for _, shr := range shrs {
|
||||
feEndpoint := ""
|
||||
if svc.FrontendEndpoint != nil {
|
||||
feEndpoint = *svc.FrontendEndpoint
|
||||
if shr.FrontendEndpoint != nil {
|
||||
feEndpoint = *shr.FrontendEndpoint
|
||||
}
|
||||
feSelection := ""
|
||||
if svc.FrontendSelection != nil {
|
||||
feSelection = *svc.FrontendSelection
|
||||
if shr.FrontendSelection != nil {
|
||||
feSelection = *shr.FrontendSelection
|
||||
}
|
||||
beProxyEndpoint := ""
|
||||
if svc.BackendProxyEndpoint != nil {
|
||||
beProxyEndpoint = *svc.BackendProxyEndpoint
|
||||
if shr.BackendProxyEndpoint != nil {
|
||||
beProxyEndpoint = *shr.BackendProxyEndpoint
|
||||
}
|
||||
es.Services = append(es.Services, &rest_model_zrok.Service{
|
||||
Token: svc.Token,
|
||||
ZID: svc.ZId,
|
||||
ShareMode: svc.ShareMode,
|
||||
BackendMode: svc.BackendMode,
|
||||
Token: shr.Token,
|
||||
ZID: shr.ZId,
|
||||
ShareMode: shr.ShareMode,
|
||||
BackendMode: shr.BackendMode,
|
||||
FrontendSelection: feSelection,
|
||||
FrontendEndpoint: feEndpoint,
|
||||
BackendProxyEndpoint: beProxyEndpoint,
|
||||
Reserved: svc.Reserved,
|
||||
Metrics: sparkData[svc.Token],
|
||||
CreatedAt: svc.CreatedAt.UnixMilli(),
|
||||
UpdatedAt: svc.UpdatedAt.UnixMilli(),
|
||||
Reserved: shr.Reserved,
|
||||
Metrics: sparkData[shr.Token],
|
||||
CreatedAt: shr.CreatedAt.UnixMilli(),
|
||||
UpdatedAt: shr.UpdatedAt.UnixMilli(),
|
||||
})
|
||||
}
|
||||
return metadata.NewGetEnvironmentDetailOK().WithPayload(es)
|
||||
|
@ -37,13 +37,13 @@ func GC(inCfg *Config) error {
|
||||
return err
|
||||
}
|
||||
defer func() { _ = tx.Rollback() }()
|
||||
dbSvcs, err := str.GetAllServices(tx)
|
||||
sshrs, err := str.GetAllShares(tx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
liveMap := make(map[string]struct{})
|
||||
for _, dbSvc := range dbSvcs {
|
||||
liveMap[dbSvc.Token] = struct{}{}
|
||||
for _, sshr := range sshrs {
|
||||
liveMap[sshr.Token] = struct{}{}
|
||||
}
|
||||
if err := gcServices(edge, liveMap); err != nil {
|
||||
return errors.Wrap(err, "error garbage collecting services")
|
||||
|
@ -18,7 +18,7 @@ func getServiceHandler(params service.GetServiceParams, principal *rest_model_zr
|
||||
}
|
||||
defer func() { _ = tx.Rollback() }()
|
||||
|
||||
ssvc, err := str.FindServiceWithToken(svcToken, tx)
|
||||
sshr, err := str.FindShareWithToken(svcToken, tx)
|
||||
if err != nil {
|
||||
logrus.Errorf("error finding service with token '%v': %v", svcToken, err)
|
||||
return service.NewGetServiceNotFound()
|
||||
@ -30,7 +30,7 @@ func getServiceHandler(params service.GetServiceParams, principal *rest_model_zr
|
||||
}
|
||||
envFound := false
|
||||
for _, senv := range senvs {
|
||||
if senv.Id == ssvc.EnvironmentId && senv.ZId == envZId {
|
||||
if senv.Id == sshr.EnvironmentId && senv.ZId == envZId {
|
||||
envFound = true
|
||||
break
|
||||
}
|
||||
@ -40,24 +40,24 @@ func getServiceHandler(params service.GetServiceParams, principal *rest_model_zr
|
||||
return service.NewGetServiceNotFound()
|
||||
}
|
||||
|
||||
svc := &rest_model_zrok.Service{
|
||||
Token: ssvc.Token,
|
||||
ZID: ssvc.ZId,
|
||||
ShareMode: ssvc.ShareMode,
|
||||
BackendMode: ssvc.BackendMode,
|
||||
Reserved: ssvc.Reserved,
|
||||
CreatedAt: ssvc.CreatedAt.UnixMilli(),
|
||||
UpdatedAt: ssvc.UpdatedAt.UnixMilli(),
|
||||
shr := &rest_model_zrok.Service{
|
||||
Token: sshr.Token,
|
||||
ZID: sshr.ZId,
|
||||
ShareMode: sshr.ShareMode,
|
||||
BackendMode: sshr.BackendMode,
|
||||
Reserved: sshr.Reserved,
|
||||
CreatedAt: sshr.CreatedAt.UnixMilli(),
|
||||
UpdatedAt: sshr.UpdatedAt.UnixMilli(),
|
||||
}
|
||||
if ssvc.FrontendSelection != nil {
|
||||
svc.FrontendSelection = *ssvc.FrontendSelection
|
||||
if sshr.FrontendSelection != nil {
|
||||
shr.FrontendSelection = *sshr.FrontendSelection
|
||||
}
|
||||
if ssvc.FrontendEndpoint != nil {
|
||||
svc.FrontendEndpoint = *ssvc.FrontendEndpoint
|
||||
if sshr.FrontendEndpoint != nil {
|
||||
shr.FrontendEndpoint = *sshr.FrontendEndpoint
|
||||
}
|
||||
if ssvc.BackendProxyEndpoint != nil {
|
||||
svc.BackendProxyEndpoint = *ssvc.BackendProxyEndpoint
|
||||
if sshr.BackendProxyEndpoint != nil {
|
||||
shr.BackendProxyEndpoint = *sshr.BackendProxyEndpoint
|
||||
}
|
||||
|
||||
return service.NewGetServiceOK().WithPayload(svc)
|
||||
return service.NewGetServiceOK().WithPayload(shr)
|
||||
}
|
||||
|
@ -119,7 +119,7 @@ func (ma *metricsAgent) processMetrics(m *model.Metrics) error {
|
||||
for k, v := range m.Sessions {
|
||||
if ma.writeApi != nil {
|
||||
pt := influxdb2.NewPoint("xfer",
|
||||
map[string]string{"namespace": m.Namespace, "service": k},
|
||||
map[string]string{"namespace": m.Namespace, "share": k},
|
||||
map[string]interface{}{"bytesRead": v.BytesRead, "bytesWritten": v.BytesWritten},
|
||||
time.UnixMilli(v.LastUpdate))
|
||||
pts = append(pts, pt)
|
||||
|
@ -21,7 +21,7 @@ func overviewHandler(_ metadata.OverviewParams, principal *rest_model_zrok.Princ
|
||||
}
|
||||
var out rest_model_zrok.EnvironmentServicesList
|
||||
for _, env := range envs {
|
||||
svcs, err := str.FindServicesForEnvironment(env.Id, tx)
|
||||
shrs, err := str.FindSharesForEnvironment(env.Id, tx)
|
||||
if err != nil {
|
||||
logrus.Errorf("error finding services for environment '%v': %v", env.ZId, err)
|
||||
return metadata.NewOverviewInternalServerError()
|
||||
@ -37,30 +37,30 @@ func overviewHandler(_ metadata.OverviewParams, principal *rest_model_zrok.Princ
|
||||
},
|
||||
}
|
||||
|
||||
for _, svc := range svcs {
|
||||
for _, shr := range shrs {
|
||||
feEndpoint := ""
|
||||
if svc.FrontendEndpoint != nil {
|
||||
feEndpoint = *svc.FrontendEndpoint
|
||||
if shr.FrontendEndpoint != nil {
|
||||
feEndpoint = *shr.FrontendEndpoint
|
||||
}
|
||||
feSelection := ""
|
||||
if svc.FrontendSelection != nil {
|
||||
feSelection = *svc.FrontendSelection
|
||||
if shr.FrontendSelection != nil {
|
||||
feSelection = *shr.FrontendSelection
|
||||
}
|
||||
beProxyEndpoint := ""
|
||||
if svc.BackendProxyEndpoint != nil {
|
||||
beProxyEndpoint = *svc.BackendProxyEndpoint
|
||||
if shr.BackendProxyEndpoint != nil {
|
||||
beProxyEndpoint = *shr.BackendProxyEndpoint
|
||||
}
|
||||
es.Services = append(es.Services, &rest_model_zrok.Service{
|
||||
Token: svc.Token,
|
||||
ZID: svc.ZId,
|
||||
ShareMode: svc.ShareMode,
|
||||
BackendMode: svc.BackendMode,
|
||||
Token: shr.Token,
|
||||
ZID: shr.ZId,
|
||||
ShareMode: shr.ShareMode,
|
||||
BackendMode: shr.BackendMode,
|
||||
FrontendSelection: feSelection,
|
||||
FrontendEndpoint: feEndpoint,
|
||||
BackendProxyEndpoint: beProxyEndpoint,
|
||||
Reserved: svc.Reserved,
|
||||
CreatedAt: svc.CreatedAt.UnixMilli(),
|
||||
UpdatedAt: svc.UpdatedAt.UnixMilli(),
|
||||
Reserved: shr.Reserved,
|
||||
CreatedAt: shr.CreatedAt.UnixMilli(),
|
||||
UpdatedAt: shr.UpdatedAt.UnixMilli(),
|
||||
})
|
||||
}
|
||||
out = append(out, es)
|
||||
|
@ -21,7 +21,7 @@ func (h *serviceDetailHandler) Handle(params metadata.GetServiceDetailParams, pr
|
||||
return metadata.NewGetServiceDetailInternalServerError()
|
||||
}
|
||||
defer func() { _ = tx.Rollback() }()
|
||||
svc, err := str.FindServiceWithToken(params.SvcToken, tx)
|
||||
shr, err := str.FindShareWithToken(params.SvcToken, tx)
|
||||
if err != nil {
|
||||
logrus.Errorf("error finding service '%v': %v", params.SvcToken, err)
|
||||
return metadata.NewGetServiceDetailNotFound()
|
||||
@ -33,7 +33,7 @@ func (h *serviceDetailHandler) Handle(params metadata.GetServiceDetailParams, pr
|
||||
}
|
||||
found := false
|
||||
for _, env := range envs {
|
||||
if svc.EnvironmentId == env.Id {
|
||||
if shr.EnvironmentId == env.Id {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
@ -44,35 +44,35 @@ func (h *serviceDetailHandler) Handle(params metadata.GetServiceDetailParams, pr
|
||||
}
|
||||
var sparkData map[string][]int64
|
||||
if cfg.Influx != nil {
|
||||
sparkData, err = sparkDataForServices([]*store.Service{svc})
|
||||
sparkData, err = sparkDataForServices([]*store.Share{shr})
|
||||
if err != nil {
|
||||
logrus.Errorf("error querying spark data for services: %v", err)
|
||||
return metadata.NewGetEnvironmentDetailInternalServerError()
|
||||
}
|
||||
}
|
||||
feEndpoint := ""
|
||||
if svc.FrontendEndpoint != nil {
|
||||
feEndpoint = *svc.FrontendEndpoint
|
||||
if shr.FrontendEndpoint != nil {
|
||||
feEndpoint = *shr.FrontendEndpoint
|
||||
}
|
||||
feSelection := ""
|
||||
if svc.FrontendSelection != nil {
|
||||
feSelection = *svc.FrontendSelection
|
||||
if shr.FrontendSelection != nil {
|
||||
feSelection = *shr.FrontendSelection
|
||||
}
|
||||
beProxyEndpoint := ""
|
||||
if svc.BackendProxyEndpoint != nil {
|
||||
beProxyEndpoint = *svc.BackendProxyEndpoint
|
||||
if shr.BackendProxyEndpoint != nil {
|
||||
beProxyEndpoint = *shr.BackendProxyEndpoint
|
||||
}
|
||||
return metadata.NewGetServiceDetailOK().WithPayload(&rest_model_zrok.Service{
|
||||
Token: svc.Token,
|
||||
ZID: svc.ZId,
|
||||
ShareMode: svc.ShareMode,
|
||||
BackendMode: svc.BackendMode,
|
||||
Token: shr.Token,
|
||||
ZID: shr.ZId,
|
||||
ShareMode: shr.ShareMode,
|
||||
BackendMode: shr.BackendMode,
|
||||
FrontendSelection: feSelection,
|
||||
FrontendEndpoint: feEndpoint,
|
||||
BackendProxyEndpoint: beProxyEndpoint,
|
||||
Reserved: svc.Reserved,
|
||||
Metrics: sparkData[svc.Token],
|
||||
CreatedAt: svc.CreatedAt.UnixMilli(),
|
||||
UpdatedAt: svc.UpdatedAt.UnixMilli(),
|
||||
Reserved: shr.Reserved,
|
||||
Metrics: sparkData[shr.Token],
|
||||
CreatedAt: shr.CreatedAt.UnixMilli(),
|
||||
UpdatedAt: shr.UpdatedAt.UnixMilli(),
|
||||
})
|
||||
}
|
||||
|
@ -100,7 +100,7 @@ func (h *shareHandler) Handle(params service.ShareParams, principal *rest_model_
|
||||
logrus.Debugf("allocated service '%v'", svcToken)
|
||||
|
||||
reserved := params.Body.Reserved
|
||||
ssvc := &store.Service{
|
||||
sshr := &store.Share{
|
||||
ZId: svcZId,
|
||||
Token: svcToken,
|
||||
ShareMode: params.Body.ShareMode,
|
||||
@ -109,12 +109,12 @@ func (h *shareHandler) Handle(params service.ShareParams, principal *rest_model_
|
||||
Reserved: reserved,
|
||||
}
|
||||
if len(frontendEndpoints) > 0 {
|
||||
ssvc.FrontendEndpoint = &frontendEndpoints[0]
|
||||
} else if ssvc.ShareMode == "private" {
|
||||
ssvc.FrontendEndpoint = &ssvc.ShareMode
|
||||
sshr.FrontendEndpoint = &frontendEndpoints[0]
|
||||
} else if sshr.ShareMode == "private" {
|
||||
sshr.FrontendEndpoint = &sshr.ShareMode
|
||||
}
|
||||
|
||||
sid, err := str.CreateService(envId, ssvc, tx)
|
||||
sid, err := str.CreateShare(envId, sshr, tx)
|
||||
if err != nil {
|
||||
logrus.Errorf("error creating service record: %v", err)
|
||||
return service.NewShareInternalServerError()
|
||||
|
@ -6,13 +6,13 @@ import (
|
||||
"github.com/openziti-test-kitchen/zrok/controller/store"
|
||||
)
|
||||
|
||||
func sparkDataForServices(svcs []*store.Service) (map[string][]int64, error) {
|
||||
func sparkDataForServices(shrs []*store.Share) (map[string][]int64, error) {
|
||||
out := make(map[string][]int64)
|
||||
|
||||
if len(svcs) > 0 {
|
||||
if len(shrs) > 0 {
|
||||
qapi := idb.QueryAPI(cfg.Influx.Org)
|
||||
|
||||
result, err := qapi.Query(context.Background(), sparkFluxQuery(svcs))
|
||||
result, err := qapi.Query(context.Background(), sparkFluxQuery(shrs))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -27,22 +27,22 @@ func sparkDataForServices(svcs []*store.Service) (map[string][]int64, error) {
|
||||
if writeRate != nil {
|
||||
combinedRate += writeRate.(int64)
|
||||
}
|
||||
svcToken := result.Record().ValueByKey("service").(string)
|
||||
svcMetrics := out[svcToken]
|
||||
svcMetrics = append(svcMetrics, combinedRate)
|
||||
out[svcToken] = svcMetrics
|
||||
shrToken := result.Record().ValueByKey("share").(string)
|
||||
shrMetrics := out[shrToken]
|
||||
shrMetrics = append(shrMetrics, combinedRate)
|
||||
out[shrToken] = shrMetrics
|
||||
}
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func sparkFluxQuery(svcs []*store.Service) string {
|
||||
func sparkFluxQuery(shrs []*store.Share) string {
|
||||
svcFilter := "|> filter(fn: (r) =>"
|
||||
for i, svc := range svcs {
|
||||
for i, shr := range shrs {
|
||||
if i > 0 {
|
||||
svcFilter += " or"
|
||||
}
|
||||
svcFilter += fmt.Sprintf(" r[\"service\"] == \"%v\"", svc.Token)
|
||||
svcFilter += fmt.Sprintf(" r[\"share\"] == \"%v\"", shr.Token)
|
||||
}
|
||||
svcFilter += ")"
|
||||
query := "read = from(bucket: \"zrok\")" +
|
||||
|
@ -1,104 +0,0 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"github.com/jmoiron/sqlx"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type Service struct {
|
||||
Model
|
||||
EnvironmentId int
|
||||
ZId string
|
||||
Token string
|
||||
ShareMode string
|
||||
BackendMode string
|
||||
FrontendSelection *string
|
||||
FrontendEndpoint *string
|
||||
BackendProxyEndpoint *string
|
||||
Reserved bool
|
||||
}
|
||||
|
||||
func (self *Store) CreateService(envId int, svc *Service, tx *sqlx.Tx) (int, error) {
|
||||
stmt, err := tx.Prepare("insert into services (environment_id, z_id, token, share_mode, backend_mode, frontend_selection, frontend_endpoint, backend_proxy_endpoint, reserved) values ($1, $2, $3, $4, $5, $6, $7, $8, $9) returning id")
|
||||
if err != nil {
|
||||
return 0, errors.Wrap(err, "error preparing services insert statement")
|
||||
}
|
||||
var id int
|
||||
if err := stmt.QueryRow(envId, svc.ZId, svc.Token, svc.ShareMode, svc.BackendMode, svc.FrontendSelection, svc.FrontendEndpoint, svc.BackendProxyEndpoint, svc.Reserved).Scan(&id); err != nil {
|
||||
return 0, errors.Wrap(err, "error executing services insert statement")
|
||||
}
|
||||
return id, nil
|
||||
}
|
||||
|
||||
func (self *Store) GetService(id int, tx *sqlx.Tx) (*Service, error) {
|
||||
svc := &Service{}
|
||||
if err := tx.QueryRowx("select * from services where id = $1", id).StructScan(svc); err != nil {
|
||||
return nil, errors.Wrap(err, "error selecting service by id")
|
||||
}
|
||||
return svc, nil
|
||||
}
|
||||
|
||||
func (self *Store) GetAllServices(tx *sqlx.Tx) ([]*Service, error) {
|
||||
rows, err := tx.Queryx("select * from services order by id")
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "error selecting all services")
|
||||
}
|
||||
var svcs []*Service
|
||||
for rows.Next() {
|
||||
svc := &Service{}
|
||||
if err := rows.StructScan(svc); err != nil {
|
||||
return nil, errors.Wrap(err, "error scanning service")
|
||||
}
|
||||
svcs = append(svcs, svc)
|
||||
}
|
||||
return svcs, nil
|
||||
}
|
||||
|
||||
func (self *Store) FindServiceWithToken(svcToken string, tx *sqlx.Tx) (*Service, error) {
|
||||
svc := &Service{}
|
||||
if err := tx.QueryRowx("select * from services where token = $1", svcToken).StructScan(svc); err != nil {
|
||||
return nil, errors.Wrap(err, "error selecting service by name")
|
||||
}
|
||||
return svc, nil
|
||||
}
|
||||
|
||||
func (self *Store) FindServicesForEnvironment(envId int, tx *sqlx.Tx) ([]*Service, error) {
|
||||
rows, err := tx.Queryx("select services.* from services where environment_id = $1", envId)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "error selecting services by environment id")
|
||||
}
|
||||
var svcs []*Service
|
||||
for rows.Next() {
|
||||
svc := &Service{}
|
||||
if err := rows.StructScan(svc); err != nil {
|
||||
return nil, errors.Wrap(err, "error scanning service")
|
||||
}
|
||||
svcs = append(svcs, svc)
|
||||
}
|
||||
return svcs, nil
|
||||
}
|
||||
|
||||
func (self *Store) UpdateService(svc *Service, tx *sqlx.Tx) error {
|
||||
sql := "update services set z_id = $1, token = $2, share_mode = $3, backend_mode = $4, frontend_selection = $5, frontend_endpoint = $6, backend_proxy_endpoint = $7, reserved = $8, updated_at = current_timestamp where id = $9"
|
||||
stmt, err := tx.Prepare(sql)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error preparing services update statement")
|
||||
}
|
||||
_, err = stmt.Exec(svc.ZId, svc.Token, svc.ShareMode, svc.BackendMode, svc.FrontendSelection, svc.FrontendEndpoint, svc.BackendProxyEndpoint, svc.Reserved, svc.Id)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error executing services update statement")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (self *Store) DeleteService(id int, tx *sqlx.Tx) error {
|
||||
stmt, err := tx.Prepare("delete from services where id = $1")
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error preparing services delete statement")
|
||||
}
|
||||
_, err = stmt.Exec(id)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error executing services delete statement")
|
||||
}
|
||||
return nil
|
||||
}
|
104
controller/store/share.go
Normal file
104
controller/store/share.go
Normal file
@ -0,0 +1,104 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"github.com/jmoiron/sqlx"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type Share struct {
|
||||
Model
|
||||
EnvironmentId int
|
||||
ZId string
|
||||
Token string
|
||||
ShareMode string
|
||||
BackendMode string
|
||||
FrontendSelection *string
|
||||
FrontendEndpoint *string
|
||||
BackendProxyEndpoint *string
|
||||
Reserved bool
|
||||
}
|
||||
|
||||
func (self *Store) CreateShare(envId int, shr *Share, tx *sqlx.Tx) (int, error) {
|
||||
stmt, err := tx.Prepare("insert into shares (environment_id, z_id, token, share_mode, backend_mode, frontend_selection, frontend_endpoint, backend_proxy_endpoint, reserved) values ($1, $2, $3, $4, $5, $6, $7, $8, $9) returning id")
|
||||
if err != nil {
|
||||
return 0, errors.Wrap(err, "error preparing shares insert statement")
|
||||
}
|
||||
var id int
|
||||
if err := stmt.QueryRow(envId, shr.ZId, shr.Token, shr.ShareMode, shr.BackendMode, shr.FrontendSelection, shr.FrontendEndpoint, shr.BackendProxyEndpoint, shr.Reserved).Scan(&id); err != nil {
|
||||
return 0, errors.Wrap(err, "error executing shares insert statement")
|
||||
}
|
||||
return id, nil
|
||||
}
|
||||
|
||||
func (self *Store) GetShare(id int, tx *sqlx.Tx) (*Share, error) {
|
||||
shr := &Share{}
|
||||
if err := tx.QueryRowx("select * from shares where id = $1", id).StructScan(shr); err != nil {
|
||||
return nil, errors.Wrap(err, "error selecting share by id")
|
||||
}
|
||||
return shr, nil
|
||||
}
|
||||
|
||||
func (self *Store) GetAllShares(tx *sqlx.Tx) ([]*Share, error) {
|
||||
rows, err := tx.Queryx("select * from shares order by id")
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "error selecting all shares")
|
||||
}
|
||||
var shrs []*Share
|
||||
for rows.Next() {
|
||||
shr := &Share{}
|
||||
if err := rows.StructScan(shr); err != nil {
|
||||
return nil, errors.Wrap(err, "error scanning share")
|
||||
}
|
||||
shrs = append(shrs, shr)
|
||||
}
|
||||
return shrs, nil
|
||||
}
|
||||
|
||||
func (self *Store) FindShareWithToken(shrToken string, tx *sqlx.Tx) (*Share, error) {
|
||||
shr := &Share{}
|
||||
if err := tx.QueryRowx("select * from shares where token = $1", shrToken).StructScan(shr); err != nil {
|
||||
return nil, errors.Wrap(err, "error selecting share by token")
|
||||
}
|
||||
return shr, nil
|
||||
}
|
||||
|
||||
func (self *Store) FindSharesForEnvironment(envId int, tx *sqlx.Tx) ([]*Share, error) {
|
||||
rows, err := tx.Queryx("select shares.* from shares where environment_id = $1", envId)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "error selecting shares by environment id")
|
||||
}
|
||||
var shrs []*Share
|
||||
for rows.Next() {
|
||||
shr := &Share{}
|
||||
if err := rows.StructScan(shr); err != nil {
|
||||
return nil, errors.Wrap(err, "error scanning share")
|
||||
}
|
||||
shrs = append(shrs, shr)
|
||||
}
|
||||
return shrs, nil
|
||||
}
|
||||
|
||||
func (self *Store) UpdateShare(shr *Share, tx *sqlx.Tx) error {
|
||||
sql := "update shares set z_id = $1, token = $2, share_mode = $3, backend_mode = $4, frontend_selection = $5, frontend_endpoint = $6, backend_proxy_endpoint = $7, reserved = $8, updated_at = current_timestamp where id = $9"
|
||||
stmt, err := tx.Prepare(sql)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error preparing shares update statement")
|
||||
}
|
||||
_, err = stmt.Exec(shr.ZId, shr.Token, shr.ShareMode, shr.BackendMode, shr.FrontendSelection, shr.FrontendEndpoint, shr.BackendProxyEndpoint, shr.Reserved, shr.Id)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error executing shares update statement")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (self *Store) DeleteShare(id int, tx *sqlx.Tx) error {
|
||||
stmt, err := tx.Prepare("delete from shares where id = $1")
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error preparing shares delete statement")
|
||||
}
|
||||
_, err = stmt.Exec(id)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error executing shares delete statement")
|
||||
}
|
||||
return nil
|
||||
}
|
@ -58,15 +58,15 @@ func (h *unshareHandler) Handle(params service.UnshareParams, principal *rest_mo
|
||||
return service.NewUnshareNotFound()
|
||||
}
|
||||
|
||||
var ssvc *store.Service
|
||||
if svcs, err := str.FindServicesForEnvironment(senv.Id, tx); err == nil {
|
||||
var sshr *store.Share
|
||||
if svcs, err := str.FindSharesForEnvironment(senv.Id, tx); err == nil {
|
||||
for _, svc := range svcs {
|
||||
if svc.ZId == svcZId {
|
||||
ssvc = svc
|
||||
sshr = svc
|
||||
break
|
||||
}
|
||||
}
|
||||
if ssvc == nil {
|
||||
if sshr == nil {
|
||||
err := errors.Errorf("service with id '%v' not found for '%v'", svcZId, principal.Email)
|
||||
logrus.Error(err)
|
||||
return service.NewUnshareNotFound()
|
||||
@ -76,16 +76,16 @@ func (h *unshareHandler) Handle(params service.UnshareParams, principal *rest_mo
|
||||
return service.NewUnshareInternalServerError()
|
||||
}
|
||||
|
||||
if ssvc.Reserved == params.Body.Reserved {
|
||||
if sshr.Reserved == params.Body.Reserved {
|
||||
// single tag-based service deallocator; should work regardless of sharing mode
|
||||
if err := h.deallocateResources(senv, svcToken, svcZId, edge); err != nil {
|
||||
logrus.Errorf("error unsharing ziti resources for '%v': %v", ssvc, err)
|
||||
logrus.Errorf("error unsharing ziti resources for '%v': %v", sshr, err)
|
||||
return service.NewUnshareInternalServerError()
|
||||
}
|
||||
|
||||
logrus.Debugf("deallocated service '%v'", svcToken)
|
||||
|
||||
if err := str.DeleteService(ssvc.Id, tx); err != nil {
|
||||
if err := str.DeleteShare(sshr.Id, tx); err != nil {
|
||||
logrus.Errorf("error deactivating service '%v': %v", svcZId, err)
|
||||
return service.NewUnshareInternalServerError()
|
||||
}
|
||||
|
@ -24,7 +24,7 @@ func (h *updateShareHandler) Handle(params service.UpdateShareParams, principal
|
||||
}
|
||||
defer func() { _ = tx.Rollback() }()
|
||||
|
||||
ssvc, err := str.FindServiceWithToken(svcToken, tx)
|
||||
sshr, err := str.FindShareWithToken(svcToken, tx)
|
||||
if err != nil {
|
||||
logrus.Errorf("service '%v' not found: %v", svcToken, err)
|
||||
return service.NewUpdateShareNotFound()
|
||||
@ -38,7 +38,7 @@ func (h *updateShareHandler) Handle(params service.UpdateShareParams, principal
|
||||
|
||||
envFound := false
|
||||
for _, senv := range senvs {
|
||||
if senv.Id == ssvc.Id {
|
||||
if senv.Id == sshr.Id {
|
||||
envFound = true
|
||||
break
|
||||
}
|
||||
@ -48,8 +48,8 @@ func (h *updateShareHandler) Handle(params service.UpdateShareParams, principal
|
||||
return service.NewUpdateShareNotFound()
|
||||
}
|
||||
|
||||
ssvc.BackendProxyEndpoint = &backendProxyEndpoint
|
||||
if err := str.UpdateService(ssvc, tx); err != nil {
|
||||
sshr.BackendProxyEndpoint = &backendProxyEndpoint
|
||||
if err := str.UpdateShare(sshr, tx); err != nil {
|
||||
logrus.Errorf("error updating service '%v': %v", svcToken, err)
|
||||
return service.NewUpdateShareInternalServerError()
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user