В принципе, сам код рабочий (создавался с целью эксперимента) и выполняет нужные функции. Рандомно (на мой взгляд) по времени выпадает данное исключение, что прерывает работу программы. Возможно ли как-то доработать программу или что-то в конфигурации RabbitMQ?
Правильно ли я понимаю, что код обрабатывается слишком долго и кролик рвет соединение или причина в чем-то другом?
[root@localhost opt]# python3 receive_rabbit.py
Traceback (most recent call last):
File "receive_rabbit.py", line 62, in
channel.start_consuming()
File "/usr/local/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 1865, in start_consuming
self._process_data_events(time_limit=None)
File "/usr/local/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 2026, in _process_data_events
self.connection.process_data_events(time_limit=time_limit)
File "/usr/local/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 824, in process_data_events
self._flush_output(common_terminator)
File "/usr/local/lib/python3.6/site-packages/pika/adapters/blocking_connection.py", line 523, in _flush_output
raise self._closed_result.value.error
pika.exceptions.StreamLostError: Transport indicated EOF
Лог RabbitMQ
=ERROR REPORT==== 6-Jun-2021::23:56:03 ===
closing AMQP connection <0.588.0> (172.17.0.1:38714 -> 172.17.0.2:5672):
{writer,send_failed,{error,timeout}}
Сам код
import pika
import psycopg2
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost', port=5672, credentials=pika.PlainCredentials('user', 'passwd'), heartbeat=0))
channel = connection.channel()
channel.queue_declare(queue='telegraf')
def callback(ch, method, properties, body):
body=str(body).split('\\n')
body[0]=body[0].replace("b'",'')
i = 0
for i in range(len(body)):
conn = psycopg2.connect(dbname='dbname', user='user', password='password', host='host')
cursor = conn.cursor()
cursor.execute("SET application_name = 'TransferFromRabbitToPG'")
if "'"not in body[i]:
str_body = body[i].split(' ')
tel_value = str_body[1]
tel_timestamp = str_body[2]
str_type_body = str_body[0].split('.')
tel_host = str_type_body[0]
tel_type = str_type_body[1]
counter = 2
tel_column = ''
for counter in range (len(str_type_body)):
if counter > 1:
tel_column = tel_column + str_type_body[counter] + '_'
tel_column = tel_column.replace('-','_')
#print('host: '+tel_host)
#print('type: '+tel_type)
#print('column: '+tel_column)
#print('value: '+tel_value)
#print('time: '+tel_timestamp)
#print(len(str_type_body))
cursor.execute('BEGIN')
cursor.execute('CREATE SCHEMA IF NOT EXISTS '+tel_host)
cursor.execute('CREATE TABLE IF NOT EXISTS '+tel_host+'."'+tel_type+'" (time bigserial PRIMARY KEY)')
try:
cursor.execute('CREATE INDEX IF NOT EXISTS index_'+tel_host+'_'+tel_type.replace('-','_')+' ON '+tel_host+'."'+tel_type+'" (time DESC)')
cursor.execute('ALTER TABLE '+tel_host+'."'+tel_type+'" ADD COLUMN IF NOT EXISTS '+tel_column+' real')
except psycopg2.errors.DeadlockDetected:
per_null=""
cursor.execute('COMMIT')
try:
cursor.execute('INSERT INTO '+tel_host+'."'+tel_type+'" (time) VALUES (\''+tel_timestamp+'\')')
except psycopg2.errors.UniqueViolation:
per_null=""
cursor.execute('BEGIN')
cursor.execute('UPDATE '+tel_host+'."'+tel_type+'" SET '+tel_column+' = \''+tel_value+'\' WHERE time = \''+tel_timestamp+'\'')
cursor.execute('COMMIT')
cursor.close()
conn.close()
channel.basic_consume(
"telegraf",
callback,
auto_ack=True,
exclusive=False,
consumer_tag=None,
arguments=None)
channel.start_consuming()