WIP rewrite the daemon

cmd subdir does not build on purpose, it's only left in tree to grab old
code and move it to github.com/zrepl/zrepl/daemon
This commit is contained in:
Christian Schwarz 2018-08-27 22:21:45 +02:00
parent df6e1bc64d
commit c69ebd3806
55 changed files with 1133 additions and 84 deletions

View File

@ -9,7 +9,7 @@ import (
"github.com/pkg/errors"
"github.com/problame/go-netssh"
"github.com/problame/go-streamrpc"
"github.com/zrepl/zrepl/cmd/config"
"github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/cmd/tlsconf"
"time"
)

View File

@ -6,8 +6,8 @@ import (
"context"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/zrepl/zrepl/cmd/config"
"github.com/zrepl/zrepl/cmd/endpoint"
"github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/endpoint"
"github.com/zrepl/zrepl/replication"
"github.com/zrepl/zrepl/zfs"
"sync"

View File

@ -11,8 +11,8 @@ import (
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/problame/go-streamrpc"
"github.com/zrepl/zrepl/cmd/config"
"github.com/zrepl/zrepl/cmd/endpoint"
"github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/endpoint"
"github.com/zrepl/zrepl/replication"
)

View File

@ -6,8 +6,8 @@ import (
"github.com/pkg/errors"
"github.com/problame/go-streamrpc"
"github.com/zrepl/zrepl/cmd/config"
"github.com/zrepl/zrepl/cmd/endpoint"
"github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/endpoint"
"net"
)

View File

@ -5,7 +5,7 @@ import (
"crypto/x509"
"github.com/mattn/go-isatty"
"github.com/pkg/errors"
"github.com/zrepl/zrepl/cmd/config"
"github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/cmd/tlsconf"
"github.com/zrepl/zrepl/logger"
"os"

View File

@ -5,7 +5,7 @@ import (
"strings"
"github.com/pkg/errors"
"github.com/zrepl/zrepl/cmd/endpoint"
"github.com/zrepl/zrepl/endpoint"
"github.com/zrepl/zrepl/zfs"
)

View File

@ -8,7 +8,7 @@ import (
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/problame/go-streamrpc"
"github.com/zrepl/zrepl/cmd/config"
"github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/cmd/pruning/retentiongrid"
"os"
)

View File

@ -2,7 +2,7 @@ package cmd
import (
"github.com/problame/go-netssh"
"github.com/zrepl/zrepl/cmd/config"
"github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/cmd/helpers"
"net"
"path"

View File

@ -1,10 +1,9 @@
package cmd
import (
"net"
"github.com/zrepl/zrepl/config"
"time"
"github.com/zrepl/zrepl/cmd/config"
"net"
)
type TCPListenerFactory struct {

View File

@ -8,7 +8,7 @@ import (
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/zrepl/zrepl/cmd/config"
"github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/cmd/tlsconf"
)

View File

@ -1,5 +1,18 @@
package daemon
import (
"context"
"github.com/zrepl/zrepl/logger"
"os"
"os/signal"
"syscall"
"time"
"github.com/zrepl/zrepl/version"
"fmt"
)
.daesdfadsfsafjlsjfda
import (
"context"
"fmt"

View File

@ -4,7 +4,7 @@ import (
"context"
"fmt"
"github.com/spf13/cobra"
"github.com/zrepl/zrepl/cmd/config"
"github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/cmd/daemon"
"github.com/zrepl/zrepl/cmd/daemon/job"
"github.com/zrepl/zrepl/logger"

View File

@ -2,7 +2,7 @@ package retentiongrid
import (
"github.com/pkg/errors"
"github.com/zrepl/zrepl/cmd/config"
"github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/zfs"
"math"
"sort"

View File

@ -152,12 +152,13 @@ type TCPServe struct {
}
type TLSServe struct {
Type string `yaml:"type"`
Listen string `yaml:"listen"`
Ca string `yaml:"ca"`
Cert string `yaml:"cert"`
Key string `yaml:"key"`
ClientCN string `yaml:"client_cn"`
Type string `yaml:"type"`
Listen string `yaml:"listen"`
Ca string `yaml:"ca"`
Cert string `yaml:"cert"`
Key string `yaml:"key"`
ClientCN string `yaml:"client_cn"`
HandshakeTimeout time.Duration `yaml:"handshake_timeout,positive,default=10s"`
}
type StdinserverServer struct {
@ -195,19 +196,19 @@ type LoggingOutletCommon struct {
type StdoutLoggingOutlet struct {
LoggingOutletCommon `yaml:",inline"`
Time bool `yaml:"time"`
Time bool `yaml:"time,default=true"`
}
type SyslogLoggingOutlet struct {
LoggingOutletCommon `yaml:",inline"`
RetryInterval time.Duration `yaml:"retry_interval,positive"`
RetryInterval time.Duration `yaml:"retry_interval,positive,default=10s"`
}
type TCPLoggingOutlet struct {
LoggingOutletCommon `yaml:",inline"`
Address string `yaml:"address"`
Net string `yaml:"net,default=tcp"`
RetryInterval time.Duration `yaml:"retry_interval,positive"`
RetryInterval time.Duration `yaml:"retry_interval,positive,default=10s"`
TLS *TCPLoggingOutletTLS `yaml:"tls,optional"`
}

View File

@ -0,0 +1,66 @@
package connecter
import (
"context"
"github.com/jinzhu/copier"
"github.com/pkg/errors"
"github.com/problame/go-netssh"
"github.com/problame/go-streamrpc"
"github.com/zrepl/zrepl/config"
"net"
"time"
)
type SSHStdinserverConnecter struct {
Host string
User string
Port uint16
IdentityFile string
TransportOpenCommand []string
SSHCommand string
Options []string
dialTimeout time.Duration
}
var _ streamrpc.Connecter = &SSHStdinserverConnecter{}
func SSHStdinserverConnecterFromConfig(in *config.SSHStdinserverConnect) (c *SSHStdinserverConnecter, err error) {
c = &SSHStdinserverConnecter{
Host: in.Host,
User: in.User,
Port: in.Port,
IdentityFile: in.IdentityFile,
SSHCommand: in.SSHCommand,
Options: in.Options,
dialTimeout: in.DialTimeout,
}
return
}
type netsshConnToConn struct{ *netssh.SSHConn }
var _ net.Conn = netsshConnToConn{}
func (netsshConnToConn) SetDeadline(dl time.Time) error { return nil }
func (netsshConnToConn) SetReadDeadline(dl time.Time) error { return nil }
func (netsshConnToConn) SetWriteDeadline(dl time.Time) error { return nil }
func (c *SSHStdinserverConnecter) Connect(dialCtx context.Context) (net.Conn, error) {
var endpoint netssh.Endpoint
if err := copier.Copy(&endpoint, c); err != nil {
return nil, errors.WithStack(err)
}
dialCtx, dialCancel := context.WithTimeout(dialCtx, c.dialTimeout) // context.TODO tied to error handling below
defer dialCancel()
nconn, err := netssh.Dial(dialCtx, endpoint)
if err != nil {
if err == context.DeadlineExceeded {
err = errors.Errorf("dial_timeout of %s exceeded", c.dialTimeout)
}
return nil, err
}
return netsshConnToConn{nconn}, nil
}

View File

@ -0,0 +1,24 @@
package connecter
import (
"context"
"github.com/zrepl/zrepl/config"
"net"
)
type TCPConnecter struct {
Address string
dialer net.Dialer
}
func TCPConnecterFromConfig(in *config.TCPConnect) (*TCPConnecter, error) {
dialer := net.Dialer{
Timeout: in.DialTimeout,
}
return &TCPConnecter{in.Address, dialer}, nil
}
func (c *TCPConnecter) Connect(dialCtx context.Context) (conn net.Conn, err error) {
return c.dialer.DialContext(dialCtx, "tcp", c.Address)
}

View File

@ -0,0 +1,43 @@
package connecter
import (
"context"
"crypto/tls"
"github.com/pkg/errors"
"github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/tlsconf"
"net"
)
type TLSConnecter struct {
Address string
dialer net.Dialer
tlsConfig *tls.Config
}
func TLSConnecterFromConfig(in *config.TLSConnect) (*TLSConnecter, error) {
dialer := net.Dialer{
Timeout: in.DialTimeout,
}
ca, err := tlsconf.ParseCAFile(in.Ca)
if err != nil {
return nil, errors.Wrap(err, "cannot parse ca file")
}
cert, err := tls.LoadX509KeyPair(in.Cert, in.Key)
if err != nil {
return nil, errors.Wrap(err, "cannot parse cert/key pair")
}
tlsConfig, err := tlsconf.ClientAuthClient(in.ServerCN, ca, cert)
if err != nil {
return nil, errors.Wrap(err, "cannot build tls config")
}
return &TLSConnecter{in.Address, dialer, tlsConfig}, nil
}
func (c *TLSConnecter) Connect(dialCtx context.Context) (conn net.Conn, err error) {
return tls.DialWithDialer(&c.dialer, "tcp", c.Address, c.tlsConfig)
}

View File

@ -0,0 +1,20 @@
package connecter
import (
"fmt"
"github.com/problame/go-streamrpc"
"github.com/zrepl/zrepl/config"
)
func FromConfig(g config.Global, in config.ConnectEnum) (streamrpc.Connecter, error) {
switch v := in.Ret.(type) {
case *config.SSHStdinserverConnect:
return SSHStdinserverConnecterFromConfig(v)
case *config.TCPConnect:
return TCPConnecterFromConfig(v)
case *config.TLSConnect:
return TLSConnecterFromConfig(v)
default:
panic(fmt.Sprintf("implementation error: unknown connecter type %T", v))
}
}

View File

@ -5,8 +5,8 @@ import (
"context"
"encoding/json"
"github.com/pkg/errors"
"github.com/zrepl/zrepl/cmd/daemon/job"
"github.com/zrepl/zrepl/cmd/helpers"
"github.com/zrepl/zrepl/daemon/job"
"github.com/zrepl/zrepl/daemon/nethelpers"
"github.com/zrepl/zrepl/logger"
"github.com/zrepl/zrepl/version"
"io"
@ -39,6 +39,7 @@ const (
ControlJobEndpointPProf string = "/debug/pprof"
ControlJobEndpointVersion string = "/version"
ControlJobEndpointStatus string = "/status"
ControlJobEndpointWakeup string = "/wakeup"
)
func (j *controlJob) Run(ctx context.Context) {
@ -46,7 +47,7 @@ func (j *controlJob) Run(ctx context.Context) {
log := job.GetLogger(ctx)
defer log.Info("control job finished")
l, err := helpers.ListenUnixPrivate(j.sockaddr)
l, err := nethelpers.ListenUnixPrivate(j.sockaddr)
if err != nil {
log.WithError(err).Error("error listening")
return
@ -55,25 +56,42 @@ func (j *controlJob) Run(ctx context.Context) {
pprofServer := NewPProfServer(ctx)
mux := http.NewServeMux()
mux.Handle(ControlJobEndpointPProf, requestLogger{log: log, handlerFunc: func(w http.ResponseWriter, r *http.Request) {
var msg PprofServerControlMsg
err := json.NewDecoder(r.Body).Decode(&msg)
if err != nil {
log.WithError(err).Error("bad pprof request from client")
w.WriteHeader(http.StatusBadRequest)
}
pprofServer.Control(msg)
w.WriteHeader(200)
}})
mux.Handle(ControlJobEndpointPProf,
requestLogger{log: log, handler: jsonRequestResponder{func(decoder jsonDecoder) (interface{}, error) {
var msg PprofServerControlMsg
err := decoder(&msg)
if err != nil {
return nil, errors.Errorf("decode failed")
}
pprofServer.Control(msg)
return struct{}{}, nil
}}})
mux.Handle(ControlJobEndpointVersion,
requestLogger{log: log, handler: jsonResponder{func() (interface{}, error) {
return version.NewZreplVersionInformation(), nil
}}})
mux.Handle(ControlJobEndpointStatus,
requestLogger{log: log, handler: jsonResponder{func() (interface{}, error) {
s := j.jobs.status()
return s, nil
}}})
mux.Handle(ControlJobEndpointWakeup,
requestLogger{log: log, handler: jsonRequestResponder{func(decoder jsonDecoder) (interface{}, error) {
type reqT struct {
Name string
}
var req reqT
if decoder(&req) != nil {
return nil, errors.Errorf("decode failed")
}
err := j.jobs.wakeup(req.Name)
return struct{}{}, err
}}})
server := http.Server{Handler: mux}
outer:
@ -122,6 +140,43 @@ func (j jsonResponder) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
}
type jsonDecoder = func(interface{}) error
type jsonRequestResponder struct {
producer func(decoder jsonDecoder) (interface{}, error)
}
func (j jsonRequestResponder) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var decodeError error
decoder := func(i interface{}) error {
err := json.NewDecoder(r.Body).Decode(&i)
decodeError = err
return err
}
res, producerErr := j.producer(decoder)
//If we had a decode error ignore output of producer and return error
if decodeError != nil {
w.WriteHeader(http.StatusBadRequest)
io.WriteString(w, decodeError.Error())
return
}
if producerErr != nil {
w.WriteHeader(http.StatusInternalServerError)
io.WriteString(w, producerErr.Error())
return
}
var buf bytes.Buffer
encodeErr := json.NewEncoder(&buf).Encode(res)
if encodeErr != nil {
w.WriteHeader(http.StatusInternalServerError)
io.WriteString(w, encodeErr.Error())
} else {
io.Copy(w, &buf)
}
}
type requestLogger struct {
log logger.Logger
handler http.Handler

View File

@ -3,7 +3,10 @@ package daemon
import (
"context"
"fmt"
"github.com/pkg/errors"
"github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/daemon/job"
"github.com/zrepl/zrepl/daemon/logging"
"github.com/zrepl/zrepl/logger"
"github.com/zrepl/zrepl/version"
"os"
@ -12,12 +15,8 @@ import (
"sync"
"syscall"
"time"
"github.com/zrepl/zrepl/cmd/config"
"github.com/zrepl/zrepl/daemon/logging"
"github.com/pkg/errors"
)
func Run(conf config.Config) error {
ctx, cancel := context.WithCancel(context.Background())
@ -86,13 +85,13 @@ type jobs struct {
// m protects all fields below it
m sync.RWMutex
wakeups map[string]job.WakeupChan // by JobName
wakeups map[string]job.WakeupFunc // by Job.Name
jobs map[string]job.Job
}
func newJobs() *jobs {
return &jobs{
wakeups: make(map[string]job.WakeupChan),
wakeups: make(map[string]job.WakeupFunc),
jobs: make(map[string]job.Job),
}
}
@ -137,6 +136,17 @@ func (s *jobs) status() map[string]interface{} {
return ret
}
func (s *jobs) wakeup(job string) error {
s.m.RLock()
defer s.m.RUnlock()
wu, ok := s.wakeups[job]
if !ok {
return errors.Errorf("Job %s does not exist", job)
}
return wu()
}
const (
jobNamePrometheus = "_prometheus"
jobNameControl = "_control"

View File

@ -0,0 +1,273 @@
package filters
import (
"fmt"
"github.com/pkg/errors"
"github.com/zrepl/zrepl/endpoint"
"github.com/zrepl/zrepl/zfs"
"strings"
)
type DatasetMapFilter struct {
entries []datasetMapFilterEntry
// if set, only valid filter entries can be added using Add()
// and Map() will always return an error
filterMode bool
}
type datasetMapFilterEntry struct {
path *zfs.DatasetPath
// the mapping. since this datastructure acts as both mapping and filter
// we have to convert it to the desired rep dynamically
mapping string
subtreeMatch bool
}
func NewDatasetMapFilter(capacity int, filterMode bool) *DatasetMapFilter {
return &DatasetMapFilter{
entries: make([]datasetMapFilterEntry, 0, capacity),
filterMode: filterMode,
}
}
func (m *DatasetMapFilter) Add(pathPattern, mapping string) (err error) {
if m.filterMode {
if _, err = m.parseDatasetFilterResult(mapping); err != nil {
return
}
}
// assert path glob adheres to spec
const SUBTREE_PATTERN string = "<"
patternCount := strings.Count(pathPattern, SUBTREE_PATTERN)
switch {
case patternCount > 1:
case patternCount == 1 && !strings.HasSuffix(pathPattern, SUBTREE_PATTERN):
err = fmt.Errorf("pattern invalid: only one '<' at end of string allowed")
return
}
pathStr := strings.TrimSuffix(pathPattern, SUBTREE_PATTERN)
path, err := zfs.NewDatasetPath(pathStr)
if err != nil {
return fmt.Errorf("pattern is not a dataset path: %s", err)
}
entry := datasetMapFilterEntry{
path: path,
mapping: mapping,
subtreeMatch: patternCount > 0,
}
m.entries = append(m.entries, entry)
return
}
// find the most specific prefix mapping we have
//
// longer prefix wins over shorter prefix, direct wins over glob
func (m DatasetMapFilter) mostSpecificPrefixMapping(path *zfs.DatasetPath) (idx int, found bool) {
lcp, lcp_entry_idx := -1, -1
direct_idx := -1
for e := range m.entries {
entry := m.entries[e]
ep := m.entries[e].path
lep := ep.Length()
switch {
case !entry.subtreeMatch && ep.Equal(path):
direct_idx = e
continue
case entry.subtreeMatch && path.HasPrefix(ep) && lep > lcp:
lcp = lep
lcp_entry_idx = e
default:
continue
}
}
if lcp_entry_idx >= 0 || direct_idx >= 0 {
found = true
switch {
case direct_idx >= 0:
idx = direct_idx
case lcp_entry_idx >= 0:
idx = lcp_entry_idx
}
}
return
}
// Returns target == nil if there is no mapping
func (m DatasetMapFilter) Map(source *zfs.DatasetPath) (target *zfs.DatasetPath, err error) {
if m.filterMode {
err = fmt.Errorf("using a filter for mapping simply does not work")
return
}
mi, hasMapping := m.mostSpecificPrefixMapping(source)
if !hasMapping {
return nil, nil
}
me := m.entries[mi]
if me.mapping == "" {
// Special case treatment: 'foo/bar<' => ''
if !me.subtreeMatch {
return nil, fmt.Errorf("mapping to '' must be a subtree match")
}
// ok...
} else {
if strings.HasPrefix("!", me.mapping) {
// reject mapping
return nil, nil
}
}
target, err = zfs.NewDatasetPath(me.mapping)
if err != nil {
err = fmt.Errorf("mapping target is not a dataset path: %s", err)
return
}
if me.subtreeMatch {
// strip common prefix ('<' wildcards are no special case here)
extendComps := source.Copy()
extendComps.TrimPrefix(me.path)
target.Extend(extendComps)
}
return
}
func (m DatasetMapFilter) Filter(p *zfs.DatasetPath) (pass bool, err error) {
if !m.filterMode {
err = fmt.Errorf("using a mapping as a filter does not work")
return
}
mi, hasMapping := m.mostSpecificPrefixMapping(p)
if !hasMapping {
pass = false
return
}
me := m.entries[mi]
pass, err = m.parseDatasetFilterResult(me.mapping)
return
}
// Construct a new filter-only DatasetMapFilter from a mapping
// The new filter allows excactly those paths that were not forbidden by the mapping.
func (m DatasetMapFilter) InvertedFilter() (inv *DatasetMapFilter, err error) {
if m.filterMode {
err = errors.Errorf("can only invert mappings")
return
}
inv = &DatasetMapFilter{
make([]datasetMapFilterEntry, len(m.entries)),
true,
}
for i, e := range m.entries {
inv.entries[i].path, err = zfs.NewDatasetPath(e.mapping)
if err != nil {
err = errors.Wrapf(err, "mapping cannot be inverted: '%s' is not a dataset path: %s", e.mapping)
return
}
inv.entries[i].mapping = MapFilterResultOk
inv.entries[i].subtreeMatch = e.subtreeMatch
}
return inv, nil
}
// FIXME investigate whether we can support more...
func (m DatasetMapFilter) Invert() (endpoint.FSMap, error) {
if m.filterMode {
return nil, errors.Errorf("can only invert mappings")
}
if len(m.entries) != 1 {
return nil, errors.Errorf("inversion of complicated mappings is not implemented") // FIXME
}
e := m.entries[0]
inv := &DatasetMapFilter{
make([]datasetMapFilterEntry, len(m.entries)),
false,
}
mp, err := zfs.NewDatasetPath(e.mapping)
if err != nil {
return nil, err
}
inv.entries[0] = datasetMapFilterEntry{
path: mp,
mapping: e.path.ToString(),
subtreeMatch: e.subtreeMatch,
}
return inv, nil
}
// Creates a new DatasetMapFilter in filter mode from a mapping
// All accepting mapping results are mapped to accepting filter results
// All rejecting mapping results are mapped to rejecting filter results
func (m DatasetMapFilter) AsFilter() endpoint.FSFilter {
f := &DatasetMapFilter{
make([]datasetMapFilterEntry, len(m.entries)),
true,
}
for i, e := range m.entries {
var newe datasetMapFilterEntry = e
if strings.HasPrefix(newe.mapping, "!") {
newe.mapping = MapFilterResultOmit
} else {
newe.mapping = MapFilterResultOk
}
f.entries[i] = newe
}
return f
}
const (
MapFilterResultOk string = "ok"
MapFilterResultOmit string = "!"
)
// Parse a dataset filter result
func (m DatasetMapFilter) parseDatasetFilterResult(result string) (pass bool, err error) {
l := strings.ToLower(result)
if l == MapFilterResultOk {
return true, nil
}
if l == MapFilterResultOmit {
return false, nil
}
return false, fmt.Errorf("'%s' is not a valid filter result", result)
}
func DatasetMapFilterFromConfig(in map[string]bool) (f *DatasetMapFilter, err error) {
f = NewDatasetMapFilter(len(in), true)
for pathPattern, accept := range in {
mapping := MapFilterResultOmit
if accept {
mapping = MapFilterResultOk
}
if err = f.Add(pathPattern, mapping); err != nil {
err = fmt.Errorf("invalid mapping entry ['%s':'%s']: %s", pathPattern, mapping, err)
return
}
}
return
}

View File

@ -0,0 +1,15 @@
package filters
import "github.com/zrepl/zrepl/zfs"
type AnyFSVFilter struct{}
func NewAnyFSVFilter() AnyFSVFilter {
return AnyFSVFilter{}
}
var _ zfs.FilesystemVersionFilter = AnyFSVFilter{}
func (AnyFSVFilter) Filter(t zfs.VersionType, name string) (accept bool, err error) {
return true, nil
}

View File

@ -1,8 +1,8 @@
package job
import (
"github.com/zrepl/zrepl/cmd/config"
"fmt"
"github.com/zrepl/zrepl/config"
)
func JobsFromConfig(c config.Config) ([]Job, error) {
@ -20,8 +20,12 @@ func JobsFromConfig(c config.Config) ([]Job, error) {
func buildJob(c config.Global, in config.JobEnum) (j Job, err error) {
switch v := in.Ret.(type) {
case *config.SinkJob:
return SinkFromConfig(c, v)
case *config.PushJob:
return PushFromConfig(c, v)
default:
panic(fmt.Sprintf("implementation error: unknown job type %s", v))
panic(fmt.Sprintf("implementation error: unknown job type %T", v))
}
}

View File

@ -2,7 +2,10 @@ package job
import (
"context"
"errors"
"github.com/problame/go-streamrpc"
"github.com/zrepl/zrepl/logger"
"time"
)
type Logger = logger.Logger
@ -25,9 +28,21 @@ func WithLogger(ctx context.Context, l Logger) context.Context {
return context.WithValue(ctx, contextKeyLog, l)
}
func WithWakeup(ctx context.Context) (context.Context, WakeupChan) {
wc := make(chan struct{}, 1)
return context.WithValue(ctx, contextKeyWakeup, wc), wc
type WakeupFunc func() error
var AlreadyWokenUp = errors.New("already woken up")
func WithWakeup(ctx context.Context) (context.Context, WakeupFunc) {
wc := make(chan struct{})
wuf := func() error {
select {
case wc <- struct{}{}:
return nil
default:
return AlreadyWokenUp
}
}
return context.WithValue(ctx, contextKeyWakeup, wc), wuf
}
type Job interface {
@ -36,12 +51,23 @@ type Job interface {
Status() interface{}
}
type WakeupChan <-chan struct{}
func WaitWakeup(ctx context.Context) WakeupChan {
wc, ok := ctx.Value(contextKeyWakeup).(WakeupChan)
func WaitWakeup(ctx context.Context) <-chan struct{} {
wc, ok := ctx.Value(contextKeyWakeup).(chan struct{})
if !ok {
wc = make(chan struct{})
}
return wc
}
var STREAMRPC_CONFIG = &streamrpc.ConnConfig{ // FIXME oversight and configurability
RxHeaderMaxLen: 4096,
RxStructuredMaxLen: 4096 * 4096,
RxStreamMaxChunkSize: 4096 * 4096,
TxChunkSize: 4096 * 4096,
RxTimeout: streamrpc.Timeout{
Progress: 10 * time.Second,
},
TxTimeout: streamrpc.Timeout{
Progress: 10 * time.Second,
},
}

86
daemon/job/push.go Normal file
View File

@ -0,0 +1,86 @@
package job
import (
"context"
"github.com/pkg/errors"
"github.com/problame/go-streamrpc"
"github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/daemon/connecter"
"github.com/zrepl/zrepl/daemon/filters"
"github.com/zrepl/zrepl/daemon/logging"
"github.com/zrepl/zrepl/endpoint"
"github.com/zrepl/zrepl/replication"
"sync"
)
type Push struct {
name string
connecter streamrpc.Connecter
fsfilter endpoint.FSFilter
mtx sync.Mutex
replication *replication.Replication
}
func PushFromConfig(g config.Global, in *config.PushJob) (j *Push, err error) {
j = &Push{}
j.name = in.Name
j.connecter, err = connecter.FromConfig(g, in.Replication.Connect)
if j.fsfilter, err = filters.DatasetMapFilterFromConfig(in.Replication.Filesystems); err != nil {
return nil, errors.Wrap(err, "cannnot build filesystem filter")
}
return j, nil
}
func (j *Push) Name() string { return j.name }
func (j *Push) Status() interface{} {
return nil // FIXME
}
func (j *Push) Run(ctx context.Context) {
log := GetLogger(ctx)
defer log.Info("job exiting")
log.Debug("wait for wakeups")
invocationCount := 0
outer:
for {
select {
case <-ctx.Done():
log.WithError(ctx.Err()).Info("context")
break outer
case <-WaitWakeup(ctx):
invocationCount++
invLog := log.WithField("invocation", invocationCount)
j.do(WithLogger(ctx, invLog))
}
}
}
func (j *Push) do(ctx context.Context) {
log := GetLogger(ctx)
client, err := streamrpc.NewClient(j.connecter, &streamrpc.ClientConfig{STREAMRPC_CONFIG})
if err != nil {
log.WithError(err).Error("cannot create streamrpc client")
}
defer client.Close()
sender := endpoint.NewSender(j.fsfilter, filters.NewAnyFSVFilter())
receiver := endpoint.NewRemote(client)
j.mtx.Lock()
rep := replication.NewReplication()
j.mtx.Unlock()
ctx = logging.WithSubsystemLoggers(ctx, log)
rep.Drive(ctx, sender, receiver)
}

115
daemon/job/sink.go Normal file
View File

@ -0,0 +1,115 @@
package job
import (
"context"
"github.com/pkg/errors"
"github.com/problame/go-streamrpc"
"github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/daemon/filters"
"github.com/zrepl/zrepl/daemon/logging"
"github.com/zrepl/zrepl/daemon/serve"
"github.com/zrepl/zrepl/endpoint"
"net"
)
type Sink struct {
name string
l serve.ListenerFactory
fsmap endpoint.FSMap
fsmapInv endpoint.FSFilter
}
func SinkFromConfig(g config.Global, in *config.SinkJob) (s *Sink, err error) {
// FIXME multi client support
s = &Sink{name: in.Name}
if s.l, err = serve.FromConfig(g, in.Replication.Serve); err != nil {
return nil, errors.Wrap(err, "cannot build server")
}
fsmap := filters.NewDatasetMapFilter(1, false) // FIXME multi-client support
if err := fsmap.Add("<", in.Replication.RootDataset); err != nil {
return nil, errors.Wrap(err, "unexpected error: cannot build filesystem mapping")
}
s.fsmap = fsmap
return s, nil
}
func (j *Sink) Name() string { return j.name }
func (*Sink) Status() interface{} {
// FIXME
return nil
}
func (j *Sink) Run(ctx context.Context) {
log := GetLogger(ctx)
defer log.Info("job exiting")
l, err := j.l.Listen()
if err != nil {
log.WithError(err).Error("cannot listen")
return
}
log.WithField("addr", l.Addr()).Debug("accepting connections")
var connId int
outer:
for {
select {
case res := <-accept(l):
if res.err != nil {
log.WithError(err).Info("accept error")
break outer
}
connId++
connLog := log.
WithField("connID", connId)
j.handleConnection(WithLogger(ctx, connLog), res.conn)
case <-ctx.Done():
break outer
}
}
}
func (j *Sink) handleConnection(ctx context.Context, conn net.Conn) {
log := GetLogger(ctx)
log.WithField("addr", conn.RemoteAddr()).Info("handling connection")
defer log.Info("finished handling connection")
logging.WithSubsystemLoggers(ctx, log)
local, err := endpoint.NewReceiver(j.fsmap, filters.NewAnyFSVFilter())
if err != nil {
log.WithError(err).Error("unexpected error: cannot convert mapping to filter")
return
}
handler := endpoint.NewHandler(local)
if err := streamrpc.ServeConn(ctx, conn, STREAMRPC_CONFIG, handler.Handle); err != nil {
log.WithError(err).Error("error serving client")
}
}
type acceptResult struct {
conn net.Conn
err error
}
func accept(listener net.Listener) <-chan acceptResult {
c := make(chan acceptResult, 1)
go func() {
conn, err := listener.Accept()
c <- acceptResult{conn, err}
}()
return c
}

View File

@ -0,0 +1,32 @@
package logging
import (
"fmt"
"github.com/problame/go-streamrpc"
"github.com/zrepl/zrepl/logger"
"strings"
)
type streamrpcLogAdaptor = twoClassLogAdaptor
type twoClassLogAdaptor struct {
logger.Logger
}
var _ streamrpc.Logger = twoClassLogAdaptor{}
func (a twoClassLogAdaptor) Errorf(fmtStr string, args ...interface{}) {
const errorSuffix = ": %s"
if len(args) == 1 {
if err, ok := args[0].(error); ok && strings.HasSuffix(fmtStr, errorSuffix) {
msg := strings.TrimSuffix(fmtStr, errorSuffix)
a.WithError(err).Error(msg)
return
}
}
a.Logger.Error(fmt.Sprintf(fmtStr, args...))
}
func (a twoClassLogAdaptor) Infof(fmtStr string, args ...interface{}) {
a.Logger.Info(fmt.Sprintf(fmtStr, args...))
}

View File

@ -1,14 +1,18 @@
package logging
import (
"github.com/zrepl/zrepl/cmd/config"
"os"
"github.com/mattn/go-isatty"
"context"
"crypto/tls"
"github.com/pkg/errors"
"crypto/x509"
"github.com/zrepl/zrepl/cmd/tlsconf"
"github.com/mattn/go-isatty"
"github.com/pkg/errors"
"github.com/problame/go-streamrpc"
"github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/endpoint"
"github.com/zrepl/zrepl/logger"
"github.com/zrepl/zrepl/replication"
"github.com/zrepl/zrepl/tlsconf"
"os"
)
func OutletsFromConfig(in []config.LoggingOutletEnum) (*logger.Outlets, error) {
@ -53,6 +57,18 @@ func OutletsFromConfig(in []config.LoggingOutletEnum) (*logger.Outlets, error) {
}
const (
SubsysReplication = "repl"
SubsysStreamrpc = "rpc"
SubsyEndpoint = "endpoint"
)
func WithSubsystemLoggers(ctx context.Context, log logger.Logger) context.Context {
ctx = replication.WithLogger(ctx, log.WithField(SubsysField, "repl"))
ctx = streamrpc.ContextWithLogger(ctx, streamrpcLogAdaptor{log.WithField(SubsysField, "rpc")})
ctx = endpoint.WithLogger(ctx, log.WithField(SubsysField, "endpoint"))
return ctx
}
func parseLogFormat(i interface{}) (f EntryFormatter, err error) {
var is string
@ -97,19 +113,19 @@ func parseOutlet(in config.LoggingOutletEnum) (o logger.Outlet, level logger.Lev
var f EntryFormatter
switch v := in.Ret.(type) {
case config.StdoutLoggingOutlet:
case *config.StdoutLoggingOutlet:
level, f, err = parseCommon(v.LoggingOutletCommon)
if err != nil {
break
}
o, err = parseStdoutOutlet(v, f)
case config.TCPLoggingOutlet:
case *config.TCPLoggingOutlet:
level, f, err = parseCommon(v.LoggingOutletCommon)
if err != nil {
break
}
o, err = parseTCPOutlet(v, f)
case config.SyslogLoggingOutlet:
case *config.SyslogLoggingOutlet:
level, f, err = parseCommon(v.LoggingOutletCommon)
if err != nil {
break
@ -121,7 +137,7 @@ func parseOutlet(in config.LoggingOutletEnum) (o logger.Outlet, level logger.Lev
return o, level, err
}
func parseStdoutOutlet(in config.StdoutLoggingOutlet, formatter EntryFormatter) (WriterOutlet, error) {
func parseStdoutOutlet(in *config.StdoutLoggingOutlet, formatter EntryFormatter) (WriterOutlet, error) {
flags := MetadataAll
writer := os.Stdout
if !isatty.IsTerminal(writer.Fd()) && !in.Time {
@ -135,7 +151,7 @@ func parseStdoutOutlet(in config.StdoutLoggingOutlet, formatter EntryFormatter)
}, nil
}
func parseTCPOutlet(in config.TCPLoggingOutlet, formatter EntryFormatter) (out *TCPOutlet, err error) {
func parseTCPOutlet(in *config.TCPLoggingOutlet, formatter EntryFormatter) (out *TCPOutlet, err error) {
var tlsConfig *tls.Config
if in.TLS != nil {
tlsConfig, err = func(m *config.TCPLoggingOutletTLS, host string) (*tls.Config, error) {
@ -171,11 +187,10 @@ func parseTCPOutlet(in config.TCPLoggingOutlet, formatter EntryFormatter) (out *
}
func parseSyslogOutlet(in config.SyslogLoggingOutlet, formatter EntryFormatter) (out *SyslogOutlet, err error) {
func parseSyslogOutlet(in *config.SyslogLoggingOutlet, formatter EntryFormatter) (out *SyslogOutlet, err error) {
out = &SyslogOutlet{}
out.Formatter = formatter
out.Formatter.SetMetadataFlags(MetadataNone)
out.RetryInterval = in.RetryInterval
return out, nil
}

View File

@ -17,12 +17,10 @@ const (
)
const (
logJobField string = "job"
logTaskField string = "task"
logSubsysField string = "subsystem"
JobField string = "job"
SubsysField string = "subsystem"
)
type MetadataFlags int64
const (
@ -33,7 +31,6 @@ const (
MetadataAll MetadataFlags = ^0
)
type NoFormatter struct{}
func (f NoFormatter) SetMetadataFlags(flags MetadataFlags) {}
@ -80,7 +77,7 @@ func (f *HumanFormatter) Format(e *logger.Entry) (out []byte, err error) {
fmt.Fprintf(&line, "[%s]", e.Level.Short())
}
prefixFields := []string{logJobField, logTaskField, logSubsysField}
prefixFields := []string{JobField, SubsysField}
prefixed := make(map[string]bool, len(prefixFields)+2)
for _, field := range prefixFields {
val, ok := e.Fields[field].(string)
@ -168,7 +165,7 @@ func (f *LogfmtFormatter) Format(e *logger.Entry) ([]byte, error) {
// at least try and put job and task in front
prefixed := make(map[string]bool, 2)
prefix := []string{logJobField, logTaskField, logSubsysField}
prefix := []string{JobField, SubsysField}
for _, pf := range prefix {
v, ok := e.Fields[pf]
if !ok {

View File

@ -12,7 +12,6 @@ import (
"time"
)
type EntryFormatter interface {
SetMetadataFlags(flags MetadataFlags)
Format(e *logger.Entry) ([]byte, error)

View File

@ -5,4 +5,3 @@ import (
)
type Logger = logger.Logger

View File

@ -1,4 +1,4 @@
package helpers
package nethelpers
import (
"github.com/pkg/errors"

View File

@ -4,7 +4,7 @@ import (
"context"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/zrepl/zrepl/cmd/daemon/job"
"github.com/zrepl/zrepl/daemon/job"
"github.com/zrepl/zrepl/zfs"
"net"
"net/http"

26
daemon/serve/serve.go Normal file
View File

@ -0,0 +1,26 @@
package serve
import (
"github.com/pkg/errors"
"github.com/zrepl/zrepl/config"
"net"
)
type ListenerFactory interface {
Listen() (net.Listener, error)
}
func FromConfig(g config.Global, in config.ServeEnum) (ListenerFactory, error) {
switch v := in.Ret.(type) {
case *config.TCPServe:
return TCPListenerFactoryFromConfig(g, v)
case *config.TLSServe:
return TLSListenerFactoryFromConfig(g, v)
case *config.StdinserverServer:
return StdinserverListenerFactoryFromConfig(g, v)
default:
return nil, errors.Errorf("internal error: unknown serve type %T", v)
}
}

View File

@ -0,0 +1,79 @@
package serve
import (
"github.com/problame/go-netssh"
"github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/daemon/nethelpers"
"io"
"net"
"path"
"time"
)
type StdinserverListenerFactory struct {
ClientIdentity string
sockpath string
}
func StdinserverListenerFactoryFromConfig(g config.Global, in *config.StdinserverServer) (f *StdinserverListenerFactory, err error) {
f = &StdinserverListenerFactory{
ClientIdentity: in.ClientIdentity,
}
f.sockpath = path.Join(g.Serve.StdinServer.SockDir, f.ClientIdentity)
return
}
func (f *StdinserverListenerFactory) Listen() (net.Listener, error) {
if err := nethelpers.PreparePrivateSockpath(f.sockpath); err != nil {
return nil, err
}
l, err := netssh.Listen(f.sockpath)
if err != nil {
return nil, err
}
return StdinserverListener{l}, nil
}
type StdinserverListener struct {
l *netssh.Listener
}
func (l StdinserverListener) Addr() net.Addr {
return netsshAddr{}
}
func (l StdinserverListener) Accept() (net.Conn, error) {
c, err := l.l.Accept()
if err != nil {
return nil, err
}
return netsshConnToNetConnAdatper{c}, nil
}
func (l StdinserverListener) Close() (err error) {
return l.l.Close()
}
type netsshAddr struct{}
func (netsshAddr) Network() string { return "netssh" }
func (netsshAddr) String() string { return "???" }
type netsshConnToNetConnAdatper struct {
io.ReadWriteCloser // works for both netssh.SSHConn and netssh.ServeConn
}
func (netsshConnToNetConnAdatper) LocalAddr() net.Addr { return netsshAddr{} }
func (netsshConnToNetConnAdatper) RemoteAddr() net.Addr { return netsshAddr{} }
func (netsshConnToNetConnAdatper) SetDeadline(t time.Time) error { return nil }
func (netsshConnToNetConnAdatper) SetReadDeadline(t time.Time) error { return nil }
func (netsshConnToNetConnAdatper) SetWriteDeadline(t time.Time) error { return nil }

21
daemon/serve/serve_tcp.go Normal file
View File

@ -0,0 +1,21 @@
package serve
import (
"github.com/zrepl/zrepl/config"
"net"
)
type TCPListenerFactory struct {
Address string
}
func TCPListenerFactoryFromConfig(c config.Global, in *config.TCPServe) (*TCPListenerFactory, error) {
lf := &TCPListenerFactory{
Address: in.Listen,
}
return lf, nil
}
func (f *TCPListenerFactory) Listen() (net.Listener, error) {
return net.Listen("tcp", f.Address)
}

52
daemon/serve/serve_tls.go Normal file
View File

@ -0,0 +1,52 @@
package serve
import (
"crypto/tls"
"crypto/x509"
"github.com/pkg/errors"
"github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/tlsconf"
"net"
"time"
)
type TLSListenerFactory struct {
address string
clientCA *x509.CertPool
serverCert tls.Certificate
clientCommonName string
handshakeTimeout time.Duration
}
func TLSListenerFactoryFromConfig(c config.Global, in *config.TLSServe) (lf *TLSListenerFactory, err error) {
lf = &TLSListenerFactory{
address: in.Listen,
}
if in.Ca == "" || in.Cert == "" || in.Key == "" || in.ClientCN == "" {
return nil, errors.New("fields 'ca', 'cert', 'key' and 'client_cn' must be specified")
}
lf.clientCommonName = in.ClientCN
lf.clientCA, err = tlsconf.ParseCAFile(in.Ca)
if err != nil {
return nil, errors.Wrap(err, "cannot parse ca file")
}
lf.serverCert, err = tls.LoadX509KeyPair(in.Cert, in.Key)
if err != nil {
return nil, errors.Wrap(err, "cannot parse cer/key pair")
}
return lf, nil
}
func (f *TLSListenerFactory) Listen() (net.Listener, error) {
l, err := net.Listen("tcp", f.address)
if err != nil {
return nil, err
}
tl := tlsconf.NewClientAuthListener(l, f.clientCA, f.serverCert, f.clientCommonName, f.handshakeTimeout)
return tl, nil
}

23
main.go
View File

@ -2,11 +2,11 @@
package main
import (
"github.com/spf13/cobra"
"github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/daemon"
"log"
"os"
"github.com/spf13/cobra"
"github.com/zrepl/zrepl/daemon"
"github.com/zrepl/zrepl/cmd/config"
)
var rootCmd = &cobra.Command{
@ -20,9 +20,8 @@ var rootCmd = &cobra.Command{
- ACLs instead of blank SSH access`,
}
var daemonCmd = &cobra.Command{
Use: "daemon",
Use: "daemon",
Short: "daemon",
RunE: func(cmd *cobra.Command, args []string) error {
conf, err := config.ParseConfig(rootArgs.configFile)
@ -33,6 +32,18 @@ var daemonCmd = &cobra.Command{
},
}
var wakeupCmd = &cobra.Command{
Use: "wakeup",
Short: "wake up jobs",
RunE: func(cmd *cobra.Command, args []string) error {
conf, err := config.ParseConfig(rootArgs.configFile)
if err != nil {
return err
}
return RunWakeup(conf, args)
},
}
var rootArgs struct {
configFile string
}
@ -41,11 +52,11 @@ func init() {
//cobra.OnInitialize(initConfig)
rootCmd.PersistentFlags().StringVar(&rootArgs.configFile, "config", "", "config file path")
rootCmd.AddCommand(daemonCmd)
rootCmd.AddCommand(wakeupCmd)
}
func main() {
if err := rootCmd.Execute(); err != nil {
log.Printf("error executing root command: %s", err)
os.Exit(1)

68
wakeup.go Normal file
View File

@ -0,0 +1,68 @@
package main
import (
"bytes"
"context"
"encoding/json"
"github.com/pkg/errors"
"github.com/zrepl/zrepl/config"
"github.com/zrepl/zrepl/daemon"
"io"
"net"
"net/http"
)
func RunWakeup(config config.Config, args []string) error {
if len(args) != 1 {
return errors.Errorf("Expected 1 argument: job")
}
httpc, err := controlHttpClient(config.Global.Control.SockPath)
if err != nil {
return err
}
err = jsonRequestResponse(httpc, daemon.ControlJobEndpointWakeup,
struct {
Name string
}{
Name: args[0],
},
struct{}{},
)
return err
}
func controlHttpClient(sockpath string) (client http.Client, err error) {
return http.Client{
Transport: &http.Transport{
DialContext: func(_ context.Context, _, _ string) (net.Conn, error) {
return net.Dial("unix", sockpath)
},
},
}, nil
}
func jsonRequestResponse(c http.Client, endpoint string, req interface{}, res interface{}) error {
var buf bytes.Buffer
encodeErr := json.NewEncoder(&buf).Encode(req)
if encodeErr != nil {
return encodeErr
}
resp, err := c.Post("http://unix"+endpoint, "application/json", &buf)
if err != nil {
return err
} else if resp.StatusCode != http.StatusOK {
var msg bytes.Buffer
io.CopyN(&msg, resp.Body, 4096)
return errors.Errorf("%s", msg.String())
}
decodeError := json.NewDecoder(resp.Body).Decode(&res)
if decodeError != nil {
return decodeError
}
return nil
}