From c69ebd3806715477b7980e0d15938b503db7b0df Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Mon, 27 Aug 2018 22:21:45 +0200 Subject: [PATCH] WIP rewrite the daemon cmd subdir does not build on purpose, it's only left in tree to grab old code and move it to github.com/zrepl/zrepl/daemon --- cmd/config_connect.go | 2 +- cmd/config_job_local.go | 4 +- cmd/config_job_pull.go | 4 +- cmd/config_job_source.go | 4 +- cmd/config_logging.go | 2 +- cmd/config_mapfilter.go | 2 +- cmd/config_parse.go | 2 +- cmd/config_serve_stdinserver.go | 2 +- cmd/config_serve_tcp.go | 5 +- cmd/config_serve_tls.go | 2 +- cmd/{daemon => daemon.deact}/control.go | 0 cmd/{daemon => daemon.deact}/daemon.go | 13 + cmd/{daemon => daemon.deact}/job/job.go | 0 cmd/{daemon => daemon.deact}/pprof.go | 0 cmd/{daemon => daemon.deact}/prometheus.go | 0 cmd/daemon.go | 2 +- .../retentiongrid/config_prune_grid.go | 2 +- {cmd/config => config}/config.go | 19 +- {cmd/config => config}/config_test.go | 0 {cmd/config => config}/retentiongrid.go | 0 {cmd/config => config}/samples/local.yml | 0 {cmd/config => config}/samples/pull.yml | 0 {cmd/config => config}/samples/pull_ssh.yml | 0 {cmd/config => config}/samples/push.yml | 0 {cmd/config => config}/samples/sink.yml | 0 {cmd/config => config}/samples/source.yml | 0 {cmd/config => config}/samples/source_ssh.yml | 0 daemon/connecter/connect_ssh.go | 66 +++++ daemon/connecter/connect_tcp.go | 24 ++ daemon/connecter/connect_tls.go | 43 +++ daemon/connecter/connecter.go | 20 ++ daemon/control.go | 81 +++++- daemon/daemon.go | 22 +- daemon/filters/fsmapfilter.go | 273 ++++++++++++++++++ daemon/filters/fsvfilter.go | 15 + daemon/job/build_jobs.go | 10 +- daemon/job/job.go | 40 ++- daemon/job/push.go | 86 ++++++ daemon/job/sink.go | 115 ++++++++ daemon/logging/adaptors.go | 32 ++ daemon/logging/build_logging.go | 39 ++- daemon/logging/logging_formatters.go | 11 +- daemon/logging/logging_outlets.go | 1 - daemon/main.go | 1 - {cmd/helpers => daemon/nethelpers}/helpers.go | 2 +- daemon/prometheus.go | 2 +- daemon/serve/serve.go | 26 ++ daemon/serve/serve_stdinserver.go | 79 +++++ daemon/serve/serve_tcp.go | 21 ++ daemon/serve/serve_tls.go | 52 ++++ {cmd/endpoint => endpoint}/context.go | 0 {cmd/endpoint => endpoint}/endpoint.go | 0 main.go | 23 +- {cmd/tlsconf => tlsconf}/tlsconf.go | 0 wakeup.go | 68 +++++ 55 files changed, 1133 insertions(+), 84 deletions(-) rename cmd/{daemon => daemon.deact}/control.go (100%) rename cmd/{daemon => daemon.deact}/daemon.go (95%) rename cmd/{daemon => daemon.deact}/job/job.go (100%) rename cmd/{daemon => daemon.deact}/pprof.go (100%) rename cmd/{daemon => daemon.deact}/prometheus.go (100%) rename {cmd/config => config}/config.go (95%) rename {cmd/config => config}/config_test.go (100%) rename {cmd/config => config}/retentiongrid.go (100%) rename {cmd/config => config}/samples/local.yml (100%) rename {cmd/config => config}/samples/pull.yml (100%) rename {cmd/config => config}/samples/pull_ssh.yml (100%) rename {cmd/config => config}/samples/push.yml (100%) rename {cmd/config => config}/samples/sink.yml (100%) rename {cmd/config => config}/samples/source.yml (100%) rename {cmd/config => config}/samples/source_ssh.yml (100%) create mode 100644 daemon/connecter/connect_ssh.go create mode 100644 daemon/connecter/connect_tcp.go create mode 100644 daemon/connecter/connect_tls.go create mode 100644 daemon/connecter/connecter.go create mode 100644 daemon/filters/fsmapfilter.go create mode 100644 daemon/filters/fsvfilter.go create mode 100644 daemon/job/push.go create mode 100644 daemon/job/sink.go create mode 100644 daemon/logging/adaptors.go rename {cmd/helpers => daemon/nethelpers}/helpers.go (98%) create mode 100644 daemon/serve/serve.go create mode 100644 daemon/serve/serve_stdinserver.go create mode 100644 daemon/serve/serve_tcp.go create mode 100644 daemon/serve/serve_tls.go rename {cmd/endpoint => endpoint}/context.go (100%) rename {cmd/endpoint => endpoint}/endpoint.go (100%) rename {cmd/tlsconf => tlsconf}/tlsconf.go (100%) create mode 100644 wakeup.go diff --git a/cmd/config_connect.go b/cmd/config_connect.go index efb84d4..81e9803 100644 --- a/cmd/config_connect.go +++ b/cmd/config_connect.go @@ -9,7 +9,7 @@ import ( "github.com/pkg/errors" "github.com/problame/go-netssh" "github.com/problame/go-streamrpc" - "github.com/zrepl/zrepl/cmd/config" + "github.com/zrepl/zrepl/config" "github.com/zrepl/zrepl/cmd/tlsconf" "time" ) diff --git a/cmd/config_job_local.go b/cmd/config_job_local.go index c8b8c00..8bf4fa3 100644 --- a/cmd/config_job_local.go +++ b/cmd/config_job_local.go @@ -6,8 +6,8 @@ import ( "context" "github.com/mitchellh/mapstructure" "github.com/pkg/errors" - "github.com/zrepl/zrepl/cmd/config" - "github.com/zrepl/zrepl/cmd/endpoint" + "github.com/zrepl/zrepl/config" + "github.com/zrepl/zrepl/endpoint" "github.com/zrepl/zrepl/replication" "github.com/zrepl/zrepl/zfs" "sync" diff --git a/cmd/config_job_pull.go b/cmd/config_job_pull.go index b20db21..f3167c5 100644 --- a/cmd/config_job_pull.go +++ b/cmd/config_job_pull.go @@ -11,8 +11,8 @@ import ( "github.com/mitchellh/mapstructure" "github.com/pkg/errors" "github.com/problame/go-streamrpc" - "github.com/zrepl/zrepl/cmd/config" - "github.com/zrepl/zrepl/cmd/endpoint" + "github.com/zrepl/zrepl/config" + "github.com/zrepl/zrepl/endpoint" "github.com/zrepl/zrepl/replication" ) diff --git a/cmd/config_job_source.go b/cmd/config_job_source.go index 76d2be5..8e1c283 100644 --- a/cmd/config_job_source.go +++ b/cmd/config_job_source.go @@ -6,8 +6,8 @@ import ( "github.com/pkg/errors" "github.com/problame/go-streamrpc" - "github.com/zrepl/zrepl/cmd/config" - "github.com/zrepl/zrepl/cmd/endpoint" + "github.com/zrepl/zrepl/config" + "github.com/zrepl/zrepl/endpoint" "net" ) diff --git a/cmd/config_logging.go b/cmd/config_logging.go index 4f18f0a..2d3fe26 100644 --- a/cmd/config_logging.go +++ b/cmd/config_logging.go @@ -5,7 +5,7 @@ import ( "crypto/x509" "github.com/mattn/go-isatty" "github.com/pkg/errors" - "github.com/zrepl/zrepl/cmd/config" + "github.com/zrepl/zrepl/config" "github.com/zrepl/zrepl/cmd/tlsconf" "github.com/zrepl/zrepl/logger" "os" diff --git a/cmd/config_mapfilter.go b/cmd/config_mapfilter.go index f1182b7..4c8f0a7 100644 --- a/cmd/config_mapfilter.go +++ b/cmd/config_mapfilter.go @@ -5,7 +5,7 @@ import ( "strings" "github.com/pkg/errors" - "github.com/zrepl/zrepl/cmd/endpoint" + "github.com/zrepl/zrepl/endpoint" "github.com/zrepl/zrepl/zfs" ) diff --git a/cmd/config_parse.go b/cmd/config_parse.go index 0419b9e..f30a9a6 100644 --- a/cmd/config_parse.go +++ b/cmd/config_parse.go @@ -8,7 +8,7 @@ import ( "github.com/mitchellh/mapstructure" "github.com/pkg/errors" "github.com/problame/go-streamrpc" - "github.com/zrepl/zrepl/cmd/config" + "github.com/zrepl/zrepl/config" "github.com/zrepl/zrepl/cmd/pruning/retentiongrid" "os" ) diff --git a/cmd/config_serve_stdinserver.go b/cmd/config_serve_stdinserver.go index 5fb19b9..63ef839 100644 --- a/cmd/config_serve_stdinserver.go +++ b/cmd/config_serve_stdinserver.go @@ -2,7 +2,7 @@ package cmd import ( "github.com/problame/go-netssh" - "github.com/zrepl/zrepl/cmd/config" + "github.com/zrepl/zrepl/config" "github.com/zrepl/zrepl/cmd/helpers" "net" "path" diff --git a/cmd/config_serve_tcp.go b/cmd/config_serve_tcp.go index 9757b00..351578b 100644 --- a/cmd/config_serve_tcp.go +++ b/cmd/config_serve_tcp.go @@ -1,10 +1,9 @@ package cmd import ( - "net" + "github.com/zrepl/zrepl/config" "time" - - "github.com/zrepl/zrepl/cmd/config" + "net" ) type TCPListenerFactory struct { diff --git a/cmd/config_serve_tls.go b/cmd/config_serve_tls.go index 54ab53d..565bc07 100644 --- a/cmd/config_serve_tls.go +++ b/cmd/config_serve_tls.go @@ -8,7 +8,7 @@ import ( "github.com/mitchellh/mapstructure" "github.com/pkg/errors" - "github.com/zrepl/zrepl/cmd/config" + "github.com/zrepl/zrepl/config" "github.com/zrepl/zrepl/cmd/tlsconf" ) diff --git a/cmd/daemon/control.go b/cmd/daemon.deact/control.go similarity index 100% rename from cmd/daemon/control.go rename to cmd/daemon.deact/control.go diff --git a/cmd/daemon/daemon.go b/cmd/daemon.deact/daemon.go similarity index 95% rename from cmd/daemon/daemon.go rename to cmd/daemon.deact/daemon.go index b3d54b5..070d4fe 100644 --- a/cmd/daemon/daemon.go +++ b/cmd/daemon.deact/daemon.go @@ -1,5 +1,18 @@ package daemon +import ( + "context" + "github.com/zrepl/zrepl/logger" + "os" + "os/signal" + "syscall" + "time" + "github.com/zrepl/zrepl/version" + "fmt" +) + +.daesdfadsfsafjlsjfda + import ( "context" "fmt" diff --git a/cmd/daemon/job/job.go b/cmd/daemon.deact/job/job.go similarity index 100% rename from cmd/daemon/job/job.go rename to cmd/daemon.deact/job/job.go diff --git a/cmd/daemon/pprof.go b/cmd/daemon.deact/pprof.go similarity index 100% rename from cmd/daemon/pprof.go rename to cmd/daemon.deact/pprof.go diff --git a/cmd/daemon/prometheus.go b/cmd/daemon.deact/prometheus.go similarity index 100% rename from cmd/daemon/prometheus.go rename to cmd/daemon.deact/prometheus.go diff --git a/cmd/daemon.go b/cmd/daemon.go index dd45234..621c591 100644 --- a/cmd/daemon.go +++ b/cmd/daemon.go @@ -4,7 +4,7 @@ import ( "context" "fmt" "github.com/spf13/cobra" - "github.com/zrepl/zrepl/cmd/config" + "github.com/zrepl/zrepl/config" "github.com/zrepl/zrepl/cmd/daemon" "github.com/zrepl/zrepl/cmd/daemon/job" "github.com/zrepl/zrepl/logger" diff --git a/cmd/pruning/retentiongrid/config_prune_grid.go b/cmd/pruning/retentiongrid/config_prune_grid.go index 5223b9d..de1c6ba 100644 --- a/cmd/pruning/retentiongrid/config_prune_grid.go +++ b/cmd/pruning/retentiongrid/config_prune_grid.go @@ -2,7 +2,7 @@ package retentiongrid import ( "github.com/pkg/errors" - "github.com/zrepl/zrepl/cmd/config" + "github.com/zrepl/zrepl/config" "github.com/zrepl/zrepl/zfs" "math" "sort" diff --git a/cmd/config/config.go b/config/config.go similarity index 95% rename from cmd/config/config.go rename to config/config.go index a999732..57bb701 100644 --- a/cmd/config/config.go +++ b/config/config.go @@ -152,12 +152,13 @@ type TCPServe struct { } type TLSServe struct { - Type string `yaml:"type"` - Listen string `yaml:"listen"` - Ca string `yaml:"ca"` - Cert string `yaml:"cert"` - Key string `yaml:"key"` - ClientCN string `yaml:"client_cn"` + Type string `yaml:"type"` + Listen string `yaml:"listen"` + Ca string `yaml:"ca"` + Cert string `yaml:"cert"` + Key string `yaml:"key"` + ClientCN string `yaml:"client_cn"` + HandshakeTimeout time.Duration `yaml:"handshake_timeout,positive,default=10s"` } type StdinserverServer struct { @@ -195,19 +196,19 @@ type LoggingOutletCommon struct { type StdoutLoggingOutlet struct { LoggingOutletCommon `yaml:",inline"` - Time bool `yaml:"time"` + Time bool `yaml:"time,default=true"` } type SyslogLoggingOutlet struct { LoggingOutletCommon `yaml:",inline"` - RetryInterval time.Duration `yaml:"retry_interval,positive"` + RetryInterval time.Duration `yaml:"retry_interval,positive,default=10s"` } type TCPLoggingOutlet struct { LoggingOutletCommon `yaml:",inline"` Address string `yaml:"address"` Net string `yaml:"net,default=tcp"` - RetryInterval time.Duration `yaml:"retry_interval,positive"` + RetryInterval time.Duration `yaml:"retry_interval,positive,default=10s"` TLS *TCPLoggingOutletTLS `yaml:"tls,optional"` } diff --git a/cmd/config/config_test.go b/config/config_test.go similarity index 100% rename from cmd/config/config_test.go rename to config/config_test.go diff --git a/cmd/config/retentiongrid.go b/config/retentiongrid.go similarity index 100% rename from cmd/config/retentiongrid.go rename to config/retentiongrid.go diff --git a/cmd/config/samples/local.yml b/config/samples/local.yml similarity index 100% rename from cmd/config/samples/local.yml rename to config/samples/local.yml diff --git a/cmd/config/samples/pull.yml b/config/samples/pull.yml similarity index 100% rename from cmd/config/samples/pull.yml rename to config/samples/pull.yml diff --git a/cmd/config/samples/pull_ssh.yml b/config/samples/pull_ssh.yml similarity index 100% rename from cmd/config/samples/pull_ssh.yml rename to config/samples/pull_ssh.yml diff --git a/cmd/config/samples/push.yml b/config/samples/push.yml similarity index 100% rename from cmd/config/samples/push.yml rename to config/samples/push.yml diff --git a/cmd/config/samples/sink.yml b/config/samples/sink.yml similarity index 100% rename from cmd/config/samples/sink.yml rename to config/samples/sink.yml diff --git a/cmd/config/samples/source.yml b/config/samples/source.yml similarity index 100% rename from cmd/config/samples/source.yml rename to config/samples/source.yml diff --git a/cmd/config/samples/source_ssh.yml b/config/samples/source_ssh.yml similarity index 100% rename from cmd/config/samples/source_ssh.yml rename to config/samples/source_ssh.yml diff --git a/daemon/connecter/connect_ssh.go b/daemon/connecter/connect_ssh.go new file mode 100644 index 0000000..7efeec5 --- /dev/null +++ b/daemon/connecter/connect_ssh.go @@ -0,0 +1,66 @@ +package connecter + +import ( + "context" + "github.com/jinzhu/copier" + "github.com/pkg/errors" + "github.com/problame/go-netssh" + "github.com/problame/go-streamrpc" + "github.com/zrepl/zrepl/config" + "net" + "time" +) + +type SSHStdinserverConnecter struct { + Host string + User string + Port uint16 + IdentityFile string + TransportOpenCommand []string + SSHCommand string + Options []string + dialTimeout time.Duration +} + +var _ streamrpc.Connecter = &SSHStdinserverConnecter{} + +func SSHStdinserverConnecterFromConfig(in *config.SSHStdinserverConnect) (c *SSHStdinserverConnecter, err error) { + + c = &SSHStdinserverConnecter{ + Host: in.Host, + User: in.User, + Port: in.Port, + IdentityFile: in.IdentityFile, + SSHCommand: in.SSHCommand, + Options: in.Options, + dialTimeout: in.DialTimeout, + } + return + +} + +type netsshConnToConn struct{ *netssh.SSHConn } + +var _ net.Conn = netsshConnToConn{} + +func (netsshConnToConn) SetDeadline(dl time.Time) error { return nil } +func (netsshConnToConn) SetReadDeadline(dl time.Time) error { return nil } +func (netsshConnToConn) SetWriteDeadline(dl time.Time) error { return nil } + +func (c *SSHStdinserverConnecter) Connect(dialCtx context.Context) (net.Conn, error) { + + var endpoint netssh.Endpoint + if err := copier.Copy(&endpoint, c); err != nil { + return nil, errors.WithStack(err) + } + dialCtx, dialCancel := context.WithTimeout(dialCtx, c.dialTimeout) // context.TODO tied to error handling below + defer dialCancel() + nconn, err := netssh.Dial(dialCtx, endpoint) + if err != nil { + if err == context.DeadlineExceeded { + err = errors.Errorf("dial_timeout of %s exceeded", c.dialTimeout) + } + return nil, err + } + return netsshConnToConn{nconn}, nil +} diff --git a/daemon/connecter/connect_tcp.go b/daemon/connecter/connect_tcp.go new file mode 100644 index 0000000..3d8b77e --- /dev/null +++ b/daemon/connecter/connect_tcp.go @@ -0,0 +1,24 @@ +package connecter + +import ( + "context" + "github.com/zrepl/zrepl/config" + "net" +) + +type TCPConnecter struct { + Address string + dialer net.Dialer +} + +func TCPConnecterFromConfig(in *config.TCPConnect) (*TCPConnecter, error) { + dialer := net.Dialer{ + Timeout: in.DialTimeout, + } + + return &TCPConnecter{in.Address, dialer}, nil +} + +func (c *TCPConnecter) Connect(dialCtx context.Context) (conn net.Conn, err error) { + return c.dialer.DialContext(dialCtx, "tcp", c.Address) +} diff --git a/daemon/connecter/connect_tls.go b/daemon/connecter/connect_tls.go new file mode 100644 index 0000000..28f3440 --- /dev/null +++ b/daemon/connecter/connect_tls.go @@ -0,0 +1,43 @@ +package connecter + +import ( + "context" + "crypto/tls" + "github.com/pkg/errors" + "github.com/zrepl/zrepl/config" + "github.com/zrepl/zrepl/tlsconf" + "net" +) + +type TLSConnecter struct { + Address string + dialer net.Dialer + tlsConfig *tls.Config +} + +func TLSConnecterFromConfig(in *config.TLSConnect) (*TLSConnecter, error) { + dialer := net.Dialer{ + Timeout: in.DialTimeout, + } + + ca, err := tlsconf.ParseCAFile(in.Ca) + if err != nil { + return nil, errors.Wrap(err, "cannot parse ca file") + } + + cert, err := tls.LoadX509KeyPair(in.Cert, in.Key) + if err != nil { + return nil, errors.Wrap(err, "cannot parse cert/key pair") + } + + tlsConfig, err := tlsconf.ClientAuthClient(in.ServerCN, ca, cert) + if err != nil { + return nil, errors.Wrap(err, "cannot build tls config") + } + + return &TLSConnecter{in.Address, dialer, tlsConfig}, nil +} + +func (c *TLSConnecter) Connect(dialCtx context.Context) (conn net.Conn, err error) { + return tls.DialWithDialer(&c.dialer, "tcp", c.Address, c.tlsConfig) +} diff --git a/daemon/connecter/connecter.go b/daemon/connecter/connecter.go new file mode 100644 index 0000000..a27ba7d --- /dev/null +++ b/daemon/connecter/connecter.go @@ -0,0 +1,20 @@ +package connecter + +import ( + "fmt" + "github.com/problame/go-streamrpc" + "github.com/zrepl/zrepl/config" +) + +func FromConfig(g config.Global, in config.ConnectEnum) (streamrpc.Connecter, error) { + switch v := in.Ret.(type) { + case *config.SSHStdinserverConnect: + return SSHStdinserverConnecterFromConfig(v) + case *config.TCPConnect: + return TCPConnecterFromConfig(v) + case *config.TLSConnect: + return TLSConnecterFromConfig(v) + default: + panic(fmt.Sprintf("implementation error: unknown connecter type %T", v)) + } +} diff --git a/daemon/control.go b/daemon/control.go index 0aff1ce..c7e1307 100644 --- a/daemon/control.go +++ b/daemon/control.go @@ -5,8 +5,8 @@ import ( "context" "encoding/json" "github.com/pkg/errors" - "github.com/zrepl/zrepl/cmd/daemon/job" - "github.com/zrepl/zrepl/cmd/helpers" + "github.com/zrepl/zrepl/daemon/job" + "github.com/zrepl/zrepl/daemon/nethelpers" "github.com/zrepl/zrepl/logger" "github.com/zrepl/zrepl/version" "io" @@ -39,6 +39,7 @@ const ( ControlJobEndpointPProf string = "/debug/pprof" ControlJobEndpointVersion string = "/version" ControlJobEndpointStatus string = "/status" + ControlJobEndpointWakeup string = "/wakeup" ) func (j *controlJob) Run(ctx context.Context) { @@ -46,7 +47,7 @@ func (j *controlJob) Run(ctx context.Context) { log := job.GetLogger(ctx) defer log.Info("control job finished") - l, err := helpers.ListenUnixPrivate(j.sockaddr) + l, err := nethelpers.ListenUnixPrivate(j.sockaddr) if err != nil { log.WithError(err).Error("error listening") return @@ -55,25 +56,42 @@ func (j *controlJob) Run(ctx context.Context) { pprofServer := NewPProfServer(ctx) mux := http.NewServeMux() - mux.Handle(ControlJobEndpointPProf, requestLogger{log: log, handlerFunc: func(w http.ResponseWriter, r *http.Request) { - var msg PprofServerControlMsg - err := json.NewDecoder(r.Body).Decode(&msg) - if err != nil { - log.WithError(err).Error("bad pprof request from client") - w.WriteHeader(http.StatusBadRequest) - } - pprofServer.Control(msg) - w.WriteHeader(200) - }}) + mux.Handle(ControlJobEndpointPProf, + requestLogger{log: log, handler: jsonRequestResponder{func(decoder jsonDecoder) (interface{}, error) { + var msg PprofServerControlMsg + err := decoder(&msg) + if err != nil { + return nil, errors.Errorf("decode failed") + } + pprofServer.Control(msg) + return struct{}{}, nil + }}}) + mux.Handle(ControlJobEndpointVersion, requestLogger{log: log, handler: jsonResponder{func() (interface{}, error) { return version.NewZreplVersionInformation(), nil }}}) + mux.Handle(ControlJobEndpointStatus, requestLogger{log: log, handler: jsonResponder{func() (interface{}, error) { s := j.jobs.status() return s, nil }}}) + + mux.Handle(ControlJobEndpointWakeup, + requestLogger{log: log, handler: jsonRequestResponder{func(decoder jsonDecoder) (interface{}, error) { + type reqT struct { + Name string + } + var req reqT + if decoder(&req) != nil { + return nil, errors.Errorf("decode failed") + } + + err := j.jobs.wakeup(req.Name) + + return struct{}{}, err + }}}) server := http.Server{Handler: mux} outer: @@ -122,6 +140,43 @@ func (j jsonResponder) ServeHTTP(w http.ResponseWriter, r *http.Request) { } } +type jsonDecoder = func(interface{}) error + +type jsonRequestResponder struct { + producer func(decoder jsonDecoder) (interface{}, error) +} + +func (j jsonRequestResponder) ServeHTTP(w http.ResponseWriter, r *http.Request) { + var decodeError error + decoder := func(i interface{}) error { + err := json.NewDecoder(r.Body).Decode(&i) + decodeError = err + return err + } + res, producerErr := j.producer(decoder) + + //If we had a decode error ignore output of producer and return error + if decodeError != nil { + w.WriteHeader(http.StatusBadRequest) + io.WriteString(w, decodeError.Error()) + return + } + if producerErr != nil { + w.WriteHeader(http.StatusInternalServerError) + io.WriteString(w, producerErr.Error()) + return + } + + var buf bytes.Buffer + encodeErr := json.NewEncoder(&buf).Encode(res) + if encodeErr != nil { + w.WriteHeader(http.StatusInternalServerError) + io.WriteString(w, encodeErr.Error()) + } else { + io.Copy(w, &buf) + } +} + type requestLogger struct { log logger.Logger handler http.Handler diff --git a/daemon/daemon.go b/daemon/daemon.go index a408aa3..e231d72 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -3,7 +3,10 @@ package daemon import ( "context" "fmt" + "github.com/pkg/errors" + "github.com/zrepl/zrepl/config" "github.com/zrepl/zrepl/daemon/job" + "github.com/zrepl/zrepl/daemon/logging" "github.com/zrepl/zrepl/logger" "github.com/zrepl/zrepl/version" "os" @@ -12,12 +15,8 @@ import ( "sync" "syscall" "time" - "github.com/zrepl/zrepl/cmd/config" - "github.com/zrepl/zrepl/daemon/logging" - "github.com/pkg/errors" ) - func Run(conf config.Config) error { ctx, cancel := context.WithCancel(context.Background()) @@ -86,13 +85,13 @@ type jobs struct { // m protects all fields below it m sync.RWMutex - wakeups map[string]job.WakeupChan // by JobName + wakeups map[string]job.WakeupFunc // by Job.Name jobs map[string]job.Job } func newJobs() *jobs { return &jobs{ - wakeups: make(map[string]job.WakeupChan), + wakeups: make(map[string]job.WakeupFunc), jobs: make(map[string]job.Job), } } @@ -137,6 +136,17 @@ func (s *jobs) status() map[string]interface{} { return ret } +func (s *jobs) wakeup(job string) error { + s.m.RLock() + defer s.m.RUnlock() + + wu, ok := s.wakeups[job] + if !ok { + return errors.Errorf("Job %s does not exist", job) + } + return wu() +} + const ( jobNamePrometheus = "_prometheus" jobNameControl = "_control" diff --git a/daemon/filters/fsmapfilter.go b/daemon/filters/fsmapfilter.go new file mode 100644 index 0000000..7fc6bbd --- /dev/null +++ b/daemon/filters/fsmapfilter.go @@ -0,0 +1,273 @@ +package filters + +import ( + "fmt" + "github.com/pkg/errors" + "github.com/zrepl/zrepl/endpoint" + "github.com/zrepl/zrepl/zfs" + "strings" +) + +type DatasetMapFilter struct { + entries []datasetMapFilterEntry + + // if set, only valid filter entries can be added using Add() + // and Map() will always return an error + filterMode bool +} + +type datasetMapFilterEntry struct { + path *zfs.DatasetPath + // the mapping. since this datastructure acts as both mapping and filter + // we have to convert it to the desired rep dynamically + mapping string + subtreeMatch bool +} + +func NewDatasetMapFilter(capacity int, filterMode bool) *DatasetMapFilter { + return &DatasetMapFilter{ + entries: make([]datasetMapFilterEntry, 0, capacity), + filterMode: filterMode, + } +} + +func (m *DatasetMapFilter) Add(pathPattern, mapping string) (err error) { + + if m.filterMode { + if _, err = m.parseDatasetFilterResult(mapping); err != nil { + return + } + } + + // assert path glob adheres to spec + const SUBTREE_PATTERN string = "<" + patternCount := strings.Count(pathPattern, SUBTREE_PATTERN) + switch { + case patternCount > 1: + case patternCount == 1 && !strings.HasSuffix(pathPattern, SUBTREE_PATTERN): + err = fmt.Errorf("pattern invalid: only one '<' at end of string allowed") + return + } + + pathStr := strings.TrimSuffix(pathPattern, SUBTREE_PATTERN) + path, err := zfs.NewDatasetPath(pathStr) + if err != nil { + return fmt.Errorf("pattern is not a dataset path: %s", err) + } + + entry := datasetMapFilterEntry{ + path: path, + mapping: mapping, + subtreeMatch: patternCount > 0, + } + m.entries = append(m.entries, entry) + return + +} + +// find the most specific prefix mapping we have +// +// longer prefix wins over shorter prefix, direct wins over glob +func (m DatasetMapFilter) mostSpecificPrefixMapping(path *zfs.DatasetPath) (idx int, found bool) { + lcp, lcp_entry_idx := -1, -1 + direct_idx := -1 + for e := range m.entries { + entry := m.entries[e] + ep := m.entries[e].path + lep := ep.Length() + + switch { + case !entry.subtreeMatch && ep.Equal(path): + direct_idx = e + continue + case entry.subtreeMatch && path.HasPrefix(ep) && lep > lcp: + lcp = lep + lcp_entry_idx = e + default: + continue + } + } + + if lcp_entry_idx >= 0 || direct_idx >= 0 { + found = true + switch { + case direct_idx >= 0: + idx = direct_idx + case lcp_entry_idx >= 0: + idx = lcp_entry_idx + } + } + return +} + +// Returns target == nil if there is no mapping +func (m DatasetMapFilter) Map(source *zfs.DatasetPath) (target *zfs.DatasetPath, err error) { + + if m.filterMode { + err = fmt.Errorf("using a filter for mapping simply does not work") + return + } + + mi, hasMapping := m.mostSpecificPrefixMapping(source) + if !hasMapping { + return nil, nil + } + me := m.entries[mi] + + if me.mapping == "" { + // Special case treatment: 'foo/bar<' => '' + if !me.subtreeMatch { + return nil, fmt.Errorf("mapping to '' must be a subtree match") + } + // ok... + } else { + if strings.HasPrefix("!", me.mapping) { + // reject mapping + return nil, nil + } + } + + target, err = zfs.NewDatasetPath(me.mapping) + if err != nil { + err = fmt.Errorf("mapping target is not a dataset path: %s", err) + return + } + if me.subtreeMatch { + // strip common prefix ('<' wildcards are no special case here) + extendComps := source.Copy() + extendComps.TrimPrefix(me.path) + target.Extend(extendComps) + } + return +} + +func (m DatasetMapFilter) Filter(p *zfs.DatasetPath) (pass bool, err error) { + + if !m.filterMode { + err = fmt.Errorf("using a mapping as a filter does not work") + return + } + + mi, hasMapping := m.mostSpecificPrefixMapping(p) + if !hasMapping { + pass = false + return + } + me := m.entries[mi] + pass, err = m.parseDatasetFilterResult(me.mapping) + return +} + +// Construct a new filter-only DatasetMapFilter from a mapping +// The new filter allows excactly those paths that were not forbidden by the mapping. +func (m DatasetMapFilter) InvertedFilter() (inv *DatasetMapFilter, err error) { + + if m.filterMode { + err = errors.Errorf("can only invert mappings") + return + } + + inv = &DatasetMapFilter{ + make([]datasetMapFilterEntry, len(m.entries)), + true, + } + + for i, e := range m.entries { + inv.entries[i].path, err = zfs.NewDatasetPath(e.mapping) + if err != nil { + err = errors.Wrapf(err, "mapping cannot be inverted: '%s' is not a dataset path: %s", e.mapping) + return + } + inv.entries[i].mapping = MapFilterResultOk + inv.entries[i].subtreeMatch = e.subtreeMatch + } + + return inv, nil +} + +// FIXME investigate whether we can support more... +func (m DatasetMapFilter) Invert() (endpoint.FSMap, error) { + + if m.filterMode { + return nil, errors.Errorf("can only invert mappings") + } + + if len(m.entries) != 1 { + return nil, errors.Errorf("inversion of complicated mappings is not implemented") // FIXME + } + + e := m.entries[0] + + inv := &DatasetMapFilter{ + make([]datasetMapFilterEntry, len(m.entries)), + false, + } + mp, err := zfs.NewDatasetPath(e.mapping) + if err != nil { + return nil, err + } + + inv.entries[0] = datasetMapFilterEntry{ + path: mp, + mapping: e.path.ToString(), + subtreeMatch: e.subtreeMatch, + } + + return inv, nil +} + +// Creates a new DatasetMapFilter in filter mode from a mapping +// All accepting mapping results are mapped to accepting filter results +// All rejecting mapping results are mapped to rejecting filter results +func (m DatasetMapFilter) AsFilter() endpoint.FSFilter { + + f := &DatasetMapFilter{ + make([]datasetMapFilterEntry, len(m.entries)), + true, + } + + for i, e := range m.entries { + var newe datasetMapFilterEntry = e + if strings.HasPrefix(newe.mapping, "!") { + newe.mapping = MapFilterResultOmit + } else { + newe.mapping = MapFilterResultOk + } + f.entries[i] = newe + } + + return f +} + +const ( + MapFilterResultOk string = "ok" + MapFilterResultOmit string = "!" +) + +// Parse a dataset filter result +func (m DatasetMapFilter) parseDatasetFilterResult(result string) (pass bool, err error) { + l := strings.ToLower(result) + if l == MapFilterResultOk { + return true, nil + } + if l == MapFilterResultOmit { + return false, nil + } + return false, fmt.Errorf("'%s' is not a valid filter result", result) +} + +func DatasetMapFilterFromConfig(in map[string]bool) (f *DatasetMapFilter, err error) { + + f = NewDatasetMapFilter(len(in), true) + for pathPattern, accept := range in { + mapping := MapFilterResultOmit + if accept { + mapping = MapFilterResultOk + } + if err = f.Add(pathPattern, mapping); err != nil { + err = fmt.Errorf("invalid mapping entry ['%s':'%s']: %s", pathPattern, mapping, err) + return + } + } + return +} diff --git a/daemon/filters/fsvfilter.go b/daemon/filters/fsvfilter.go new file mode 100644 index 0000000..0ec6225 --- /dev/null +++ b/daemon/filters/fsvfilter.go @@ -0,0 +1,15 @@ +package filters + +import "github.com/zrepl/zrepl/zfs" + +type AnyFSVFilter struct{} + +func NewAnyFSVFilter() AnyFSVFilter { + return AnyFSVFilter{} +} + +var _ zfs.FilesystemVersionFilter = AnyFSVFilter{} + +func (AnyFSVFilter) Filter(t zfs.VersionType, name string) (accept bool, err error) { + return true, nil +} diff --git a/daemon/job/build_jobs.go b/daemon/job/build_jobs.go index b36fc90..f062918 100644 --- a/daemon/job/build_jobs.go +++ b/daemon/job/build_jobs.go @@ -1,8 +1,8 @@ package job import ( - "github.com/zrepl/zrepl/cmd/config" "fmt" + "github.com/zrepl/zrepl/config" ) func JobsFromConfig(c config.Config) ([]Job, error) { @@ -20,8 +20,12 @@ func JobsFromConfig(c config.Config) ([]Job, error) { func buildJob(c config.Global, in config.JobEnum) (j Job, err error) { switch v := in.Ret.(type) { + case *config.SinkJob: + return SinkFromConfig(c, v) + case *config.PushJob: + return PushFromConfig(c, v) default: - panic(fmt.Sprintf("implementation error: unknown job type %s", v)) + panic(fmt.Sprintf("implementation error: unknown job type %T", v)) } -} \ No newline at end of file +} diff --git a/daemon/job/job.go b/daemon/job/job.go index 56e25af..4ea199a 100644 --- a/daemon/job/job.go +++ b/daemon/job/job.go @@ -2,7 +2,10 @@ package job import ( "context" + "errors" + "github.com/problame/go-streamrpc" "github.com/zrepl/zrepl/logger" + "time" ) type Logger = logger.Logger @@ -25,9 +28,21 @@ func WithLogger(ctx context.Context, l Logger) context.Context { return context.WithValue(ctx, contextKeyLog, l) } -func WithWakeup(ctx context.Context) (context.Context, WakeupChan) { - wc := make(chan struct{}, 1) - return context.WithValue(ctx, contextKeyWakeup, wc), wc +type WakeupFunc func() error + +var AlreadyWokenUp = errors.New("already woken up") + +func WithWakeup(ctx context.Context) (context.Context, WakeupFunc) { + wc := make(chan struct{}) + wuf := func() error { + select { + case wc <- struct{}{}: + return nil + default: + return AlreadyWokenUp + } + } + return context.WithValue(ctx, contextKeyWakeup, wc), wuf } type Job interface { @@ -36,12 +51,23 @@ type Job interface { Status() interface{} } -type WakeupChan <-chan struct{} - -func WaitWakeup(ctx context.Context) WakeupChan { - wc, ok := ctx.Value(contextKeyWakeup).(WakeupChan) +func WaitWakeup(ctx context.Context) <-chan struct{} { + wc, ok := ctx.Value(contextKeyWakeup).(chan struct{}) if !ok { wc = make(chan struct{}) } return wc } + +var STREAMRPC_CONFIG = &streamrpc.ConnConfig{ // FIXME oversight and configurability + RxHeaderMaxLen: 4096, + RxStructuredMaxLen: 4096 * 4096, + RxStreamMaxChunkSize: 4096 * 4096, + TxChunkSize: 4096 * 4096, + RxTimeout: streamrpc.Timeout{ + Progress: 10 * time.Second, + }, + TxTimeout: streamrpc.Timeout{ + Progress: 10 * time.Second, + }, +} diff --git a/daemon/job/push.go b/daemon/job/push.go new file mode 100644 index 0000000..f234981 --- /dev/null +++ b/daemon/job/push.go @@ -0,0 +1,86 @@ +package job + +import ( + "context" + "github.com/pkg/errors" + "github.com/problame/go-streamrpc" + "github.com/zrepl/zrepl/config" + "github.com/zrepl/zrepl/daemon/connecter" + "github.com/zrepl/zrepl/daemon/filters" + "github.com/zrepl/zrepl/daemon/logging" + "github.com/zrepl/zrepl/endpoint" + "github.com/zrepl/zrepl/replication" + "sync" +) + +type Push struct { + name string + connecter streamrpc.Connecter + fsfilter endpoint.FSFilter + + mtx sync.Mutex + replication *replication.Replication +} + +func PushFromConfig(g config.Global, in *config.PushJob) (j *Push, err error) { + + j = &Push{} + j.name = in.Name + + j.connecter, err = connecter.FromConfig(g, in.Replication.Connect) + + if j.fsfilter, err = filters.DatasetMapFilterFromConfig(in.Replication.Filesystems); err != nil { + return nil, errors.Wrap(err, "cannnot build filesystem filter") + } + + return j, nil +} + +func (j *Push) Name() string { return j.name } + +func (j *Push) Status() interface{} { + return nil // FIXME +} + +func (j *Push) Run(ctx context.Context) { + log := GetLogger(ctx) + + defer log.Info("job exiting") + + log.Debug("wait for wakeups") + + invocationCount := 0 +outer: + for { + select { + case <-ctx.Done(): + log.WithError(ctx.Err()).Info("context") + break outer + case <-WaitWakeup(ctx): + invocationCount++ + invLog := log.WithField("invocation", invocationCount) + j.do(WithLogger(ctx, invLog)) + } + } +} + +func (j *Push) do(ctx context.Context) { + + log := GetLogger(ctx) + + client, err := streamrpc.NewClient(j.connecter, &streamrpc.ClientConfig{STREAMRPC_CONFIG}) + if err != nil { + log.WithError(err).Error("cannot create streamrpc client") + } + defer client.Close() + + sender := endpoint.NewSender(j.fsfilter, filters.NewAnyFSVFilter()) + receiver := endpoint.NewRemote(client) + + j.mtx.Lock() + rep := replication.NewReplication() + j.mtx.Unlock() + + ctx = logging.WithSubsystemLoggers(ctx, log) + rep.Drive(ctx, sender, receiver) +} diff --git a/daemon/job/sink.go b/daemon/job/sink.go new file mode 100644 index 0000000..618ced7 --- /dev/null +++ b/daemon/job/sink.go @@ -0,0 +1,115 @@ +package job + +import ( + "context" + "github.com/pkg/errors" + "github.com/problame/go-streamrpc" + "github.com/zrepl/zrepl/config" + "github.com/zrepl/zrepl/daemon/filters" + "github.com/zrepl/zrepl/daemon/logging" + "github.com/zrepl/zrepl/daemon/serve" + "github.com/zrepl/zrepl/endpoint" + "net" +) + +type Sink struct { + name string + l serve.ListenerFactory + fsmap endpoint.FSMap + fsmapInv endpoint.FSFilter +} + +func SinkFromConfig(g config.Global, in *config.SinkJob) (s *Sink, err error) { + + // FIXME multi client support + + s = &Sink{name: in.Name} + if s.l, err = serve.FromConfig(g, in.Replication.Serve); err != nil { + return nil, errors.Wrap(err, "cannot build server") + } + + fsmap := filters.NewDatasetMapFilter(1, false) // FIXME multi-client support + if err := fsmap.Add("<", in.Replication.RootDataset); err != nil { + return nil, errors.Wrap(err, "unexpected error: cannot build filesystem mapping") + } + s.fsmap = fsmap + + return s, nil +} + +func (j *Sink) Name() string { return j.name } + +func (*Sink) Status() interface{} { + // FIXME + return nil +} + +func (j *Sink) Run(ctx context.Context) { + + log := GetLogger(ctx) + defer log.Info("job exiting") + + l, err := j.l.Listen() + if err != nil { + log.WithError(err).Error("cannot listen") + return + } + + log.WithField("addr", l.Addr()).Debug("accepting connections") + + var connId int + +outer: + for { + + select { + case res := <-accept(l): + if res.err != nil { + log.WithError(err).Info("accept error") + break outer + } + connId++ + connLog := log. + WithField("connID", connId) + j.handleConnection(WithLogger(ctx, connLog), res.conn) + + case <-ctx.Done(): + break outer + } + + } + +} + +func (j *Sink) handleConnection(ctx context.Context, conn net.Conn) { + log := GetLogger(ctx) + log.WithField("addr", conn.RemoteAddr()).Info("handling connection") + defer log.Info("finished handling connection") + + logging.WithSubsystemLoggers(ctx, log) + + local, err := endpoint.NewReceiver(j.fsmap, filters.NewAnyFSVFilter()) + if err != nil { + log.WithError(err).Error("unexpected error: cannot convert mapping to filter") + return + } + + handler := endpoint.NewHandler(local) + if err := streamrpc.ServeConn(ctx, conn, STREAMRPC_CONFIG, handler.Handle); err != nil { + log.WithError(err).Error("error serving client") + } +} + +type acceptResult struct { + conn net.Conn + err error +} + +func accept(listener net.Listener) <-chan acceptResult { + c := make(chan acceptResult, 1) + go func() { + conn, err := listener.Accept() + c <- acceptResult{conn, err} + }() + return c +} diff --git a/daemon/logging/adaptors.go b/daemon/logging/adaptors.go new file mode 100644 index 0000000..7f0b21b --- /dev/null +++ b/daemon/logging/adaptors.go @@ -0,0 +1,32 @@ +package logging + +import ( + "fmt" + "github.com/problame/go-streamrpc" + "github.com/zrepl/zrepl/logger" + "strings" +) + +type streamrpcLogAdaptor = twoClassLogAdaptor + +type twoClassLogAdaptor struct { + logger.Logger +} + +var _ streamrpc.Logger = twoClassLogAdaptor{} + +func (a twoClassLogAdaptor) Errorf(fmtStr string, args ...interface{}) { + const errorSuffix = ": %s" + if len(args) == 1 { + if err, ok := args[0].(error); ok && strings.HasSuffix(fmtStr, errorSuffix) { + msg := strings.TrimSuffix(fmtStr, errorSuffix) + a.WithError(err).Error(msg) + return + } + } + a.Logger.Error(fmt.Sprintf(fmtStr, args...)) +} + +func (a twoClassLogAdaptor) Infof(fmtStr string, args ...interface{}) { + a.Logger.Info(fmt.Sprintf(fmtStr, args...)) +} diff --git a/daemon/logging/build_logging.go b/daemon/logging/build_logging.go index 6e2b4d6..f673033 100644 --- a/daemon/logging/build_logging.go +++ b/daemon/logging/build_logging.go @@ -1,14 +1,18 @@ package logging import ( - "github.com/zrepl/zrepl/cmd/config" - "os" - "github.com/mattn/go-isatty" + "context" "crypto/tls" - "github.com/pkg/errors" "crypto/x509" - "github.com/zrepl/zrepl/cmd/tlsconf" + "github.com/mattn/go-isatty" + "github.com/pkg/errors" + "github.com/problame/go-streamrpc" + "github.com/zrepl/zrepl/config" + "github.com/zrepl/zrepl/endpoint" "github.com/zrepl/zrepl/logger" + "github.com/zrepl/zrepl/replication" + "github.com/zrepl/zrepl/tlsconf" + "os" ) func OutletsFromConfig(in []config.LoggingOutletEnum) (*logger.Outlets, error) { @@ -53,6 +57,18 @@ func OutletsFromConfig(in []config.LoggingOutletEnum) (*logger.Outlets, error) { } +const ( + SubsysReplication = "repl" + SubsysStreamrpc = "rpc" + SubsyEndpoint = "endpoint" +) + +func WithSubsystemLoggers(ctx context.Context, log logger.Logger) context.Context { + ctx = replication.WithLogger(ctx, log.WithField(SubsysField, "repl")) + ctx = streamrpc.ContextWithLogger(ctx, streamrpcLogAdaptor{log.WithField(SubsysField, "rpc")}) + ctx = endpoint.WithLogger(ctx, log.WithField(SubsysField, "endpoint")) + return ctx +} func parseLogFormat(i interface{}) (f EntryFormatter, err error) { var is string @@ -97,19 +113,19 @@ func parseOutlet(in config.LoggingOutletEnum) (o logger.Outlet, level logger.Lev var f EntryFormatter switch v := in.Ret.(type) { - case config.StdoutLoggingOutlet: + case *config.StdoutLoggingOutlet: level, f, err = parseCommon(v.LoggingOutletCommon) if err != nil { break } o, err = parseStdoutOutlet(v, f) - case config.TCPLoggingOutlet: + case *config.TCPLoggingOutlet: level, f, err = parseCommon(v.LoggingOutletCommon) if err != nil { break } o, err = parseTCPOutlet(v, f) - case config.SyslogLoggingOutlet: + case *config.SyslogLoggingOutlet: level, f, err = parseCommon(v.LoggingOutletCommon) if err != nil { break @@ -121,7 +137,7 @@ func parseOutlet(in config.LoggingOutletEnum) (o logger.Outlet, level logger.Lev return o, level, err } -func parseStdoutOutlet(in config.StdoutLoggingOutlet, formatter EntryFormatter) (WriterOutlet, error) { +func parseStdoutOutlet(in *config.StdoutLoggingOutlet, formatter EntryFormatter) (WriterOutlet, error) { flags := MetadataAll writer := os.Stdout if !isatty.IsTerminal(writer.Fd()) && !in.Time { @@ -135,7 +151,7 @@ func parseStdoutOutlet(in config.StdoutLoggingOutlet, formatter EntryFormatter) }, nil } -func parseTCPOutlet(in config.TCPLoggingOutlet, formatter EntryFormatter) (out *TCPOutlet, err error) { +func parseTCPOutlet(in *config.TCPLoggingOutlet, formatter EntryFormatter) (out *TCPOutlet, err error) { var tlsConfig *tls.Config if in.TLS != nil { tlsConfig, err = func(m *config.TCPLoggingOutletTLS, host string) (*tls.Config, error) { @@ -171,11 +187,10 @@ func parseTCPOutlet(in config.TCPLoggingOutlet, formatter EntryFormatter) (out * } -func parseSyslogOutlet(in config.SyslogLoggingOutlet, formatter EntryFormatter) (out *SyslogOutlet, err error) { +func parseSyslogOutlet(in *config.SyslogLoggingOutlet, formatter EntryFormatter) (out *SyslogOutlet, err error) { out = &SyslogOutlet{} out.Formatter = formatter out.Formatter.SetMetadataFlags(MetadataNone) out.RetryInterval = in.RetryInterval return out, nil } - diff --git a/daemon/logging/logging_formatters.go b/daemon/logging/logging_formatters.go index b968531..de79ee0 100644 --- a/daemon/logging/logging_formatters.go +++ b/daemon/logging/logging_formatters.go @@ -17,12 +17,10 @@ const ( ) const ( - logJobField string = "job" - logTaskField string = "task" - logSubsysField string = "subsystem" + JobField string = "job" + SubsysField string = "subsystem" ) - type MetadataFlags int64 const ( @@ -33,7 +31,6 @@ const ( MetadataAll MetadataFlags = ^0 ) - type NoFormatter struct{} func (f NoFormatter) SetMetadataFlags(flags MetadataFlags) {} @@ -80,7 +77,7 @@ func (f *HumanFormatter) Format(e *logger.Entry) (out []byte, err error) { fmt.Fprintf(&line, "[%s]", e.Level.Short()) } - prefixFields := []string{logJobField, logTaskField, logSubsysField} + prefixFields := []string{JobField, SubsysField} prefixed := make(map[string]bool, len(prefixFields)+2) for _, field := range prefixFields { val, ok := e.Fields[field].(string) @@ -168,7 +165,7 @@ func (f *LogfmtFormatter) Format(e *logger.Entry) ([]byte, error) { // at least try and put job and task in front prefixed := make(map[string]bool, 2) - prefix := []string{logJobField, logTaskField, logSubsysField} + prefix := []string{JobField, SubsysField} for _, pf := range prefix { v, ok := e.Fields[pf] if !ok { diff --git a/daemon/logging/logging_outlets.go b/daemon/logging/logging_outlets.go index 4a0fd7a..5a00d42 100644 --- a/daemon/logging/logging_outlets.go +++ b/daemon/logging/logging_outlets.go @@ -12,7 +12,6 @@ import ( "time" ) - type EntryFormatter interface { SetMetadataFlags(flags MetadataFlags) Format(e *logger.Entry) ([]byte, error) diff --git a/daemon/main.go b/daemon/main.go index 488b020..2dd2590 100644 --- a/daemon/main.go +++ b/daemon/main.go @@ -5,4 +5,3 @@ import ( ) type Logger = logger.Logger - diff --git a/cmd/helpers/helpers.go b/daemon/nethelpers/helpers.go similarity index 98% rename from cmd/helpers/helpers.go rename to daemon/nethelpers/helpers.go index bcf2cf5..994b9d2 100644 --- a/cmd/helpers/helpers.go +++ b/daemon/nethelpers/helpers.go @@ -1,4 +1,4 @@ -package helpers +package nethelpers import ( "github.com/pkg/errors" diff --git a/daemon/prometheus.go b/daemon/prometheus.go index 1cef3d0..3836624 100644 --- a/daemon/prometheus.go +++ b/daemon/prometheus.go @@ -4,7 +4,7 @@ import ( "context" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" - "github.com/zrepl/zrepl/cmd/daemon/job" + "github.com/zrepl/zrepl/daemon/job" "github.com/zrepl/zrepl/zfs" "net" "net/http" diff --git a/daemon/serve/serve.go b/daemon/serve/serve.go new file mode 100644 index 0000000..8ab7945 --- /dev/null +++ b/daemon/serve/serve.go @@ -0,0 +1,26 @@ +package serve + +import ( + "github.com/pkg/errors" + "github.com/zrepl/zrepl/config" + "net" +) + +type ListenerFactory interface { + Listen() (net.Listener, error) +} + +func FromConfig(g config.Global, in config.ServeEnum) (ListenerFactory, error) { + + switch v := in.Ret.(type) { + case *config.TCPServe: + return TCPListenerFactoryFromConfig(g, v) + case *config.TLSServe: + return TLSListenerFactoryFromConfig(g, v) + case *config.StdinserverServer: + return StdinserverListenerFactoryFromConfig(g, v) + default: + return nil, errors.Errorf("internal error: unknown serve type %T", v) + } + +} diff --git a/daemon/serve/serve_stdinserver.go b/daemon/serve/serve_stdinserver.go new file mode 100644 index 0000000..4048d65 --- /dev/null +++ b/daemon/serve/serve_stdinserver.go @@ -0,0 +1,79 @@ +package serve + +import ( + "github.com/problame/go-netssh" + "github.com/zrepl/zrepl/config" + "github.com/zrepl/zrepl/daemon/nethelpers" + "io" + "net" + "path" + "time" +) + +type StdinserverListenerFactory struct { + ClientIdentity string + sockpath string +} + +func StdinserverListenerFactoryFromConfig(g config.Global, in *config.StdinserverServer) (f *StdinserverListenerFactory, err error) { + + f = &StdinserverListenerFactory{ + ClientIdentity: in.ClientIdentity, + } + + f.sockpath = path.Join(g.Serve.StdinServer.SockDir, f.ClientIdentity) + + return +} + +func (f *StdinserverListenerFactory) Listen() (net.Listener, error) { + + if err := nethelpers.PreparePrivateSockpath(f.sockpath); err != nil { + return nil, err + } + + l, err := netssh.Listen(f.sockpath) + if err != nil { + return nil, err + } + return StdinserverListener{l}, nil +} + +type StdinserverListener struct { + l *netssh.Listener +} + +func (l StdinserverListener) Addr() net.Addr { + return netsshAddr{} +} + +func (l StdinserverListener) Accept() (net.Conn, error) { + c, err := l.l.Accept() + if err != nil { + return nil, err + } + return netsshConnToNetConnAdatper{c}, nil +} + +func (l StdinserverListener) Close() (err error) { + return l.l.Close() +} + +type netsshAddr struct{} + +func (netsshAddr) Network() string { return "netssh" } +func (netsshAddr) String() string { return "???" } + +type netsshConnToNetConnAdatper struct { + io.ReadWriteCloser // works for both netssh.SSHConn and netssh.ServeConn +} + +func (netsshConnToNetConnAdatper) LocalAddr() net.Addr { return netsshAddr{} } + +func (netsshConnToNetConnAdatper) RemoteAddr() net.Addr { return netsshAddr{} } + +func (netsshConnToNetConnAdatper) SetDeadline(t time.Time) error { return nil } + +func (netsshConnToNetConnAdatper) SetReadDeadline(t time.Time) error { return nil } + +func (netsshConnToNetConnAdatper) SetWriteDeadline(t time.Time) error { return nil } diff --git a/daemon/serve/serve_tcp.go b/daemon/serve/serve_tcp.go new file mode 100644 index 0000000..a5bad28 --- /dev/null +++ b/daemon/serve/serve_tcp.go @@ -0,0 +1,21 @@ +package serve + +import ( + "github.com/zrepl/zrepl/config" + "net" +) + +type TCPListenerFactory struct { + Address string +} + +func TCPListenerFactoryFromConfig(c config.Global, in *config.TCPServe) (*TCPListenerFactory, error) { + lf := &TCPListenerFactory{ + Address: in.Listen, + } + return lf, nil +} + +func (f *TCPListenerFactory) Listen() (net.Listener, error) { + return net.Listen("tcp", f.Address) +} diff --git a/daemon/serve/serve_tls.go b/daemon/serve/serve_tls.go new file mode 100644 index 0000000..f24f5ad --- /dev/null +++ b/daemon/serve/serve_tls.go @@ -0,0 +1,52 @@ +package serve + +import ( + "crypto/tls" + "crypto/x509" + "github.com/pkg/errors" + "github.com/zrepl/zrepl/config" + "github.com/zrepl/zrepl/tlsconf" + "net" + "time" +) + +type TLSListenerFactory struct { + address string + clientCA *x509.CertPool + serverCert tls.Certificate + clientCommonName string + handshakeTimeout time.Duration +} + +func TLSListenerFactoryFromConfig(c config.Global, in *config.TLSServe) (lf *TLSListenerFactory, err error) { + lf = &TLSListenerFactory{ + address: in.Listen, + } + + if in.Ca == "" || in.Cert == "" || in.Key == "" || in.ClientCN == "" { + return nil, errors.New("fields 'ca', 'cert', 'key' and 'client_cn' must be specified") + } + + lf.clientCommonName = in.ClientCN + + lf.clientCA, err = tlsconf.ParseCAFile(in.Ca) + if err != nil { + return nil, errors.Wrap(err, "cannot parse ca file") + } + + lf.serverCert, err = tls.LoadX509KeyPair(in.Cert, in.Key) + if err != nil { + return nil, errors.Wrap(err, "cannot parse cer/key pair") + } + + return lf, nil +} + +func (f *TLSListenerFactory) Listen() (net.Listener, error) { + l, err := net.Listen("tcp", f.address) + if err != nil { + return nil, err + } + tl := tlsconf.NewClientAuthListener(l, f.clientCA, f.serverCert, f.clientCommonName, f.handshakeTimeout) + return tl, nil +} diff --git a/cmd/endpoint/context.go b/endpoint/context.go similarity index 100% rename from cmd/endpoint/context.go rename to endpoint/context.go diff --git a/cmd/endpoint/endpoint.go b/endpoint/endpoint.go similarity index 100% rename from cmd/endpoint/endpoint.go rename to endpoint/endpoint.go diff --git a/main.go b/main.go index 452b02d..251ffe2 100644 --- a/main.go +++ b/main.go @@ -2,11 +2,11 @@ package main import ( + "github.com/spf13/cobra" + "github.com/zrepl/zrepl/config" + "github.com/zrepl/zrepl/daemon" "log" "os" - "github.com/spf13/cobra" - "github.com/zrepl/zrepl/daemon" - "github.com/zrepl/zrepl/cmd/config" ) var rootCmd = &cobra.Command{ @@ -20,9 +20,8 @@ var rootCmd = &cobra.Command{ - ACLs instead of blank SSH access`, } - var daemonCmd = &cobra.Command{ - Use: "daemon", + Use: "daemon", Short: "daemon", RunE: func(cmd *cobra.Command, args []string) error { conf, err := config.ParseConfig(rootArgs.configFile) @@ -33,6 +32,18 @@ var daemonCmd = &cobra.Command{ }, } +var wakeupCmd = &cobra.Command{ + Use: "wakeup", + Short: "wake up jobs", + RunE: func(cmd *cobra.Command, args []string) error { + conf, err := config.ParseConfig(rootArgs.configFile) + if err != nil { + return err + } + return RunWakeup(conf, args) + }, +} + var rootArgs struct { configFile string } @@ -41,11 +52,11 @@ func init() { //cobra.OnInitialize(initConfig) rootCmd.PersistentFlags().StringVar(&rootArgs.configFile, "config", "", "config file path") rootCmd.AddCommand(daemonCmd) + rootCmd.AddCommand(wakeupCmd) } func main() { - if err := rootCmd.Execute(); err != nil { log.Printf("error executing root command: %s", err) os.Exit(1) diff --git a/cmd/tlsconf/tlsconf.go b/tlsconf/tlsconf.go similarity index 100% rename from cmd/tlsconf/tlsconf.go rename to tlsconf/tlsconf.go diff --git a/wakeup.go b/wakeup.go new file mode 100644 index 0000000..df389de --- /dev/null +++ b/wakeup.go @@ -0,0 +1,68 @@ +package main + +import ( + "bytes" + "context" + "encoding/json" + "github.com/pkg/errors" + "github.com/zrepl/zrepl/config" + "github.com/zrepl/zrepl/daemon" + "io" + "net" + "net/http" +) + +func RunWakeup(config config.Config, args []string) error { + if len(args) != 1 { + return errors.Errorf("Expected 1 argument: job") + } + + httpc, err := controlHttpClient(config.Global.Control.SockPath) + if err != nil { + return err + } + + err = jsonRequestResponse(httpc, daemon.ControlJobEndpointWakeup, + struct { + Name string + }{ + Name: args[0], + }, + struct{}{}, + ) + return err +} + +func controlHttpClient(sockpath string) (client http.Client, err error) { + return http.Client{ + Transport: &http.Transport{ + DialContext: func(_ context.Context, _, _ string) (net.Conn, error) { + return net.Dial("unix", sockpath) + }, + }, + }, nil +} + +func jsonRequestResponse(c http.Client, endpoint string, req interface{}, res interface{}) error { + var buf bytes.Buffer + encodeErr := json.NewEncoder(&buf).Encode(req) + if encodeErr != nil { + return encodeErr + } + + resp, err := c.Post("http://unix"+endpoint, "application/json", &buf) + if err != nil { + return err + } else if resp.StatusCode != http.StatusOK { + var msg bytes.Buffer + io.CopyN(&msg, resp.Body, 4096) + return errors.Errorf("%s", msg.String()) + } + + decodeError := json.NewDecoder(resp.Body).Decode(&res) + if decodeError != nil { + return decodeError + } + + return nil +}