mirror of
https://github.com/zrepl/zrepl.git
synced 2024-11-21 16:03:32 +01:00
Spellcheck all files
Signed-off-by: InsanePrawn <insane.prawny@gmail.com>
This commit is contained in:
parent
94caf8b8db
commit
44bd354eae
@ -136,7 +136,7 @@ However, new contributions & patches should fix naming without further notice in
|
||||
|
||||
### RPC debugging
|
||||
|
||||
Optionally, there are various RPC-related environment varibles, that if set to something != `""` will produce additional debug output on stderr:
|
||||
Optionally, there are various RPC-related environment variables, that if set to something != `""` will produce additional debug output on stderr:
|
||||
|
||||
https://github.com/zrepl/zrepl/blob/master/rpc/rpc_debug.go#L11
|
||||
|
||||
|
@ -165,7 +165,7 @@ func doMigrateReplicationCursor(sc *cli.Subcommand, args []string) error {
|
||||
var hadError bool
|
||||
for _, fs := range fss {
|
||||
|
||||
bold.Printf("INSPECT FILESYTEM %q\n", fs.ToString())
|
||||
bold.Printf("INSPECT FILESYSTEM %q\n", fs.ToString())
|
||||
|
||||
err := doMigrateReplicationCursorFS(ctx, v1cursorJobs, fs)
|
||||
if err == migrateReplicationCursorSkipSentinel {
|
||||
|
@ -11,7 +11,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
// tcell is the termbox-compatbile library for abstracting away escape sequences, etc.
|
||||
// tcell is the termbox-compatible library for abstracting away escape sequences, etc.
|
||||
// as of tcell#252, the number of default distributed terminals is relatively limited
|
||||
// additional terminal definitions can be included via side-effect import
|
||||
// See https://github.com/gdamore/tcell/blob/master/terminfo/base/base.go
|
||||
@ -264,7 +264,7 @@ loop:
|
||||
|
||||
}
|
||||
|
||||
func (t *tui) getReplicationProgresHistory(jobName string) *bytesProgressHistory {
|
||||
func (t *tui) getReplicationProgressHistory(jobName string) *bytesProgressHistory {
|
||||
p, ok := t.replicationProgress[jobName]
|
||||
if !ok {
|
||||
p = &bytesProgressHistory{}
|
||||
@ -329,7 +329,7 @@ func (t *tui) draw() {
|
||||
t.printf("Replication:")
|
||||
t.newline()
|
||||
t.addIndent(1)
|
||||
t.renderReplicationReport(activeStatus.Replication, t.getReplicationProgresHistory(k))
|
||||
t.renderReplicationReport(activeStatus.Replication, t.getReplicationProgressHistory(k))
|
||||
t.addIndent(-1)
|
||||
|
||||
t.printf("Pruning Sender:")
|
||||
@ -697,7 +697,7 @@ func rightPad(str string, length int, pad string) string {
|
||||
|
||||
var arrowPositions = `>\|/`
|
||||
|
||||
// changeCount = 0 indicates stall / no progresss
|
||||
// changeCount = 0 indicates stall / no progress
|
||||
func (t *tui) drawBar(length int, bytes, totalBytes int64, changeCount int) {
|
||||
var completedLength int
|
||||
if totalBytes > 0 {
|
||||
|
@ -620,7 +620,7 @@ func ParseConfigBytes(bytes []byte) (*Config, error) {
|
||||
|
||||
var durationStringRegex *regexp.Regexp = regexp.MustCompile(`^\s*(\d+)\s*(s|m|h|d|w)\s*$`)
|
||||
|
||||
func parsePostitiveDuration(e string) (d time.Duration, err error) {
|
||||
func parsePositiveDuration(e string) (d time.Duration, err error) {
|
||||
comps := durationStringRegex.FindStringSubmatch(e)
|
||||
if len(comps) != 3 {
|
||||
err = fmt.Errorf("does not match regex: %s %#v", e, comps)
|
||||
|
@ -21,7 +21,7 @@ jobs:
|
||||
clients: {
|
||||
"10.0.0.1":"foo"
|
||||
}
|
||||
root_fs: zoot/foo
|
||||
root_fs: zroot/foo
|
||||
`
|
||||
_, err := ParseConfigBytes([]byte(jobdef))
|
||||
require.NoError(t, err)
|
||||
|
@ -42,7 +42,7 @@ func TestSampleConfigsAreParsedWithoutErrors(t *testing.T) {
|
||||
|
||||
}
|
||||
|
||||
// template must be a template/text template with a single '{{ . }}' as placehodler for val
|
||||
// template must be a template/text template with a single '{{ . }}' as placeholder for val
|
||||
//nolint[:deadcode,unused]
|
||||
func testValidConfigTemplate(t *testing.T, tmpl string, val string) *Config {
|
||||
tmp, err := template.New("master").Parse(tmpl)
|
||||
|
@ -64,7 +64,7 @@ func parseRetentionGridIntervalString(e string) (intervals []RetentionInterval,
|
||||
return nil, fmt.Errorf("contains factor <= 0")
|
||||
}
|
||||
|
||||
duration, err := parsePostitiveDuration(comps[2])
|
||||
duration, err := parsePositiveDuration(comps[2])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -64,7 +64,7 @@ func (j *controlJob) RegisterMetrics(registerer prometheus.Registerer) {
|
||||
Namespace: "zrepl",
|
||||
Subsystem: "control",
|
||||
Name: "request_finished",
|
||||
Help: "time it took a request to finih",
|
||||
Help: "time it took a request to finish",
|
||||
Buckets: []float64{1e-6, 10e-6, 100e-6, 500e-6, 1e-3, 10e-3, 100e-3, 200e-3, 400e-3, 800e-3, 1, 10, 20},
|
||||
}, []string{"endpoint"})
|
||||
registerer.MustRegister(promControl.requestBegin)
|
||||
@ -250,7 +250,7 @@ func (j jsonRequestResponder) ServeHTTP(w http.ResponseWriter, r *http.Request)
|
||||
var buf bytes.Buffer
|
||||
encodeErr := json.NewEncoder(&buf).Encode(res)
|
||||
if encodeErr != nil {
|
||||
j.log.WithError(producerErr).Error("control handler json marhsal error")
|
||||
j.log.WithError(producerErr).Error("control handler json marshal error")
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
_, err := io.WriteString(w, encodeErr.Error())
|
||||
logIoErr(err)
|
||||
|
@ -161,7 +161,7 @@ func (m DatasetMapFilter) Filter(p *zfs.DatasetPath) (pass bool, err error) {
|
||||
}
|
||||
|
||||
// Construct a new filter-only DatasetMapFilter from a mapping
|
||||
// The new filter allows excactly those paths that were not forbidden by the mapping.
|
||||
// The new filter allows exactly those paths that were not forbidden by the mapping.
|
||||
func (m DatasetMapFilter) InvertedFilter() (inv *DatasetMapFilter, err error) {
|
||||
|
||||
if m.filterMode {
|
||||
|
@ -147,7 +147,7 @@ func modePushFromConfig(g *config.Global, in *config.PushJob, jobID endpoint.Job
|
||||
|
||||
fsf, err := filters.DatasetMapFilterFromConfig(in.Filesystems)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "cannnot build filesystem filter")
|
||||
return nil, errors.Wrap(err, "cannot build filesystem filter")
|
||||
}
|
||||
|
||||
m.senderConfig = &endpoint.SenderConfig{
|
||||
|
@ -75,7 +75,7 @@ func modeSourceFromConfig(g *config.Global, in *config.SourceJob, jobID endpoint
|
||||
m = &modeSource{}
|
||||
fsf, err := filters.DatasetMapFilterFromConfig(in.Filesystems)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "cannnot build filesystem filter")
|
||||
return nil, errors.Wrap(err, "cannot build filesystem filter")
|
||||
}
|
||||
m.senderConfig = &endpoint.SenderConfig{
|
||||
FSF: fsf,
|
||||
|
@ -70,7 +70,7 @@ type Subsystem string
|
||||
|
||||
const (
|
||||
SubsysReplication Subsystem = "repl"
|
||||
SubsyEndpoint Subsystem = "endpoint"
|
||||
SubsysEndpoint Subsystem = "endpoint"
|
||||
SubsysPruning Subsystem = "pruning"
|
||||
SubsysSnapshot Subsystem = "snapshot"
|
||||
SubsysHooks Subsystem = "hook"
|
||||
@ -84,7 +84,7 @@ const (
|
||||
func WithSubsystemLoggers(ctx context.Context, log logger.Logger) context.Context {
|
||||
ctx = logic.WithLogger(ctx, log.WithField(SubsysField, SubsysReplication))
|
||||
ctx = driver.WithLogger(ctx, log.WithField(SubsysField, SubsysReplication))
|
||||
ctx = endpoint.WithLogger(ctx, log.WithField(SubsysField, SubsyEndpoint))
|
||||
ctx = endpoint.WithLogger(ctx, log.WithField(SubsysField, SubsysEndpoint))
|
||||
ctx = pruner.WithLogger(ctx, log.WithField(SubsysField, SubsysPruning))
|
||||
ctx = snapper.WithLogger(ctx, log.WithField(SubsysField, SubsysSnapshot))
|
||||
ctx = hooks.WithLogger(ctx, log.WithField(SubsysField, SubsysHooks))
|
||||
|
@ -211,7 +211,7 @@ func logfmtTryEncodeKeyval(enc *logfmt.Encoder, field, value interface{}) error
|
||||
case logfmt.ErrUnsupportedValueType:
|
||||
err := enc.EncodeKeyval(field, fmt.Sprintf("<%T>", value))
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "cannot encode unsuuported value type Go type")
|
||||
return errors.Wrap(err, "cannot encode unsupported value type Go type")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -63,7 +63,7 @@ func NewTCPOutlet(formatter EntryFormatter, network, address string, tlsConfig *
|
||||
return
|
||||
}
|
||||
|
||||
entryChan := make(chan *bytes.Buffer, 1) // allow one message in flight while previos is in io.Copy()
|
||||
entryChan := make(chan *bytes.Buffer, 1) // allow one message in flight while previous is in io.Copy()
|
||||
|
||||
o := &TCPOutlet{
|
||||
formatter: formatter,
|
||||
|
@ -18,13 +18,13 @@ import (
|
||||
"github.com/zrepl/zrepl/util/envconst"
|
||||
)
|
||||
|
||||
// Try to keep it compatible with gitub.com/zrepl/zrepl/endpoint.Endpoint
|
||||
// Try to keep it compatible with github.com/zrepl/zrepl/endpoint.Endpoint
|
||||
type History interface {
|
||||
ReplicationCursor(ctx context.Context, req *pdu.ReplicationCursorReq) (*pdu.ReplicationCursorRes, error)
|
||||
ListFilesystems(ctx context.Context, req *pdu.ListFilesystemReq) (*pdu.ListFilesystemRes, error)
|
||||
}
|
||||
|
||||
// Try to keep it compatible with gitub.com/zrepl/zrepl/endpoint.Endpoint
|
||||
// Try to keep it compatible with github.com/zrepl/zrepl/endpoint.Endpoint
|
||||
type Target interface {
|
||||
ListFilesystems(ctx context.Context, req *pdu.ListFilesystemReq) (*pdu.ListFilesystemRes, error)
|
||||
ListFilesystemVersions(ctx context.Context, req *pdu.ListFilesystemVersionsReq) (*pdu.ListFilesystemVersionsRes, error)
|
||||
|
@ -277,7 +277,7 @@ func snapshot(a args, u updater) state {
|
||||
|
||||
jobCallback := hooks.NewCallbackHookForFilesystem("snapshot", fs, func(_ context.Context) (err error) {
|
||||
l.Debug("create snapshot")
|
||||
err = zfs.ZFSSnapshot(fs, snapname, false) // TODO propagagte context to ZFSSnapshot
|
||||
err = zfs.ZFSSnapshot(fs, snapname, false) // TODO propagate context to ZFSSnapshot
|
||||
if err != nil {
|
||||
l.WithError(err).Error("cannot create snapshot")
|
||||
}
|
||||
|
@ -105,7 +105,7 @@ Actual changelog:
|
||||
| You can support maintenance and feature development through one of the following services:
|
||||
| |Donate via Patreon| |Donate via Liberapay| |Donate via PayPal|
|
||||
| Note that PayPal processing fees are relatively high for small donations.
|
||||
| For SEPA wire transfer and **commerical support**, please `contact Christian directly <https://cschwarz.com>`_.
|
||||
| For SEPA wire transfer and **commercial support**, please `contact Christian directly <https://cschwarz.com>`_.
|
||||
|
||||
|
||||
0.1.1
|
||||
|
@ -202,7 +202,7 @@ The latter is particularly useful in combination with log aggregation services.
|
||||
.. WARNING::
|
||||
|
||||
zrepl drops log messages to the TCP outlet if the underlying connection is not fast enough.
|
||||
Note that TCP buffering in the kernel must first run full becfore messages are dropped.
|
||||
Note that TCP buffering in the kernel must first run full before messages are dropped.
|
||||
|
||||
Make sure to always configure a ``stdout`` outlet as the special error outlet to be informed about problems
|
||||
with the TCP outlet (see :ref:`above <logging-error-outlet>` ).
|
||||
|
@ -15,7 +15,7 @@ Prometheus & Grafana
|
||||
zrepl can expose `Prometheus metrics <https://prometheus.io/docs/instrumenting/exposition_formats/>`_ via HTTP.
|
||||
The ``listen`` attribute is a `net.Listen <https://golang.org/pkg/net/#Listen>`_ string for tcp, e.g. ``:9091`` or ``127.0.0.1:9091``.
|
||||
The ``listen_freebind`` attribute is :ref:`explained here <listen-freebind-explanation>`.
|
||||
The Prometheues monitoring job appears in the ``zrepl control`` job list and may be specified **at most once**.
|
||||
The Prometheus monitoring job appears in the ``zrepl control`` job list and may be specified **at most once**.
|
||||
|
||||
zrepl also ships with an importable `Grafana <https://grafana.com>`_ dashboard that consumes the Prometheus metrics:
|
||||
see :repomasterlink:`dist/grafana`.
|
||||
|
@ -26,9 +26,9 @@ Config File Structure
|
||||
type: push
|
||||
- ...
|
||||
|
||||
zrepl is confgured using a single YAML configuration file with two main sections: ``global`` and ``jobs``.
|
||||
zrepl is configured using a single YAML configuration file with two main sections: ``global`` and ``jobs``.
|
||||
The ``global`` section is filled with sensible defaults and is covered later in this chapter.
|
||||
The ``jobs`` section is a list of jobs which we are goind to explain now.
|
||||
The ``jobs`` section is a list of jobs which we are going to explain now.
|
||||
|
||||
.. _job-overview:
|
||||
|
||||
@ -41,7 +41,7 @@ Jobs are identified by their ``name``, both in log files and the ``zrepl status`
|
||||
|
||||
Replication always happens between a pair of jobs: one is the **active side**, and one the **passive side**.
|
||||
The active side connects to the passive side using a :ref:`transport <transport>` and starts executing the replication logic.
|
||||
The passive side responds to requests from the active side after checking its persmissions.
|
||||
The passive side responds to requests from the active side after checking its permissions.
|
||||
|
||||
The following table shows how different job types can be combined to achieve **both push and pull mode setups**.
|
||||
Note that snapshot-creation denoted by "(snap)" is orthogonal to whether a job is active or passive.
|
||||
@ -120,7 +120,7 @@ The following steps take place during replication and can be monitored using the
|
||||
* Perform replication steps in the following order:
|
||||
Among all filesystems with pending replication steps, pick the filesystem whose next replication step's snapshot is the oldest.
|
||||
* Create placeholder filesystems on the receiving side to mirror the dataset paths on the sender to ``root_fs/${client_identity}``.
|
||||
* Aquire send-side step-holds on the step's `from` and `to` snapshots.
|
||||
* Acquire send-side step-holds on the step's `from` and `to` snapshots.
|
||||
* Perform the replication step.
|
||||
* Move the **replication cursor** bookmark on the sending side (see below).
|
||||
* Move the **last-received-hold** on the receiving side (see below).
|
||||
@ -141,7 +141,7 @@ The ``zrepl holds list`` provides a listing of all bookmarks and holds managed b
|
||||
.. _replication-placeholder-property:
|
||||
|
||||
**Placeholder filesystems** on the receiving side are regular ZFS filesystems with the placeholder property ``zrepl:placeholder=on``.
|
||||
Placeholders allow the receiving side to mirror the sender's ZFS dataset hierachy without replicating every filesystem at every intermediary dataset path component.
|
||||
Placeholders allow the receiving side to mirror the sender's ZFS dataset hierarchy without replicating every filesystem at every intermediary dataset path component.
|
||||
Consider the following example: ``S/H/J`` shall be replicated to ``R/sink/job/S/H/J``, but neither ``S/H`` nor ``S`` shall be replicated.
|
||||
ZFS requires the existence of ``R/sink/job/S`` and ``R/sink/job/S/H`` in order to receive into ``R/sink/job/S/H/J``.
|
||||
Thus, zrepl creates the parent filesystems as placeholders on the receiving side.
|
||||
@ -181,7 +181,7 @@ No Overlapping
|
||||
|
||||
Jobs run independently of each other.
|
||||
If two jobs match the same filesystem with their ``filesystems`` filter, they will operate on that filesystem independently and potentially in parallel.
|
||||
For example, if job A prunes snapshots that job B is planning to replicate, the replication will fail because B asssumed the snapshot to still be present.
|
||||
For example, if job A prunes snapshots that job B is planning to replicate, the replication will fail because B assumed the snapshot to still be present.
|
||||
However, the next replication attempt will re-examine the situation from scratch and should work.
|
||||
|
||||
N push jobs to 1 sink
|
||||
@ -198,5 +198,5 @@ Multiple pull jobs pulling from the same source have potential for race conditio
|
||||
each pull job prunes the source side independently, causing replication-prune and prune-prune races.
|
||||
|
||||
There is currently no way for a pull job to filter which snapshots it should attempt to replicate.
|
||||
Thus, it is not possibe to just manually assert that the prune rules of all pull jobs are disjoint to avoid replication-prune and prune-prune races.
|
||||
Thus, it is not possible to just manually assert that the prune rules of all pull jobs are disjoint to avoid replication-prune and prune-prune races.
|
||||
|
||||
|
@ -154,7 +154,7 @@ Policy ``regex``
|
||||
negate: true
|
||||
regex: "^zrepl_.*"
|
||||
|
||||
``regex`` keeps all snapshots whose names are matched by the regular expressionin ``regex``.
|
||||
``regex`` keeps all snapshots whose names are matched by the regular expression in ``regex``.
|
||||
Like all other regular expression fields in prune policies, zrepl uses Go's `regexp.Regexp <https://golang.org/pkg/regexp/#Compile>`_ Perl-compatible regular expressions (`Syntax <https://golang.org/pkg/regexp/syntax>`_).
|
||||
The optional `negate` boolean field inverts the semantics: Use it if you want to keep all snapshots that *do not* match the given regex.
|
||||
|
||||
|
@ -24,7 +24,7 @@ Send Options
|
||||
---------------------
|
||||
|
||||
The ``encryption`` variable controls whether the matched filesystems are sent as `OpenZFS native encryption <http://open-zfs.org/wiki/ZFS-Native_Encryption>`_ raw sends.
|
||||
More specificially, if ``encryption=true``, zrepl
|
||||
More specifically, if ``encryption=true``, zrepl
|
||||
|
||||
* checks for any of the filesystems matched by ``filesystems`` whether the ZFS ``encryption`` property indicates that the filesystem is actually encrypted with ZFS native encryption and
|
||||
* invokes the ``zfs send`` subcommand with the ``-w`` option (raw sends) and
|
||||
|
@ -127,7 +127,7 @@ Either way, all build results are located in the ``artifacts/`` directory.
|
||||
|
||||
.. NOTE::
|
||||
|
||||
It is your job to install the apropriate binary in the zrepl users's ``$PATH``, e.g. ``/usr/local/bin/zrepl``.
|
||||
It is your job to install the appropriate binary in the zrepl users's ``$PATH``, e.g. ``/usr/local/bin/zrepl``.
|
||||
Otherwise, the examples in the :ref:`tutorial` may need to be adjusted.
|
||||
|
||||
What next?
|
||||
|
@ -6,7 +6,7 @@
|
||||
|
||||
zrepl is a spare-time project primarily developed by `Christian Schwarz <https://cschwarz.com>`_.
|
||||
You can support maintenance and feature development through one of the services listed above.
|
||||
For SEPA wire transfer and **commerical support**, please `contact Christian directly <https://cschwarz.com>`_.
|
||||
For SEPA wire transfer and **commercial support**, please `contact Christian directly <https://cschwarz.com>`_.
|
||||
|
||||
**Thanks for your support!**
|
||||
|
||||
|
@ -48,7 +48,7 @@ zrepl daemon
|
||||
All actual work zrepl does is performed by a daemon process.
|
||||
The daemon supports structured :ref:`logging <logging>` and provides :ref:`monitoring endpoints <monitoring>`.
|
||||
|
||||
When installating from a package, the package maintainer should have provided an init script / systemd.service file.
|
||||
When installing from a package, the package maintainer should have provided an init script / systemd.service file.
|
||||
You should thus be able to start zrepl daemon using your init system.
|
||||
|
||||
Alternatively, or for running zrepl in the foreground, simply execute ``zrepl daemon``.
|
||||
@ -73,6 +73,6 @@ The daemon exits as soon as all jobs have reported shut down.
|
||||
Systemd Unit File
|
||||
~~~~~~~~~~~~~~~~~
|
||||
|
||||
A systemd service defintion template is available in :repomasterlink:`dist/systemd`.
|
||||
A systemd service definition template is available in :repomasterlink:`dist/systemd`.
|
||||
Note that some of the options only work on recent versions of systemd.
|
||||
Any help & improvements are very welcome, see :issue:`145`.
|
@ -170,7 +170,7 @@ func (s *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, zfs.St
|
||||
// ok, fallthrough outer
|
||||
case pdu.Tri_False:
|
||||
if s.encrypt.B {
|
||||
return nil, nil, errors.New("only encrytped sends allowed (send -w + encryption!= off), but unencrytped send requested")
|
||||
return nil, nil, errors.New("only encrypted sends allowed (send -w + encryption!= off), but unencrypted send requested")
|
||||
}
|
||||
// fallthrough outer
|
||||
case pdu.Tri_True:
|
||||
@ -206,7 +206,7 @@ func (s *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, zfs.St
|
||||
}
|
||||
|
||||
// From now on, assume that sendArgs has been validated by ZFSSendDry
|
||||
// (because validation invovles shelling out, it's actually a little expensive)
|
||||
// (because validation involves shelling out, it's actually a little expensive)
|
||||
|
||||
var expSize int64 = 0 // protocol says 0 means no estimate
|
||||
if si.SizeEstimate != -1 { // but si returns -1 for no size estimate
|
||||
@ -625,9 +625,9 @@ func (s *Receiver) Receive(ctx context.Context, req *pdu.ReceiveReq, receive zfs
|
||||
// ZFS dataset hierarchy subtrees.
|
||||
var visitErr error
|
||||
func() {
|
||||
getLogger(ctx).Debug("begin aquire recvParentCreationMtx")
|
||||
getLogger(ctx).Debug("begin acquire recvParentCreationMtx")
|
||||
defer s.recvParentCreationMtx.Lock().Unlock()
|
||||
getLogger(ctx).Debug("end aquire recvParentCreationMtx")
|
||||
getLogger(ctx).Debug("end acquire recvParentCreationMtx")
|
||||
defer getLogger(ctx).Debug("release recvParentCreationMtx")
|
||||
|
||||
f := zfs.NewDatasetPathForest()
|
||||
|
@ -227,7 +227,7 @@ func HoldStep(ctx context.Context, fs string, v *zfs.ZFSSendArgVersion, jobID Jo
|
||||
// TODO we could actually try to find a local snapshot that has the requested GUID
|
||||
// however, the replication algorithm prefers snapshots anyways, so this quest
|
||||
// is most likely not going to be successful. Also, there's the possibility that
|
||||
// the caller might want to filter what snapshots are eligibile, and this would
|
||||
// the caller might want to filter what snapshots are eligible, and this would
|
||||
// complicate things even further.
|
||||
return err // TODO go1.13 use wrapping
|
||||
}
|
||||
@ -393,7 +393,7 @@ type ListHoldsAndBookmarksOutputHold struct {
|
||||
// List all holds and bookmarks managed by endpoint
|
||||
func ListZFSHoldsAndBookmarks(ctx context.Context, fsfilter zfs.DatasetFilter) (*ListHoldsAndBookmarksOutput, error) {
|
||||
|
||||
// initialize all fields so that JSON serializion of output looks pretty (see client/holds.go)
|
||||
// initialize all fields so that JSON serialization of output looks pretty (see client/holds.go)
|
||||
// however, listZFSHoldsAndBookmarksImplFS shouldn't rely on it
|
||||
out := &ListHoldsAndBookmarksOutput{
|
||||
StepBookmarks: make([]*ListHoldsAndBookmarksOutputBookmark, 0),
|
||||
|
@ -172,21 +172,21 @@ type testCaseResult struct {
|
||||
func runTestCase(ctx *platformtest.Context, ex platformtest.Execer, c tests.Case) *testCaseResult {
|
||||
|
||||
// run case
|
||||
var paniced = false
|
||||
var panicked = false
|
||||
var panicValue interface{} = nil
|
||||
var panicStack error
|
||||
func() {
|
||||
defer func() {
|
||||
if item := recover(); item != nil {
|
||||
panicValue = item
|
||||
paniced = true
|
||||
panicked = true
|
||||
panicStack = errors.Errorf("panic while running test: %v", panicValue)
|
||||
}
|
||||
}()
|
||||
c(ctx)
|
||||
}()
|
||||
|
||||
if paniced {
|
||||
if panicked {
|
||||
switch panicValue {
|
||||
case platformtest.SkipNowSentinel:
|
||||
return &testCaseResult{skipped: true}
|
||||
|
@ -180,8 +180,8 @@ func splitQuotedWords(data []byte, atEOF bool) (advance int, token []byte, err e
|
||||
// unescaped quote, end of this string
|
||||
// remove backslash-escapes
|
||||
withBackslash := data[begin+1 : end]
|
||||
withoutBaskslash := bytes.Replace(withBackslash, []byte("\\\""), []byte("\""), -1)
|
||||
return end + 1, withoutBaskslash, nil
|
||||
withoutBackslash := bytes.Replace(withBackslash, []byte("\\\""), []byte("\""), -1)
|
||||
return end + 1, withoutBackslash, nil
|
||||
} else {
|
||||
// continue to next quote
|
||||
end += 1
|
||||
|
@ -34,7 +34,7 @@ func (a ZpoolCreateArgs) Validate() error {
|
||||
return errors.Errorf("Mountpoint must be an absolute path to a directory")
|
||||
}
|
||||
if a.PoolName == "" {
|
||||
return errors.Errorf("PoolName must not be emtpy")
|
||||
return errors.Errorf("PoolName must not be empty")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -113,7 +113,7 @@ func makeResumeSituation(ctx *platformtest.Context, src dummySnapshotSituation,
|
||||
|
||||
situation.sendArgs = sendArgs
|
||||
situation.recvOpts = recvOptions
|
||||
require.True(ctx, recvOptions.SavePartialRecvState, "this method would be pointeless otherwise")
|
||||
require.True(ctx, recvOptions.SavePartialRecvState, "this method would be pointless otherwise")
|
||||
require.Equal(ctx, sendArgs.FS, src.sendFS)
|
||||
|
||||
copier, err := zfs.ZFSSend(ctx, sendArgs)
|
||||
|
@ -40,7 +40,7 @@ func ResumableRecvAndTokenHandling(ctx *platformtest.Context) {
|
||||
require.True(ctx, ok)
|
||||
|
||||
// we know that support on sendFS implies support on recvFS
|
||||
// => asser that if we don't support resumed recv, the method returns ""
|
||||
// => assert that if we don't support resumed recv, the method returns ""
|
||||
tok, err := zfs.ZFSGetReceiveResumeTokenOrEmptyStringIfNotSupported(ctx, mustDatasetPath(recvFS))
|
||||
check(err)
|
||||
require.Equal(ctx, "", tok)
|
||||
|
@ -117,7 +117,7 @@ func SendArgsValidationResumeTokenEncryptionMismatchForbidden(ctx *platformtest.
|
||||
require.Equal(ctx, mismatchError.What, zfs.ZFSSendArgsResumeTokenMismatchEncryptionNotSet)
|
||||
}
|
||||
|
||||
// threat model: use of a crafted resume token that requests an encryped send
|
||||
// threat model: use of a crafted resume token that requests an encrypted send
|
||||
// but send args require unencrypted send
|
||||
{
|
||||
var maliciousSend zfs.ZFSSendArgs = unencS.sendArgs
|
||||
|
@ -35,7 +35,7 @@ The algorithm **ensures resumability** of the replication step in presence of
|
||||
* network failures at any time
|
||||
* other instances of this algorithm executing the same step in parallel (e.g. concurrent replication to different destinations)
|
||||
|
||||
To accomplish this goal, the algorithm **assumes ownersip of parts of the ZFS hold tag namespace and the bookmark namespace**:
|
||||
To accomplish this goal, the algorithm **assumes ownership of parts of the ZFS hold tag namespace and the bookmark namespace**:
|
||||
* holds with prefix `zrepl_STEP` on any snapshot are reserved for zrepl
|
||||
* bookmarks with prefix `zrepl_STEP` are reserved for zrepl
|
||||
|
||||
@ -55,7 +55,7 @@ The replication step (full `to` send or `from => to` send) is *complete* iff the
|
||||
Specifically, the algorithm may be invoked with the same `from` and `to` arguments, and potentially a `resume_token`, after a temporary (like network-related) failure:
|
||||
**Unless permanent errors occur, repeated invocations of the algorithm with updated resume token will converge monotonically (but not strictly monotonically) toward completion.**
|
||||
|
||||
Note that the mere existence of `to` on the receiving side does not constitue completion, since there may still be post-recv actions to be performed on sender and receiver.
|
||||
Note that the mere existence of `to` on the receiving side does not constitute completion, since there may still be post-recv actions to be performed on sender and receiver.
|
||||
|
||||
#### Job and Job ID
|
||||
This algorithm supports that *multiple* instance of it run in parallel on the *same* step (full `to` / `from => to` pair).
|
||||
@ -117,7 +117,7 @@ Recv-side: no-op
|
||||
# => doesn't work, because zfs recv is implemented as a `clone` internally, that's exactly what we want
|
||||
```
|
||||
|
||||
- if recv-side `to` exists, goto cleaup-phase (no replication to do)
|
||||
- if recv-side `to` exists, goto cleanup-phase (no replication to do)
|
||||
|
||||
- `to` cannot be destroyed while being received, because it isn't visible as a snapshot yet (it isn't yet one after all)
|
||||
|
||||
@ -142,7 +142,7 @@ Network failures during replication can be recovered from using resumable send &
|
||||
- Network failure during the replication
|
||||
- send-side `from` and `to` are still present due to zfs holds
|
||||
- recv-side `from` is still present because the partial receive state prevents its destruction (see prepare-phase)
|
||||
- if recv-side hasa resume token, the resume token will continue to work on the sender because `from`s and `to` are still present
|
||||
- if recv-side has a resume token, the resume token will continue to work on the sender because `from`s and `to` are still present
|
||||
- Network failure at the end of the replication step stream transmission
|
||||
- Variant A: failure from the sender's perspective, success from the receiver's perspective
|
||||
- receive-side `to` doesn't have a hold and could be destroyed anytime
|
||||
@ -221,7 +221,7 @@ It builds a diff between the sender and receiver filesystem bookmarks+snapshots
|
||||
In case of conflict, the algorithm errors out with a conflict description that can be used to manually or automatically resolve the conflict.
|
||||
Otherwise, the algorithm builds a list of replication steps that are then worked on sequentially by the "Algorithm for a Single Replication Step".
|
||||
|
||||
The algorithm ensures that a plan can be executed exactly as planned by aquiring appropriate zfs holds.
|
||||
The algorithm ensures that a plan can be executed exactly as planned by acquiring appropriate zfs holds.
|
||||
The algorithm can be configured to retry a plan when encountering non-permanent errors (e.g. network errors).
|
||||
However, permanent errors result in the plan being cancelled.
|
||||
|
||||
@ -262,22 +262,22 @@ If fast-forward is not possible, produce a conflict description and ERROR OUT.<b
|
||||
TODOs:
|
||||
- make it configurable what snapshots are included in the list (i.e. every one we see, only most recent, at least one every X hours, ...)
|
||||
|
||||
**Ensure that we will be able to carry out all steps** by aquiring holds or fsstep bookmarks on the sending side
|
||||
**Ensure that we will be able to carry out all steps** by acquiring holds or fsstep bookmarks on the sending side
|
||||
- `idempotent_hold([s.to for s in STEPS], zrepl_FS_J_${jobid})`
|
||||
- `if STEPS[0].from != nil: idempotent_FSSTEP_bookmark(STEPS[0].from, zrepl_FSSTEP_bm_G_${STEPS[0].from.guid}_J_${jobid})`
|
||||
|
||||
**Determine which steps have not been completed (`uncompleted_steps`)** (we might be in an second invocation of this algorithm after a network failure and some steps might already be done):
|
||||
- `res_tok := receiver.ResumeToken(fs)`
|
||||
- `rmrfsv := receiver.MostRecentFilesystemVersion(fs)`
|
||||
- if `res_tok != nil`: ensure that `res_tok` has a correspondinng step in `STEPS`, otherwise ERROR OUT
|
||||
- if `rmrfsv != nil`: ensure that `res_tok` has a correspondinng step in `STEPS`, otherwise ERROR OUT
|
||||
- if `res_tok != nil`: ensure that `res_tok` has a corresponding step in `STEPS`, otherwise ERROR OUT
|
||||
- if `rmrfsv != nil`: ensure that `res_tok` has a corresponding step in `STEPS`, otherwise ERROR OUT
|
||||
- if `(res_token != nil && rmrfsv != nil)`: ensure that `res_tok` is the subsequent step to the one we found for `rmrfsv`
|
||||
- if both are nil, we are at the beginning, `uncompleted_steps = STEPS` and goto next block
|
||||
- `rstep := if res_tok != nil { res_tok } else { rmrfsv }`
|
||||
- `uncompleted_steps := STEPS[find_step_idx(STEPS, rstep).expect("must exist, checked above"):]`
|
||||
- Note that we do not explicitly check for the completion of prior replication steps.
|
||||
All we care about is what needs to be done from `rstep`.
|
||||
- This is intentional and necessary because we cummutatively release all holds and step bookmarks made for steps that preceed a just-completed step (see next paragraph)
|
||||
- This is intentional and necessary because we cumulatively release all holds and step bookmarks made for steps that precede a just-completed step (see next paragraph)
|
||||
|
||||
**Execute uncompleted steps**<br/>
|
||||
Invoke the "Algorithm for a Single Replication Step" for each step in `uncompleted_steps`.
|
||||
|
@ -352,7 +352,7 @@ func (a *attempt) do(ctx context.Context, prev *attempt) {
|
||||
}
|
||||
}
|
||||
}
|
||||
// invariant: prevs contains an entry for each unambigious correspondence
|
||||
// invariant: prevs contains an entry for each unambiguous correspondence
|
||||
|
||||
stepQueue := newStepQueue()
|
||||
defer stepQueue.Start(1)() // TODO parallel replication
|
||||
@ -399,7 +399,7 @@ func (fs *fs) do(ctx context.Context, pq *stepQueue, prev *fs) {
|
||||
}
|
||||
fs.planned.steps = append(fs.planned.steps, step)
|
||||
}
|
||||
debug("iniital len(fs.planned.steps) = %d", len(fs.planned.steps))
|
||||
debug("initial len(fs.planned.steps) = %d", len(fs.planned.steps))
|
||||
|
||||
// for not-first attempts, only allow fs.planned.steps
|
||||
// up to including the originally planned target snapshot
|
||||
|
@ -8,12 +8,12 @@ import (
|
||||
|
||||
type Logger = logger.Logger
|
||||
|
||||
type contexKey int
|
||||
type contextKey int
|
||||
|
||||
const contexKeyLogger contexKey = iota + 1
|
||||
const contextKeyLogger contextKey = iota + 1
|
||||
|
||||
func getLog(ctx context.Context) Logger {
|
||||
l, ok := ctx.Value(contexKeyLogger).(Logger)
|
||||
l, ok := ctx.Value(contextKeyLogger).(Logger)
|
||||
if !ok {
|
||||
l = logger.NewNullLogger()
|
||||
}
|
||||
@ -21,5 +21,5 @@ func getLog(ctx context.Context) Logger {
|
||||
}
|
||||
|
||||
func WithLogger(ctx context.Context, log Logger) context.Context {
|
||||
return context.WithValue(ctx, contexKeyLogger, log)
|
||||
return context.WithValue(ctx, contextKeyLogger, log)
|
||||
}
|
||||
|
@ -174,7 +174,7 @@ func TestReplication(t *testing.T) {
|
||||
waitBegin := time.Now()
|
||||
wait(true)
|
||||
waitDuration := time.Since(waitBegin)
|
||||
assert.True(t, waitDuration < 10*time.Millisecond, "%v", waitDuration) // and that's gratious
|
||||
assert.True(t, waitDuration < 10*time.Millisecond, "%v", waitDuration) // and that's gracious
|
||||
|
||||
prev, err := json.Marshal(reports[0])
|
||||
require.NoError(t, err)
|
||||
|
@ -14,7 +14,7 @@ import (
|
||||
)
|
||||
|
||||
// FIXME: this test relies on timing and is thus rather flaky
|
||||
// (relies on scheduler responsivity of < 500ms)
|
||||
// (relies on scheduler responsiveness of < 500ms)
|
||||
func TestPqNotconcurrent(t *testing.T) {
|
||||
var ctr uint32
|
||||
q := newStepQueue()
|
||||
|
@ -17,7 +17,7 @@ func fsvlist(fsv ...string) (r []*FilesystemVersion) {
|
||||
r = make([]*FilesystemVersion, len(fsv))
|
||||
for i, f := range fsv {
|
||||
|
||||
// parse the id from fsvlist. it is used to derivce Guid,CreateTXG and Creation attrs
|
||||
// parse the id from fsvlist. it is used to derive Guid,CreateTXG and Creation attrs
|
||||
split := strings.Split(f, ",")
|
||||
if len(split) != 2 {
|
||||
panic("invalid fsv spec")
|
||||
@ -114,7 +114,7 @@ func TestIncrementalPath_BookmarkSupport(t *testing.T) {
|
||||
assert.Equal(t, l("#a,1", "@b,2"), path)
|
||||
})
|
||||
|
||||
// boomarks are stripped from IncrementalPath (cannot send incrementally)
|
||||
// bookmarks are stripped from IncrementalPath (cannot send incrementally)
|
||||
doTest(l("@a,1"), l("#a,1", "#b,2", "@c,3"), func(path []*FilesystemVersion, conflict error) {
|
||||
assert.Equal(t, l("#a,1", "@c,3"), path)
|
||||
})
|
||||
|
@ -91,7 +91,7 @@ message ReceiveReq {
|
||||
string Filesystem = 1;
|
||||
FilesystemVersion To = 2;
|
||||
|
||||
// If true, the receiver should clear the resume token before perfoming the
|
||||
// If true, the receiver should clear the resume token before performing the
|
||||
// zfs recv of the stream in the request
|
||||
bool ClearResumeToken = 3;
|
||||
}
|
||||
|
@ -352,7 +352,7 @@ func (fs *Filesystem) doPlanning(ctx context.Context) ([]*Step, error) {
|
||||
}
|
||||
|
||||
// give both sides a hint about how far the replication got
|
||||
// This serves as a cummulative variant of SendCompleted and can be useful
|
||||
// This serves as a cumulative variant of SendCompleted and can be useful
|
||||
// for example to release stale holds from an earlier (interrupted) replication.
|
||||
// TODO FIXME: enqueue this as a replication step instead of doing it here during planning
|
||||
// then again, the step should run regardless of planning success
|
||||
|
@ -52,7 +52,7 @@ func (s *streamCopier) WriteStreamTo(w io.Writer) zfs.StreamCopierError {
|
||||
s.mtx.Lock()
|
||||
defer s.mtx.Unlock()
|
||||
if s.used {
|
||||
panic("streamCopier used mulitple times")
|
||||
panic("streamCopier used multiple times")
|
||||
}
|
||||
s.used = true
|
||||
return s.streamConn.ReadStreamInto(w, ZFSStream)
|
||||
|
@ -100,7 +100,7 @@ func (c *Conn) ReadFrame() (Frame, error) {
|
||||
return Frame{}, ErrShutdown
|
||||
}
|
||||
|
||||
// only aquire readMtx now to prioritize the draining in Shutdown()
|
||||
// only acquire readMtx now to prioritize the draining in Shutdown()
|
||||
// over external callers (= drain public callers)
|
||||
|
||||
c.readMtx.Lock()
|
||||
@ -148,7 +148,7 @@ func (c *Conn) readFrame() (Frame, error) {
|
||||
// | | | |
|
||||
// | | | F3
|
||||
// | | |
|
||||
// | F2 |signficant time between frames because
|
||||
// | F2 |significant time between frames because
|
||||
// F1 the peer has nothing to say to us
|
||||
//
|
||||
// Assume we're at the point were F2's header is in c.readNext.
|
||||
@ -246,7 +246,7 @@ func (c *Conn) Shutdown(deadline time.Time) error {
|
||||
//
|
||||
// 1. Naive Option: We just call Close() right after CloseWrite.
|
||||
// This yields the same race condition as explained above (DIF, first
|
||||
// paragraph): The situation just becomae a little more unlikely because
|
||||
// paragraph): The situation just became a little more unlikely because
|
||||
// our rstFrameType + CloseWrite dance gave the client a full RTT worth of
|
||||
// time to read the data from its TCP recv buffer.
|
||||
//
|
||||
@ -295,7 +295,7 @@ func (c *Conn) Shutdown(deadline time.Time) error {
|
||||
defer prometheus.NewTimer(prom.ShutdownSeconds).ObserveDuration()
|
||||
|
||||
closeWire := func(step string) error {
|
||||
// TODO SetLinger(0) or similiar (we want RST frames here, not FINS)
|
||||
// TODO SetLinger(0) or similar (we want RST frames here, not FINS)
|
||||
closeErr := c.nc.Close()
|
||||
if closeErr == nil {
|
||||
return nil
|
||||
@ -321,10 +321,10 @@ func (c *Conn) Shutdown(deadline time.Time) error {
|
||||
|
||||
c.shutdown.Begin()
|
||||
// new calls to c.ReadFrame and c.WriteFrame will now return ErrShutdown
|
||||
// Aquiring writeMtx and readMtx afterwards ensures that already-running calls exit successfully
|
||||
// Acquiring writeMtx and readMtx afterwards ensures that already-running calls exit successfully
|
||||
|
||||
// disable renewing timeouts now, enforce the requested deadline instead
|
||||
// we need to do this before aquiring locks to enforce the timeout on slow
|
||||
// we need to do this before acquiring locks to enforce the timeout on slow
|
||||
// clients / if something hangs (DoS mitigation)
|
||||
if err := c.nc.DisableTimeouts(); err != nil {
|
||||
return hardclose(err, "disable_timeouts")
|
||||
|
@ -1,4 +1,4 @@
|
||||
// microbenchmark to manually test rpc/dataconn perforamnce
|
||||
// microbenchmark to manually test rpc/dataconn performance
|
||||
//
|
||||
// With stdin / stdout on client and server, simulating zfs send|recv piping
|
||||
//
|
||||
@ -6,7 +6,7 @@
|
||||
// ./microbenchmark -appmode client -direction recv < /dev/zero
|
||||
//
|
||||
//
|
||||
// Without the overhead of pipes (just protocol perforamnce, mostly useful with perf bc no bw measurement)
|
||||
// Without the overhead of pipes (just protocol performance, mostly useful with perf bc no bw measurement)
|
||||
//
|
||||
// ./microbenchmark -appmode client -direction recv -devnoopWriter -devnoopReader
|
||||
// ./microbenchmark -appmode server -devnoopReader -devnoopWriter
|
||||
|
@ -119,7 +119,7 @@ restart:
|
||||
return n, err
|
||||
}
|
||||
|
||||
// Writes the given buffers to Conn, following the sematincs of io.Copy,
|
||||
// Writes the given buffers to Conn, following the semantics of io.Copy,
|
||||
// but is guaranteed to use the writev system call if the wrapped Wire
|
||||
// support it.
|
||||
// Note the Conn does not support writev through io.Copy(aConn, aNetBuffers).
|
||||
@ -158,9 +158,9 @@ var _ SyscallConner = (*net.TCPConn)(nil)
|
||||
// Think of io.ReadvFull, but for net.Buffers + using the readv syscall.
|
||||
//
|
||||
// If the underlying Wire is not a SyscallConner, a fallback
|
||||
// ipmlementation based on repeated Conn.Read invocations is used.
|
||||
// implementation based on repeated Conn.Read invocations is used.
|
||||
//
|
||||
// If the connection returned io.EOF, the number of bytes up ritten until
|
||||
// If the connection returned io.EOF, the number of bytes written until
|
||||
// then + io.EOF is returned. This behavior is different to io.ReadFull
|
||||
// which returns io.ErrUnexpectedEOF.
|
||||
func (c Conn) ReadvFull(buffers net.Buffers) (n int64, err error) {
|
||||
|
@ -99,7 +99,7 @@ func (c Conn) doOneReadv(rawConn syscall.RawConn, iovecs *[]syscall.Iovec) (n in
|
||||
// Update left, cannot go below 0 due to
|
||||
// a) definition of thisIovecConsumedCompletely
|
||||
// b) left > 0 due to loop invariant
|
||||
// Convertion .Len to int64 is thus also safe now, because it is < left < INT_MAX
|
||||
// Converting .Len to int64 is thus also safe now, because it is < left < INT_MAX
|
||||
left -= int((*iovecs)[0].Len)
|
||||
*iovecs = (*iovecs)[1:]
|
||||
} else {
|
||||
|
@ -56,7 +56,7 @@ func NewClientAuthListener(
|
||||
}
|
||||
|
||||
// Accept() accepts a connection from the *net.TCPListener passed to the constructor
|
||||
// and sets up the TLS connection, including handshake and peer CommmonName validation
|
||||
// and sets up the TLS connection, including handshake and peer CommonName validation
|
||||
// within the specified handshakeTimeout.
|
||||
//
|
||||
// It returns both the raw TCP connection (tcpConn) and the TLS connection (tlsConn) on top of it.
|
||||
|
@ -100,7 +100,7 @@ func (l *LocalListener) Accept(ctx context.Context) (*transport.AuthConn, error)
|
||||
WithField("res.conn", res.conn).WithField("res.err", res.err).
|
||||
Debug("responding to client request")
|
||||
|
||||
// contract bewteen Connect and Accept is that Connect sends a req.callback
|
||||
// contract between Connect and Accept is that Connect sends a req.callback
|
||||
// into which we can send one result non-blockingly.
|
||||
// We want to panic if that contract is violated (impl error)
|
||||
//
|
||||
@ -112,7 +112,7 @@ func (l *LocalListener) Accept(ctx context.Context) (*transport.AuthConn, error)
|
||||
defer func() {
|
||||
errv := recover()
|
||||
if errv == clientCallbackBlocked {
|
||||
// this would be a violation of contract betwee Connect and Accept, see above
|
||||
// this would be a violation of contract between Connect and Accept, see above
|
||||
panic(clientCallbackBlocked)
|
||||
} else {
|
||||
transport.GetLogger(ctx).WithField("recover_err", errv).
|
||||
|
@ -116,7 +116,7 @@ func (m *MultiStdinserverListener) Close() error {
|
||||
return oneErr
|
||||
}
|
||||
|
||||
// a single stdinserverListener (part of multiStinserverListener)
|
||||
// a single stdinserverListener (part of multiStdinserverListener)
|
||||
type stdinserverListener struct {
|
||||
l *netssh.Listener
|
||||
clientIdentity string
|
||||
|
@ -61,7 +61,7 @@ func ValidateClientIdentity(in string) (err error) {
|
||||
return err
|
||||
}
|
||||
if path.Length() != 1 {
|
||||
return errors.New("client identity must be a single path comonent (not empty, no '/')")
|
||||
return errors.New("client identity must be a single path component (not empty, no '/')")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -7,7 +7,7 @@ import (
|
||||
"github.com/zrepl/zrepl/zfs"
|
||||
)
|
||||
|
||||
// StreamCopier wraps a zfs.StreamCopier, reimplemening
|
||||
// StreamCopier wraps a zfs.StreamCopier, reimplementing
|
||||
// its interface and counting the bytes written to during copying.
|
||||
type StreamCopier interface {
|
||||
zfs.StreamCopier
|
||||
|
@ -20,7 +20,7 @@ func TestSemaphore(t *testing.T) {
|
||||
|
||||
sem := New(concurrentSemaphore)
|
||||
|
||||
var aquisitions struct {
|
||||
var acquisitions struct {
|
||||
beforeT, afterT uint32
|
||||
}
|
||||
|
||||
@ -33,9 +33,9 @@ func TestSemaphore(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
defer res.Release()
|
||||
if time.Since(begin) > sleepTime {
|
||||
atomic.AddUint32(&aquisitions.beforeT, 1)
|
||||
atomic.AddUint32(&acquisitions.beforeT, 1)
|
||||
} else {
|
||||
atomic.AddUint32(&aquisitions.afterT, 1)
|
||||
atomic.AddUint32(&acquisitions.afterT, 1)
|
||||
}
|
||||
time.Sleep(sleepTime)
|
||||
}()
|
||||
@ -43,7 +43,7 @@ func TestSemaphore(t *testing.T) {
|
||||
|
||||
wg.Wait()
|
||||
|
||||
assert.True(t, aquisitions.beforeT == concurrentSemaphore)
|
||||
assert.True(t, aquisitions.afterT == numGoroutines-concurrentSemaphore)
|
||||
assert.True(t, acquisitions.beforeT == concurrentSemaphore)
|
||||
assert.True(t, acquisitions.afterT == numGoroutines-concurrentSemaphore)
|
||||
|
||||
}
|
||||
|
@ -39,7 +39,7 @@ type DatasetPathsVisitor func(v DatasetPathVisit) (visitChildTree bool)
|
||||
|
||||
// Traverse a list of DatasetPaths top down, i.e. given a set of datasets with same
|
||||
// path prefix, those with shorter prefix are traversed first.
|
||||
// If there are gaps, i.e. the intermediary component a/b bewtween a and a/b/c,
|
||||
// If there are gaps, i.e. the intermediary component a/b between a and a/b/c,
|
||||
// those gaps are still visited but the FilledIn property of the visit is set to true.
|
||||
func (f *DatasetPathForest) WalkTopDown(visitor DatasetPathsVisitor) {
|
||||
|
||||
|
@ -56,7 +56,7 @@ func TestDatasetPathForestWalkTopDown(t *testing.T) {
|
||||
|
||||
buildForest(paths).WalkTopDown(v)
|
||||
|
||||
expectedVisists := []DatasetPathVisit{
|
||||
expectedVisits := []DatasetPathVisit{
|
||||
{toDatasetPath("pool1"), false},
|
||||
{toDatasetPath("pool1/foo"), true},
|
||||
{toDatasetPath("pool1/foo/bar"), false},
|
||||
@ -65,7 +65,7 @@ func TestDatasetPathForestWalkTopDown(t *testing.T) {
|
||||
{toDatasetPath("pool2/test"), true},
|
||||
{toDatasetPath("pool2/test/bar"), false},
|
||||
}
|
||||
assert.Equal(t, expectedVisists, rec.visits)
|
||||
assert.Equal(t, expectedVisits, rec.visits)
|
||||
|
||||
}
|
||||
|
||||
@ -82,7 +82,7 @@ func TestDatasetPathWalkTopDownWorksUnordered(t *testing.T) {
|
||||
|
||||
buildForest(paths).WalkTopDown(v)
|
||||
|
||||
expectedVisists := []DatasetPathVisit{
|
||||
expectedVisits := []DatasetPathVisit{
|
||||
{toDatasetPath("pool1"), false},
|
||||
{toDatasetPath("pool1/foo"), true},
|
||||
{toDatasetPath("pool1/foo/bar"), false},
|
||||
@ -91,6 +91,6 @@ func TestDatasetPathWalkTopDownWorksUnordered(t *testing.T) {
|
||||
{toDatasetPath("pool1/bang/baz"), false},
|
||||
}
|
||||
|
||||
assert.Equal(t, expectedVisists, rec.visits)
|
||||
assert.Equal(t, expectedVisits, rec.visits)
|
||||
|
||||
}
|
||||
|
@ -266,7 +266,7 @@ func doZFSReleaseAllOlderAndIncOrExcludingGUIDFindSnapshots(snapOrBookmarkGuid u
|
||||
case EntityTypeBookmark:
|
||||
return 1
|
||||
default:
|
||||
panic("unepxected entity type " + t.String())
|
||||
panic("unexpected entity type " + t.String())
|
||||
}
|
||||
}
|
||||
return iET(lines[i].entityType) < iET(lines[j].entityType)
|
||||
@ -290,7 +290,7 @@ func doZFSReleaseAllOlderAndIncOrExcludingGUIDFindSnapshots(snapOrBookmarkGuid u
|
||||
}
|
||||
if foundGuid {
|
||||
// The secondary key in sorting (snap < bookmark) guarantees that we
|
||||
// A) either found the snapshot with snapOrBoomkarkGuid
|
||||
// A) either found the snapshot with snapOrBookmarkGuid
|
||||
// B) or no snapshot with snapGuid exists, but one or more bookmarks of it exists
|
||||
// In the case of A, we already added the snapshot to releaseSnaps if includeGuid requests it,
|
||||
// and can ignore possible subsequent bookmarks of the snapshot.
|
||||
|
@ -13,7 +13,7 @@ func TestDoZFSReleaseAllOlderAndIncOrExcludingGUIDFindSnapshots(t *testing.T) {
|
||||
// what we test here: sort bookmark #3 before @3
|
||||
// => assert that the function doesn't stop at the first guid match
|
||||
// (which might be a bookmark, depending on zfs list ordering)
|
||||
// but instead considers the entire stride of boomarks and snapshots with that guid
|
||||
// but instead considers the entire stride of bookmarks and snapshots with that guid
|
||||
//
|
||||
// also, throw in unordered createtxg for good measure
|
||||
list, err := doZFSReleaseAllOlderAndIncOrExcludingGUIDParseListOutput(
|
||||
|
@ -15,7 +15,7 @@ import (
|
||||
"github.com/zrepl/zrepl/util/envconst"
|
||||
)
|
||||
|
||||
// NOTE: Update ZFSSendARgs.Validate when changning fields (potentially SECURITY SENSITIVE)
|
||||
// NOTE: Update ZFSSendARgs.Validate when changing fields (potentially SECURITY SENSITIVE)
|
||||
type ResumeToken struct {
|
||||
HasFromGUID, HasToGUID bool
|
||||
FromGUID, ToGUID uint64
|
||||
@ -25,6 +25,7 @@ type ResumeToken struct {
|
||||
}
|
||||
|
||||
var resumeTokenNVListRE = regexp.MustCompile(`\t(\S+) = (.*)`)
|
||||
|
||||
var resumeTokenContentsRE = regexp.MustCompile(`resume token contents:\nnvlist version: 0`)
|
||||
var resumeTokenIsCorruptRE = regexp.MustCompile(`resume token is corrupt`)
|
||||
|
||||
|
@ -188,7 +188,7 @@ func doDestroyBatchedRec(ctx context.Context, fsbatch []*DestroySnapOp, d destro
|
||||
err := tryBatch(ctx, strippedBatch, d)
|
||||
if err != nil {
|
||||
// run entire batch sequentially if the stripped one fails
|
||||
// (it shouldn't because we stripped erronous datasets)
|
||||
// (it shouldn't because we stripped erroneous datasets)
|
||||
singleRun = fsbatch // shadow
|
||||
} else {
|
||||
setDestroySnapOpErr(strippedBatch, nil) // these ones worked
|
||||
|
16
zfs/zfs.go
16
zfs/zfs.go
@ -127,7 +127,7 @@ func NewDatasetPath(s string) (p *DatasetPath, err error) {
|
||||
return p, nil // the empty dataset path
|
||||
}
|
||||
const FORBIDDEN = "@#|\t<>*"
|
||||
/* Documenation of allowed characters in zfs names:
|
||||
/* Documentation of allowed characters in zfs names:
|
||||
https://docs.oracle.com/cd/E19253-01/819-5461/gbcpt/index.html
|
||||
Space is missing in the oracle list, but according to
|
||||
https://github.com/zfsonlinux/zfs/issues/439
|
||||
@ -511,7 +511,7 @@ func (s *sendStream) killAndWait(precedingReadErr error) error {
|
||||
|
||||
// detect the edge where we're called from s.Read
|
||||
// after the pipe EOFed and zfs send exited without errors
|
||||
// this is actullay the "hot" / nice path
|
||||
// this is actually the "hot" / nice path
|
||||
if exitErr == nil && precedingReadErr == io.EOF {
|
||||
return precedingReadErr
|
||||
}
|
||||
@ -618,13 +618,13 @@ func (n *NilBool) String() string {
|
||||
return fmt.Sprintf("%v", n.B)
|
||||
}
|
||||
|
||||
// When updating this struct, check Validate and ValidateCorrespondsToResumeToken (Potentiall SECURITY SENSITIVE)
|
||||
// When updating this struct, check Validate and ValidateCorrespondsToResumeToken (POTENTIALLY SECURITY SENSITIVE)
|
||||
type ZFSSendArgs struct {
|
||||
FS string
|
||||
From, To *ZFSSendArgVersion // From may be nil
|
||||
Encrypted *NilBool
|
||||
|
||||
// Prefereed if not empty
|
||||
// Preferred if not empty
|
||||
ResumeToken string // if not nil, must match what is specified in From, To (covered by ValidateCorrespondsToResumeToken)
|
||||
}
|
||||
|
||||
@ -660,7 +660,7 @@ func (e ZFSSendArgsValidationError) Error() string {
|
||||
}
|
||||
|
||||
// - Recursively call Validate on each field.
|
||||
// - Make sure that if ResumeToken != "", it reflects the same operation as the other paramters would.
|
||||
// - Make sure that if ResumeToken != "", it reflects the same operation as the other parameters would.
|
||||
//
|
||||
// This function is not pure because GUIDs are checked against the local host's datasets.
|
||||
func (a ZFSSendArgs) Validate(ctx context.Context) error {
|
||||
@ -679,7 +679,7 @@ func (a ZFSSendArgs) Validate(ctx context.Context) error {
|
||||
if err := a.From.ValidateExists(ctx, a.FS); err != nil {
|
||||
return newGenericValidationError(a, errors.Wrap(err, "`From` invalid"))
|
||||
}
|
||||
// falthrough
|
||||
// fallthrough
|
||||
}
|
||||
|
||||
if err := a.Encrypted.Validate(); err != nil {
|
||||
@ -743,7 +743,7 @@ func (a ZFSSendArgs) validateCorrespondsToResumeToken(ctx context.Context, valCt
|
||||
|
||||
debug("decoding resume token %q", a.ResumeToken)
|
||||
t, err := ParseResumeToken(ctx, a.ResumeToken)
|
||||
debug("decode resumee token result: %#v %T %v", t, err, err)
|
||||
debug("decode resume token result: %#v %T %v", t, err, err)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -894,7 +894,7 @@ type DrySendInfo struct {
|
||||
}
|
||||
|
||||
var (
|
||||
// keep same number of capture groups for unmarshalInfoLine homogenity
|
||||
// keep same number of capture groups for unmarshalInfoLine homogeneity
|
||||
|
||||
sendDryRunInfoLineRegexFull = regexp.MustCompile(`^(full)\t()([^\t]+@[^\t]+)\t([0-9]+)$`)
|
||||
// cannot enforce '[#@]' in incremental source, see test cases
|
||||
|
@ -104,7 +104,7 @@ nvlist version: 0
|
||||
incremental zroot/test/a@1 zroot/test/a@2 5383936
|
||||
`
|
||||
|
||||
// # incremental send with token + bookmarmk
|
||||
// # incremental send with token + bookmark
|
||||
// $ zfs send -nvP -t 1-ef01e717e-e0-789c636064000310a501c49c50360710a715e5e7a69766a63040c1d904b9e342877e062900d9ec48eaf293b252934b181898a0ea30e4d3d28a534b40323e70793624f9a4ca92d46220fdc1ce0fabfe927c882bc46c8a0a9f71ad3baf8124cf0996cf4bcc4d6560a82acacf2fd1079a55a29fe86004710b00d8ae1f93
|
||||
incSendBookmark := `
|
||||
resume token contents:
|
||||
|
Loading…
Reference in New Issue
Block a user