zrepl/daemon/job/active.go
Christian Schwarz aeb87ffbcf daemon/job/active: push mode: awful hack for handling of concurrent snapshots + stale remote operation
We have the problem that there are legitimate use cases where a user
does not want their machine to fill up with snapshots, even if it means
unreplicated must be destroyed.  This can be expressed by *not*
configuring the keep rule `not_replicated` for the snapshot-creating
side.  This commit only addresses push mode because we don't support
pruning in the source job. We adivse users in the docs to use push mode
if they have above use case, so this is fine - at least for 0.1.

Ideally, the replication.Replication would communicate to the pruner
which snapshots are currently part of the replication plan, and then
we'd need some conflict resolution to determine whether it's more
important to destroy the snapshots or to replicate them (destroy should
win?).

However, we don't have the infrastructure for this yet (we could parse
the replication report, but that's just ugly).  And we want to get 0.1
out, so showtime for a dirty hack:

We start replication, and ideally, replication and pruning is done
before new snapshot have been taken. If so: great. However, what happens
if snapshots have been taken and we are not done with replication and /
or pruning?

* If replicatoin is making progress according to its state, let it run.
This covers the *important* situation of initial replication, where
replication may easily take longer than a single snapshotting interval.

* If replication is in an error state, cancel it through context
cancellation.
    * As with the pruner below, the main problem here is that
      status output will only contain "context cancelled" after the
      cancellation, instead of showing the reason why it was cancelled.
      Not nice, but oh well, the logs provide enough detail for this
      niche situation...

* If we are past replication, we're still pruning

* Leave the local (send-side) pruning alone.
Again, we only implement this hack for push, so we know sender is
local, and it will only fail hard, not retry.

* If the remote (receiver-side) pruner is in an error state, cancel it
through context cancellation.

* Otherwise, let it run.

Note that every time we "let it run", we tolerate a temporary excess of
snapshots, but given sufficiently aggressive timeouts and the assumption
that the snapshot interval is much greater than the timeouts, this is
not a significant problem in practice.
2018-10-12 22:47:06 +02:00

406 lines
11 KiB
Go

