netbird/relay/testec2/relay.go

177 lines
4.1 KiB
Go
Raw Normal View History

//go:build linux || darwin
package main
import (
"context"
"fmt"
"net"
"sync"
"time"
log "github.com/sirupsen/logrus"
"github.com/netbirdio/netbird/relay/auth/hmac"
"github.com/netbirdio/netbird/relay/client"
)
var (
hmacTokenStore = &hmac.TokenStore{}
)
func relayTransfer(serverConnURL string, testData []byte, peerPairs int) {
connsSender := prepareConnsSender(serverConnURL, peerPairs)
defer func() {
for i := 0; i < len(connsSender); i++ {
err := connsSender[i].Close()
if err != nil {
log.Errorf("failed to close connection: %s", err)
}
}
}()
wg := sync.WaitGroup{}
wg.Add(len(connsSender))
for _, conn := range connsSender {
go func(conn net.Conn) {
defer wg.Done()
runWriter(conn, testData)
}(conn)
}
wg.Wait()
}
func runWriter(conn net.Conn, testData []byte) {
si := NewStartInidication(time.Now(), len(testData))
_, err := conn.Write(si)
if err != nil {
log.Errorf("failed to write to channel: %s", err)
return
}
log.Infof("sent start indication")
pieceSize := 1024
testDataLen := len(testData)
for j := 0; j < testDataLen; j += pieceSize {
end := j + pieceSize
if end > testDataLen {
end = testDataLen
}
_, writeErr := conn.Write(testData[j:end])
if writeErr != nil {
log.Errorf("failed to write to channel: %s", writeErr)
return
}
}
}
func prepareConnsSender(serverConnURL string, peerPairs int) []net.Conn {
ctx := context.Background()
clientsSender := make([]*client.Client, peerPairs)
for i := 0; i < cap(clientsSender); i++ {
c := client.NewClient(ctx, serverConnURL, hmacTokenStore, "sender-"+fmt.Sprint(i))
if err := c.Connect(); err != nil {
log.Fatalf("failed to connect to server: %s", err)
}
clientsSender[i] = c
}
connsSender := make([]net.Conn, 0, peerPairs)
for i := 0; i < len(clientsSender); i++ {
conn, err := clientsSender[i].OpenConn("receiver-" + fmt.Sprint(i))
if err != nil {
log.Fatalf("failed to bind channel: %s", err)
}
connsSender = append(connsSender, conn)
}
return connsSender
}
func relayReceive(serverConnURL string, peerPairs int) []time.Duration {
connsReceiver := prepareConnsReceiver(serverConnURL, peerPairs)
defer func() {
for i := 0; i < len(connsReceiver); i++ {
if err := connsReceiver[i].Close(); err != nil {
log.Errorf("failed to close connection: %s", err)
}
}
}()
durations := make(chan time.Duration, len(connsReceiver))
wg := sync.WaitGroup{}
for _, conn := range connsReceiver {
wg.Add(1)
go func(conn net.Conn) {
defer wg.Done()
duration := runReader(conn)
durations <- duration
}(conn)
}
wg.Wait()
durationsList := make([]time.Duration, 0, len(connsReceiver))
for d := range durations {
durationsList = append(durationsList, d)
if len(durationsList) == len(connsReceiver) {
close(durations)
}
}
return durationsList
}
func runReader(conn net.Conn) time.Duration {
buf := make([]byte, 8192)
n, readErr := conn.Read(buf)
if readErr != nil {
log.Errorf("failed to read from channel: %s", readErr)
return 0
}
si := DecodeStartIndication(buf[:n])
log.Infof("received start indication: %v", si)
receivedSize, err := conn.Read(buf)
if err != nil {
log.Fatalf("failed to read from relay: %s", err)
}
now := time.Now()
rcv := 0
for receivedSize < si.TransferSize {
n, readErr = conn.Read(buf)
if readErr != nil {
log.Errorf("failed to read from channel: %s", readErr)
return 0
}
receivedSize += n
rcv += n
}
return time.Since(now)
}
func prepareConnsReceiver(serverConnURL string, peerPairs int) []net.Conn {
clientsReceiver := make([]*client.Client, peerPairs)
for i := 0; i < cap(clientsReceiver); i++ {
c := client.NewClient(context.Background(), serverConnURL, hmacTokenStore, "receiver-"+fmt.Sprint(i))
err := c.Connect()
if err != nil {
log.Fatalf("failed to connect to server: %s", err)
}
clientsReceiver[i] = c
}
connsReceiver := make([]net.Conn, 0, peerPairs)
for i := 0; i < len(clientsReceiver); i++ {
conn, err := clientsReceiver[i].OpenConn("sender-" + fmt.Sprint(i))
if err != nil {
log.Fatalf("failed to bind channel: %s", err)
}
connsReceiver = append(connsReceiver, conn)
}
return connsReceiver
}