signal handling for source job

This commit is contained in:
Christian Schwarz 2017-09-11 13:50:35 +02:00
parent ce25c01c7e
commit 0a53b2415f

View File

@ -3,16 +3,20 @@ package cmd
import ( import (
"time" "time"
"github.com/zrepl/zrepl/rpc"
mapstructure "github.com/mitchellh/mapstructure" mapstructure "github.com/mitchellh/mapstructure"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/zrepl/zrepl/rpc"
"io"
"os"
"os/signal"
"syscall"
) )
type SourceJob struct { type SourceJob struct {
Name string Name string
Serve AuthenticatedChannelListenerFactory Serve AuthenticatedChannelListenerFactory
Datasets *DatasetMapFilter Datasets *DatasetMapFilter
SnapshotFilter *PrefixSnapshotFilter SnapshotFilter *PrefixSnapshotFilter
Interval time.Duration Interval time.Duration
Prune PrunePolicy Prune PrunePolicy
} }
@ -64,37 +68,71 @@ func (j *SourceJob) JobName() string {
func (j *SourceJob) JobDo(log Logger) (err error) { func (j *SourceJob) JobDo(log Logger) (err error) {
// Setup automatic snapshotting
listener, err := j.Serve.Listen() listener, err := j.Serve.Listen()
if err != nil { if err != nil {
return err return err
} }
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
rwcChan := make(chan io.ReadWriteCloser)
// Serve connections until interrupted or error
outer:
for { for {
// listener does auth for us go func() {
rwc, err := listener.Accept() rwc, err := listener.Accept()
if err != nil { if err != nil {
// if err != AuthError... log.Printf("error accepting connection: %s", err)
panic(err) // TODO close(rwcChan)
return
}
rwcChan <- rwc
}()
select {
case rwc, notClosed := <-rwcChan:
if !notClosed {
break outer // closed because of accept error
}
// construct connection handler
handler := Handler{
Logger: log,
PullACL: j.Datasets,
// TODO should set SinkMapping here? no, but check Handler impl
}
// handle connection
rpcServer := rpc.NewServer(rwc)
registerEndpoints(rpcServer, handler)
if err = rpcServer.Serve(); err != nil {
log.Printf("error serving connection: %s", err)
}
rwc.Close()
case sig := <-sigChan:
log.Printf("%s received", sig)
break outer
} }
// construct connection handler
handler := Handler{
Logger: log,
PullACL: j.Datasets,
// TODO should set SinkMapping here? no, but check Handler impl
}
// handle connection
server := rpc.NewServer(rwc)
registerEndpoints(server, handler)
if err = server.Serve(); err != nil {
log.Printf("error serving connection: %s", err)
}
rwc.Close()
} }
signal.Stop(sigChan)
close(sigChan)
log.Printf("closing listener")
err = listener.Close()
if err != nil {
log.Printf("error closing listener: %s", err)
}
return nil
} }