Fix forwarder log displaying (#3411)

This commit is contained in:
Viktor Liu 2025-02-28 20:53:01 +01:00 committed by GitHub
parent 6ead0ff95e
commit 36e36414d9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 42 additions and 33 deletions

View File

@ -1,6 +1,8 @@
package forwarder package forwarder
import ( import (
"fmt"
wgdevice "golang.zx2c4.com/wireguard/device" wgdevice "golang.zx2c4.com/wireguard/device"
"gvisor.dev/gvisor/pkg/tcpip" "gvisor.dev/gvisor/pkg/tcpip"
"gvisor.dev/gvisor/pkg/tcpip/header" "gvisor.dev/gvisor/pkg/tcpip/header"
@ -79,3 +81,9 @@ func (e *endpoint) AddHeader(*stack.PacketBuffer) {
func (e *endpoint) ParseHeader(*stack.PacketBuffer) bool { func (e *endpoint) ParseHeader(*stack.PacketBuffer) bool {
return true return true
} }
type epID stack.TransportEndpointID
func (i epID) String() string {
return fmt.Sprintf("%s:%d -> %s:%d", i.LocalAddress, i.LocalPort, i.RemoteAddress, i.RemotePort)
}

View File

@ -25,7 +25,7 @@ func (f *Forwarder) handleICMP(id stack.TransportEndpointID, pkt stack.PacketBuf
// TODO: support non-root // TODO: support non-root
conn, err := lc.ListenPacket(ctx, "ip4:icmp", "0.0.0.0") conn, err := lc.ListenPacket(ctx, "ip4:icmp", "0.0.0.0")
if err != nil { if err != nil {
f.logger.Error("Failed to create ICMP socket for %v: %v", id, err) f.logger.Error("Failed to create ICMP socket for %v: %v", epID(id), err)
f.sendICMPEvent(nftypes.TypeEnd, flowID, id) f.sendICMPEvent(nftypes.TypeEnd, flowID, id)
@ -52,7 +52,7 @@ func (f *Forwarder) handleICMP(id stack.TransportEndpointID, pkt stack.PacketBuf
// For Echo Requests, send and handle response // For Echo Requests, send and handle response
switch icmpHdr.Type() { switch icmpHdr.Type() {
case header.ICMPv4Echo: case header.ICMPv4Echo:
return f.handleEchoResponse(icmpHdr, payload, dst, conn, id) return f.handleEchoResponse(icmpHdr, payload, dst, conn, id, flowID)
case header.ICMPv4EchoReply: case header.ICMPv4EchoReply:
// dont process our own replies // dont process our own replies
return true return true
@ -62,24 +62,24 @@ func (f *Forwarder) handleICMP(id stack.TransportEndpointID, pkt stack.PacketBuf
// For other ICMP types (Time Exceeded, Destination Unreachable, etc) // For other ICMP types (Time Exceeded, Destination Unreachable, etc)
_, err = conn.WriteTo(payload, dst) _, err = conn.WriteTo(payload, dst)
if err != nil { if err != nil {
f.logger.Error("Failed to write ICMP packet for %v: %v", id, err) f.logger.Error("Failed to write ICMP packet for %v: %v", epID(id), err)
return true return true
} }
f.logger.Trace("Forwarded ICMP packet %v type=%v code=%v", f.logger.Trace("Forwarded ICMP packet %v type %v code %v",
id, icmpHdr.Type(), icmpHdr.Code()) epID(id), icmpHdr.Type(), icmpHdr.Code())
return true return true
} }
func (f *Forwarder) handleEchoResponse(icmpHdr header.ICMPv4, payload []byte, dst *net.IPAddr, conn net.PacketConn, id stack.TransportEndpointID) bool { func (f *Forwarder) handleEchoResponse(icmpHdr header.ICMPv4, payload []byte, dst *net.IPAddr, conn net.PacketConn, id stack.TransportEndpointID, flowID uuid.UUID) bool {
if _, err := conn.WriteTo(payload, dst); err != nil { if _, err := conn.WriteTo(payload, dst); err != nil {
f.logger.Error("Failed to write ICMP packet for %v: %v", id, err) f.logger.Error("Failed to write ICMP packet for %v: %v", epID(id), err)
return true return true
} }
f.logger.Trace("Forwarded ICMP packet %v type=%v code=%v", f.logger.Trace("Forwarded ICMP packet %v type %v code %v",
id, icmpHdr.Type(), icmpHdr.Code()) epID(id), icmpHdr.Type(), icmpHdr.Code())
if err := conn.SetReadDeadline(time.Now().Add(5 * time.Second)); err != nil { if err := conn.SetReadDeadline(time.Now().Add(5 * time.Second)); err != nil {
f.logger.Error("Failed to set read deadline for ICMP response: %v", err) f.logger.Error("Failed to set read deadline for ICMP response: %v", err)
@ -116,7 +116,9 @@ func (f *Forwarder) handleEchoResponse(icmpHdr header.ICMPv4, payload []byte, ds
return true return true
} }
f.logger.Trace("Forwarded ICMP echo reply for %v", id) f.logger.Trace("Forwarded ICMP echo reply for %v type %v code %v",
epID(id), icmpHdr.Type(), icmpHdr.Code())
return true return true
} }
@ -130,7 +132,6 @@ func (f *Forwarder) sendICMPEvent(typ nftypes.Type, flowID uuid.UUID, id stack.T
// TODO: handle ipv6 // TODO: handle ipv6
SourceIP: netip.AddrFrom4(id.LocalAddress.As4()), SourceIP: netip.AddrFrom4(id.LocalAddress.As4()),
DestIP: netip.AddrFrom4(id.RemoteAddress.As4()), DestIP: netip.AddrFrom4(id.RemoteAddress.As4()),
SourcePort: id.LocalPort, // TODO: handle type and code
DestPort: id.RemotePort,
}) })
} }

View File

@ -29,7 +29,7 @@ func (f *Forwarder) handleTCP(r *tcp.ForwarderRequest) {
outConn, err := (&net.Dialer{}).DialContext(f.ctx, "tcp", dialAddr) outConn, err := (&net.Dialer{}).DialContext(f.ctx, "tcp", dialAddr)
if err != nil { if err != nil {
r.Complete(true) r.Complete(true)
f.logger.Trace("forwarder: dial error for %v: %v", id, err) f.logger.Trace("forwarder: dial error for %v: %v", epID(id), err)
return return
} }
@ -51,7 +51,7 @@ func (f *Forwarder) handleTCP(r *tcp.ForwarderRequest) {
inConn := gonet.NewTCPConn(&wq, ep) inConn := gonet.NewTCPConn(&wq, ep)
f.logger.Trace("forwarder: established TCP connection %v", id) f.logger.Trace("forwarder: established TCP connection %v", epID(id))
go f.proxyTCP(id, inConn, outConn, ep, flowID) go f.proxyTCP(id, inConn, outConn, ep, flowID)
} }
@ -87,13 +87,13 @@ func (f *Forwarder) proxyTCP(id stack.TransportEndpointID, inConn *gonet.TCPConn
select { select {
case <-ctx.Done(): case <-ctx.Done():
f.logger.Trace("forwarder: tearing down TCP connection %v due to context done", id) f.logger.Trace("forwarder: tearing down TCP connection %v due to context done", epID(id))
return return
case err := <-errChan: case err := <-errChan:
if err != nil && !isClosedError(err) { if err != nil && !isClosedError(err) {
f.logger.Error("proxyTCP: copy error: %v", err) f.logger.Error("proxyTCP: copy error: %v", err)
} }
f.logger.Trace("forwarder: tearing down TCP connection %v", id) f.logger.Trace("forwarder: tearing down TCP connection %v", epID(id))
return return
} }
} }

View File

@ -78,10 +78,10 @@ func (f *udpForwarder) Stop() {
for id, conn := range f.conns { for id, conn := range f.conns {
conn.cancel() conn.cancel()
if err := conn.conn.Close(); err != nil { if err := conn.conn.Close(); err != nil {
f.logger.Debug("forwarder: UDP conn close error for %v: %v", id, err) f.logger.Debug("forwarder: UDP conn close error for %v: %v", epID(id), err)
} }
if err := conn.outConn.Close(); err != nil { if err := conn.outConn.Close(); err != nil {
f.logger.Debug("forwarder: UDP outConn close error for %v: %v", id, err) f.logger.Debug("forwarder: UDP outConn close error for %v: %v", epID(id), err)
} }
conn.ep.Close() conn.ep.Close()
@ -127,10 +127,10 @@ func (f *udpForwarder) cleanup() {
for _, idle := range idleConns { for _, idle := range idleConns {
idle.conn.cancel() idle.conn.cancel()
if err := idle.conn.conn.Close(); err != nil { if err := idle.conn.conn.Close(); err != nil {
f.logger.Debug("forwarder: UDP conn close error for %v: %v", idle.id, err) f.logger.Debug("forwarder: UDP conn close error for %v: %v", epID(idle.id), err)
} }
if err := idle.conn.outConn.Close(); err != nil { if err := idle.conn.outConn.Close(); err != nil {
f.logger.Debug("forwarder: UDP outConn close error for %v: %v", idle.id, err) f.logger.Debug("forwarder: UDP outConn close error for %v: %v", epID(idle.id), err)
} }
idle.conn.ep.Close() idle.conn.ep.Close()
@ -139,7 +139,7 @@ func (f *udpForwarder) cleanup() {
delete(f.conns, idle.id) delete(f.conns, idle.id)
f.Unlock() f.Unlock()
f.logger.Trace("forwarder: cleaned up idle UDP connection %v", idle.id) f.logger.Trace("forwarder: cleaned up idle UDP connection %v", epID(idle.id))
f.sendUDPEvent(nftypes.TypeEnd, idle.conn.flowID, idle.id) f.sendUDPEvent(nftypes.TypeEnd, idle.conn.flowID, idle.id)
} }
@ -160,7 +160,7 @@ func (f *Forwarder) handleUDP(r *udp.ForwarderRequest) {
_, exists := f.udpForwarder.conns[id] _, exists := f.udpForwarder.conns[id]
f.udpForwarder.RUnlock() f.udpForwarder.RUnlock()
if exists { if exists {
f.logger.Trace("forwarder: existing UDP connection for %v", id) f.logger.Trace("forwarder: existing UDP connection for %v", epID(id))
return return
} }
@ -170,7 +170,7 @@ func (f *Forwarder) handleUDP(r *udp.ForwarderRequest) {
dstAddr := fmt.Sprintf("%s:%d", f.determineDialAddr(id.LocalAddress), id.LocalPort) dstAddr := fmt.Sprintf("%s:%d", f.determineDialAddr(id.LocalAddress), id.LocalPort)
outConn, err := (&net.Dialer{}).DialContext(f.ctx, "udp", dstAddr) outConn, err := (&net.Dialer{}).DialContext(f.ctx, "udp", dstAddr)
if err != nil { if err != nil {
f.logger.Debug("forwarder: UDP dial error for %v: %v", id, err) f.logger.Debug("forwarder: UDP dial error for %v: %v", epID(id), err)
f.sendUDPEvent(nftypes.TypeEnd, flowID, id) f.sendUDPEvent(nftypes.TypeEnd, flowID, id)
// TODO: Send ICMP error message // TODO: Send ICMP error message
return return
@ -182,7 +182,7 @@ func (f *Forwarder) handleUDP(r *udp.ForwarderRequest) {
if epErr != nil { if epErr != nil {
f.logger.Debug("forwarder: failed to create UDP endpoint: %v", epErr) f.logger.Debug("forwarder: failed to create UDP endpoint: %v", epErr)
if err := outConn.Close(); err != nil { if err := outConn.Close(); err != nil {
f.logger.Debug("forwarder: UDP outConn close error for %v: %v", id, err) f.logger.Debug("forwarder: UDP outConn close error for %v: %v", epID(id), err)
} }
f.sendUDPEvent(nftypes.TypeEnd, flowID, id) f.sendUDPEvent(nftypes.TypeEnd, flowID, id)
return return
@ -206,10 +206,10 @@ func (f *Forwarder) handleUDP(r *udp.ForwarderRequest) {
f.udpForwarder.Unlock() f.udpForwarder.Unlock()
pConn.cancel() pConn.cancel()
if err := inConn.Close(); err != nil { if err := inConn.Close(); err != nil {
f.logger.Debug("forwarder: UDP inConn close error for %v: %v", id, err) f.logger.Debug("forwarder: UDP inConn close error for %v: %v", epID(id), err)
} }
if err := outConn.Close(); err != nil { if err := outConn.Close(); err != nil {
f.logger.Debug("forwarder: UDP outConn close error for %v: %v", id, err) f.logger.Debug("forwarder: UDP outConn close error for %v: %v", epID(id), err)
} }
f.sendUDPEvent(nftypes.TypeEnd, flowID, id) f.sendUDPEvent(nftypes.TypeEnd, flowID, id)
@ -218,7 +218,7 @@ func (f *Forwarder) handleUDP(r *udp.ForwarderRequest) {
f.udpForwarder.conns[id] = pConn f.udpForwarder.conns[id] = pConn
f.udpForwarder.Unlock() f.udpForwarder.Unlock()
f.logger.Trace("forwarder: established UDP connection to %v", id) f.logger.Trace("forwarder: established UDP connection %v", epID(id))
go f.proxyUDP(connCtx, pConn, id, ep) go f.proxyUDP(connCtx, pConn, id, ep)
} }
@ -226,10 +226,10 @@ func (f *Forwarder) proxyUDP(ctx context.Context, pConn *udpPacketConn, id stack
defer func() { defer func() {
pConn.cancel() pConn.cancel()
if err := pConn.conn.Close(); err != nil { if err := pConn.conn.Close(); err != nil {
f.logger.Debug("forwarder: UDP inConn close error for %v: %v", id, err) f.logger.Debug("forwarder: UDP inConn close error for %v: %v", epID(id), err)
} }
if err := pConn.outConn.Close(); err != nil { if err := pConn.outConn.Close(); err != nil {
f.logger.Debug("forwarder: UDP outConn close error for %v: %v", id, err) f.logger.Debug("forwarder: UDP outConn close error for %v: %v", epID(id), err)
} }
ep.Close() ep.Close()
@ -253,13 +253,13 @@ func (f *Forwarder) proxyUDP(ctx context.Context, pConn *udpPacketConn, id stack
select { select {
case <-ctx.Done(): case <-ctx.Done():
f.logger.Trace("forwarder: tearing down UDP connection %v due to context done", id) f.logger.Trace("forwarder: tearing down UDP connection %v due to context done", epID(id))
return return
case err := <-errChan: case err := <-errChan:
if err != nil && !isClosedError(err) { if err != nil && !isClosedError(err) {
f.logger.Error("proxyUDP: copy error: %v", err) f.logger.Error("proxyUDP: copy error: %v", err)
} }
f.logger.Trace("forwarder: tearing down UDP connection %v", id) f.logger.Trace("forwarder: tearing down UDP connection %v", epID(id))
return return
} }
} }