generalize trigger kinds

This commit is contained in:
Christian Schwarz 2023-12-22 14:40:53 +00:00
parent b0caa2d151
commit c8afaf83ab
13 changed files with 168 additions and 57 deletions

View File

@ -157,6 +157,11 @@ type ReplicationTriggerPeriodic struct {
Interval *PositiveDuration `yaml:"interval"`
}
type ReplicationTriggerCron struct {
Type string `yaml:"type"`
Cron CronSpec `yaml:"cron"`
}
type PropertyRecvOptions struct {
Inherit []zfsprop.Property `yaml:"inherit,optional"`
Override map[zfsprop.Property]string `yaml:"override,optional"`
@ -179,7 +184,6 @@ func (j *PushJob) GetSendOptions() *SendOptions { return j.Send }
type PullJob struct {
ActiveJob `yaml:",inline"`
RootFS string `yaml:"root_fs"`
Interval PositiveDurationOrManual `yaml:"interval"`
Recv *RecvOptions `yaml:"recv,fromdefaults,optional"`
}

View File

@ -93,7 +93,7 @@ type activeMode interface {
SenderReceiver() (logic.Sender, logic.Receiver)
Type() Type
PlannerPolicy() logic.PlannerPolicy
RunPeriodic(ctx context.Context, wakeReplication *trigger.Trigger)
RunPeriodic(ctx context.Context, wakeReplication trigger.Trigger
SnapperReport() *snapper.Report
ResetConnectBackoff()
}
@ -135,7 +135,7 @@ func (m *modePush) Type() Type { return TypePush }
func (m *modePush) PlannerPolicy() logic.PlannerPolicy { return *m.plannerPolicy }
func (m *modePush) RunPeriodic(ctx context.Context, trigger *trigger.Trigger) {
func (m *modePush) RunPeriodic(ctx context.Context, trigger trigger.Trigger {
m.snapper.Run(ctx, trigger)
}
@ -224,7 +224,7 @@ func (*modePull) Type() Type { return TypePull }
func (m *modePull) PlannerPolicy() logic.PlannerPolicy { return *m.plannerPolicy }
func (m *modePull) RunPeriodic(ctx context.Context, wakeReplication *trigger.Trigger) {
func (m *modePull) RunPeriodic(ctx context.Context, wakeReplication trigger.Trigger {
if m.interval.Manual {
GetLogger(ctx).Info("manual pull configured, periodic pull disabled")
// "waiting for wakeups" is printed in common ActiveSide.do

View File

@ -115,7 +115,7 @@ func (j *SnapJob) Run(ctx context.Context) {
go j.snapper.Run(periodicCtx, snapshottingTrigger)
triggers := trigger.Empty()
triggered, endTask := triggers.Spawn(ctx, []*trigger.Trigger{snapshottingTrigger, wakeupTrigger})
triggered, endTask := triggers.Spawn(ctx, []trigger.TriggersnapshottingTrigger, wakeupTrigger})
defer endTask()
invocationCount := 0

View File

@ -0,0 +1,23 @@
package trigger
import (
"context"
"github.com/robfig/cron/v3"
)
type Cron struct {
spec cron.Schedule
}
var _ Trigger = &Cron{}
func NewCron(spec cron.Schedule) *Cron {
return &Cron{spec: spec}
}
func (t *Cron) ID() string { return "cron" }
func (t *Cron) run(ctx context.Context, signal chan<- struct{}) {
panic("unimpl: extract from cron snapper")
}

View File

@ -0,0 +1,30 @@
package trigger
import (
"fmt"
"github.com/zrepl/zrepl/config"
)
func FromConfig(in []*config.ReplicationTriggerEnum) (*Triggers, error) {
triggers := make([]Trigger, len(in))
for i, e := range in {
var t Trigger = nil
switch te := e.Ret.(type) {
case *config.ReplicationTriggerManual:
// not a trigger
t = NewManual("manual")
case *config.ReplicationTriggerPeriodic:
t = NewPeriodic(te.Interval.Duration())
case *config.ReplicationTriggerCron:
t = NewCron(te.Cron.Schedule)
default:
return nil, fmt.Errorf("unknown trigger type %T", te)
}
triggers[i] = t
}
return &Triggers{
spawned: false,
triggers: triggers,
}, nil
}

View File

@ -0,0 +1,33 @@
package trigger
import "context"
type Manual struct {
id string
signal chan<- struct{}
}
var _ Trigger = &Manual{}
func NewManual(id string) *Manual {
return &Manual{
id: id,
signal: nil,
}
}
func (t *Manual) ID() string {
return t.id
}
func (t *Manual) run(ctx context.Context, signal chan<- struct{}) {
if t.signal != nil {
panic("run must only be called once")
}
t.signal = signal
}
// Panics if called before the trigger has been spanwed as part of a `Triggers`.
func (t *Manual) Fire() {
t.signal <- struct{}{}
}

View File

@ -0,0 +1,33 @@
package trigger
import (
"context"
"time"
)
type Periodic struct {
interval time.Duration
}
var _ Trigger = &Periodic{}
func NewPeriodic(interval time.Duration) *Periodic {
return &Periodic{
interval: interval,
}
}
func (p *Periodic) ID() string { return "periodic" }
func (p *Periodic) run(ctx context.Context, signal chan<- struct{}) {
t := time.NewTicker(p.interval)
defer t.Stop()
for {
select {
case <-t.C:
signal <- struct{}{}
case <-ctx.Done():
return
}
}
}

View File

@ -2,74 +2,58 @@ package trigger
import (
"context"
"github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/daemon/logging/trace"
)
type Triggers struct {
spawned bool
triggers []*Trigger
triggers []Trigger
}
type Trigger struct {
id string
signal chan struct{}
}
func FromConfig([]*config.ReplicationTriggerEnum) (*Triggers, error) {
panic("unimpl")
return &Triggers{
spawned: false,
}, nil
type Trigger interface {
ID() string
run(context.Context, chan<- struct{})
}
func Empty() *Triggers {
panic("unimpl")
}
func New(id string) *Trigger {
return &Trigger{
id: id,
signal: make(chan struct{}),
return &Triggers{
spawned: false,
triggers: nil,
}
}
func (t *Trigger) ID() string {
return t.id
}
func (t *Trigger) Fire() error {
panic("unimpl")
}
func (t *Triggers) Spawn(ctx context.Context, additionalTriggers []*Trigger) (chan *Trigger, trace.DoneFunc) {
func (t *Triggers) Spawn(ctx context.Context, additionalTriggers []Trigger) (chan Trigger, trace.DoneFunc) {
if t.spawned {
panic("must only spawn once")
}
t.spawned = true
t.triggers = append(t.triggers, additionalTriggers...)
childCtx, endTask := trace.WithTask(ctx, "triggers")
sink := make(chan *Trigger)
go t.task(childCtx, sink)
sink := make(chan Trigger)
endTask := t.spawn(ctx, sink)
return sink, endTask
}
type triggering struct {
trigger *Trigger
trigger Trigger
handled chan struct{}
}
func (t *Triggers) task(ctx context.Context, sink chan *Trigger) {
func (t *Triggers) spawn(ctx context.Context, sink chan Trigger) trace.DoneFunc {
ctx, endTask := trace.WithTask(ctx, "triggers")
ctx, add, wait := trace.WithTaskGroup(ctx, "trigger-tasks")
triggered := make(chan triggering, len(t.triggers))
for _, t := range t.triggers {
t := t
signal := make(chan struct{})
go add(func(ctx context.Context) {
t.run(ctx, signal)
})
go func() {
for {
select {
case <-ctx.Done():
return
case <-t.signal:
case <-signal:
handled := make(chan struct{})
select {
case triggered <- triggering{trigger: t, handled: handled}:
@ -85,19 +69,23 @@ func (t *Triggers) task(ctx context.Context, sink chan *Trigger) {
}
}()
}
for {
select {
case <-ctx.Done():
return
case triggering := <-triggered:
go func() {
defer wait()
for {
select {
case sink <- triggering.trigger:
default:
getLogger(ctx).
WithField("trigger_id", triggering.trigger.id).
Warn("dropping triggering because job is busy")
case <-ctx.Done():
return
case triggering := <-triggered:
select {
case sink <- triggering.trigger:
default:
getLogger(ctx).
WithField("trigger_id", triggering.trigger.ID()).
Warn("dropping triggering because job is busy")
}
close(triggering.handled)
}
close(triggering.handled)
}
}
}()
return endTask
}

View File

@ -19,7 +19,7 @@ func Wait(ctx context.Context) <-chan struct{} {
return wc
}
func Trigger(ctx context.Context) *trigger.Trigger {
func Trigger(ctx context.Context) trigger.Trigger {
panic("unimpl")
}

View File

@ -43,7 +43,7 @@ type Cron struct {
wakeupWhileRunningCount int
}
func (s *Cron) Run(ctx context.Context, snapshotsTaken *trigger.Trigger) {
func (s *Cron) Run(ctx context.Context, snapshotsTaken trigger.Trigger) {
for {
now := time.Now()

View File

@ -8,7 +8,7 @@ import (
type manual struct{}
func (s *manual) Run(ctx context.Context, snapshotsTaken *trigger.Trigger) {
func (s *manual) Run(ctx context.Context, snapshotsTaken trigger.Trigger {
// nothing to do
}

View File

@ -104,7 +104,7 @@ func (s State) sf() state {
type updater func(u func(*Periodic)) State
type state func(a periodicArgs, u updater) state
func (s *Periodic) Run(ctx context.Context, snapshotsTaken *trigger.Trigger) {
func (s *Periodic) Run(ctx context.Context, snapshotsTaken trigger.Trigger {
defer trace.WithSpanFromStackUpdateCtx(&ctx)()
getLogger(ctx).Debug("start")
defer getLogger(ctx).Debug("stop")

View File

@ -18,7 +18,7 @@ const (
)
type Snapper interface {
Run(ctx context.Context, snapshotsTaken *trigger.Trigger)
Run(ctx context.Context, snapshotsTaken trigger.Trigger
Report() Report
}