mirror of
https://github.com/zrepl/zrepl.git
synced 2025-01-03 12:58:57 +01:00
839eccf513
- make TCPOutlet fully asynchronous, dropping messages if connection is not fast enough - syslog is just fine for now, local anyways - stdout same thing refs #26
162 lines
3.3 KiB
Go
162 lines
3.3 KiB
Go
package cmd
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"crypto/tls"
|
|
"github.com/pkg/errors"
|
|
"github.com/zrepl/zrepl/logger"
|
|
"io"
|
|
"log/syslog"
|
|
"net"
|
|
"time"
|
|
)
|
|
|
|
type WriterOutlet struct {
|
|
Formatter EntryFormatter
|
|
Writer io.Writer
|
|
}
|
|
|
|
func (h WriterOutlet) WriteEntry(entry logger.Entry) error {
|
|
bytes, err := h.Formatter.Format(&entry)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
_, err = h.Writer.Write(bytes)
|
|
h.Writer.Write([]byte("\n"))
|
|
return err
|
|
}
|
|
|
|
type TCPOutlet struct {
|
|
formatter EntryFormatter
|
|
// Specifies how much time must pass between a connection error and a reconnection attempt
|
|
// Log entries written to the outlet during this time interval are silently dropped.
|
|
connect func(ctx context.Context) (net.Conn, error)
|
|
entryChan chan *bytes.Buffer
|
|
}
|
|
|
|
func NewTCPOutlet(formatter EntryFormatter, network, address string, tlsConfig *tls.Config, retryInterval time.Duration) *TCPOutlet {
|
|
|
|
connect := func(ctx context.Context) (conn net.Conn, err error) {
|
|
deadl, ok := ctx.Deadline()
|
|
if !ok {
|
|
deadl = time.Time{}
|
|
}
|
|
dialer := net.Dialer{
|
|
Deadline: deadl,
|
|
}
|
|
if tlsConfig != nil {
|
|
conn, err = tls.DialWithDialer(&dialer, network, address, tlsConfig)
|
|
} else {
|
|
conn, err = dialer.DialContext(ctx, network, address)
|
|
}
|
|
return
|
|
}
|
|
|
|
entryChan := make(chan *bytes.Buffer, 1) // allow one message in flight while previos is in io.Copy()
|
|
|
|
o := &TCPOutlet{
|
|
formatter: formatter,
|
|
connect: connect,
|
|
entryChan: entryChan,
|
|
}
|
|
|
|
go o.outLoop(retryInterval)
|
|
|
|
return o
|
|
}
|
|
|
|
// FIXME: use this method
|
|
func (h *TCPOutlet) Close() {
|
|
close(h.entryChan)
|
|
}
|
|
|
|
func (h *TCPOutlet) outLoop(retryInterval time.Duration) {
|
|
|
|
var retry time.Time
|
|
var conn net.Conn
|
|
for msg := range h.entryChan {
|
|
var err error
|
|
for conn == nil {
|
|
time.Sleep(time.Until(retry))
|
|
ctx, cancel := context.WithDeadline(context.TODO(), time.Now().Add(retryInterval))
|
|
conn, err = h.connect(ctx)
|
|
cancel()
|
|
if err != nil {
|
|
retry = time.Now().Add(retryInterval)
|
|
conn = nil
|
|
}
|
|
}
|
|
conn.SetWriteDeadline(time.Now().Add(retryInterval))
|
|
_, err = io.Copy(conn, msg)
|
|
if err != nil {
|
|
retry = time.Now().Add(retryInterval)
|
|
conn.Close()
|
|
conn = nil
|
|
}
|
|
}
|
|
}
|
|
|
|
func (h *TCPOutlet) WriteEntry(e logger.Entry) error {
|
|
|
|
ebytes, err := h.formatter.Format(&e)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
buf := new(bytes.Buffer)
|
|
buf.Write(ebytes)
|
|
buf.WriteString("\n")
|
|
|
|
select {
|
|
case h.entryChan <- buf:
|
|
return nil
|
|
default:
|
|
return errors.New("connection broken or not fast enough")
|
|
}
|
|
}
|
|
|
|
type SyslogOutlet struct {
|
|
Formatter EntryFormatter
|
|
RetryInterval time.Duration
|
|
writer *syslog.Writer
|
|
lastConnectAttempt time.Time
|
|
}
|
|
|
|
func (o *SyslogOutlet) WriteEntry(entry logger.Entry) error {
|
|
|
|
bytes, err := o.Formatter.Format(&entry)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
s := string(bytes)
|
|
|
|
if o.writer == nil {
|
|
now := time.Now()
|
|
if now.Sub(o.lastConnectAttempt) < o.RetryInterval {
|
|
return nil // not an error toward logger
|
|
}
|
|
o.writer, err = syslog.New(syslog.LOG_LOCAL0, "zrepl")
|
|
o.lastConnectAttempt = time.Now()
|
|
if err != nil {
|
|
o.writer = nil
|
|
return err
|
|
}
|
|
}
|
|
|
|
switch entry.Level {
|
|
case logger.Debug:
|
|
return o.writer.Debug(s)
|
|
case logger.Info:
|
|
return o.writer.Info(s)
|
|
case logger.Warn:
|
|
return o.writer.Warning(s)
|
|
case logger.Error:
|
|
return o.writer.Err(s)
|
|
default:
|
|
return o.writer.Err(s) // write as error as reaching this case is in fact an error
|
|
}
|
|
|
|
}
|