Доброго времен суток. Столкнулся с проблемой, и не могу найти ни какого логического объяснения.
Реализую RPC через RabbitMQ на Go. Схема следующая:
Клиент:
1. Формирует запрос в постоянную очередь "RPC"
2. В сообщении задает уникальный CorrelationId
3. Слушает постоянную очередь "RPC Answers"
4. При получении сообщения проверяет соответствие CorrelationId заданному ранее
5. При соответствии CorrelationId отсылает ACK, закрывает канал, и соединение.
Сервер
1. Слушает очередь "RPC"
2. При получении сообщения, делает ACK
3. Формирует ответ на сообщение и посылает в очередь RPC Answers с установкой CorrelationId равным, CorrelationId в принятом сообщении.
Схема полностью рабочая, все данные проходят корректно, но при тесте, таких циклов проходит не более 3 - 4, после чего клиент зависает в ожидании ответа, а сервер через какое то время отваливается как консьюмер.
Есть какие то мысли?
Ниже верхнейровенвый код основной логики
Клиент:
func main() {
for {
mq, err := rMQ.NewEngine(os.Getenv("MQ_HOST"), os.Getenv("MQ_USER"), os.Getenv("MQ_PASS"), os.Getenv("MQ_PORT"))
if err != nil {
log.Fatal(err)
}
command := Types.PRCCommand{Action: "LoadIpLists", Params: ""}
result, err := mq.RPC(command.ToJson(), "agent")
mq.Close()
if err != nil {
log.Println(err)
}
Res := struct {
Text Types.IpListsDump
}{}
err = json.Unmarshal(result, &Res)
if err != nil {
log.Println(err)
}
_, _ = pp.Println(Res)
time.Sleep(1 * time.Second)
}
}
func (engine Engine) RPC(body []byte, agent string) ([]byte, error) {
rpcID := time.Now().String()
ch, err := engine.Connection.Channel()
if err != nil {
return nil, err
}
ReplayTo, err := ch.QueueDeclare("RPC_ANSWERS", false, false, false, false, nil)
if err != nil {
return nil, err
}
err = ch.Publish("", "RPC", false, false, amqp.Publishing{
ContentType: "application/json",
Body: body,
ReplyTo: ReplayTo.Name,
CorrelationId: rpcID,
})
if err != nil {
return nil, err
}
res, err := ch.Consume(ReplayTo.Name, agent, false, false, false, false, nil)
if err != nil {
log.Println(err)
}
for msg := range res {
if msg.CorrelationId == rpcID {
err := msg.Ack(true)
if err != nil {
log.Println(err)
}
err = ch.Close()
if err != nil {
log.Println(err)
}
return msg.Body, err
}
}
err = ch.Close()
if err != nil {
log.Println(err)
}
return nil, nil
}
Сервер
func main{
mq, error := rMQ.NewEngine(os.Getenv("MQ_HOST"), os.Getenv("MQ_USER"), os.Getenv("MQ_PASS"), os.Getenv("MQ_PORT"))
if error != nil {
log.Fatal(error)
}
go mq.ListenSourceMessage("RPC", false, app.HandlePRCRequest)
for {
_ = true
}
}
func (engine *Engine) ListenSourceMessage(s string, exclusive bool, Func func(msg amqp.Delivery, connection *amqp.Connection)) {
conn, err := amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s:%s/", engine.User, engine.Pass, engine.Host, engine.Port))
fatalOnError(err)
defer conn.Close()
ch, err := conn.Channel()
fatalOnError(err)
_, err = ch.QueueDeclare(s, true, false, false, false, nil)
fatalOnError(err)
msgs, err := ch.Consume(s, "RootServer", true, exclusive, false, false, nil)
if err != nil {
log.Println(err)
}
for d := range msgs {
Func(d, conn)
}
}
func (command *PRCCommand) SendAnswer(msg amqp.Delivery, conn *amqp.Connection) {
channel, e := conn.Channel()
if e != nil {
log.Println(e)
}
defer channel.Close()
jsonbleCommand, e := command.Execute(msg)
if e != nil {
log.Println(e.Error())
bytes, _ := json.Marshal(e)
err := channel.Publish("", msg.ReplyTo, false, false, amqp.Publishing{
Body: bytes,
Type: "application/json",
CorrelationId: msg.CorrelationId,
})
if err != nil {
log.Println(err)
}
return
}
e = channel.Publish("", msg.ReplyTo, false, false, amqp.Publishing{
Body: jsonbleCommand.ToJson(),
Type: "application/json",
CorrelationId: msg.CorrelationId,
})
if e != nil {
log.Println(e)
}
}