test: add message exchange test timeout

This commit is contained in:
braginini 2021-06-15 18:58:47 +02:00
parent f247f9a2f8
commit 94c0091a7b
3 changed files with 25 additions and 6 deletions

View File

@ -29,7 +29,7 @@ type Client struct {
ctx context.Context ctx context.Context
stream proto.SignalExchange_ConnectStreamClient stream proto.SignalExchange_ConnectStreamClient
//waiting group to notify once stream is connected //waiting group to notify once stream is connected
connWg sync.WaitGroup //todo use a channel instead?? connWg *sync.WaitGroup //todo use a channel instead??
} }
// Close Closes underlying connections to the Signal Exchange // Close Closes underlying connections to the Signal Exchange
@ -55,11 +55,13 @@ func NewClient(ctx context.Context, addr string, key wgtypes.Key) (*Client, erro
return nil, err return nil, err
} }
var wg sync.WaitGroup
return &Client{ return &Client{
realClient: proto.NewSignalExchangeClient(conn), realClient: proto.NewSignalExchangeClient(conn),
ctx: ctx, ctx: ctx,
signalConn: conn, signalConn: conn,
key: key, key: key,
connWg: &wg,
}, nil }, nil
} }
@ -107,8 +109,8 @@ func (c *Client) connect(key string, msgHandler func(msg *proto.Message) error)
// add key fingerprint to the request header to be identified on the server side // add key fingerprint to the request header to be identified on the server side
md := metadata.New(map[string]string{proto.HeaderId: key}) md := metadata.New(map[string]string{proto.HeaderId: key})
ctx := metadata.NewOutgoingContext(c.ctx, md) ctx := metadata.NewOutgoingContext(c.ctx, md)
ctx, cancel := context.WithCancel(ctx) //ctx, cancel := context.WithCancel(ctx)
defer cancel() //defer cancel()
stream, err := c.realClient.ConnectStream(ctx) stream, err := c.realClient.ConnectStream(ctx)

View File

@ -53,7 +53,6 @@ func (s *SignalExchangeServer) ConnectStream(stream proto.SignalExchange_Connect
} }
log.Infof("peer [%s] has successfully connected", p.Id) log.Infof("peer [%s] has successfully connected", p.Id)
for { for {
msg, err := stream.Recv() msg, err := stream.Recv()
if err == io.EOF { if err == io.EOF {

View File

@ -66,7 +66,7 @@ var _ = Describe("Client", func() {
Body: &sigProto.Body{Payload: "pong"}, Body: &sigProto.Body{Payload: "pong"},
}) })
if err != nil { if err != nil {
Fail("failed sending a message to {PeerA}") Fail("failed sending a message to PeerA")
} }
msgReceived.Done() msgReceived.Done()
return nil return nil
@ -83,7 +83,9 @@ var _ = Describe("Client", func() {
Fail("failed sending a message to PeerB") Fail("failed sending a message to PeerB")
} }
msgReceived.Wait() if waitTimeout(&msgReceived, 3*time.Second) {
Fail("test timed out on waiting for peers to exchange messages")
}
Expect(receivedOnA).To(BeEquivalentTo("pong")) Expect(receivedOnA).To(BeEquivalentTo("pong"))
Expect(receivedOnB).To(BeEquivalentTo("ping")) Expect(receivedOnB).To(BeEquivalentTo("ping"))
@ -179,3 +181,19 @@ func startSignal() (*grpc.Server, net.Listener) {
return s, lis return s, lis
} }
// waitTimeout waits for the waitgroup for the specified max timeout.
// Returns true if waiting timed out.
func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
c := make(chan struct{})
go func() {
defer close(c)
wg.Wait()
}()
select {
case <-c:
return false // completed normally
case <-time.After(timeout):
return true // timed out
}
}