package job
import (
"context"
"github.com/pkg/errors"
"github.com/problame/go-streamrpc"
"github.com/prometheus/client_golang/prometheus"
"github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/daemon/job/reset"
"github.com/zrepl/zrepl/daemon/job/wakeup"
"github.com/zrepl/zrepl/daemon/transport/connecter"
"github.com/zrepl/zrepl/daemon/filters"
"github.com/zrepl/zrepl/daemon/pruner"
"github.com/zrepl/zrepl/endpoint"
"github.com/zrepl/zrepl/replication"
"github.com/zrepl/zrepl/zfs"
"sync"
"github.com/zrepl/zrepl/daemon/logging"
"github.com/zrepl/zrepl/daemon/snapper"
"time"
)
type ActiveSide struct {
mode activeMode
name string
clientFactory *connecter.ClientFactory
prunerFactory *pruner.PrunerFactory
promRepStateSecs *prometheus.HistogramVec // labels: state
promPruneSecs *prometheus.HistogramVec // labels: prune_side
promBytesReplicated *prometheus.CounterVec // labels: filesystem
tasksMtx sync.Mutex
tasks activeSideTasks
}
type activeSideTasks struct {
replication *replication.Replication
prunerSender, prunerReceiver *pruner.Pruner
}
func (a *ActiveSide) updateTasks(u func(*activeSideTasks)) activeSideTasks {
a.tasksMtx.Lock()
defer a.tasksMtx.Unlock()
var copy activeSideTasks
copy = a.tasks
if u == nil {
return copy
}
u(&copy)
a.tasks = copy
return copy
}
type activeMode interface {
SenderReceiver(client *streamrpc.Client) (replication.Sender, replication.Receiver, error)
Type() Type
RunPeriodic(ctx context.Context, wakeUpCommon chan<- struct{})
}
type modePush struct {
fsfilter endpoint.FSFilter
snapper *snapper.PeriodicOrManual
}
func (m *modePush) SenderReceiver(client *streamrpc.Client) (replication.Sender, replication.Receiver, error) {
sender := endpoint.NewSender(m.fsfilter)
receiver := endpoint.NewRemote(client)
return sender, receiver, nil
}
func (m *modePush) Type() Type { return TypePush }
func (m *modePush) RunPeriodic(ctx context.Context, wakeUpCommon chan <- struct{}) {
m.snapper.Run(ctx, wakeUpCommon)
}
func modePushFromConfig(g *config.Global, in *config.PushJob) (*modePush, error) {
m := &modePush{}
fsf, err := filters.DatasetMapFilterFromConfig(in.Filesystems)
if err != nil {
return nil, errors.Wrap(err, "cannnot build filesystem filter")
}
m.fsfilter = fsf
if m.snapper, err = snapper.FromConfig(g, fsf, in.Snapshotting); err != nil {
return nil, errors.Wrap(err, "cannot build snapper")
}
return m, nil
}
type modePull struct {
rootFS *zfs.DatasetPath
interval time.Duration
}
func (m *modePull) SenderReceiver(client *streamrpc.Client) (replication.Sender, replication.Receiver, error) {
sender := endpoint.NewRemote(client)
receiver, err := endpoint.NewReceiver(m.rootFS)
return sender, receiver, err
}
func (*modePull) Type() Type { return TypePull }
func (m *modePull) RunPeriodic(ctx context.Context, wakeUpCommon chan<- struct{}) {
t := time.NewTicker(m.interval)
defer t.Stop()
for {
select {
case <-t.C:
select {
case wakeUpCommon <- struct{}{}:
default:
GetLogger(ctx).
WithField("pull_interval", m.interval).
Warn("pull job took longer than pull interval")
wakeUpCommon <- struct{}{} // block anyways, to queue up the wakeup
}
case <-ctx.Done():
return
}
}
}
func modePullFromConfig(g *config.Global, in *config.PullJob) (m *modePull, err error) {
m = &modePull{}
if in.Interval <= 0 {
return nil, errors.New("interval must be positive")
}
m.interval = in.Interval
m.rootFS, err = zfs.NewDatasetPath(in.RootFS)
if err != nil {
return nil, errors.New("RootFS is not a valid zfs filesystem path")
}
if m.rootFS.Length() <= 0 {
return nil, errors.New("RootFS must not be empty") // duplicates error check of receiver
}
return m, nil
}
func activeSide(g *config.Global, in *config.ActiveJob, mode activeMode) (j *ActiveSide, err error) {
j = &ActiveSide{mode: mode}
j.name = in.Name
j.promRepStateSecs = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "zrepl",
Subsystem: "replication",
Name: "state_time",
Help: "seconds spent during replication",
ConstLabels: prometheus.Labels{"zrepl_job":j.name},
}, []string{"state"})
j.promBytesReplicated = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "zrepl",
Subsystem: "replication",
Name: "bytes_replicated",
Help: "number of bytes replicated from sender to receiver per filesystem",
ConstLabels: prometheus.Labels{"zrepl_job":j.name},
}, []string{"filesystem"})
j.clientFactory, err = connecter.FromConfig(g, in.Connect)
if err != nil {
return nil, errors.Wrap(err, "cannot build client")
}
j.promPruneSecs = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "zrepl",
Subsystem: "pruning",
Name: "time",
Help: "seconds spent in pruner",
ConstLabels: prometheus.Labels{"zrepl_job":j.name},
}, []string{"prune_side"})
j.prunerFactory, err = pruner.NewPrunerFactory(in.Pruning, j.promPruneSecs)
if err != nil {
return nil, err
}
return j, nil
}
func (j *ActiveSide) RegisterMetrics(registerer prometheus.Registerer) {
registerer.MustRegister(j.promRepStateSecs)
registerer.MustRegister(j.promPruneSecs)
registerer.MustRegister(j.promBytesReplicated)
}
func (j *ActiveSide) Name() string { return j.name }
type ActiveSideStatus struct {
Replication *replication.Report
PruningSender, PruningReceiver *pruner.Report
}
func (j *ActiveSide) Status() *Status {
tasks := j.updateTasks(nil)
s := &ActiveSideStatus{}
t := j.mode.Type()
if tasks.replication != nil {
s.Replication = tasks.replication.Report()
}
if tasks.prunerSender != nil {
s.PruningSender = tasks.prunerSender.Report()
}
if tasks.prunerReceiver != nil {
s.PruningReceiver = tasks.prunerReceiver.Report()
}
return &Status{Type: t, JobSpecific: s}
}
func (j *ActiveSide) Run(ctx context.Context) {
log := GetLogger(ctx)
ctx = logging.WithSubsystemLoggers(ctx, log)
defer log.Info("job exiting")
periodicDone := make(chan struct{})
ctx, cancel := context.WithCancel(ctx)
defer cancel()
go j.mode.RunPeriodic(ctx, periodicDone)
invocationCount := 0
outer:
for {
log.Info("wait for wakeups")
select {
case <-ctx.Done():
log.WithError(ctx.Err()).Info("context")
break outer
case <-wakeup.Wait(ctx):
case <-periodicDone:
}
invocationCount++
invLog := log.WithField("invocation", invocationCount)
j.do(WithLogger(ctx, invLog), periodicDone)
}
}
func (j *ActiveSide) do(ctx context.Context, periodicWakeup <-chan struct{}) {
log := GetLogger(ctx)
ctx = logging.WithSubsystemLoggers(ctx, log)
// allow cancellation of an invocation (this function)
ctx, cancelThisRun := context.WithCancel(ctx)
defer cancelThisRun()
runDone := make(chan struct{})
defer close(runDone)
go func() {
select {
case <-runDone:
case <-reset.Wait(ctx):
log.Info("reset received, cancelling current invocation")
cancelThisRun()
case <-ctx.Done():
}
}()
client, err := j.clientFactory.NewClient()
if err != nil {
log.WithError(err).Error("factory cannot instantiate streamrpc client")
}
defer client.Close(ctx)
sender, receiver, err := j.mode.SenderReceiver(client)
tasks := j.updateTasks(func(tasks *activeSideTasks) {
// reset it
*tasks = activeSideTasks{}
tasks.replication = replication.NewReplication(j.promRepStateSecs, j.promBytesReplicated)
})
log.Info("start replication")
replicationDone := make(chan struct{})
replicationCtx, replicationCancel := context.WithCancel(ctx)
defer replicationCancel()
go func() {
tasks.replication.Drive(replicationCtx, sender, receiver)
close(replicationDone)
}()
outer:
for {
select {
case <-replicationDone:
// fine!
break outer
case <-periodicWakeup:
// Replication took longer than the periodic interval.
//
// For pull jobs, this isn't so bad because nothing changes on the active side
// if replication doesn't go forward.
//
// For push jobs, this means snapshots were taken.
// We need to invoke the pruner now, because otherwise an infinitely stuck replication
// will cause this side to fill up with snapshots.
//
// However, there are cases where replication progresses and just takes longer,
// and we don't want these situations be interrupted by a prune, which will require
// re-planning and starting over (think of initial replication as an example).
//
// Therefore, we prohibit pruning of snapshots that are part of the current replication plan.
// If there is no such plan, we kill the replication.
if j.mode.Type() == TypePush {
rep := tasks.replication.Report()
state, err := replication.StateString(rep.Status)
if err != nil {
panic(err)
}
switch state {
case replication.Planning:
fallthrough
case replication.PlanningError:
fallthrough
case replication.WorkingWait:
log.WithField("repl_state", state.String()).
Info("cancelling replication after new snapshots invalidated its current state")
replicationCancel()
log.Info("waiting for replication to stop")
<-replicationDone // no need to wait for ctx.Done, replication is already bound to global cancel
break outer
default:
log.WithField("repl_state", state.String()).
Warn("new snapshots while replication is running and making progress")
}
}
}
}
var pruningWg sync.WaitGroup
log.Info("start pruning sender")
pruningWg.Add(1)
go func() {
defer pruningWg.Done()
tasks := j.updateTasks(func(tasks *activeSideTasks) {
tasks.prunerSender = j.prunerFactory.BuildSenderPruner(ctx, sender, sender)
})
tasks.prunerSender.Prune()
// FIXME no need to do the cancellation dance with sender, we know it's local for push
// FIXME and we don't worry about pull ATM
}()
log.Info("start pruning receiver")
pruningWg.Add(1)
go func() {
defer pruningWg.Done()
receiverPrunerCtx, receiverPrunerCancel := context.WithCancel(ctx)
defer receiverPrunerCancel()
tasks := j.updateTasks(func(tasks *activeSideTasks) {
tasks.prunerReceiver = j.prunerFactory.BuildReceiverPruner(receiverPrunerCtx, receiver, sender)
})
receiverPrunerDone := make(chan struct{})
go func() {
defer close(receiverPrunerDone)
tasks.prunerReceiver.Prune()
}()
outer:
for {
select {
case <-receiverPrunerDone:
// fine!
break outer
case <-periodicWakeup:
// see comments for similar apporach with replication above
if j.mode.Type() == TypePush {
rep := tasks.prunerReceiver.Report()
state, err := pruner.StateString(rep.State)
if err != nil {
panic(err)
}
switch state {
case pruner.PlanWait:
fallthrough
case pruner.ExecWait:
log.WithField("pruner_state", state.String()).
Info("cancelling failing prune on receiver because new snapshots were taken on sender")
receiverPrunerCancel()
log.Info("waiting for receiver pruner to stop")
<-receiverPrunerDone
break outer
default:
log.WithField("pruner_state", state.String()).
Warn("new snapshots while prune on receiver is still running")
}
}
}
}
}()
pruningWg.Wait() // if pruners handle ctx cancellation correctly, we don't need to wait for it here
}