logger.Outlet: WriteEntry must not block

- make TCPOutlet fully asynchronous, dropping messages if connection is
  not fast enough
- syslog is just fine for now, local anyways
- stdout same thing

refs #26
This commit is contained in:
Christian Schwarz 2017-12-29 17:19:07 +01:00
parent 9a19615fd4
commit 839eccf513
6 changed files with 125 additions and 80 deletions

View File

@ -160,10 +160,6 @@ func parseStdoutOutlet(i interface{}, formatter EntryFormatter) (WriterOutlet, e
func parseTCPOutlet(i interface{}, formatter EntryFormatter) (out *TCPOutlet, err error) { func parseTCPOutlet(i interface{}, formatter EntryFormatter) (out *TCPOutlet, err error) {
out = &TCPOutlet{}
out.Formatter = formatter
out.Formatter.SetMetadataFlags(MetadataAll)
var in struct { var in struct {
Net string Net string
Address string Address string
@ -178,13 +174,19 @@ func parseTCPOutlet(i interface{}, formatter EntryFormatter) (out *TCPOutlet, er
return nil, errors.Wrap(err, "mapstructure error") return nil, errors.Wrap(err, "mapstructure error")
} }
out.RetryInterval, err = time.ParseDuration(in.RetryInterval) retryInterval, err := time.ParseDuration(in.RetryInterval)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "cannot parse 'retry_interval'") return nil, errors.Wrap(err, "cannot parse 'retry_interval'")
} }
out.Net, out.Address = in.Net, in.Address if len(in.Net) == 0 {
return nil, errors.New("field 'net' must not be empty")
}
if len(in.Address) == 0 {
return nil, errors.New("field 'address' must not be empty")
}
var tlsConfig *tls.Config
if in.TLS != nil { if in.TLS != nil {
cert, err := tls.LoadX509KeyPair(in.TLS.Cert, in.TLS.Key) cert, err := tls.LoadX509KeyPair(in.TLS.Cert, in.TLS.Key)
@ -211,15 +213,16 @@ func parseTCPOutlet(i interface{}, formatter EntryFormatter) (out *TCPOutlet, er
return nil, errors.Wrap(err, "cannot load root ca pool") return nil, errors.Wrap(err, "cannot load root ca pool")
} }
out.TLS = &tls.Config{ tlsConfig = &tls.Config{
Certificates: []tls.Certificate{cert}, Certificates: []tls.Certificate{cert},
RootCAs: rootCAs, RootCAs: rootCAs,
} }
out.TLS.BuildNameToCertificate() tlsConfig.BuildNameToCertificate()
} }
return formatter.SetMetadataFlags(MetadataAll)
return NewTCPOutlet(formatter, in.Net, in.Address, tlsConfig, retryInterval), nil
} }

View File

