@desbiger
разработчик

RabbitMQ RPC отрабатывает только 3 — 4 раза после чего отваливаются консьюмеры?

Доброго времен суток. Столкнулся с проблемой, и не могу найти ни какого логического объяснения.
Реализую RPC через RabbitMQ на Go. Схема следующая:

Клиент:
1. Формирует запрос в постоянную очередь "RPC"
2. В сообщении задает уникальный CorrelationId
3. Слушает постоянную очередь "RPC Answers"
4. При получении сообщения проверяет соответствие CorrelationId заданному ранее
5. При соответствии CorrelationId отсылает ACK, закрывает канал, и соединение.

Сервер
1. Слушает очередь "RPC"
2. При получении сообщения, делает ACK
3. Формирует ответ на сообщение и посылает в очередь RPC Answers с установкой CorrelationId равным, CorrelationId в принятом сообщении.

Схема полностью рабочая, все данные проходят корректно, но при тесте, таких циклов проходит не более 3 - 4, после чего клиент зависает в ожидании ответа, а сервер через какое то время отваливается как консьюмер.
Есть какие то мысли?

Ниже верхнейровенвый код основной логики

Клиент:
func main() {


	for {
		mq, err := rMQ.NewEngine(os.Getenv("MQ_HOST"), os.Getenv("MQ_USER"), os.Getenv("MQ_PASS"), os.Getenv("MQ_PORT"))

		if err != nil {
			log.Fatal(err)
		}

		command := Types.PRCCommand{Action: "LoadIpLists", Params: ""}

		result, err := mq.RPC(command.ToJson(), "agent")
		mq.Close()
		if err != nil {
			log.Println(err)
		}
		Res := struct {
			Text Types.IpListsDump
		}{}
		err = json.Unmarshal(result, &Res)
		if err != nil {
			log.Println(err)
		}

		_, _ = pp.Println(Res)
		time.Sleep(1 * time.Second)

	}

}

func (engine Engine) RPC(body []byte, agent string) ([]byte, error) {

	rpcID := time.Now().String()
	ch, err := engine.Connection.Channel()

	if err != nil {
		return nil, err
	}

	ReplayTo, err := ch.QueueDeclare("RPC_ANSWERS", false, false, false, false, nil)
	if err != nil {
		return nil, err
	}

	err = ch.Publish("", "RPC", false, false, amqp.Publishing{
		ContentType:   "application/json",
		Body:          body,
		ReplyTo:       ReplayTo.Name,
		CorrelationId: rpcID,
	})

	if err != nil {

		return nil, err
	}

	res, err := ch.Consume(ReplayTo.Name, agent, false, false, false, false, nil)

	if err != nil {
		log.Println(err)
	}
	for msg := range res {
		if msg.CorrelationId == rpcID {
			err := msg.Ack(true)
			if err != nil {
				log.Println(err)
			}

			err = ch.Close()
			if err != nil {
				log.Println(err)
			}

			return msg.Body, err
		}
	}
	err = ch.Close()
	if err != nil {
		log.Println(err)
	}

	return nil, nil
}


Сервер
func main{
       mq, error := rMQ.NewEngine(os.Getenv("MQ_HOST"), os.Getenv("MQ_USER"), os.Getenv("MQ_PASS"), os.Getenv("MQ_PORT"))
	if error != nil {
		log.Fatal(error)
	}
  go mq.ListenSourceMessage("RPC", false, app.HandlePRCRequest)
	for {
		_ = true
	}
}

func (engine *Engine) ListenSourceMessage(s string, exclusive bool, Func func(msg amqp.Delivery, connection *amqp.Connection)) {

	conn, err := amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s:%s/", engine.User, engine.Pass, engine.Host, engine.Port))
	fatalOnError(err)

	defer conn.Close()

	ch, err := conn.Channel()
	fatalOnError(err)
	_, err = ch.QueueDeclare(s, true, false, false, false, nil)
	fatalOnError(err)

	msgs, err := ch.Consume(s, "RootServer", true, exclusive, false, false, nil)

	if err != nil {
		log.Println(err)
	}
	for d := range msgs {
		Func(d, conn)
	}
}

func (command *PRCCommand) SendAnswer(msg amqp.Delivery, conn *amqp.Connection) {

	channel, e := conn.Channel()
	if e != nil {
		log.Println(e)
	}
	defer channel.Close()

	jsonbleCommand, e := command.Execute(msg)
	if e != nil {
		log.Println(e.Error())
		bytes, _ := json.Marshal(e)
		err := channel.Publish("", msg.ReplyTo, false, false, amqp.Publishing{
			Body:          bytes,
			Type:          "application/json",
			CorrelationId: msg.CorrelationId,
		})
		if err != nil {
			log.Println(err)
		}
		return
	}
	e = channel.Publish("", msg.ReplyTo, false, false, amqp.Publishing{
		Body: jsonbleCommand.ToJson(),
		Type: "application/json",
		CorrelationId: msg.CorrelationId,
	})
	if e != nil {
		log.Println(e)
	}
}
  • Вопрос задан
  • 407 просмотров
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Войти через центр авторизации
Похожие вопросы