[client] Add initiator field to ack (#3563)

added the new field and client handling
This commit is contained in:
Maycon Santos
2025-03-22 22:22:34 +01:00
committed by GitHub
parent 99b41543b8
commit 8b4c0c58e4
3 changed files with 80 additions and 60 deletions

View File

@ -83,7 +83,11 @@ func (c *GRPCClient) Close() error {
func (c *GRPCClient) Receive(ctx context.Context, interval time.Duration, msgHandler func(msg *proto.FlowEventAck) error) error {
backOff := defaultBackoff(ctx, interval)
operation := func() error {
return c.establishStreamAndReceive(ctx, msgHandler)
err := c.establishStreamAndReceive(ctx, msgHandler)
if err != nil {
log.Errorf("receive failed: %v", err)
}
return err
}
if err := backoff.Retry(operation, backOff); err != nil {
@ -126,6 +130,11 @@ func (c *GRPCClient) receive(stream proto.FlowService_EventsClient, msgHandler f
return fmt.Errorf("receive from stream: %w", err)
}
if msg.IsInitiator {
log.Tracef("received initiator message from flow receiver")
continue
}
if err := msgHandler(msg); err != nil {
return fmt.Errorf("handle message: %w", err)
}