resp, err := srv.RPCClient.Call(topic, payload)
if err != nil {
srv.logger.Error("RPC call error. ", err)
return err
}
func (c *AMQPClient) Call(topic string, body []byte) (response []byte, err error) {
// ...
// publish message with CorrelationId
// ...
// Consume messages in loop
for msg := range msgs {
if corrID == msg.CorrelationId {
return msg.Body, nil
}
}
err = errors.New("lost CorrelationId=" + corrID)
c.logger.Error(err)
return nil, err
}
ctx, cancel := context.WithTimeout(context.Background(), 1500*time.Millisecond)
defer cancel()
resp, err := srv.RPCClient.Call(ctx, topic, payload)
if err != nil { // должен вернуть ошибку, если таймаут
srv.logger.Error("RPC call error. ", err)
return err
}
func (c *AMQPClient) Call(ctx context.Context, topic string, body []byte) (response []byte, err error) {
// ...
// publish message with CorrelationId
// ...
// Consume messages in loop
/*
for msg := range msgs {
if corrID == msg.CorrelationId {
return msg.Body, nil
}
}
*/
for {
select {
case msg := <-msgs:
if corrID == msg.CorrelationId {
return msg.Body, nil
}
case <-ctx.Done():
return nil, ctx.Err()
}
}
err = errors.New("lost CorrelationId=" + corrID) // на этой строчке теперь сообщает: unreachable code
c.logger.Error(err)
return nil, err
}
case <-ctx.Done():
return nil, ctx.Err()
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
resp, err := srv.RPCClient.Call(ctx, topic, payload)
if err != nil { // тут вернет ошибку "context cancelled" при таймауте
srv.logger.Error("RPC call error. ", err)
return err
}
case <-ctx.Done():
return nil, ctx.Err()
}
case <-ctx.Done():
err = fmt.Errorf("lost CorrelationId=%s %s", corrID, err.Error())
c.logger.Error(err)
return nil, ctx.Err()
}