diff --git a/relay/client/client_test.go b/relay/client/client_test.go index b9068ccd9..42cd5bc67 100644 --- a/relay/client/client_test.go +++ b/relay/client/client_test.go @@ -2,8 +2,11 @@ package client import ( "context" + "crypto/rand" + "fmt" "net" "os" + "sync" "testing" "time" @@ -108,6 +111,122 @@ func TestClient(t *testing.T) { } } +func TestDataTransfer(t *testing.T) { + ctx := context.Background() + + testData, err := seedRandomData() + if err != nil { + t.Fatalf("failed to seed random data: %s", err) + } + + srv := server.NewServer(serverURL, false, av) + errChan := make(chan error, 1) + go func() { + listenCfg := server.ListenerConfig{Address: serverListenAddr} + err := srv.Listen(listenCfg) + if err != nil { + errChan <- err + } + }() + + defer func() { + err := srv.Close() + if err != nil { + t.Errorf("failed to close server: %s", err) + } + }() + + // wait for server to start + if err := waitForServerToStart(errChan); err != nil { + t.Fatalf("failed to start server: %s", err) + } + + peerPairs := 50 + clientsSender := make([]*Client, peerPairs) + for i := 0; i < cap(clientsSender); i++ { + c := NewClient(ctx, serverURL, hmacTokenStore, "sender-"+fmt.Sprint(i)) + err := c.Connect() + if err != nil { + t.Fatalf("failed to connect to server: %s", err) + } + clientsSender[i] = c + } + + clientsReceiver := make([]*Client, peerPairs) + for i := 0; i < cap(clientsSender); i++ { + c := NewClient(ctx, serverURL, hmacTokenStore, "receiver-"+fmt.Sprint(i)) + err := c.Connect() + if err != nil { + t.Fatalf("failed to connect to server: %s", err) + } + clientsReceiver[i] = c + } + + connsSender := make([]net.Conn, 0, peerPairs) + connsReceiver := make([]net.Conn, 0, peerPairs) + for i := 0; i < len(clientsSender); i++ { + conn, err := clientsSender[i].OpenConn("receiver-" + fmt.Sprint(i)) + if err != nil { + t.Fatalf("failed to bind channel: %s", err) + } + connsSender = append(connsSender, conn) + + conn, err = clientsReceiver[i].OpenConn("sender-" + fmt.Sprint(i)) + if err != nil { + t.Fatalf("failed to bind channel: %s", err) + } + connsReceiver = append(connsReceiver, conn) + } + + wg := sync.WaitGroup{} + for i := 0; i < len(connsSender); i++ { + wg.Add(2) + go func(i int) { + pieceSize := 1024 + testDataLen := len(testData) + + for j := 0; j < testDataLen; j += pieceSize { + end := j + pieceSize + if end > testDataLen { + end = testDataLen + } + _, err := connsSender[i].Write(testData[j:end]) + if err != nil { + t.Fatalf("failed to write to channel: %s", err) + } + } + wg.Done() + }(i) + + go func(i int) { + for receivedSize := 0; receivedSize < len(testData); { + buf := make([]byte, 8192) + n, err := connsReceiver[i].Read(buf) + if err != nil { + t.Fatalf("failed to read from channel: %s", err) + } + + receivedSize += n + } + wg.Done() + }(i) + } + + wg.Wait() + + for i := 0; i < len(connsSender); i++ { + err := connsSender[i].Close() + if err != nil { + t.Errorf("failed to close connection: %s", err) + } + + err = connsReceiver[i].Close() + if err != nil { + t.Errorf("failed to close connection: %s", err) + } + } +} + func TestRegistration(t *testing.T) { ctx := context.Background() srvCfg := server.ListenerConfig{Address: serverListenAddr} @@ -601,3 +720,12 @@ func waitForServerToStart(errChan chan error) error { } return nil } + +func seedRandomData() ([]byte, error) { + token := make([]byte, 1024*1024*10) + _, err := rand.Read(token) + if err != nil { + return nil, err + } + return token, nil +}