diff --git a/config/config.go b/config/config.go index 50e4a9e..6e7263e 100644 --- a/config/config.go +++ b/config/config.go @@ -157,6 +157,11 @@ type ReplicationTriggerPeriodic struct { Interval *PositiveDuration `yaml:"interval"` } +type ReplicationTriggerCron struct { + Type string `yaml:"type"` + Cron CronSpec `yaml:"cron"` +} + type PropertyRecvOptions struct { Inherit []zfsprop.Property `yaml:"inherit,optional"` Override map[zfsprop.Property]string `yaml:"override,optional"` @@ -179,7 +184,6 @@ func (j *PushJob) GetSendOptions() *SendOptions { return j.Send } type PullJob struct { ActiveJob `yaml:",inline"` RootFS string `yaml:"root_fs"` - Interval PositiveDurationOrManual `yaml:"interval"` Recv *RecvOptions `yaml:"recv,fromdefaults,optional"` } diff --git a/daemon/job/active.go b/daemon/job/active.go index 0cd36bd..b516156 100644 --- a/daemon/job/active.go +++ b/daemon/job/active.go @@ -93,7 +93,7 @@ type activeMode interface { SenderReceiver() (logic.Sender, logic.Receiver) Type() Type PlannerPolicy() logic.PlannerPolicy - RunPeriodic(ctx context.Context, wakeReplication *trigger.Trigger) + RunPeriodic(ctx context.Context, wakeReplication trigger.Trigger SnapperReport() *snapper.Report ResetConnectBackoff() } @@ -135,7 +135,7 @@ func (m *modePush) Type() Type { return TypePush } func (m *modePush) PlannerPolicy() logic.PlannerPolicy { return *m.plannerPolicy } -func (m *modePush) RunPeriodic(ctx context.Context, trigger *trigger.Trigger) { +func (m *modePush) RunPeriodic(ctx context.Context, trigger trigger.Trigger { m.snapper.Run(ctx, trigger) } @@ -224,7 +224,7 @@ func (*modePull) Type() Type { return TypePull } func (m *modePull) PlannerPolicy() logic.PlannerPolicy { return *m.plannerPolicy } -func (m *modePull) RunPeriodic(ctx context.Context, wakeReplication *trigger.Trigger) { +func (m *modePull) RunPeriodic(ctx context.Context, wakeReplication trigger.Trigger { if m.interval.Manual { GetLogger(ctx).Info("manual pull configured, periodic pull disabled") // "waiting for wakeups" is printed in common ActiveSide.do diff --git a/daemon/job/snapjob.go b/daemon/job/snapjob.go index 0eae3d2..e591f18 100644 --- a/daemon/job/snapjob.go +++ b/daemon/job/snapjob.go @@ -115,7 +115,7 @@ func (j *SnapJob) Run(ctx context.Context) { go j.snapper.Run(periodicCtx, snapshottingTrigger) triggers := trigger.Empty() - triggered, endTask := triggers.Spawn(ctx, []*trigger.Trigger{snapshottingTrigger, wakeupTrigger}) + triggered, endTask := triggers.Spawn(ctx, []trigger.TriggersnapshottingTrigger, wakeupTrigger}) defer endTask() invocationCount := 0 diff --git a/daemon/job/trigger/cron.go b/daemon/job/trigger/cron.go new file mode 100644 index 0000000..ad41d50 --- /dev/null +++ b/daemon/job/trigger/cron.go @@ -0,0 +1,23 @@ +package trigger + +import ( + "context" + + "github.com/robfig/cron/v3" +) + +type Cron struct { + spec cron.Schedule +} + +var _ Trigger = &Cron{} + +func NewCron(spec cron.Schedule) *Cron { + return &Cron{spec: spec} +} + +func (t *Cron) ID() string { return "cron" } + +func (t *Cron) run(ctx context.Context, signal chan<- struct{}) { + panic("unimpl: extract from cron snapper") +} diff --git a/daemon/job/trigger/from_config.go b/daemon/job/trigger/from_config.go new file mode 100644 index 0000000..956bd2c --- /dev/null +++ b/daemon/job/trigger/from_config.go @@ -0,0 +1,30 @@ +package trigger + +import ( + "fmt" + + "github.com/zrepl/zrepl/config" +) + +func FromConfig(in []*config.ReplicationTriggerEnum) (*Triggers, error) { + triggers := make([]Trigger, len(in)) + for i, e := range in { + var t Trigger = nil + switch te := e.Ret.(type) { + case *config.ReplicationTriggerManual: + // not a trigger + t = NewManual("manual") + case *config.ReplicationTriggerPeriodic: + t = NewPeriodic(te.Interval.Duration()) + case *config.ReplicationTriggerCron: + t = NewCron(te.Cron.Schedule) + default: + return nil, fmt.Errorf("unknown trigger type %T", te) + } + triggers[i] = t + } + return &Triggers{ + spawned: false, + triggers: triggers, + }, nil +} diff --git a/daemon/job/trigger/manual.go b/daemon/job/trigger/manual.go new file mode 100644 index 0000000..44fe5e5 --- /dev/null +++ b/daemon/job/trigger/manual.go @@ -0,0 +1,33 @@ +package trigger + +import "context" + +type Manual struct { + id string + signal chan<- struct{} +} + +var _ Trigger = &Manual{} + +func NewManual(id string) *Manual { + return &Manual{ + id: id, + signal: nil, + } +} + +func (t *Manual) ID() string { + return t.id +} + +func (t *Manual) run(ctx context.Context, signal chan<- struct{}) { + if t.signal != nil { + panic("run must only be called once") + } + t.signal = signal +} + +// Panics if called before the trigger has been spanwed as part of a `Triggers`. +func (t *Manual) Fire() { + t.signal <- struct{}{} +} diff --git a/daemon/job/trigger/periodic.go b/daemon/job/trigger/periodic.go new file mode 100644 index 0000000..dac4724 --- /dev/null +++ b/daemon/job/trigger/periodic.go @@ -0,0 +1,33 @@ +package trigger + +import ( + "context" + "time" +) + +type Periodic struct { + interval time.Duration +} + +var _ Trigger = &Periodic{} + +func NewPeriodic(interval time.Duration) *Periodic { + return &Periodic{ + interval: interval, + } +} + +func (p *Periodic) ID() string { return "periodic" } + +func (p *Periodic) run(ctx context.Context, signal chan<- struct{}) { + t := time.NewTicker(p.interval) + defer t.Stop() + for { + select { + case <-t.C: + signal <- struct{}{} + case <-ctx.Done(): + return + } + } +} diff --git a/daemon/job/trigger/trigger.go b/daemon/job/trigger/trigger.go index 14ca204..3993423 100644 --- a/daemon/job/trigger/trigger.go +++ b/daemon/job/trigger/trigger.go @@ -2,74 +2,58 @@ package trigger import ( "context" - - "github.com/zrepl/zrepl/config" "github.com/zrepl/zrepl/daemon/logging/trace" ) type Triggers struct { spawned bool - triggers []*Trigger + triggers []Trigger } -type Trigger struct { - id string - signal chan struct{} -} - -func FromConfig([]*config.ReplicationTriggerEnum) (*Triggers, error) { - panic("unimpl") - return &Triggers{ - spawned: false, - }, nil +type Trigger interface { + ID() string + run(context.Context, chan<- struct{}) } func Empty() *Triggers { - panic("unimpl") -} - -func New(id string) *Trigger { - return &Trigger{ - id: id, - signal: make(chan struct{}), + return &Triggers{ + spawned: false, + triggers: nil, } } -func (t *Trigger) ID() string { - return t.id -} - -func (t *Trigger) Fire() error { - panic("unimpl") -} - -func (t *Triggers) Spawn(ctx context.Context, additionalTriggers []*Trigger) (chan *Trigger, trace.DoneFunc) { +func (t *Triggers) Spawn(ctx context.Context, additionalTriggers []Trigger) (chan Trigger, trace.DoneFunc) { if t.spawned { panic("must only spawn once") } t.spawned = true t.triggers = append(t.triggers, additionalTriggers...) - childCtx, endTask := trace.WithTask(ctx, "triggers") - sink := make(chan *Trigger) - go t.task(childCtx, sink) + sink := make(chan Trigger) + endTask := t.spawn(ctx, sink) return sink, endTask } type triggering struct { - trigger *Trigger + trigger Trigger handled chan struct{} } -func (t *Triggers) task(ctx context.Context, sink chan *Trigger) { +func (t *Triggers) spawn(ctx context.Context, sink chan Trigger) trace.DoneFunc { + ctx, endTask := trace.WithTask(ctx, "triggers") + ctx, add, wait := trace.WithTaskGroup(ctx, "trigger-tasks") triggered := make(chan triggering, len(t.triggers)) for _, t := range t.triggers { t := t + signal := make(chan struct{}) + go add(func(ctx context.Context) { + t.run(ctx, signal) + }) go func() { for { select { case <-ctx.Done(): return - case <-t.signal: + case <-signal: handled := make(chan struct{}) select { case triggered <- triggering{trigger: t, handled: handled}: @@ -85,19 +69,23 @@ func (t *Triggers) task(ctx context.Context, sink chan *Trigger) { } }() } - for { - select { - case <-ctx.Done(): - return - case triggering := <-triggered: + go func() { + defer wait() + for { select { - case sink <- triggering.trigger: - default: - getLogger(ctx). - WithField("trigger_id", triggering.trigger.id). - Warn("dropping triggering because job is busy") + case <-ctx.Done(): + return + case triggering := <-triggered: + select { + case sink <- triggering.trigger: + default: + getLogger(ctx). + WithField("trigger_id", triggering.trigger.ID()). + Warn("dropping triggering because job is busy") + } + close(triggering.handled) } - close(triggering.handled) } - } + }() + return endTask } diff --git a/daemon/job/wakeup/wakeup.go b/daemon/job/wakeup/wakeup.go index 4d8525d..3b34d13 100644 --- a/daemon/job/wakeup/wakeup.go +++ b/daemon/job/wakeup/wakeup.go @@ -19,7 +19,7 @@ func Wait(ctx context.Context) <-chan struct{} { return wc } -func Trigger(ctx context.Context) *trigger.Trigger { +func Trigger(ctx context.Context) trigger.Trigger { panic("unimpl") } diff --git a/daemon/snapper/cron.go b/daemon/snapper/cron.go index fd5433e..33f521c 100644 --- a/daemon/snapper/cron.go +++ b/daemon/snapper/cron.go @@ -43,7 +43,7 @@ type Cron struct { wakeupWhileRunningCount int } -func (s *Cron) Run(ctx context.Context, snapshotsTaken *trigger.Trigger) { +func (s *Cron) Run(ctx context.Context, snapshotsTaken trigger.Trigger) { for { now := time.Now() diff --git a/daemon/snapper/manual.go b/daemon/snapper/manual.go index cbd2f03..9f17a24 100644 --- a/daemon/snapper/manual.go +++ b/daemon/snapper/manual.go @@ -8,7 +8,7 @@ import ( type manual struct{} -func (s *manual) Run(ctx context.Context, snapshotsTaken *trigger.Trigger) { +func (s *manual) Run(ctx context.Context, snapshotsTaken trigger.Trigger { // nothing to do } diff --git a/daemon/snapper/periodic.go b/daemon/snapper/periodic.go index 568fb9f..ddf302c 100644 --- a/daemon/snapper/periodic.go +++ b/daemon/snapper/periodic.go @@ -104,7 +104,7 @@ func (s State) sf() state { type updater func(u func(*Periodic)) State type state func(a periodicArgs, u updater) state -func (s *Periodic) Run(ctx context.Context, snapshotsTaken *trigger.Trigger) { +func (s *Periodic) Run(ctx context.Context, snapshotsTaken trigger.Trigger { defer trace.WithSpanFromStackUpdateCtx(&ctx)() getLogger(ctx).Debug("start") defer getLogger(ctx).Debug("stop") diff --git a/daemon/snapper/snapper.go b/daemon/snapper/snapper.go index ba1b545..b24894c 100644 --- a/daemon/snapper/snapper.go +++ b/daemon/snapper/snapper.go @@ -18,7 +18,7 @@ const ( ) type Snapper interface { - Run(ctx context.Context, snapshotsTaken *trigger.Trigger) + Run(ctx context.Context, snapshotsTaken trigger.Trigger Report() Report }