mirror of
https://github.com/zrepl/zrepl.git
synced 2024-12-22 07:00:44 +01:00
replication: report a filesystem is active vs. blocked on something
- `BlockedOn` prop in JSON report - Bring back the `*` in front of the filesystem report as an activity indicator. fixes https://github.com/zrepl/zrepl/issues/505
This commit is contained in:
parent
1850a332ed
commit
ac147b5a6f
@ -216,7 +216,7 @@ func drawJob(t *stringbuilder.B, name string, v *job.Status, history *bytesProgr
|
||||
}
|
||||
}
|
||||
|
||||
func printFilesystemStatus(t *stringbuilder.B, rep *report.FilesystemReport, active bool, maxFS int) {
|
||||
func printFilesystemStatus(t *stringbuilder.B, rep *report.FilesystemReport, maxFS int) {
|
||||
|
||||
expected, replicated, containsInvalidSizeEstimates := rep.BytesSum()
|
||||
sizeEstimationImpreciseNotice := ""
|
||||
@ -235,7 +235,8 @@ func printFilesystemStatus(t *stringbuilder.B, rep *report.FilesystemReport, act
|
||||
)
|
||||
|
||||
activeIndicator := " "
|
||||
if active {
|
||||
if rep.BlockedOn == report.FsBlockedOnNothing &&
|
||||
(rep.State == report.FilesystemPlanning || rep.State == report.FilesystemStepping) {
|
||||
activeIndicator = "*"
|
||||
}
|
||||
t.AddIndent(1)
|
||||
@ -385,7 +386,7 @@ func renderReplicationReport(t *stringbuilder.B, rep *report.Report, history *by
|
||||
}
|
||||
}
|
||||
for _, fs := range latest.Filesystems {
|
||||
printFilesystemStatus(t, fs, false, maxFSLen) // FIXME bring 'active' flag back
|
||||
printFilesystemStatus(t, fs, maxFSLen)
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -151,6 +151,8 @@ type fs struct {
|
||||
|
||||
l *chainlock.L
|
||||
|
||||
blockedOn report.FsBlockedOn
|
||||
|
||||
// ordering relationship that must be maintained for initial replication
|
||||
initialRepOrd struct {
|
||||
parents, children []*fs
|
||||
@ -158,8 +160,9 @@ type fs struct {
|
||||
}
|
||||
|
||||
planning struct {
|
||||
done bool
|
||||
err *timedError
|
||||
waitingForStepQueue bool
|
||||
done bool
|
||||
err *timedError
|
||||
}
|
||||
|
||||
// valid iff planning.done && planning.err == nil
|
||||
@ -337,8 +340,9 @@ func (a *attempt) doGlobalPlanning(ctx context.Context, prev *attempt) map[*fs]*
|
||||
|
||||
for _, pfs := range pfss {
|
||||
fs := &fs{
|
||||
fs: pfs,
|
||||
l: a.l,
|
||||
fs: pfs,
|
||||
l: a.l,
|
||||
blockedOn: report.FsBlockedOnNothing,
|
||||
}
|
||||
fs.initialRepOrd.parentDidUpdate = make(chan struct{}, 1)
|
||||
a.fss = append(a.fss, fs)
|
||||
@ -448,6 +452,10 @@ func (a *attempt) doFilesystems(ctx context.Context, prevs map[*fs]*fs) {
|
||||
ctx, endTask := trace.WithTaskAndSpan(ctx, "repl-fs", f.report().Info.Name)
|
||||
defer endTask()
|
||||
f.do(ctx, stepQueue, prevs[f])
|
||||
f.l.HoldWhile(func() {
|
||||
// every return from f means it's unblocked...
|
||||
f.blockedOn = report.FsBlockedOnNothing
|
||||
})
|
||||
}(f)
|
||||
}
|
||||
a.l.DropWhile(func() {
|
||||
@ -486,11 +494,16 @@ func (f *fs) do(ctx context.Context, pq *stepQueue, prev *fs) {
|
||||
var psteps []Step
|
||||
var errTime time.Time
|
||||
var err error
|
||||
f.blockedOn = report.FsBlockedOnPlanningStepQueue
|
||||
f.l.DropWhile(func() {
|
||||
// TODO hacky
|
||||
// choose target time that is earlier than any snapshot, so fs planning is always prioritized
|
||||
targetDate := time.Unix(0, 0)
|
||||
defer pq.WaitReady(ctx, f, targetDate)()
|
||||
f.l.HoldWhile(func() {
|
||||
// transition before we call PlanFS
|
||||
f.blockedOn = report.FsBlockedOnNothing
|
||||
})
|
||||
psteps, err = f.fs.PlanFS(ctx) // no shadow
|
||||
errTime = time.Now() // no shadow
|
||||
})
|
||||
@ -545,6 +558,7 @@ func (f *fs) do(ctx context.Context, pq *stepQueue, prev *fs) {
|
||||
f.planning.done = true
|
||||
|
||||
// wait for parents' initial replication
|
||||
f.blockedOn = report.FsBlockedOnParentInitialRepl
|
||||
var parents []string
|
||||
for _, p := range f.initialRepOrd.parents {
|
||||
parents = append(parents, p.fs.ReportInfo().Name)
|
||||
@ -613,7 +627,6 @@ func (f *fs) do(ctx context.Context, pq *stepQueue, prev *fs) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
f.planned.stepErr = newTimedError(ctx.Err(), time.Now())
|
||||
return
|
||||
case <-f.initialRepOrd.parentDidUpdate:
|
||||
// loop
|
||||
}
|
||||
@ -631,7 +644,9 @@ func (f *fs) do(ctx context.Context, pq *stepQueue, prev *fs) {
|
||||
f.l.DropWhile(func() {
|
||||
// wait for parallel replication
|
||||
targetDate := s.step.TargetDate()
|
||||
f.l.HoldWhile(func() { f.blockedOn = report.FsBlockedOnReplStepQueue })
|
||||
defer pq.WaitReady(ctx, f, targetDate)()
|
||||
f.l.HoldWhile(func() { f.blockedOn = report.FsBlockedOnNothing })
|
||||
// do the step
|
||||
ctx, endSpan := trace.WithSpan(ctx, fmt.Sprintf("%#v", s.step.ReportInfo()))
|
||||
defer endSpan()
|
||||
@ -725,6 +740,7 @@ func (f *fs) report() *report.FilesystemReport {
|
||||
r := &report.FilesystemReport{
|
||||
Info: f.fs.ReportInfo(),
|
||||
State: state,
|
||||
BlockedOn: f.blockedOn,
|
||||
PlanError: f.planning.err.IntoReportError(),
|
||||
StepError: f.planned.stepErr.IntoReportError(),
|
||||
Steps: make([]*report.StepReport, len(f.planned.steps)),
|
||||
|
@ -62,11 +62,23 @@ const (
|
||||
FilesystemDone FilesystemState = "done"
|
||||
)
|
||||
|
||||
type FsBlockedOn string
|
||||
|
||||
const (
|
||||
FsBlockedOnNothing FsBlockedOn = "nothing"
|
||||
FsBlockedOnPlanningStepQueue FsBlockedOn = "plan-queue"
|
||||
FsBlockedOnParentInitialRepl FsBlockedOn = "parent-initial-repl"
|
||||
FsBlockedOnReplStepQueue FsBlockedOn = "repl-queue"
|
||||
)
|
||||
|
||||
type FilesystemReport struct {
|
||||
Info *FilesystemInfo
|
||||
|
||||
State FilesystemState
|
||||
|
||||
// Always valid.
|
||||
BlockedOn FsBlockedOn
|
||||
|
||||
// Valid in State = FilesystemPlanningErrored
|
||||
PlanError *TimedError
|
||||
// Valid in State = FilesystemSteppingErrored
|
||||
|
Loading…
Reference in New Issue
Block a user