From 0a53b2415f6981bdeb0b080ac91678ad19bbce2e Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Mon, 11 Sep 2017 13:50:35 +0200 Subject: [PATCH] signal handling for source job --- cmd/config_job_source.go | 86 +++++++++++++++++++++++++++++----------- 1 file changed, 62 insertions(+), 24 deletions(-) diff --git a/cmd/config_job_source.go b/cmd/config_job_source.go index 7f4ac23..fb1dccb 100644 --- a/cmd/config_job_source.go +++ b/cmd/config_job_source.go @@ -3,16 +3,20 @@ package cmd import ( "time" - "github.com/zrepl/zrepl/rpc" mapstructure "github.com/mitchellh/mapstructure" "github.com/pkg/errors" + "github.com/zrepl/zrepl/rpc" + "io" + "os" + "os/signal" + "syscall" ) type SourceJob struct { Name string Serve AuthenticatedChannelListenerFactory Datasets *DatasetMapFilter - SnapshotFilter *PrefixSnapshotFilter + SnapshotFilter *PrefixSnapshotFilter Interval time.Duration Prune PrunePolicy } @@ -64,37 +68,71 @@ func (j *SourceJob) JobName() string { func (j *SourceJob) JobDo(log Logger) (err error) { - // Setup automatic snapshotting - listener, err := j.Serve.Listen() if err != nil { return err } + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + + rwcChan := make(chan io.ReadWriteCloser) + + // Serve connections until interrupted or error +outer: for { - // listener does auth for us - rwc, err := listener.Accept() - if err != nil { - // if err != AuthError... - panic(err) // TODO + go func() { + rwc, err := listener.Accept() + if err != nil { + log.Printf("error accepting connection: %s", err) + close(rwcChan) + return + } + rwcChan <- rwc + }() + + select { + + case rwc, notClosed := <-rwcChan: + + if !notClosed { + break outer // closed because of accept error + } + + // construct connection handler + handler := Handler{ + Logger: log, + PullACL: j.Datasets, + // TODO should set SinkMapping here? no, but check Handler impl + } + + // handle connection + rpcServer := rpc.NewServer(rwc) + registerEndpoints(rpcServer, handler) + if err = rpcServer.Serve(); err != nil { + log.Printf("error serving connection: %s", err) + } + rwc.Close() + + case sig := <-sigChan: + + log.Printf("%s received", sig) + break outer + } - // construct connection handler - handler := Handler{ - Logger: log, - PullACL: j.Datasets, - // TODO should set SinkMapping here? no, but check Handler impl - } - - // handle connection - server := rpc.NewServer(rwc) - registerEndpoints(server, handler) - if err = server.Serve(); err != nil { - log.Printf("error serving connection: %s", err) - } - - rwc.Close() } + signal.Stop(sigChan) + close(sigChan) + + log.Printf("closing listener") + err = listener.Close() + if err != nil { + log.Printf("error closing listener: %s", err) + } + + return nil + }