@ -373,7 +373,7 @@ func (t *Task) Log() *logger.Logger {
} }
// implement logger.Outlet interface // implement logger.Outlet interface
func (t *Task) WriteEntry(ctx context.Context, entry logger.Entry) error { func (t *Task) WriteEntry(entry logger.Entry) error {
t.rwl.RLock() t.rwl.RLock()
defer t.rwl.RUnlock() defer t.rwl.RUnlock()
t.cur().progress.UpdateLogEntry(entry) t.cur().progress.UpdateLogEntry(entry)

View File

@ -1,6 +1,7 @@
package cmd package cmd
import ( import (
"bytes"
"context" "context"
"crypto/tls" "crypto/tls"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -16,7 +17,7 @@ type WriterOutlet struct {
Writer io.Writer Writer io.Writer
} }
func (h WriterOutlet) WriteEntry(ctx context.Context, entry logger.Entry) error { func (h WriterOutlet) WriteEntry(entry logger.Entry) error {
bytes, err := h.Formatter.Format(&entry) bytes, err := h.Formatter.Format(&entry)
if err != nil { if err != nil {
return err return err
@ -27,56 +28,92 @@ func (h WriterOutlet) WriteEntry(ctx context.Context, entry logger.Entry) error
} }
type TCPOutlet struct { type TCPOutlet struct {
Formatter EntryFormatter formatter EntryFormatter
Net, Address string
Dialer net.Dialer
TLS *tls.Config
// Specifies how much time must pass between a connection error and a reconnection attempt // Specifies how much time must pass between a connection error and a reconnection attempt
// Log entries written to the outlet during this time interval are silently dropped. // Log entries written to the outlet during this time interval are silently dropped.
RetryInterval time.Duration connect func(ctx context.Context) (net.Conn, error)
// nil if there was an error sending / connecting to remote server entryChan chan *bytes.Buffer
conn net.Conn
// Last time an error occurred when sending / connecting to remote server
retry time.Time
} }
func (h *TCPOutlet) WriteEntry(ctx context.Context, e logger.Entry) error { func NewTCPOutlet(formatter EntryFormatter, network, address string, tlsConfig *tls.Config, retryInterval time.Duration) *TCPOutlet {
b, err := h.Formatter.Format(&e) connect := func(ctx context.Context) (conn net.Conn, err error) {
deadl, ok := ctx.Deadline()
if !ok {
deadl = time.Time{}
}
dialer := net.Dialer{
Deadline: deadl,
}
if tlsConfig != nil {
conn, err = tls.DialWithDialer(&dialer, network, address, tlsConfig)
} else {
conn, err = dialer.DialContext(ctx, network, address)
}
return
}
entryChan := make(chan *bytes.Buffer, 1) // allow one message in flight while previos is in io.Copy()
o := &TCPOutlet{
formatter: formatter,
connect: connect,
entryChan: entryChan,
}
go o.outLoop(retryInterval)
return o
}
// FIXME: use this method
func (h *TCPOutlet) Close() {
close(h.entryChan)
}
func (h *TCPOutlet) outLoop(retryInterval time.Duration) {
var retry time.Time
var conn net.Conn
for msg := range h.entryChan {
var err error
for conn == nil {
time.Sleep(time.Until(retry))
ctx, cancel := context.WithDeadline(context.TODO(), time.Now().Add(retryInterval))
conn, err = h.connect(ctx)
cancel()
if err != nil {
retry = time.Now().Add(retryInterval)
conn = nil
}
}
conn.SetWriteDeadline(time.Now().Add(retryInterval))
_, err = io.Copy(conn, msg)
if err != nil {
retry = time.Now().Add(retryInterval)
conn.Close()
conn = nil
}
}
}
func (h *TCPOutlet) WriteEntry(e logger.Entry) error {
ebytes, err := h.formatter.Format(&e)
if err != nil { if err != nil {
return err return err
} }
if h.conn == nil { buf := new(bytes.Buffer)
if time.Now().Sub(h.retry) < h.RetryInterval { buf.Write(ebytes)
// cool-down phase, drop the log entry buf.WriteString("\n")
return nil
}
if h.TLS != nil { select {
h.conn, err = tls.DialWithDialer(&h.Dialer, h.Net, h.Address, h.TLS) case h.entryChan <- buf:
} else { return nil
h.conn, err = h.Dialer.DialContext(ctx, h.Net, h.Address) default:
} return errors.New("connection broken or not fast enough")
if err != nil {
h.conn = nil
h.retry = time.Now()
return errors.Wrap(err, "cannot dial")
}
} }
_, err = h.conn.Write(b)
if err == nil {
_, err = h.conn.Write([]byte("\n"))
}
if err != nil {
h.conn.Close()
h.conn = nil
h.retry = time.Now()
return errors.Wrap(err, "cannot write")
}
return nil
} }
type SyslogOutlet struct { type SyslogOutlet struct {
@ -86,7 +123,7 @@ type SyslogOutlet struct {
lastConnectAttempt time.Time lastConnectAttempt time.Time
} }
func (o *SyslogOutlet) WriteEntry(ctx context.Context, entry logger.Entry) error { func (o *SyslogOutlet) WriteEntry(entry logger.Entry) error {
bytes, err := o.Formatter.Format(&entry) bytes, err := o.Formatter.Format(&entry)
if err != nil { if err != nil {

View File

@ -27,6 +27,8 @@ Check out :sampleconf:`random/logging.yml` for an example on how to configure mu
jobs: ... jobs: ...
.. _logging-error-outlet:
.. ATTENTION:: .. ATTENTION::
The **first outlet is special**: if an error writing to any outlet occurs, the first outlet receives the error and can print it. The **first outlet is special**: if an error writing to any outlet occurs, the first outlet receives the error and can print it.
Thus, the first outlet must be the one that always works and does not block, e.g. ``stdout``, which is the default. Thus, the first outlet must be the one that always works and does not block, e.g. ``stdout``, which is the default.
@ -100,6 +102,11 @@ Formats
``encoding/json.Marshal()``, which is particularly useful for processing in ``encoding/json.Marshal()``, which is particularly useful for processing in
log aggregation or when processing state dumps. log aggregation or when processing state dumps.
Outlets
~~~~~~~
Outlets are the destination for log entries.
.. _logging-outlet-stdout: .. _logging-outlet-stdout:
``stdout`` Outlet ``stdout`` Outlet
@ -150,12 +157,6 @@ Can only be specified once.
``tcp`` Outlet ``tcp`` Outlet
-------------- --------------
.. WARNING::
The TCP outlet is not fully asynchronous and blocks the calling goroutine when it cannot connect.
Currently it should only be used for local connections that are guaranteed to not fail / be slow.
This issue is tracked in :issue:`26`
.. list-table:: .. list-table::
:widths: 10 90 :widths: 10 90
:header-rows: 1 :header-rows: 1
@ -197,6 +198,15 @@ This is particularly useful in combination with log aggregation services that ru
* - ``key`` * - ``key``
- PEM-encoded, unencrypted client private key identifying this zrepl daemon toward the remote server - PEM-encoded, unencrypted client private key identifying this zrepl daemon toward the remote server
.. WARNING::
zrepl drops log messages to the TCP outlet if the underlying connection is not fast enough.
Note that TCP buffering in the kernel must first run full becfore messages are dropped.
Make sure to always configure a ``stdout`` outlet as the special error outlet to be informed about problems
with the TCP outlet (see :ref:`above <logging-error-outlet>` ).
.. NOTE:: .. NOTE::
zrepl uses Go's ``crypto/tls`` and ``crypto/x509`` packages and leaves all but the required fields in ``tls.Config`` at their default values. zrepl uses Go's ``crypto/tls`` and ``crypto/x509`` packages and leaves all but the required fields in ``tls.Config`` at their default values.

View File

@ -1,7 +1,6 @@
package logger package logger
import ( import (
"context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -82,10 +81,17 @@ type Entry struct {
Fields Fields Fields Fields
} }
// An outlet receives log entries produced by the Logger and writes them to some destination.
type Outlet interface { type Outlet interface {
// Write the entry to the destination.
//
// Logger waits for all outlets to return from WriteEntry() before returning from the log call.
// An implementation of Outlet must assert that it does not block in WriteEntry.
// Otherwise, it will slow down the program.
//
// Note: os.Stderr is also used by logger.Logger for reporting errors returned by outlets // Note: os.Stderr is also used by logger.Logger for reporting errors returned by outlets
// => you probably don't want to log there // => you probably don't want to log there
WriteEntry(ctx context.Context, entry Entry) error WriteEntry(entry Entry) error
} }
type Outlets struct { type Outlets struct {
@ -138,4 +144,4 @@ func (os *Outlets) GetLoggerErrorOutlet() Outlet {
type nullOutlet struct{} type nullOutlet struct{}
func (nullOutlet) WriteEntry(ctx context.Context, entry Entry) error { return nil } func (nullOutlet) WriteEntry(entry Entry) error { return nil }

View File

@ -1,7 +1,6 @@
package logger package logger
import ( import (
"context"
"fmt" "fmt"
"runtime/debug" "runtime/debug"
"sync" "sync"
@ -52,7 +51,7 @@ func (l *Logger) logInternalError(outlet Outlet, err string) {
time.Now(), time.Now(),
fields, fields,
} }
l.outlets.GetLoggerErrorOutlet().WriteEntry(context.Background(), entry) l.outlets.GetLoggerErrorOutlet().WriteEntry(entry)
} }
func (l *Logger) log(level Level, msg string) { func (l *Logger) log(level Level, msg string) {
@ -62,30 +61,20 @@ func (l *Logger) log(level Level, msg string) {
entry := Entry{level, msg, time.Now(), l.fields} entry := Entry{level, msg, time.Now(), l.fields}
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(l.outletTimeout))
ech := make(chan outletResult)
louts := l.outlets.Get(level) louts := l.outlets.Get(level)
ech := make(chan outletResult, len(louts))
for i := range louts { for i := range louts {
go func(ctx context.Context, outlet Outlet, entry Entry) { go func(outlet Outlet, entry Entry) {
ech <- outletResult{outlet, outlet.WriteEntry(ctx, entry)} ech <- outletResult{outlet, outlet.WriteEntry(entry)}
}(ctx, louts[i], entry) }(louts[i], entry)
} }
for fin := 0; fin < len(louts); fin++ { for fin := 0; fin < len(louts); fin++ {
select { res := <-ech
case res := <-ech: if res.Error != nil {
if res.Error != nil { l.logInternalError(res.Outlet, res.Error.Error())
l.logInternalError(res.Outlet, res.Error.Error())
}
case <-ctx.Done():
if ctx.Err() == context.DeadlineExceeded {
l.logInternalError(nil, "one or more outlets exceeded timeout but will keep waiting anyways")
}
} }
} }
close(ech)
cancel() // make go vet happy
} }