romesses
@romesses
Backend инженер

Как корректно ограничить вызов RPC по таймауту?

Используется вызов RPC посредством AMQP (адаптирован с примера в rpc_client.go): после отправки сообщения слушаем, ожидая сообщение с заданным CorrelationId.
Исходный фрагмент кода

resp, err := srv.RPCClient.Call(topic, payload)
if err != nil {
	srv.logger.Error("RPC call error. ", err)
	return err
}


func (c *AMQPClient) Call(topic string, body []byte) (response []byte, err error) {
	// ...
	// publish message with CorrelationId
	// ...

	// Consume messages in loop
	for msg := range msgs {
		if corrID == msg.CorrelationId {
			return msg.Body, nil
		}
	}

	err = errors.New("lost CorrelationId=" + corrID)
	c.logger.Error(err)

	return nil, err
}



Теперь необходимо добавить таймаут операции вызова. Я решил, что необходимо добавить контекст внутрь Call.
Делаю так

ctx, cancel := context.WithTimeout(context.Background(), 1500*time.Millisecond)
defer cancel()

resp, err := srv.RPCClient.Call(ctx, topic, payload)
if err != nil { // должен вернуть ошибку, если таймаут
	srv.logger.Error("RPC call error. ", err)
	return err
}


func (c *AMQPClient) Call(ctx context.Context, topic string, body []byte) (response []byte, err error) {
	// ...
	// publish message with CorrelationId
	// ...

	// Consume messages in loop
	/*
		for msg := range msgs {
			if corrID == msg.CorrelationId {
				return msg.Body, nil
			}
		}
	*/

	for {
		select {
		case msg := <-msgs:
			if corrID == msg.CorrelationId {
				return msg.Body, nil
			}
		case <-ctx.Done():
			return nil, ctx.Err()
		}
	}

	err = errors.New("lost CorrelationId=" + corrID) //  на этой строчке теперь сообщает: unreachable code
	c.logger.Error(err)

	return nil, err
}


Как сделать правильно, избавившись от "unreachable code"?
  • Вопрос задан
  • 61 просмотр
Решения вопроса 1
unreachable code можно смело убрать
У вас же и так возвращается ошибка в случае отмены контекста:
case <-ctx.Done():
      return nil, ctx.Err()

Но нужно понимать, что контекст будет по таймауту отменяться только если этот таймаут контексту задан.
Например:
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
resp, err := srv.RPCClient.Call(ctx, topic, payload)
if err != nil { // тут вернет ошибку "context cancelled" при таймауте
  srv.logger.Error("RPC call error. ", err)
  return err
}
Ответ написан
Пригласить эксперта
Ответы на вопрос 1
Можно изменить код, к примеру, таким образом. А все что после цикла - убрать. У вас сейчас едиснтвенный способ покинуть цикл - return с завершением текущей функции, поэтому весь код после цикла никогда не будет выполнен

case <-ctx.Done():
  return nil, ctx.Err()
}


case <-ctx.Done():
  err = fmt.Errorf("lost CorrelationId=%s %s", corrID, err.Error())
  c.logger.Error(err)
  return nil, ctx.Err()
}
Ответ написан
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Войти через центр авторизации
Похожие вопросы