test: fix management multiple concurrent peers test

This commit is contained in:
braginini 2021-07-22 15:53:15 +02:00
parent 2172d6f1b9
commit 83ac774264

View File

@ -5,7 +5,6 @@ import (
pb "github.com/golang/protobuf/proto" //nolint pb "github.com/golang/protobuf/proto" //nolint
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/wiretrustee/wiretrustee/encryption" "github.com/wiretrustee/wiretrustee/encryption"
"io"
"io/ioutil" "io/ioutil"
"math/rand" "math/rand"
"net" "net"
@ -34,7 +33,6 @@ var _ = Describe("Management service", func() {
var ( var (
addr string addr string
server *grpc.Server server *grpc.Server
tmpDir string
dataDir string dataDir string
client mgmtProto.ManagementServiceClient client mgmtProto.ManagementServiceClient
serverPubKey wgtypes.Key serverPubKey wgtypes.Key
@ -67,7 +65,7 @@ var _ = Describe("Management service", func() {
server.Stop() server.Stop()
err := conn.Close() err := conn.Close()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
err = os.RemoveAll(tmpDir) err = os.RemoveAll(dataDir)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
}) })
@ -237,29 +235,30 @@ var _ = Describe("Management service", func() {
wg := sync2.WaitGroup{} wg := sync2.WaitGroup{}
wg.Add(initialPeers + initialPeers*additionalPeers) wg.Add(initialPeers + initialPeers*additionalPeers)
var clients []mgmtProto.ManagementService_SyncClient
for _, peer := range peers { for _, peer := range peers {
messageBytes, err := pb.Marshal(&mgmtProto.SyncRequest{}) messageBytes, err := pb.Marshal(&mgmtProto.SyncRequest{})
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
encryptedBytes, err := encryption.Encrypt(messageBytes, serverPubKey, peer) encryptedBytes, err := encryption.Encrypt(messageBytes, serverPubKey, peer)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
// receive stream
peer := peer
go func() {
// open stream // open stream
sync, err := client.Sync(context.TODO(), &mgmtProto.EncryptedMessage{ sync, err := client.Sync(context.TODO(), &mgmtProto.EncryptedMessage{
WgPubKey: peer.PublicKey().String(), WgPubKey: peer.PublicKey().String(),
Body: encryptedBytes, Body: encryptedBytes,
}) })
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
clients = append(clients, sync)
// receive stream
peer := peer
go func() {
for { for {
encryptedResponse := &mgmtProto.EncryptedMessage{} encryptedResponse := &mgmtProto.EncryptedMessage{}
err = sync.RecvMsg(encryptedResponse) err = sync.RecvMsg(encryptedResponse)
if err == io.EOF { if err != nil {
break break
} else if err != nil {
Expect(err).NotTo(HaveOccurred())
} }
decryptedBytes, err := encryption.Decrypt(encryptedResponse.Body, serverPubKey, peer) decryptedBytes, err := encryption.Decrypt(encryptedResponse.Body, serverPubKey, peer)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
@ -268,7 +267,6 @@ var _ = Describe("Management service", func() {
err = pb.Unmarshal(decryptedBytes, resp) err = pb.Unmarshal(decryptedBytes, resp)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
wg.Done() wg.Done()
} }
}() }()
} }
@ -284,6 +282,11 @@ var _ = Describe("Management service", func() {
wg.Wait() wg.Wait()
for _, syncClient := range clients {
err := syncClient.CloseSend()
Expect(err).NotTo(HaveOccurred())
}
}) })
}) })
}) })