zfs send/recv: Support raw sends (-w/-Lce) and property handling (-p, -b, -o, -x)

Signed-off-by: InsanePrawn <insane.prawny@gmail.com>
This commit is contained in:
InsanePrawn 2020-08-30 18:16:31 +02:00
parent 8e1937fe75
commit 893d686eef
8 changed files with 448 additions and 25 deletions

View File

@ -12,6 +12,8 @@ import (
"github.com/pkg/errors"
"github.com/zrepl/yaml-config"
zfsprop "github.com/zrepl/zrepl/zfs/property"
)
type Config struct {
@ -76,8 +78,15 @@ type SnapJob struct {
}
type SendOptions struct {
Encrypted bool `yaml:"encrypted"`
StepHolds SendOptionsStepHolds `yaml:"step_holds,optional"`
Encrypted bool `yaml:"encrypted,optional"`
Raw bool `yaml:"raw,optional"`
SendProperties bool `yaml:"send_properties,optional"`
BackupProperties bool `yaml:"backup_properties,optional"`
LargeBlocks bool `yaml:"large_blocks,optional"`
Compressed bool `yaml:"compressed,optional"`
EmbeddedData bool `yaml:"embbeded_data,optional"`
}
type SendOptionsStepHolds struct {
@ -87,21 +96,43 @@ type SendOptionsStepHolds struct {
var _ yaml.Defaulter = (*SendOptions)(nil)
func (l *SendOptions) SetDefault() {
*l = SendOptions{Encrypted: false}
*l = SendOptions{
Encrypted: false,
Raw: false,
SendProperties: false,
BackupProperties: false,
LargeBlocks: false,
Compressed: false,
EmbeddedData: false,
}
}
type RecvOptions struct {
// Note: we cannot enforce encrypted recv as the ZFS cli doesn't provide a mechanism for it
// Encrypted bool `yaml:"may_encrypted"`
// Future:
// Reencrypt bool `yaml:"reencrypt"`
Properties *PropertyRecvOptions `yaml:"properties,fromdefaults"`
}
var _ yaml.Defaulter = (*RecvOptions)(nil)
func (l *RecvOptions) SetDefault() {
*l = RecvOptions{}
*l = RecvOptions{Properties: &PropertyRecvOptions{}}
}
type PropertyRecvOptions struct {
Inherit []zfsprop.Property `yaml:"inherit,optional"`
Override map[zfsprop.Property]string `yaml:"override,optional"`
}
var _ yaml.Defaulter = (*PropertyRecvOptions)(nil)
func (l *PropertyRecvOptions) SetDefault() {
//*l = PropertyRecvOptions{}
//TODO: is below necessary?
*l = PropertyRecvOptions{Inherit: make([]zfsprop.Property, 0), Override: make(map[zfsprop.Property]string)}
}
type PushJob struct {

132
config/config_recv_test.go Normal file
View File

@ -0,0 +1,132 @@
package config
import (
"fmt"
"testing"
"github.com/stretchr/testify/assert"
)
func TestRecvOptions(t *testing.T) {
tmpl := `
jobs:
- name: foo
type: pull
connect:
type: local
listener_name: foo
client_identity: bar
root_fs: "zreplplatformtest"
%s
interval: manual
pruning:
keep_sender:
- type: last_n
count: 10
keep_receiver:
- type: last_n
count: 10
`
recv_properties_empty := `
recv:
properties:
`
recv_inherit_empty := `
recv:
properties:
inherit:
`
recv_inherit := `
recv:
properties:
inherit:
- testprop
`
recv_override_empty := `
recv:
properties:
override:
`
recv_override := `
recv:
properties:
override:
testprop2: "test123"
`
recv_override_and_inherit := `
recv:
properties:
inherit:
- testprop
override:
testprop2: "test123"
`
recv_empty := `
recv: {}
`
recv_not_specified := `
`
fill := func(s string) string { return fmt.Sprintf(tmpl, s) }
var c *Config
t.Run("recv_inherit_empty", func(t *testing.T) {
c, err := testConfig(t, fill(recv_inherit_empty))
assert.NoError(t, err)
assert.NotNil(t, c)
})
t.Run("recv_inherit", func(t *testing.T) {
c = testValidConfig(t, fill(recv_inherit))
inherit := c.Jobs[0].Ret.(*PullJob).Recv.Properties.Inherit
assert.NotEmpty(t, inherit)
})
t.Run("recv_override_empty", func(t *testing.T) {
c, err := testConfig(t, fill(recv_override_empty))
assert.NoError(t, err)
assert.NotNil(t, c)
})
t.Run("recv_override", func(t *testing.T) {
c = testValidConfig(t, fill(recv_override))
override := c.Jobs[0].Ret.(*PullJob).Recv.Properties.Override
assert.NotEmpty(t, override)
})
t.Run("recv_override_and_inherit", func(t *testing.T) {
c = testValidConfig(t, fill(recv_override_and_inherit))
inherit := c.Jobs[0].Ret.(*PullJob).Recv.Properties.Inherit
override := c.Jobs[0].Ret.(*PullJob).Recv.Properties.Override
assert.NotEmpty(t, inherit)
assert.NotEmpty(t, override)
})
t.Run("recv_properties_empty", func(t *testing.T) {
c, err := testConfig(t, fill(recv_properties_empty))
assert.NoError(t, err)
assert.NotNil(t, c)
})
t.Run("recv_empty", func(t *testing.T) {
c, err := testConfig(t, fill(recv_empty))
assert.NoError(t, err)
assert.NotNil(t, c)
})
t.Run("send_not_specified", func(t *testing.T) {
c, err := testConfig(t, fill(recv_not_specified))
assert.NoError(t, err)
assert.NotNil(t, c)
})
}

View File

@ -38,7 +38,41 @@ jobs:
encrypted: true
`
encrypted_unspecified := `
raw_true := `
send:
raw: true
`
raw_false := `
send:
raw: false
`
raw_and_encrypted := `
send:
encrypted: true
raw: true
`
properties_and_encrypted := `
send:
encrypted: true
send_properties: true
`
properties_true := `
send:
send_properties: true
`
properties_false := `
send:
send_properties: false
`
send_empty := `
send: {}
`
@ -59,10 +93,50 @@ jobs:
assert.Equal(t, true, encrypted)
})
t.Run("encrypted_unspecified", func(t *testing.T) {
c, err := testConfig(t, fill(encrypted_unspecified))
assert.Error(t, err)
assert.Nil(t, c)
t.Run("send_empty", func(t *testing.T) {
c := testValidConfig(t, fill(send_empty))
encrypted := c.Jobs[0].Ret.(*PushJob).Send.Encrypted
assert.Equal(t, false, encrypted)
})
t.Run("properties_and_encrypted", func(t *testing.T) {
c := testValidConfig(t, fill(properties_and_encrypted))
encrypted := c.Jobs[0].Ret.(*PushJob).Send.Encrypted
properties := c.Jobs[0].Ret.(*PushJob).Send.SendProperties
assert.Equal(t, true, encrypted)
assert.Equal(t, true, properties)
})
t.Run("properties_false", func(t *testing.T) {
c := testValidConfig(t, fill(properties_false))
properties := c.Jobs[0].Ret.(*PushJob).Send.SendProperties
assert.Equal(t, false, properties)
})
t.Run("properties_true", func(t *testing.T) {
c := testValidConfig(t, fill(properties_true))
properties := c.Jobs[0].Ret.(*PushJob).Send.SendProperties
assert.Equal(t, true, properties)
})
t.Run("raw_true", func(t *testing.T) {
c := testValidConfig(t, fill(raw_true))
raw := c.Jobs[0].Ret.(*PushJob).Send.Raw
assert.Equal(t, true, raw)
})
t.Run("raw_false", func(t *testing.T) {
c := testValidConfig(t, fill(raw_false))
raw := c.Jobs[0].Ret.(*PushJob).Send.Raw
assert.Equal(t, false, raw)
})
t.Run("raw_and_encrypted", func(t *testing.T) {
c := testValidConfig(t, fill(raw_and_encrypted))
raw := c.Jobs[0].Ret.(*PushJob).Send.Raw
encrypted := c.Jobs[0].Ret.(*PushJob).Send.Encrypted
assert.Equal(t, true, raw)
assert.Equal(t, true, encrypted)
})
t.Run("send_not_specified", func(t *testing.T) {

View File

@ -153,9 +153,16 @@ func modePushFromConfig(g *config.Global, in *config.PushJob, jobID endpoint.Job
m.senderConfig = &endpoint.SenderConfig{
FSF: fsf,
Encrypt: &zfs.NilBool{B: in.Send.Encrypted},
DisableIncrementalStepHolds: in.Send.StepHolds.DisableIncremental,
JobID: jobID,
DisableIncrementalStepHolds: in.Send.StepHolds.DisableIncremental,
Encrypt: &zfs.NilBool{B: in.Send.Encrypted},
SendRaw: in.Send.Raw,
SendProperties: in.Send.SendProperties,
SendBackupProperties: in.Send.BackupProperties,
SendLargeBlocks: in.Send.LargeBlocks,
SendCompressed: in.Send.Compressed,
SendEmbeddedData: in.Send.EmbeddedData,
}
m.plannerPolicy = &logic.PlannerPolicy{
EncryptedSend: logic.TriFromBool(in.Send.Encrypted),
@ -264,6 +271,9 @@ func modePullFromConfig(g *config.Global, in *config.PullJob, jobID endpoint.Job
RootWithoutClientComponent: m.rootFS,
AppendClientIdentity: false, // !
UpdateLastReceivedHold: true,
InheritProperties: in.Recv.Properties.Inherit,
OverrideProperties: in.Recv.Properties.Override,
}
if err := m.receiverConfig.Validate(); err != nil {
return nil, errors.Wrap(err, "cannot build receiver config")

View File

@ -58,6 +58,9 @@ func modeSinkFromConfig(g *config.Global, in *config.SinkJob, jobID endpoint.Job
RootWithoutClientComponent: rootDataset,
AppendClientIdentity: true, // !
UpdateLastReceivedHold: true,
InheritProperties: in.Recv.Properties.Inherit,
OverrideProperties: in.Recv.Properties.Override,
}
if err := m.receiverConfig.Validate(); err != nil {
return nil, errors.Wrap(err, "cannot build receiver config")
@ -80,9 +83,16 @@ func modeSourceFromConfig(g *config.Global, in *config.SourceJob, jobID endpoint
}
m.senderConfig = &endpoint.SenderConfig{
FSF: fsf,
Encrypt: &zfs.NilBool{B: in.Send.Encrypted},
DisableIncrementalStepHolds: in.Send.StepHolds.DisableIncremental,
JobID: jobID,
DisableIncrementalStepHolds: in.Send.StepHolds.DisableIncremental,
Encrypt: &zfs.NilBool{B: in.Send.Encrypted},
SendRaw: in.Send.Raw,
SendProperties: in.Send.SendProperties,
SendBackupProperties: in.Send.BackupProperties,
SendLargeBlocks: in.Send.LargeBlocks,
SendCompressed: in.Send.Compressed,
SendEmbeddedData: in.Send.EmbeddedData,
}
if m.snapper, err = snapper.FromConfig(g, fsf, in.Snapshotting); err != nil {

View File

@ -18,13 +18,21 @@ import (
"github.com/zrepl/zrepl/util/envconst"
"github.com/zrepl/zrepl/util/semaphore"
"github.com/zrepl/zrepl/zfs"
zfsprop "github.com/zrepl/zrepl/zfs/property"
)
type SenderConfig struct {
FSF zfs.DatasetFilter
Encrypt *zfs.NilBool
DisableIncrementalStepHolds bool
JobID JobID
Encrypt *zfs.NilBool
SendRaw bool
SendProperties bool
SendBackupProperties bool
SendLargeBlocks bool
SendCompressed bool
SendEmbeddedData bool
}
func (c *SenderConfig) Validate() error {
@ -41,9 +49,16 @@ func (c *SenderConfig) Validate() error {
// Sender implements replication.ReplicationEndpoint for a sending side
type Sender struct {
FSFilter zfs.DatasetFilter
encrypt *zfs.NilBool
disableIncrementalStepHolds bool
jobId JobID
encrypt *zfs.NilBool
sendRaw bool
sendProperties bool
sendBackupProperties bool
sendLargeBlocks bool
sendCompressed bool
sendEmbeddedData bool
}
func NewSender(conf SenderConfig) *Sender {
@ -52,9 +67,16 @@ func NewSender(conf SenderConfig) *Sender {
}
return &Sender{
FSFilter: conf.FSF,
encrypt: conf.Encrypt,
disableIncrementalStepHolds: conf.DisableIncrementalStepHolds,
jobId: conf.JobID,
encrypt: conf.Encrypt,
sendRaw: conf.SendRaw,
sendProperties: conf.SendProperties,
sendBackupProperties: conf.SendBackupProperties,
sendLargeBlocks: conf.SendLargeBlocks,
sendCompressed: conf.SendCompressed,
sendEmbeddedData: conf.SendEmbeddedData,
}
}
@ -171,8 +193,15 @@ func (s *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.Rea
FS: r.Filesystem,
From: uncheckedSendArgsFromPDU(r.GetFrom()), // validated by zfs.ZFSSendDry / zfs.ZFSSend
To: uncheckedSendArgsFromPDU(r.GetTo()), // validated by zfs.ZFSSendDry / zfs.ZFSSend
Encrypted: s.encrypt,
ResumeToken: r.ResumeToken, // nil or not nil, depending on decoding success
Encrypted: s.encrypt,
Properties: s.sendProperties,
BackupProperties: s.sendBackupProperties,
Raw: s.sendRaw,
LargeBlocks: s.sendLargeBlocks,
Compressed: s.sendCompressed,
EmbeddedData: s.sendEmbeddedData,
}
sendArgs, err := sendArgsUnvalidated.Validate(ctx)
@ -180,6 +209,21 @@ func (s *Sender) Send(ctx context.Context, r *pdu.SendReq) (*pdu.SendRes, io.Rea
return nil, nil, errors.Wrap(err, "validate send arguments")
}
if s.sendRaw {
encryptionSupported, err := zfs.EncryptionCLISupported(ctx)
if err != nil {
return nil, nil, errors.Wrap(err, "cannot determine CLI native encryption support")
}
if !encryptionSupported {
// zfs-send(8) about `send -w`: "For unencrypted datasets, this flag will be equivalent to -Lec."
sendArgs.Raw = false
sendArgs.LargeBlocks = true // -L
sendArgs.Compressed = true // -c
sendArgs.EmbeddedData = true // -e
}
}
getLogger(ctx).Debug("acquire concurrent send semaphore")
// TODO use try-acquire and fail with resource-exhaustion rpc status
// => would require handling on the client-side
@ -432,6 +476,8 @@ type FSMap interface { // FIXME unused
AsFilter() FSFilter
}
// NOTE: when adding members to this struct, remember
// to add them to `ReceiverConfig.copyIn()`
type ReceiverConfig struct {
JobID JobID
@ -439,14 +485,46 @@ type ReceiverConfig struct {
AppendClientIdentity bool
UpdateLastReceivedHold bool
InheritProperties []zfsprop.Property
OverrideProperties map[zfsprop.Property]string
}
func (c *ReceiverConfig) copyIn() {
c.RootWithoutClientComponent = c.RootWithoutClientComponent.Copy()
pInherit := make([]zfsprop.Property, len(c.InheritProperties))
copy(pInherit, c.InheritProperties)
c.InheritProperties = pInherit
pOverride := make(map[zfsprop.Property]string, len(c.OverrideProperties))
for key, value := range c.OverrideProperties {
pOverride[key] = value
}
c.OverrideProperties = pOverride
}
func (c *ReceiverConfig) Validate() error {
c.JobID.MustValidate()
if c.InheritProperties != nil {
for _, prop := range c.InheritProperties {
err := prop.Validate()
if err != nil {
return errors.Wrapf(err, "inherit property %q", prop)
}
}
}
if c.OverrideProperties != nil {
for prop := range c.OverrideProperties {
err := prop.Validate()
if err != nil {
return errors.Wrapf(err, "override property %q", prop)
}
}
}
if c.RootWithoutClientComponent.Length() <= 0 {
return errors.New("RootWithoutClientComponent must not be an empty dataset path")
}
@ -734,6 +812,10 @@ func (s *Receiver) Receive(ctx context.Context, req *pdu.ReceiveReq, receive io.
return nil, errors.Wrap(err, "cannot get placeholder state")
}
log.WithField("placeholder_state", fmt.Sprintf("%#v", ph)).Debug("placeholder state")
recvOpts.InheritProperties = s.conf.InheritProperties
recvOpts.OverrideProperties = s.conf.OverrideProperties
if ph.FSExists && ph.IsPlaceholder {
recvOpts.RollbackAndForceRecv = true
clearPlaceholderProperty = true

33
zfs/property/property.go Normal file
View File

@ -0,0 +1,33 @@
package property
import (
"fmt"
"regexp"
)
type Property string
// Check property name conforms to zfsprops(8), section "User Properties"
// Keep regex and error message in sync!
var (
propertyValidNameChars = regexp.MustCompile(`^[0-9a-zA-Z-_\.:]+$`)
propertyValidNameCharsErr = fmt.Errorf("property name must only contain alphanumeric chars and any in %q", "-_.:")
)
func (p Property) Validate() error {
const PROPERTYNAMEMAXLEN int = 256
if len(p) < 1 {
return fmt.Errorf("property name cannot be empty")
}
if len(p) > PROPERTYNAMEMAXLEN {
return fmt.Errorf("property name longer than %d characters", PROPERTYNAMEMAXLEN)
}
if p[0] == '-' {
return fmt.Errorf("property name cannot start with '-'")
}
if !propertyValidNameChars.MatchString(string(p)) {
return propertyValidNameCharsErr
}
return nil
}

View File

@ -21,6 +21,7 @@ import (
"github.com/zrepl/zrepl/util/circlog"
"github.com/zrepl/zrepl/util/envconst"
zfsprop "github.com/zrepl/zrepl/zfs/property"
"github.com/zrepl/zrepl/zfs/zfscmd"
)
@ -329,10 +330,30 @@ func (a ZFSSendArgsUnvalidated) buildCommonSendArgs() ([]string, error) {
return args, nil
}
if a.Encrypted.B {
if a.Encrypted.B || a.Raw {
args = append(args, "-w")
}
if a.Properties {
args = append(args, "-p")
}
if a.BackupProperties {
args = append(args, "-b")
}
if a.LargeBlocks {
args = append(args, "-L")
}
if a.Compressed {
args = append(args, "-c")
}
if a.EmbeddedData {
args = append(args, "-e")
}
toV, err := absVersion(a.FS, a.To)
if err != nil {
return nil, err
@ -573,7 +594,14 @@ func (n *NilBool) String() string {
type ZFSSendArgsUnvalidated struct {
FS string
From, To *ZFSSendArgVersion // From may be nil
Encrypted *NilBool
Properties bool
BackupProperties bool
Raw bool
LargeBlocks bool
Compressed bool
EmbeddedData bool
// Preferred if not empty
ResumeToken string // if not nil, must match what is specified in From, To (covered by ValidateCorrespondsToResumeToken)
@ -779,17 +807,27 @@ func ZFSSend(ctx context.Context, sendArgs ZFSSendArgsValidated) (*SendStream, e
args = append(args, "send")
// pre-validation of sendArgs for plain ErrEncryptedSendNotSupported error
// we tie BackupProperties (send -b) and SendRaw (-w, same as with Encrypted) to this
// since these were released together.
// TODO go1.13: push this down to sendArgs.Validate
if encryptedSendValid := sendArgs.Encrypted.Validate(); encryptedSendValid == nil && sendArgs.Encrypted.B {
supported, err := EncryptionCLISupported(ctx)
if sendArgs.Encrypted.B || sendArgs.Raw || sendArgs.BackupProperties {
encryptionSupported, err := EncryptionCLISupported(ctx)
if err != nil {
return nil, errors.Wrap(err, "cannot determine CLI native encryption support")
}
if !supported {
if !encryptionSupported {
return nil, ErrEncryptedSendNotSupported
}
}
// TODO: Add similar tests for -L, -c and -e if applicable.
if _, err := sendArgs.Validate(ctx); err != nil {
return nil, err // do not wrap, part of API, tested by platformtest
}
sargs, err := sendArgs.buildCommonSendArgs()
if err != nil {
return nil, err
@ -974,6 +1012,8 @@ type RecvOptions struct {
RollbackAndForceRecv bool
// Set -s flag used for resumable send & recv
SavePartialRecvState bool
InheritProperties []zfsprop.Property
OverrideProperties map[zfsprop.Property]string
}
type ErrRecvResumeNotSupported struct {
@ -1051,6 +1091,17 @@ func ZFSRecv(ctx context.Context, fs string, v *ZFSSendArgVersion, stream io.Rea
}
args = append(args, "-s")
}
if opts.InheritProperties != nil {
for _, prop := range opts.InheritProperties {
args = append(args, "-x", string(prop))
}
}
if opts.OverrideProperties != nil {
for prop, value := range opts.OverrideProperties {
args = append(args, "-o", fmt.Sprintf("%s=%s", prop, value))
}
}
args = append(args, v.FullPath(fs))
ctx, cancelCmd := context.WithCancel(ctx)