diff --git a/cmd/adaptors.go b/cmd/adaptors.go index 6423518..86e7c37 100644 --- a/cmd/adaptors.go +++ b/cmd/adaptors.go @@ -2,12 +2,15 @@ package cmd import ( "context" + "fmt" "io" "net" + "strings" "time" "github.com/problame/go-streamrpc" "github.com/zrepl/zrepl/util" + "github.com/zrepl/zrepl/logger" ) type logNetConnConnecter struct { @@ -74,3 +77,29 @@ func (netsshConnToNetConnAdatper) SetDeadline(t time.Time) error { return nil } func (netsshConnToNetConnAdatper) SetReadDeadline(t time.Time) error { return nil } func (netsshConnToNetConnAdatper) SetWriteDeadline(t time.Time) error { return nil } + +type streamrpcLogAdaptor = twoClassLogAdaptor +type replicationLogAdaptor = twoClassLogAdaptor + +type twoClassLogAdaptor struct { + logger.Logger +} + +var _ streamrpc.Logger = twoClassLogAdaptor{} + +func (a twoClassLogAdaptor) Errorf(fmtStr string, args... interface{}) { + const errorSuffix = ": %s" + if len(args) == 1 { + if err, ok := args[0].(error); ok && strings.HasSuffix(fmtStr, errorSuffix) { + msg := strings.TrimSuffix(fmtStr, errorSuffix) + a.WithError(err).Error(msg) + return + } + } + a.Logger.Error(fmt.Sprintf(fmtStr, args...)) +} + +func (a twoClassLogAdaptor) Infof(fmtStr string, args... interface{}) { + a.Logger.Info(fmt.Sprintf(fmtStr, args...)) +} + diff --git a/cmd/config_job_control.go b/cmd/config_job_control.go index 3baf715..1e7baae 100644 --- a/cmd/config_job_control.go +++ b/cmd/config_job_control.go @@ -8,6 +8,7 @@ import ( "io" "net" "net/http" + "github.com/zrepl/zrepl/logger" ) type ControlJob struct { @@ -126,7 +127,7 @@ func (j jsonResponder) ServeHTTP(w http.ResponseWriter, r *http.Request) { } type requestLogger struct { - log Logger + log logger.Logger handler http.Handler handlerFunc http.HandlerFunc } diff --git a/cmd/config_job_local.go b/cmd/config_job_local.go index 88316d4..d66bdce 100644 --- a/cmd/config_job_local.go +++ b/cmd/config_job_local.go @@ -146,12 +146,7 @@ outer: j.mainTask.Log().Debug("replicating from lhs to rhs") j.mainTask.Enter("replicate") - replication.Replicate( - ctx, - replication.NewEndpointPairPull(sender, receiver), - replication.NewIncrementalPathReplicator(), - nil, //FIXME - ) + replication.Replicate(ctx, replication.NewEndpointPairPull(sender, receiver)) j.mainTask.Finish() diff --git a/cmd/config_job_pull.go b/cmd/config_job_pull.go index ab65c5b..85685e4 100644 --- a/cmd/config_job_pull.go +++ b/cmd/config_job_pull.go @@ -1,15 +1,18 @@ package cmd import ( + "os" + "os/signal" + "syscall" "time" "context" "fmt" + "github.com/mitchellh/mapstructure" "github.com/pkg/errors" - "github.com/zrepl/zrepl/util" - "github.com/zrepl/zrepl/cmd/replication" "github.com/problame/go-streamrpc" + "github.com/zrepl/zrepl/cmd/replication" ) type PullJob struct { @@ -91,7 +94,7 @@ func parsePullJob(c JobParsingContext, name string, i map[string]interface{}) (j if j.Debug.Conn.ReadDump != "" || j.Debug.Conn.WriteDump != "" { logConnecter := logNetConnConnecter{ Connecter: j.Connect, - ReadDump: j.Debug.Conn.ReadDump, + ReadDump: j.Debug.Conn.ReadDump, WriteDump: j.Debug.Conn.WriteDump, } j.Connect = logConnecter @@ -113,6 +116,9 @@ func (j *PullJob) JobStart(ctx context.Context) { j.task = NewTask("main", j, log) // j.task is idle here idle here + usr1 := make(chan os.Signal) + signal.Notify(usr1, syscall.SIGUSR1) + defer signal.Stop(usr1) ticker := time.NewTicker(j.Interval) for { @@ -130,23 +136,25 @@ func (j *PullJob) JobStart(ctx context.Context) { j.task.Log().WithError(ctx.Err()).Info("context") return case <-ticker.C: + case <-usr1: } } } var STREAMRPC_CONFIG = &streamrpc.ConnConfig{ // FIXME oversight and configurability - RxHeaderMaxLen: 4096, - RxStructuredMaxLen: 4096 * 4096, + RxHeaderMaxLen: 4096, + RxStructuredMaxLen: 4096 * 4096, RxStreamMaxChunkSize: 4096 * 4096, - TxChunkSize: 4096 * 4096, + TxChunkSize: 4096 * 4096, RxTimeout: streamrpc.Timeout{ - Progress: 10*time.Second, + Progress: 10 * time.Second, }, TxTimeout: streamrpc.Timeout{ - Progress: 10*time.Second, + Progress: 10 * time.Second, }, } + func (j *PullJob) doRun(ctx context.Context) { j.task.Enter("run") @@ -174,26 +182,10 @@ func (j *PullJob) doRun(ctx context.Context) { return } - replicator := replication.NewIncrementalPathReplicator() - ctx = context.WithValue(ctx, replication.ContextKeyLog, j.task.Log()) - ctx = context.WithValue(ctx, streamrpc.ContextKeyLogger, j.task.Log()) - ctx, enforceDeadline := util.ContextWithOptionalDeadline(ctx) - - // Try replicating each file system regardless of j.Interval - // (this does not solve the underlying problem that j.Interval is too short, - // but it covers the case of initial replication taking longer than all - // incremental replications afterwards) - allTriedOnce := make(chan struct{}) - replicationBegin := time.Now() - go func() { - select { - case <-allTriedOnce: - enforceDeadline(replicationBegin.Add(j.Interval)) - case <-ctx.Done(): - } - }() - replication.Replicate(ctx, replication.NewEndpointPairPull(sender, puller), replicator, allTriedOnce) - + ctx = replication.ContextWithLogger(ctx, replicationLogAdaptor{j.task.Log().WithField("subsystem", "replication")}) + ctx = streamrpc.ContextWithLogger(ctx, streamrpcLogAdaptor{j.task.Log().WithField("subsystem", "rpc.protocol")}) + ctx = context.WithValue(ctx, contextKeyLog, j.task.Log().WithField("subsystem", "rpc.endpoint")) + replication.Replicate(ctx, replication.NewEndpointPairPull(sender, puller)) client.Close() j.task.Finish() diff --git a/cmd/config_job_source.go b/cmd/config_job_source.go index ef6e991..f6b7851 100644 --- a/cmd/config_job_source.go +++ b/cmd/config_job_source.go @@ -71,8 +71,8 @@ func parseSourceJob(c JobParsingContext, name string, i map[string]interface{}) if j.Debug.Conn.ReadDump != "" || j.Debug.Conn.WriteDump != "" { logServe := logListenerFactory{ ListenerFactory: j.Serve, - ReadDump: j.Debug.Conn.ReadDump, - WriteDump: j.Debug.Conn.WriteDump, + ReadDump: j.Debug.Conn.ReadDump, + WriteDump: j.Debug.Conn.WriteDump, } j.Serve = logServe } @@ -209,17 +209,13 @@ func (j *SourceJob) handleConnection(conn net.Conn, task *Task) { task.Log().Info("handling client connection") - senderEP := NewSenderEndpoint(j.Filesystems, NewPrefixFilter(j.SnapshotPrefix)) - handler := HandlerAdaptor{senderEP, task.Log()} - // FIXME logging support or erase config - //if j.Debug.RPC.Log { - // rpclog := task.Log().WithField("subsystem", "rpc") - // rpcServer.SetLogger(rpclog, true) - //} - - if err := streamrpc.ServeConn(context.TODO(), conn, STREAMRPC_CONFIG, handler.Handle); err != nil { + ctx := context.Background() + ctx = context.WithValue(ctx, contextKeyLog, task.Log().WithField("subsystem", "rpc.endpoint")) + ctx = streamrpc.ContextWithLogger(ctx, streamrpcLogAdaptor{task.Log().WithField("subsystem", "rpc.protocol")}) + handler := HandlerAdaptor{senderEP} + if err := streamrpc.ServeConn(ctx, conn, STREAMRPC_CONFIG, handler.Handle); err != nil { task.Log().WithError(err).Error("error serving connection") } else { task.Log().Info("client closed connection") diff --git a/cmd/daemon.go b/cmd/daemon.go index 46746a2..81a718a 100644 --- a/cmd/daemon.go +++ b/cmd/daemon.go @@ -9,7 +9,7 @@ import ( "io" "os" "os/signal" - "strings" + //"strings" "sync" "syscall" "time" @@ -88,6 +88,10 @@ const ( contextKeyDaemon contextKey = contextKey("daemon") ) +func getLogger(ctx context.Context) Logger { + return ctx.Value(contextKeyLog).(Logger) +} + type Daemon struct { conf *Config startedAt time.Time @@ -276,7 +280,7 @@ func (p *taskProgress) Read() (out taskProgress) { type taskActivity struct { name string idle bool - logger *logger.Logger + logger logger.Logger // The progress of the task that is updated by UpdateIO() and UpdateLogEntry() // // Progress happens on a task-level and is thus global to the task. @@ -285,7 +289,7 @@ type taskActivity struct { progress *taskProgress } -func NewTask(name string, parent Job, lg *logger.Logger) *Task { +func NewTask(name string, parent Job, lg logger.Logger) *Task { t := &Task{ name: name, parent: parent, @@ -336,9 +340,10 @@ func (t *Task) Enter(activity string) { } act := &taskActivity{activity, false, nil, prev.progress} t.activities.PushFront(act) - stack := t.buildActivityStack() - activityField := strings.Join(stack, ".") - act.logger = prev.logger.ReplaceField(logTaskField, activityField) + //stack := t.buildActivityStack() + //activityField := strings.Join(stack, ".") + act.logger = prev.logger + // act.logger = prev.logger.ReplaceField(logTaskField, activityField) t.activitiesLastUpdate = time.Now() } @@ -425,7 +430,7 @@ func (t *Task) Finish() { // Returns a logger derived from the logger passed to the constructor function. // The logger's task field contains the current activity stack joined by '.'. -func (t *Task) Log() *logger.Logger { +func (t *Task) Log() logger.Logger { t.rwl.RLock() defer t.rwl.RUnlock() // FIXME should influence TaskStatus's LastUpdate field diff --git a/cmd/logging_formatters.go b/cmd/logging_formatters.go index cf0f985..02795e8 100644 --- a/cmd/logging_formatters.go +++ b/cmd/logging_formatters.go @@ -25,6 +25,7 @@ const ( logJobField string = "job" logTaskField string = "task" logFSField string = "filesystem" + logSubsysField string = "subsystem" logMapFromField string = "map_from" logMapToField string = "map_to" logIncFromField string = "inc_from" @@ -77,7 +78,7 @@ func (f *HumanFormatter) Format(e *logger.Entry) (out []byte, err error) { fmt.Fprintf(&line, "[%s]", e.Level.Short()) } - prefixFields := []string{logJobField, logTaskField, logFSField} + prefixFields := []string{logJobField, logTaskField, logSubsysField} prefixed := make(map[string]bool, len(prefixFields)+2) for _, field := range prefixFields { val, ok := e.Fields[field].(string) @@ -91,18 +92,18 @@ func (f *HumanFormatter) Format(e *logger.Entry) (out []byte, err error) { } } // even more prefix fields - mapFrom, mapFromOk := e.Fields[logMapFromField].(string) - mapTo, mapToOk := e.Fields[logMapToField].(string) - if mapFromOk && mapToOk && !f.ignored(logMapFromField) && !f.ignored(logMapToField) { - fmt.Fprintf(&line, "[%s => %s]", mapFrom, mapTo) - prefixed[logMapFromField], prefixed[logMapToField] = true, true - } - incFrom, incFromOk := e.Fields[logIncFromField].(string) - incTo, incToOk := e.Fields[logIncToField].(string) - if incFromOk && incToOk && !f.ignored(logIncFromField) && !f.ignored(logMapToField) { - fmt.Fprintf(&line, "[%s => %s]", incFrom, incTo) - prefixed[logIncFromField], prefixed[logIncToField] = true, true - } + //mapFrom, mapFromOk := e.Fields[logMapFromField].(string) + //mapTo, mapToOk := e.Fields[logMapToField].(string) + //if mapFromOk && mapToOk && !f.ignored(logMapFromField) && !f.ignored(logMapToField) { + // fmt.Fprintf(&line, "[%s => %s]", mapFrom, mapTo) + // prefixed[logMapFromField], prefixed[logMapToField] = true, true + //} + //incFrom, incFromOk := e.Fields[logIncFromField].(string) + //incTo, incToOk := e.Fields[logIncToField].(string) + //if incFromOk && incToOk && !f.ignored(logIncFromField) && !f.ignored(logMapToField) { + // fmt.Fprintf(&line, "[%s => %s]", incFrom, incTo) + // prefixed[logIncFromField], prefixed[logIncToField] = true, true + //} if line.Len() > 0 { fmt.Fprint(&line, ": ") @@ -179,7 +180,7 @@ func (f *LogfmtFormatter) Format(e *logger.Entry) ([]byte, error) { // at least try and put job and task in front prefixed := make(map[string]bool, 2) - prefix := []string{logJobField, logTaskField} + prefix := []string{logJobField, logTaskField, logSubsysField} for _, pf := range prefix { v, ok := e.Fields[pf] if !ok { diff --git a/cmd/main.go b/cmd/main.go index 4a5c626..242367b 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -24,7 +24,7 @@ var ( zreplVersion string // set by build infrastructure ) -type Logger = *logger.Logger +type Logger = logger.Logger var RootCmd = &cobra.Command{ Use: "zrepl", diff --git a/cmd/replication.go b/cmd/replication.go index d5ce665..ff18bb0 100644 --- a/cmd/replication.go +++ b/cmd/replication.go @@ -9,7 +9,6 @@ import ( "github.com/pkg/errors" "github.com/golang/protobuf/proto" "bytes" - "os" "context" ) @@ -71,7 +70,6 @@ func (p *SenderEndpoint) ListFilesystemVersions(ctx context.Context, fs string) } func (p *SenderEndpoint) Send(ctx context.Context, r *replication.SendReq) (*replication.SendRes, io.ReadCloser, error) { - os.Stderr.WriteString("sending " + r.String() + "\n") dp, err := zfs.NewDatasetPath(r.Filesystem) if err != nil { return nil, nil, err @@ -175,23 +173,27 @@ func (e *ReceiverEndpoint) Receive(ctx context.Context, req *replication.Receive var visitErr error f := zfs.NewDatasetPathForest() f.Add(lp) + getLogger(ctx).Debug("begin tree-walk") f.WalkTopDown(func(v zfs.DatasetPathVisit) (visitChildTree bool) { if v.Path.Equal(lp) { return false } _, err := zfs.ZFSGet(v.Path, []string{zfs.ZREPL_PLACEHOLDER_PROPERTY_NAME}) if err != nil { - os.Stderr.WriteString("error zfsget " + err.Error() + "\n") // interpret this as an early exit of the zfs binary due to the fs not existing if err := zfs.ZFSCreatePlaceholderFilesystem(v.Path); err != nil { - os.Stderr.WriteString("error creating placeholder " + v.Path.ToString() + "\n") + getLogger(ctx). + WithError(err). + WithField("placeholder_fs", v.Path). + Error("cannot create placeholder filesystem") visitErr = err return false } } - os.Stderr.WriteString(v.Path.ToString() + " exists\n") + getLogger(ctx).WithField("filesystem", v.Path.ToString()).Debug("exists") return true // leave this fs as is }) + getLogger(ctx).WithField("visitErr", visitErr).Debug("complete tree-walk") if visitErr != nil { return visitErr @@ -210,7 +212,7 @@ func (e *ReceiverEndpoint) Receive(ctx context.Context, req *replication.Receive args = append(args, "-F") } - os.Stderr.WriteString("receiving...\n") + getLogger(ctx).Debug("start receive command") if err := zfs.ZFSRecv(lp.ToString(), sendStream, args...); err != nil { return err @@ -322,16 +324,10 @@ func (s RemoteEndpoint) Receive(ctx context.Context, r *replication.ReceiveReq, type HandlerAdaptor struct { ep replication.ReplicationEndpoint - log Logger } func (a *HandlerAdaptor) Handle(ctx context.Context, endpoint string, reqStructured *bytes.Buffer, reqStream io.ReadCloser) (resStructured *bytes.Buffer, resStream io.ReadCloser, err error) { - if a.log != nil { - // FIXME validate type conversion here? - ctx = context.WithValue(ctx, streamrpc.ContextKeyLogger, a.log) - } - switch endpoint { case RPCListFilesystems: var req replication.ListFilesystemReq diff --git a/cmd/replication/pdu_extras.go b/cmd/replication/pdu_extras.go index b962059..0848bd6 100644 --- a/cmd/replication/pdu_extras.go +++ b/cmd/replication/pdu_extras.go @@ -41,6 +41,10 @@ func FilesystemVersionFromZFS(fsv zfs.FilesystemVersion) *FilesystemVersion { } } +func (v *FilesystemVersion) CreationAsTime() (time.Time, error) { + return time.Parse(time.RFC3339, v.Creation) +} + func (v *FilesystemVersion) ZFSFilesystemVersion() *zfs.FilesystemVersion { ct := time.Time{} if v.Creation != "" { diff --git a/cmd/replication/replication.go b/cmd/replication/replication.go index 1b6e37f..8f9ea4d 100644 --- a/cmd/replication/replication.go +++ b/cmd/replication/replication.go @@ -2,18 +2,20 @@ package replication import ( "context" - "io" - "container/list" "fmt" + "github.com/zrepl/zrepl/logger" + "io" "net" + "sort" + "time" ) type ReplicationEndpoint interface { // Does not include placeholder filesystems ListFilesystems(ctx context.Context) ([]*Filesystem, error) ListFilesystemVersions(ctx context.Context, fs string) ([]*FilesystemVersion, error) // fix depS - Sender - Receiver + Send(ctx context.Context, r *SendReq) (*SendRes, io.ReadCloser, error) + Receive(ctx context.Context, r *ReceiveReq, sendStream io.ReadCloser) error } type FilteredError struct{ fs string } @@ -73,62 +75,175 @@ func (p EndpointPair) Mode() ReplicationMode { type contextKey int const ( - ContextKeyLog contextKey = iota + contextKeyLog contextKey = iota ) -type Logger interface{ - Printf(fmt string, args ... interface{}) +//type Logger interface { +// Infof(fmt string, args ...interface{}) +// Errorf(fmt string, args ...interface{}) +//} + +//var _ Logger = nullLogger{} + +//type nullLogger struct{} +// +//func (nullLogger) Infof(fmt string, args ...interface{}) {} +//func (nullLogger) Errorf(fmt string, args ...interface{}) {} + +type Logger = logger.Logger + +func ContextWithLogger(ctx context.Context, l Logger) context.Context { + return context.WithValue(ctx, contextKeyLog, l) } -type replicationWork struct { - fs *Filesystem -} - -type FilesystemReplicationResult struct { - Done bool - Retry bool - Unfixable bool -} - -func handleGenericEndpointError(err error) FilesystemReplicationResult { - if _, ok := err.(net.Error); ok { - return FilesystemReplicationResult{Retry: true} +func getLogger(ctx context.Context) Logger { + l, ok := ctx.Value(contextKeyLog).(Logger) + if !ok { + l = logger.NewNullLogger() } - return FilesystemReplicationResult{Unfixable: true} + return l } -func driveFSReplication(ctx context.Context, ws *list.List, allTriedOnce chan struct{}, log Logger, f func(w *replicationWork) FilesystemReplicationResult) { - initialLen, fCalls := ws.Len(), 0 - for ws.Len() > 0 { +type replicationStep struct { + from, to *FilesystemVersion + fswork *replicateFSWork +} + +func (s *replicationStep) String() string { + if s.from == nil { // FIXME: ZFS semantics are that to is nil on non-incremental send + return fmt.Sprintf("%s%s (full)", s.fswork.fs.Path, s.to.RelName()) + } else { + return fmt.Sprintf("%s(%s => %s)", s.fswork.fs.Path, s.from.RelName(), s.to.RelName()) + } +} + +func newReplicationStep(from, to *FilesystemVersion) *replicationStep { + return &replicationStep{from: from, to: to} +} + +type replicateFSWork struct { + fs *Filesystem + steps []*replicationStep + currentStep int + errorCount int +} + +func newReplicateFSWork(fs *Filesystem) *replicateFSWork { + if fs == nil { + panic("implementation error") + } + return &replicateFSWork{ + fs: fs, + steps: make([]*replicationStep, 0), + } +} + +func newReplicateFSWorkWithConflict(fs *Filesystem, conflict error) *replicateFSWork { + // FIXME ignore conflict for now, but will be useful later when we make the replicationPlan exportable + return &replicateFSWork{ + fs: fs, + steps: make([]*replicationStep, 0), + } +} + +func (r *replicateFSWork) AddStep(step *replicationStep) { + if step == nil { + panic("implementation error") + } + if step.fswork != nil { + panic("implementation error") + } + step.fswork = r + r.steps = append(r.steps, step) +} + +func (w *replicateFSWork) CurrentStepDate() time.Time { + if len(w.steps) == 0 { + return time.Time{} + } + toTime, err := w.steps[w.currentStep].to.CreationAsTime() + if err != nil { + panic(err) // implementation inconsistent: should not admit invalid FilesystemVersion objects + } + return toTime +} + +func (w *replicateFSWork) CurrentStep() *replicationStep { + if w.currentStep >= len(w.steps) { + return nil + } + return w.steps[w.currentStep] +} + +func (w *replicateFSWork) CompleteStep() { + w.currentStep++ +} + +type replicationPlan struct { + fsws []*replicateFSWork +} + +func newReplicationPlan() *replicationPlan { + return &replicationPlan{ + fsws: make([]*replicateFSWork, 0), + } +} + +func (p *replicationPlan) addWork(work *replicateFSWork) { + p.fsws = append(p.fsws, work) +} + +func (p *replicationPlan) executeOldestFirst(ctx context.Context, doStep func(fs *Filesystem, from, to *FilesystemVersion) tryRes) { + log := getLogger(ctx) + + for { select { case <-ctx.Done(): - log.Printf("aborting replication due to context error: %s", ctx.Err()) + log.WithError(ctx.Err()).Info("aborting replication due to context error") return default: } - w := ws.Remove(ws.Front()).(*replicationWork) - res := f(w) - fCalls++ - if fCalls == initialLen { - select { - case allTriedOnce <- struct{}{}: - default: + // FIXME poor man's nested priority queue + pending := make([]*replicateFSWork, 0, len(p.fsws)) + for _, fsw := range p.fsws { + if fsw.CurrentStep() != nil { + pending = append(pending, fsw) } } - if res.Done { - log.Printf("finished replication of %s", w.fs.Path) - continue + sort.Slice(pending, func(i, j int) bool { + if pending[i].errorCount == pending[j].errorCount { + return pending[i].CurrentStepDate().Before(pending[j].CurrentStepDate()) + } + return pending[i].errorCount < pending[j].errorCount + }) + // pending is now sorted ascending by errorCount,CurrentStep().Creation + + if len(pending) == 0 { + log.Info("replication complete") + return } - if res.Unfixable { - log.Printf("aborting replication of %s after unfixable error", w.fs.Path) - continue + fsw := pending[0] + step := fsw.CurrentStep() + if step == nil { + panic("implementation error") + } + + log.WithField("step", step).Info("begin replication step") + res := doStep(step.fswork.fs, step.from, step.to) + + if res.done { + log.Info("replication step successful") + fsw.errorCount = 0 + fsw.CompleteStep() + } else { + log.Error("replication step failed, queuing for retry result") + fsw.errorCount++ } - log.Printf("queuing replication of %s for retry", w.fs.Path) - ws.PushBack(w) } + } func resolveConflict(conflict error) (path []*FilesystemVersion, msg string) { @@ -137,7 +252,7 @@ func resolveConflict(conflict error) (path []*FilesystemVersion, msg string) { // FIXME hard-coded replication policy: most recent // snapshot as source var mostRecentSnap *FilesystemVersion - for n := len(noCommonAncestor.SortedSenderVersions) -1; n >= 0; n-- { + for n := len(noCommonAncestor.SortedSenderVersions) - 1; n >= 0; n-- { if noCommonAncestor.SortedSenderVersions[n].Type == FilesystemVersion_Snapshot { mostRecentSnap = noCommonAncestor.SortedSenderVersions[n] break @@ -146,7 +261,7 @@ func resolveConflict(conflict error) (path []*FilesystemVersion, msg string) { if mostRecentSnap == nil { return nil, "no snapshots available on sender side" } - return []*FilesystemVersion{mostRecentSnap}, fmt.Sprintf("start replication at most recent snapshot %s", mostRecentSnap) + return []*FilesystemVersion{mostRecentSnap}, fmt.Sprintf("start replication at most recent snapshot %s", mostRecentSnap.RelName()) } } return nil, "no automated way to handle conflict type" @@ -160,43 +275,144 @@ func resolveConflict(conflict error) (path []*FilesystemVersion, msg string) { // If an error occurs when replicating a filesystem, that error is logged to the logger in ctx. // Replicate continues with the replication of the remaining file systems. // Depending on the type of error, failed replications are retried in an unspecified order (currently FIFO). -func Replicate(ctx context.Context, ep EndpointPair, ipr IncrementalPathReplicator, allTriedOnce chan struct{}) { +func Replicate(ctx context.Context, ep EndpointPair) { - log := ctx.Value(ContextKeyLog).(Logger) + log := getLogger(ctx) + + retryPlanTicker := time.NewTicker(15 * time.Second) // FIXME make configurable + defer retryPlanTicker.Stop() + + var ( + plan *replicationPlan + res tryRes + ) + for { + log.Info("build replication plan") + plan, res = tryBuildReplicationPlan(ctx, ep) + if plan != nil { + break + } + log.WithField("result", res).Error("building replication plan failed, wait for retry timer result") + select { + case <-ctx.Done(): + log.WithError(ctx.Err()).Info("aborting replication because context is done") + return + case <-retryPlanTicker.C: + // TODO also accept an external channel that allows us to tick + } + } + retryPlanTicker.Stop() + + mainlog := log + plan.executeOldestFirst(ctx, func(fs *Filesystem, from, to *FilesystemVersion) tryRes { + + log := mainlog.WithField("filesystem", fs.Path) + + // FIXME refresh fs resume token + fs.ResumeToken = "" + + var sr *SendReq + if fs.ResumeToken != "" { + sr = &SendReq{ + Filesystem: fs.Path, + ResumeToken: fs.ResumeToken, + } + } else if from == nil { + sr = &SendReq{ + Filesystem: fs.Path, + From: to.RelName(), // FIXME fix protocol to use To, like zfs does internally + } + } else { + sr = &SendReq{ + Filesystem: fs.Path, + From: from.RelName(), + To: to.RelName(), + } + } + + log.WithField("request", sr).Debug("initiate send request") + sres, sstream, err := ep.Sender().Send(ctx, sr) + if err != nil { + log.WithError(err).Error("send request failed") + return tryResFromEndpointError(err) + } + if sstream == nil { + log.Error("send request did not return a stream, broken endpoint implementation") + return tryRes{unfixable: true} + } + + rr := &ReceiveReq{ + Filesystem: fs.Path, + ClearResumeToken: !sres.UsedResumeToken, + } + log.WithField("request", rr).Debug("initiate receive request") + err = ep.Receiver().Receive(ctx, rr, sstream) + if err != nil { + log.WithError(err).Error("receive request failed (might also be error on sender)") + sstream.Close() + // This failure could be due to + // - an unexpected exit of ZFS on the sending side + // - an unexpected exit of ZFS on the receiving side + // - a connectivity issue + return tryResFromEndpointError(err) + } + log.Info("receive finished") + return tryRes{done: true} + + }) + +} + +type tryRes struct { + done bool + retry bool + unfixable bool +} + +func tryResFromEndpointError(err error) tryRes { + if _, ok := err.(net.Error); ok { + return tryRes{retry: true} + } + return tryRes{unfixable: true} +} + +func tryBuildReplicationPlan(ctx context.Context, ep EndpointPair) (*replicationPlan, tryRes) { + + log := getLogger(ctx) + + early := func(err error) (*replicationPlan, tryRes) { + return nil, tryResFromEndpointError(err) + } sfss, err := ep.Sender().ListFilesystems(ctx) if err != nil { - log.Printf("error listing sender filesystems: %s", err) - return + log.WithError(err).Error("error listing sender filesystems") + return early(err) } rfss, err := ep.Receiver().ListFilesystems(ctx) if err != nil { - log.Printf("error listing receiver filesystems: %s", err) - return + log.WithError(err).Error("error listing receiver filesystems") + return early(err) } - wq := list.New() + plan := newReplicationPlan() + mainlog := log for _, fs := range sfss { - wq.PushBack(&replicationWork{ - fs: fs, - }) - } - driveFSReplication(ctx, wq, allTriedOnce, log, func(w *replicationWork) FilesystemReplicationResult { - fs := w.fs + log := mainlog.WithField("filesystem", fs.Path) - log.Printf("replicating %s", fs.Path) + log.Info("assessing filesystem") sfsvs, err := ep.Sender().ListFilesystemVersions(ctx, fs.Path) if err != nil { - log.Printf("cannot get remote filesystem versions: %s", err) - return handleGenericEndpointError(err) + log.WithError(err).Error("cannot get remote filesystem versions") + return early(err) } if len(sfsvs) <= 1 { - log.Printf("sender does not have any versions") - return FilesystemReplicationResult{Unfixable: true} + log.Error("sender does not have any versions") + return nil, tryRes{unfixable: true} } receiverFSExists := false @@ -211,11 +427,11 @@ func Replicate(ctx context.Context, ep EndpointPair, ipr IncrementalPathReplicat rfsvs, err = ep.Receiver().ListFilesystemVersions(ctx, fs.Path) if err != nil { if _, ok := err.(FilteredError); ok { - log.Printf("receiver does not map %s", fs.Path) - return FilesystemReplicationResult{Done: true} + log.Info("receiver ignores filesystem") + continue } - log.Printf("receiver error %s", err) - return handleGenericEndpointError(err) + log.WithError(err).Error("receiver error") + return early(err) } } else { rfsvs = []*FilesystemVersion{} @@ -223,130 +439,34 @@ func Replicate(ctx context.Context, ep EndpointPair, ipr IncrementalPathReplicat path, conflict := IncrementalPath(rfsvs, sfsvs) if conflict != nil { - log.Printf("conflict: %s", conflict) var msg string - path, msg = resolveConflict(conflict) + path, msg = resolveConflict(conflict) // no shadowing allowed! if path != nil { - log.Printf("conflict resolved: %s", msg) + log.WithField("conflict", conflict).Info("conflict") + log.WithField("resolution", msg).Info("automatically resolved") } else { - log.Printf("%s", msg) + log.WithField("conflict", conflict).Error("conflict") + log.WithField("problem", msg).Error("cannot resolve conflict") } } if path == nil { - return FilesystemReplicationResult{Unfixable: true} + plan.addWork(newReplicateFSWorkWithConflict(fs, conflict)) + continue } - return ipr.Replicate(ctx, ep.Sender(), ep.Receiver(), NewCopier(), fs, path) + w := newReplicateFSWork(fs) + if len(path) == 1 { + step := newReplicationStep(nil, path[0]) + w.AddStep(step) + } else { + for i := 0; i < len(path)-1; i++ { + step := newReplicationStep(path[i], path[i+1]) + w.AddStep(step) + } + } + plan.addWork(w) - }) - -} - -type Sender interface { - Send(ctx context.Context, r *SendReq) (*SendRes, io.ReadCloser, error) -} - -type Receiver interface { - Receive(ctx context.Context, r *ReceiveReq, sendStream io.ReadCloser) (error) -} - -type Copier interface { - Copy(writer io.Writer, reader io.Reader) (int64, error) -} - -type copier struct{} - -func (copier) Copy(writer io.Writer, reader io.Reader) (int64, error) { - return io.Copy(writer, reader) -} - -func NewCopier() Copier { - return copier{} -} - -type IncrementalPathReplicator interface { - Replicate(ctx context.Context, sender Sender, receiver Receiver, copier Copier, fs *Filesystem, path []*FilesystemVersion) FilesystemReplicationResult -} - -type incrementalPathReplicator struct{} - -func NewIncrementalPathReplicator() IncrementalPathReplicator { - return incrementalPathReplicator{} -} - -func (incrementalPathReplicator) Replicate(ctx context.Context, sender Sender, receiver Receiver, copier Copier, fs *Filesystem, path []*FilesystemVersion) FilesystemReplicationResult { - - log := ctx.Value(ContextKeyLog).(Logger) - - if len(path) == 0 { - log.Printf("nothing to do") - return FilesystemReplicationResult{Done: true} } - if len(path) == 1 { - log.Printf("full send of version %s", path[0]) - - sr := &SendReq{ - Filesystem: fs.Path, - From: path[0].RelName(), - ResumeToken: fs.ResumeToken, - } - sres, sstream, err := sender.Send(ctx, sr) - if err != nil { - log.Printf("send request failed: %s", err) - return handleGenericEndpointError(err) - } - - rr := &ReceiveReq{ - Filesystem: fs.Path, - ClearResumeToken: fs.ResumeToken != "" && !sres.UsedResumeToken, - } - err = receiver.Receive(ctx, rr, sstream) - if err != nil { - log.Printf("receive request failed (might also be error on sender): %s", err) - sstream.Close() - // This failure could be due to - // - an unexpected exit of ZFS on the sending side - // - an unexpected exit of ZFS on the receiving side - // - a connectivity issue - return handleGenericEndpointError(err) - } - return FilesystemReplicationResult{Done: true} - } - - usedResumeToken := false - - for j := 0; j < len(path)-1; j++ { - rt := "" - if !usedResumeToken { // only send resume token for first increment - rt = fs.ResumeToken - usedResumeToken = true - } - sr := &SendReq{ - Filesystem: fs.Path, - From: path[j].RelName(), - To: path[j+1].RelName(), - ResumeToken: rt, - } - sres, sstream, err := sender.Send(ctx, sr) - if err != nil { - log.Printf("send request failed: %s", err) - return handleGenericEndpointError(err) - } - // try to consume stream - - rr := &ReceiveReq{ - Filesystem: fs.Path, - ClearResumeToken: rt != "" && !sres.UsedResumeToken, - } - err = receiver.Receive(ctx, rr, sstream) - if err != nil { - log.Printf("receive request failed: %s", err) - return handleGenericEndpointError(err) // FIXME resume state on receiver -> update ResumeToken - } - - // FIXME handle properties from sres - } - - return FilesystemReplicationResult{Done: true} + return plan, tryRes{done: true} } diff --git a/cmd/replication/replication_test.go b/cmd/replication/replication_test.go index 2e6072b..36faf55 100644 --- a/cmd/replication/replication_test.go +++ b/cmd/replication/replication_test.go @@ -51,131 +51,131 @@ func (m *MockIncrementalPathRecorder) Finished() bool { return m.Pos == len(m.Sequence) } -type DiscardCopier struct{} - -func (DiscardCopier) Copy(writer io.Writer, reader io.Reader) (int64, error) { - return 0, nil -} - -type IncrementalPathReplicatorTest struct { - Msg string - Filesystem *replication.Filesystem - Path []*replication.FilesystemVersion - Steps []IncrementalPathSequenceStep -} - -func (test *IncrementalPathReplicatorTest) Test(t *testing.T) { - - t.Log(test.Msg) - - rec := &MockIncrementalPathRecorder{ - T: t, - Sequence: test.Steps, - } - - ctx := context.WithValue(context.Background(), replication.ContextKeyLog, testLog{t}) - - ipr := replication.NewIncrementalPathReplicator() - ipr.Replicate( - ctx, - rec, - rec, - DiscardCopier{}, - test.Filesystem, - test.Path, - ) - - assert.True(t, rec.Finished()) - -} +//type IncrementalPathReplicatorTest struct { +// Msg string +// Filesystem *replication.Filesystem +// Path []*replication.FilesystemVersion +// Steps []IncrementalPathSequenceStep +//} +// +//func (test *IncrementalPathReplicatorTest) Test(t *testing.T) { +// +// t.Log(test.Msg) +// +// rec := &MockIncrementalPathRecorder{ +// T: t, +// Sequence: test.Steps, +// } +// +// ctx := replication.ContextWithLogger(context.Background(), testLog{t}) +// +// ipr := replication.NewIncrementalPathReplicator() +// ipr.Replicate( +// ctx, +// rec, +// rec, +// DiscardCopier{}, +// test.Filesystem, +// test.Path, +// ) +// +// assert.True(t, rec.Finished()) +// +//} type testLog struct { t *testing.T } -func (t testLog) Printf(fmt string, args ...interface{}) { +var _ replication.Logger = testLog{} + +func (t testLog) Infof(fmt string, args ...interface{}) { + t.t.Logf(fmt, args) +} +func (t testLog) Errorf(fmt string, args ...interface{}) { t.t.Logf(fmt, args) } -func TestIncrementalPathReplicator_Replicate(t *testing.T) { - tbl := []IncrementalPathReplicatorTest{ - { - Msg: "generic happy place with resume token", - Filesystem: &replication.Filesystem{ - Path: "foo/bar", - ResumeToken: "blafoo", - }, - Path: fsvlist("@a,1", "@b,2", "@c,3"), - Steps: []IncrementalPathSequenceStep{ - { - SendRequest: &replication.SendReq{ - Filesystem: "foo/bar", - From: "@a,1", - To: "@b,2", - ResumeToken: "blafoo", - }, - SendResponse: &replication.SendRes{ - UsedResumeToken: true, - }, - }, - { - ReceiveRequest: &replication.ReceiveReq{ - Filesystem: "foo/bar", - ClearResumeToken: false, - }, - }, - { - SendRequest: &replication.SendReq{ - Filesystem: "foo/bar", - From: "@b,2", - To: "@c,3", - }, - }, - { - ReceiveRequest: &replication.ReceiveReq{ - Filesystem: "foo/bar", - }, - }, - }, - }, - { - Msg: "no action on empty sequence", - Filesystem: &replication.Filesystem{ - Path: "foo/bar", - }, - Path: fsvlist(), - Steps: []IncrementalPathSequenceStep{}, - }, - { - Msg: "full send on single entry path", - Filesystem: &replication.Filesystem{ - Path: "foo/bar", - }, - Path: fsvlist("@justone,1"), - Steps: []IncrementalPathSequenceStep{ - { - SendRequest: &replication.SendReq{ - Filesystem: "foo/bar", - From: "@justone,1", - To: "", // empty means full send - }, - SendResponse: &replication.SendRes{ - UsedResumeToken: false, - }, - }, - { - ReceiveRequest: &replication.ReceiveReq{ - Filesystem: "foo/bar", - ClearResumeToken: false, - }, - }, - }, - }, - } - - for _, test := range tbl { - test.Test(t) - } - -} +//func TestIncrementalPathReplicator_Replicate(t *testing.T) { +// +// tbl := []IncrementalPathReplicatorTest{ +// { +// Msg: "generic happy place with resume token", +// Filesystem: &replication.Filesystem{ +// Path: "foo/bar", +// ResumeToken: "blafoo", +// }, +// Path: fsvlist("@a,1", "@b,2", "@c,3"), +// Steps: []IncrementalPathSequenceStep{ +// { +// SendRequest: &replication.SendReq{ +// Filesystem: "foo/bar", +// From: "@a,1", +// To: "@b,2", +// ResumeToken: "blafoo", +// }, +// SendResponse: &replication.SendRes{ +// UsedResumeToken: true, +// }, +// }, +// { +// ReceiveRequest: &replication.ReceiveReq{ +// Filesystem: "foo/bar", +// ClearResumeToken: false, +// }, +// }, +// { +// SendRequest: &replication.SendReq{ +// Filesystem: "foo/bar", +// From: "@b,2", +// To: "@c,3", +// }, +// }, +// { +// ReceiveRequest: &replication.ReceiveReq{ +// Filesystem: "foo/bar", +// }, +// }, +// }, +// }, +// { +// Msg: "no action on empty sequence", +// Filesystem: &replication.Filesystem{ +// Path: "foo/bar", +// }, +// Path: fsvlist(), +// Steps: []IncrementalPathSequenceStep{}, +// }, +// { +// Msg: "full send on single entry path", +// Filesystem: &replication.Filesystem{ +// Path: "foo/bar", +// }, +// Path: fsvlist("@justone,1"), +// Steps: []IncrementalPathSequenceStep{ +// { +// SendRequest: &replication.SendReq{ +// Filesystem: "foo/bar", +// From: "@justone,1", +// To: "", // empty means full send +// }, +// SendResponse: &replication.SendRes{ +// UsedResumeToken: false, +// }, +// }, +// { +// ReceiveRequest: &replication.ReceiveReq{ +// Filesystem: "foo/bar", +// ClearResumeToken: false, +// }, +// }, +// }, +// }, +// } +// +// for _, test := range tbl { +// test.Test(t) +// } +// +//} diff --git a/logger/logger.go b/logger/logger.go index fdfe103..b007267 100644 --- a/logger/logger.go +++ b/logger/logger.go @@ -14,7 +14,20 @@ const ( const DefaultUserFieldCapacity = 5 -type Logger struct { +type Logger interface { + WithOutlet(outlet Outlet, level Level) Logger + ReplaceField(field string, val interface{}) Logger + WithField(field string, val interface{}) Logger + WithFields(fields Fields) Logger + WithError(err error) Logger + Debug(msg string) + Info(msg string) + Warn(msg string) + Error(msg string) + Printf(format string, args ...interface{}) +} + +type loggerImpl struct { fields Fields outlets *Outlets outletTimeout time.Duration @@ -22,8 +35,10 @@ type Logger struct { mtx *sync.Mutex } -func NewLogger(outlets *Outlets, outletTimeout time.Duration) *Logger { - return &Logger{ +var _ Logger = &loggerImpl{} + +func NewLogger(outlets *Outlets, outletTimeout time.Duration) Logger { + return &loggerImpl{ make(Fields, DefaultUserFieldCapacity), outlets, outletTimeout, @@ -36,7 +51,7 @@ type outletResult struct { Error error } -func (l *Logger) logInternalError(outlet Outlet, err string) { +func (l *loggerImpl) logInternalError(outlet Outlet, err string) { fields := Fields{} if outlet != nil { if _, ok := outlet.(fmt.Stringer); ok { @@ -54,7 +69,7 @@ func (l *Logger) logInternalError(outlet Outlet, err string) { l.outlets.GetLoggerErrorOutlet().WriteEntry(entry) } -func (l *Logger) log(level Level, msg string) { +func (l *loggerImpl) log(level Level, msg string) { l.mtx.Lock() defer l.mtx.Unlock() @@ -78,12 +93,12 @@ func (l *Logger) log(level Level, msg string) { } -func (l *Logger) WithOutlet(outlet Outlet, level Level) *Logger { +func (l *loggerImpl) WithOutlet(outlet Outlet, level Level) Logger { l.mtx.Lock() defer l.mtx.Unlock() newOutlets := l.outlets.DeepCopy() newOutlets.Add(outlet, level) - child := &Logger{ + child := &loggerImpl{ fields: l.fields, outlets: newOutlets, outletTimeout: l.outletTimeout, @@ -93,9 +108,9 @@ func (l *Logger) WithOutlet(outlet Outlet, level Level) *Logger { } // callers must hold l.mtx -func (l *Logger) forkLogger(field string, val interface{}) *Logger { +func (l *loggerImpl) forkLogger(field string, val interface{}) *loggerImpl { - child := &Logger{ + child := &loggerImpl{ fields: make(Fields, len(l.fields)+1), outlets: l.outlets, outletTimeout: l.outletTimeout, @@ -109,13 +124,13 @@ func (l *Logger) forkLogger(field string, val interface{}) *Logger { return child } -func (l *Logger) ReplaceField(field string, val interface{}) *Logger { +func (l *loggerImpl) ReplaceField(field string, val interface{}) Logger { l.mtx.Lock() defer l.mtx.Unlock() return l.forkLogger(field, val) } -func (l *Logger) WithField(field string, val interface{}) *Logger { +func (l *loggerImpl) WithField(field string, val interface{}) Logger { l.mtx.Lock() defer l.mtx.Unlock() if val, ok := l.fields[field]; ok && val != nil { @@ -125,16 +140,16 @@ func (l *Logger) WithField(field string, val interface{}) *Logger { return l.forkLogger(field, val) } -func (l *Logger) WithFields(fields Fields) (ret *Logger) { +func (l *loggerImpl) WithFields(fields Fields) Logger { // TODO optimize - ret = l + var ret Logger = l for field, value := range fields { ret = ret.WithField(field, value) } return ret } -func (l *Logger) WithError(err error) *Logger { +func (l *loggerImpl) WithError(err error) Logger { val := interface{}(nil) if err != nil { val = err.Error() @@ -142,22 +157,22 @@ func (l *Logger) WithError(err error) *Logger { return l.WithField(FieldError, val) } -func (l *Logger) Debug(msg string) { +func (l *loggerImpl) Debug(msg string) { l.log(Debug, msg) } -func (l *Logger) Info(msg string) { +func (l *loggerImpl) Info(msg string) { l.log(Info, msg) } -func (l *Logger) Warn(msg string) { +func (l *loggerImpl) Warn(msg string) { l.log(Warn, msg) } -func (l *Logger) Error(msg string) { +func (l *loggerImpl) Error(msg string) { l.log(Error, msg) } -func (l *Logger) Printf(format string, args ...interface{}) { +func (l *loggerImpl) Printf(format string, args ...interface{}) { l.log(Error, fmt.Sprintf(format, args...)) } diff --git a/logger/nulllogger.go b/logger/nulllogger.go new file mode 100644 index 0000000..3233dfe --- /dev/null +++ b/logger/nulllogger.go @@ -0,0 +1,22 @@ +package logger + + +type nullLogger struct {} + +var _ Logger = nullLogger{} + +func NewNullLogger() Logger { + return nullLogger{} +} + +func (n nullLogger) WithOutlet(outlet Outlet, level Level) Logger { return n } +func (n nullLogger) ReplaceField(field string, val interface{}) Logger { return n } +func (n nullLogger) WithField(field string, val interface{}) Logger { return n } +func (n nullLogger) WithFields(fields Fields) Logger { return n } +func (n nullLogger) WithError(err error) Logger { return n } +func (nullLogger) Debug(msg string) {} +func (nullLogger) Info(msg string) {} +func (nullLogger) Warn(msg string) {} +func (nullLogger) Error(msg string) {} +func (nullLogger) Printf(format string, args ...interface{}) {} +