@Warington

"Pika.exceptions.StreamLostError: Transport indicated EOF" rabbitmq — python3 — pgsql куда копать?

В принципе, сам код рабочий (создавался с целью эксперимента) и выполняет нужные функции. Рандомно (на мой взгляд) по времени выпадает данное исключение, что прерывает работу программы. Возможно ли как-то доработать программу или что-то в конфигурации RabbitMQ?
Правильно ли я понимаю, что код обрабатывается слишком долго и кролик рвет соединение или причина в чем-то другом?

[root@localhost opt]# python3 receive_rabbit.py
Traceback (most recent call last):
File "receive_rabbit.py", line 62, in
channel.start_consuming()
File "/usr/local/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 1865, in start_consuming
self._process_data_events(time_limit=None)
File "/usr/local/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 2026, in _process_data_events
self.connection.process_data_events(time_limit=time_limit)
File "/usr/local/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 824, in process_data_events
self._flush_output(common_terminator)
File "/usr/local/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 523, in _flush_output
raise self._closed_result.value.error
pika.exceptions.StreamLostError: Transport indicated EOF
Лог RabbitMQ

=ERROR REPORT==== 6-Jun-2021::23:56:03 ===
closing AMQP connection <0.588.0> (172.17.0.1:38714 -> 172.17.0.2:5672):
{writer,send_failed,{error,timeout}}

Сам код

import pika
import psycopg2
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost', port=5672, credentials=pika.PlainCredentials('user', 'passwd'), heartbeat=0))
channel = connection.channel()

channel.queue_declare(queue='telegraf')


def callback(ch, method, properties, body):
        body=str(body).split('\\n')
        body[0]=body[0].replace("b'",'')
        i = 0
        for i in range(len(body)):
                conn = psycopg2.connect(dbname='dbname', user='user', password='password', host='host')
                cursor = conn.cursor()
                cursor.execute("SET application_name = 'TransferFromRabbitToPG'")
                if "'"not in body[i]:
                        str_body = body[i].split(' ')
                        tel_value = str_body[1]
                        tel_timestamp = str_body[2]
                        str_type_body = str_body[0].split('.')
                        tel_host = str_type_body[0]
                        tel_type = str_type_body[1]
                        counter = 2
                        tel_column = ''
                        for counter in range (len(str_type_body)):
                                if counter > 1:
                                        tel_column = tel_column + str_type_body[counter] + '_'
                        tel_column = tel_column.replace('-','_')
                        #print('host: '+tel_host)
                        #print('type: '+tel_type)
                        #print('column: '+tel_column)
                        #print('value: '+tel_value)
                        #print('time: '+tel_timestamp)
                        #print(len(str_type_body))
                        cursor.execute('BEGIN')
                        cursor.execute('CREATE SCHEMA IF NOT EXISTS '+tel_host)
                        cursor.execute('CREATE TABLE IF NOT EXISTS '+tel_host+'."'+tel_type+'" (time bigserial PRIMARY KEY)')
                        try:
                                cursor.execute('CREATE INDEX IF NOT EXISTS index_'+tel_host+'_'+tel_type.replace('-','_')+' ON '+tel_host+'."'+tel_type+'" (time DESC)')
                                cursor.execute('ALTER TABLE '+tel_host+'."'+tel_type+'" ADD COLUMN IF NOT EXISTS '+tel_column+' real')
                        except psycopg2.errors.DeadlockDetected:
                                per_null=""
                        cursor.execute('COMMIT')
                        try:
                                cursor.execute('INSERT INTO '+tel_host+'."'+tel_type+'" (time) VALUES (\''+tel_timestamp+'\')')
                        except psycopg2.errors.UniqueViolation:
                                per_null=""
                        cursor.execute('BEGIN')
                        cursor.execute('UPDATE '+tel_host+'."'+tel_type+'" SET '+tel_column+' = \''+tel_value+'\' WHERE time = \''+tel_timestamp+'\'')
                        cursor.execute('COMMIT')
                        cursor.close()
                        conn.close()
channel.basic_consume(
                        "telegraf",
                        callback,
                        auto_ack=True,
                        exclusive=False,
                        consumer_tag=None,
                        arguments=None)
channel.start_consuming()

  • Вопрос задан
  • 1365 просмотров
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Войти через центр авторизации
Похожие вопросы