WIP: generic activation through + new interval-based replication trigger

This commit is contained in:
Christian Schwarz 2023-12-22 14:00:20 +00:00
parent ebc46cf1c0
commit b0caa2d151
11 changed files with 197 additions and 43 deletions

View File

@ -121,6 +121,7 @@ type BandwidthLimit struct {
}
type Replication struct {
Triggers []*ReplicationTriggerEnum
Protection *ReplicationOptionsProtection `yaml:"protection,optional,fromdefaults"`
Concurrency *ReplicationOptionsConcurrency `yaml:"concurrency,optional,fromdefaults"`
}
@ -135,6 +136,27 @@ type ReplicationOptionsConcurrency struct {
SizeEstimates int `yaml:"size_estimates,optional,default=4"`
}
type ReplicationTriggerEnum struct {
Ret interface{}
}
func (t *ReplicationTriggerEnum) UnmarshalYAML(u func(interface{}, bool) error) (err error) {
t.Ret, err = enumUnmarshal(u, map[string]interface{}{
"manual": &ReplicationTriggerManual{},
"periodic": &ReplicationTriggerPeriodic{},
})
return
}
type ReplicationTriggerManual struct {
Type string `yaml:"type"`
}
type ReplicationTriggerPeriodic struct {
Type string `yaml:"type"`
Interval *PositiveDuration `yaml:"interval"`
}
type PropertyRecvOptions struct {
Inherit []zfsprop.Property `yaml:"inherit,optional"`
Override map[zfsprop.Property]string `yaml:"override,optional"`

View File

@ -15,6 +15,7 @@ import (
"github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/daemon/job/reset"
"github.com/zrepl/zrepl/daemon/job/trigger"
"github.com/zrepl/zrepl/daemon/job/wakeup"
"github.com/zrepl/zrepl/daemon/pruner"
"github.com/zrepl/zrepl/daemon/snapper"
@ -44,6 +45,8 @@ type ActiveSide struct {
promReplicationErrors prometheus.Gauge
promLastSuccessful prometheus.Gauge
triggers *trigger.Triggers
tasksMtx sync.Mutex
tasks activeSideTasks
}
@ -90,7 +93,7 @@ type activeMode interface {
SenderReceiver() (logic.Sender, logic.Receiver)
Type() Type
PlannerPolicy() logic.PlannerPolicy
RunPeriodic(ctx context.Context, wakeUpCommon chan<- struct{})
RunPeriodic(ctx context.Context, wakeReplication *trigger.Trigger)
SnapperReport() *snapper.Report
ResetConnectBackoff()
}
@ -132,8 +135,8 @@ func (m *modePush) Type() Type { return TypePush }
func (m *modePush) PlannerPolicy() logic.PlannerPolicy { return *m.plannerPolicy }
func (m *modePush) RunPeriodic(ctx context.Context, wakeUpCommon chan<- struct{}) {
m.snapper.Run(ctx, wakeUpCommon)
func (m *modePush) RunPeriodic(ctx context.Context, trigger *trigger.Trigger) {
m.snapper.Run(ctx, trigger)
}
func (m *modePush) SnapperReport() *snapper.Report {
@ -221,7 +224,7 @@ func (*modePull) Type() Type { return TypePull }
func (m *modePull) PlannerPolicy() logic.PlannerPolicy { return *m.plannerPolicy }
func (m *modePull) RunPeriodic(ctx context.Context, wakeUpCommon chan<- struct{}) {
func (m *modePull) RunPeriodic(ctx context.Context, wakeReplication *trigger.Trigger) {
if m.interval.Manual {
GetLogger(ctx).Info("manual pull configured, periodic pull disabled")
// "waiting for wakeups" is printed in common ActiveSide.do
@ -232,14 +235,7 @@ func (m *modePull) RunPeriodic(ctx context.Context, wakeUpCommon chan<- struct{}
for {
select {
case <-t.C:
select {
case wakeUpCommon <- struct{}{}:
default:
GetLogger(ctx).
WithField("pull_interval", m.interval).
Warn("pull job took longer than pull interval")
wakeUpCommon <- struct{}{} // block anyways, to queue up the wakeup
}
wakeReplication.Fire()
case <-ctx.Done():
return
}
@ -370,6 +366,11 @@ func activeSide(g *config.Global, in *config.ActiveJob, configJob interface{}, p
return nil, errors.Wrap(err, "cannot build replication driver config")
}
j.triggers, err = trigger.FromConfig(in.Replication.Triggers)
if err != nil {
return nil, errors.Wrap(err, "cannot build triggers")
}
return j, nil
}
@ -444,12 +445,18 @@ func (j *ActiveSide) Run(ctx context.Context) {
defer log.Info("job exiting")
periodicDone := make(chan struct{})
ctx, cancel := context.WithCancel(ctx)
defer cancel()
periodicCtx, endTask := trace.WithTask(ctx, "periodic")
periodicTrigger := trigger.New("periodic")
periodCtx, endTask := trace.WithTask(ctx, "periodic")
defer endTask()
go j.mode.RunPeriodic(periodCtx, periodicTrigger)
wakeupTrigger := wakeup.Trigger(ctx)
triggered, endTask := j.triggers.Spawn(ctx, []*trigger.Trigger{periodicTrigger, wakeupTrigger})
defer endTask()
go j.mode.RunPeriodic(periodicCtx, periodicDone)
invocationCount := 0
outer:
@ -459,10 +466,15 @@ outer:
case <-ctx.Done():
log.WithError(ctx.Err()).Info("context")
break outer
case <-wakeup.Wait(ctx):
j.mode.ResetConnectBackoff()
case <-periodicDone:
case trigger := <-triggered:
log :=
log.WithField("trigger_id", trigger.ID())
log.Info("triggered")
switch trigger {
case wakeupTrigger:
log.Info("trigger is wakeup command, resetting connection backoff")
j.mode.ResetConnectBackoff()
}
}
invocationCount++
invocationCtx, endSpan := trace.WithSpan(ctx, fmt.Sprintf("invocation-%d", invocationCount))

View File

@ -15,6 +15,7 @@ import (
"github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/daemon/filters"
"github.com/zrepl/zrepl/daemon/job/trigger"
"github.com/zrepl/zrepl/daemon/job/wakeup"
"github.com/zrepl/zrepl/daemon/pruner"
"github.com/zrepl/zrepl/daemon/snapper"
@ -104,12 +105,18 @@ func (j *SnapJob) Run(ctx context.Context) {
defer log.Info("job exiting")
periodicDone := make(chan struct{})
wakeupTrigger := wakeup.Trigger(ctx)
snapshottingTrigger := trigger.New("periodic")
ctx, cancel := context.WithCancel(ctx)
defer cancel()
periodicCtx, endTask := trace.WithTask(ctx, "snapshotting")
defer endTask()
go j.snapper.Run(periodicCtx, periodicDone)
go j.snapper.Run(periodicCtx, snapshottingTrigger)
triggers := trigger.Empty()
triggered, endTask := triggers.Spawn(ctx, []*trigger.Trigger{snapshottingTrigger, wakeupTrigger})
defer endTask()
invocationCount := 0
outer:
@ -119,9 +126,7 @@ outer:
case <-ctx.Done():
log.WithError(ctx.Err()).Info("context")
break outer
case <-wakeup.Wait(ctx):
case <-periodicDone:
case <-triggered:
}
invocationCount++

View File

@ -0,0 +1,12 @@
package trigger
import (
"context"
"github.com/zrepl/zrepl/logger"
)
func getLogger(ctx context.Context) logger.Logger {
panic("unimpl")
}

View File

@ -0,0 +1,103 @@
package trigger
import (
"context"
"github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/daemon/logging/trace"
)
type Triggers struct {
spawned bool
triggers []*Trigger
}
type Trigger struct {
id string
signal chan struct{}
}
func FromConfig([]*config.ReplicationTriggerEnum) (*Triggers, error) {
panic("unimpl")
return &Triggers{
spawned: false,
}, nil
}
func Empty() *Triggers {
panic("unimpl")
}
func New(id string) *Trigger {
return &Trigger{
id: id,
signal: make(chan struct{}),
}
}
func (t *Trigger) ID() string {
return t.id
}
func (t *Trigger) Fire() error {
panic("unimpl")
}
func (t *Triggers) Spawn(ctx context.Context, additionalTriggers []*Trigger) (chan *Trigger, trace.DoneFunc) {
if t.spawned {
panic("must only spawn once")
}
t.spawned = true
t.triggers = append(t.triggers, additionalTriggers...)
childCtx, endTask := trace.WithTask(ctx, "triggers")
sink := make(chan *Trigger)
go t.task(childCtx, sink)
return sink, endTask
}
type triggering struct {
trigger *Trigger
handled chan struct{}
}
func (t *Triggers) task(ctx context.Context, sink chan *Trigger) {
triggered := make(chan triggering, len(t.triggers))
for _, t := range t.triggers {
t := t
go func() {
for {
select {
case <-ctx.Done():
return
case <-t.signal:
handled := make(chan struct{})
select {
case triggered <- triggering{trigger: t, handled: handled}:
default:
panic("this funtion ensures that there's always room in the channel")
}
select {
case <-handled:
case <-ctx.Done():
return
}
}
}
}()
}
for {
select {
case <-ctx.Done():
return
case triggering := <-triggered:
select {
case sink <- triggering.trigger:
default:
getLogger(ctx).
WithField("trigger_id", triggering.trigger.id).
Warn("dropping triggering because job is busy")
}
close(triggering.handled)
}
}
}

View File

@ -3,6 +3,8 @@ package wakeup
import (
"context"
"errors"
"github.com/zrepl/zrepl/daemon/job/trigger"
)
type contextKey int
@ -17,6 +19,10 @@ func Wait(ctx context.Context) <-chan struct{} {
return wc
}
func Trigger(ctx context.Context) *trigger.Trigger {
panic("unimpl")
}
type Func func() error
var AlreadyWokenUp = errors.New("already woken up")

View File

@ -63,6 +63,7 @@ type Subsystem string
const (
SubsysMeta Subsystem = "meta"
SubsysJob Subsystem = "job"
SubsysTrigger Subsystem = "trigger"
SubsysReplication Subsystem = "repl"
SubsysEndpoint Subsystem = "endpoint"
SubsysPruning Subsystem = "pruning"

View File

@ -10,6 +10,7 @@ import (
"github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/daemon/hooks"
"github.com/zrepl/zrepl/daemon/job/trigger"
"github.com/zrepl/zrepl/util/suspendresumesafetimer"
"github.com/zrepl/zrepl/zfs"
)
@ -42,7 +43,7 @@ type Cron struct {
wakeupWhileRunningCount int
}
func (s *Cron) Run(ctx context.Context, snapshotsTaken chan<- struct{}) {
func (s *Cron) Run(ctx context.Context, snapshotsTaken *trigger.Trigger) {
for {
now := time.Now()
@ -75,13 +76,7 @@ func (s *Cron) Run(ctx context.Context, snapshotsTaken chan<- struct{}) {
s.running = false
s.mtx.Unlock()
select {
case snapshotsTaken <- struct{}{}:
default:
if snapshotsTaken != nil {
getLogger(ctx).Warn("callback channel is full, discarding snapshot update event")
}
}
snapshotsTaken.Fire()
}()
}

View File

@ -2,11 +2,13 @@ package snapper
import (
"context"
"github.com/zrepl/zrepl/daemon/job/trigger"
)
type manual struct{}
func (s *manual) Run(ctx context.Context, wakeUpCommon chan<- struct{}) {
func (s *manual) Run(ctx context.Context, snapshotsTaken *trigger.Trigger) {
// nothing to do
}

View File

@ -9,6 +9,7 @@ import (
"github.com/pkg/errors"
"github.com/zrepl/zrepl/daemon/job/trigger"
"github.com/zrepl/zrepl/daemon/logging/trace"
"github.com/zrepl/zrepl/config"
@ -51,7 +52,7 @@ type periodicArgs struct {
interval time.Duration
fsf zfs.DatasetFilter
planArgs planArgs
snapshotsTaken chan<- struct{}
snapshotsTaken *trigger.Trigger
dryRun bool
}
@ -103,7 +104,7 @@ func (s State) sf() state {
type updater func(u func(*Periodic)) State
type state func(a periodicArgs, u updater) state
func (s *Periodic) Run(ctx context.Context, snapshotsTaken chan<- struct{}) {
func (s *Periodic) Run(ctx context.Context, snapshotsTaken *trigger.Trigger) {
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
getLogger(ctx).Debug("start")
defer getLogger(ctx).Debug("stop")
@ -207,13 +208,7 @@ func periodicStateSnapshot(a periodicArgs, u updater) state {
ok := plan.execute(a.ctx, false)
select {
case a.snapshotsTaken <- struct{}{}:
default:
if a.snapshotsTaken != nil {
getLogger(a.ctx).Warn("callback channel is full, discarding snapshot update event")
}
}
a.snapshotsTaken.Fire()
return u(func(snapper *Periodic) {
if !ok {

View File

@ -5,6 +5,7 @@ import (
"fmt"
"github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/daemon/job/trigger"
"github.com/zrepl/zrepl/zfs"
)
@ -17,7 +18,7 @@ const (
)
type Snapper interface {
Run(ctx context.Context, snapshotsTaken chan<- struct{})
Run(ctx context.Context, snapshotsTaken *trigger.Trigger)
Report() Report
}