@HooinKema
Разработчик BackEnd на Python и всякой мелочёвки

Как устранить зависания в запросах к БД Postgres?

Использую скрипт на python для обработки сообщений, поступающих по протоколу MQTT, брокер Mosquitto. Некоторое время скрипт работает корректно, но потом зависает, не выдавая никаких ошибок. Запросы начинают выполняться медленнее, чем поступают новые. Код:
import paho.mqtt.client as mqtt
import json
from contextlib import closing
import psycopg2
from datetime import datetime
import pytz
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
import smtplib
import requests


base_url = 'someurl.com'


def on_connect(client, userdata, rc, sdf):
    print("Connected with result code " + str(rc))


# The callback for when a PUBLISH message is received from the server.

with closing(psycopg2.connect(dbname='db1', user='admin', password='pas', host="localhost", options='-c statement_timeout=0')) as conn:
    with conn.cursor() as cursor:

        def on_message(client, userdata, msg):
            modes = [0, 10, 15, 25]
            postgre_bool = ['false', 'true']
            data = json.loads(msg.payload.decode())

            if 'id' in data:
                        tr_id = data['id']
                        status = "OK"
                        cursor.execute("INSERT INTO api_telelog (imei, log) VALUES (%s, %s)", (tr_id, json.dumps(data)))
                        conn.commit()
                        cursor.execute("SELECT owner_id, latitude, longitude, id FROM api_scooter WHERE tracker_id = %s;", (tr_id, ))
                        is_in_base = False
                        for _ in cursor:
                            is_in_base = True
                        cursor.execute("SELECT owner_id, latitude, longitude, id, status FROM api_scooter WHERE tracker_id = %s;",
                                       (tr_id,))
                        scooter_data = cursor.fetchone()
                        finally_command = "UPDATE api_scooter SET "
                        vars = []
                        if is_in_base:
                            cursor.execute(
                                "SELECT email FROM api_employee WHERE id = %s;",
                                (scooter_data[0],))
                            email = cursor.fetchone()[0]
                            if 'lat' in data:
                                lat = data['lat']
                                lon = data['lon']
                                vars += [lat, lon]
                                finally_command += "latitude = %s, longitude = %s, "
                            else:
                                status = "NG"
                            if 'lamp' in data:
                                lamp = postgre_bool[int(data['lamp'])]
                                vars.append(lamp)
                                finally_command += "lamp = %s, "
                            if 'slock' in data:
                                lock = postgre_bool[int(data['slock'])]
                                vars.append(lock)
                                finally_command += "lock = %s, "
                            if 'enb' in data:
                                engine = postgre_bool[int(data['enb'])]
                                vars.append(engine)
                                finally_command += "engine = %s, "
                            if 'gear' in data:
                                mode = modes[int(data['gear'])]
                                vars.append(mode)
                                finally_command += "speed_limit = %s, "
                            if 'ubat' in data:
                                battery = data['ubat']
                                vars.append(battery)
                                finally_command += "battery = %s, "
                                command = """SELECT min_voltage, max_voltage FROM api_scooter WHERE tracker_id = %s;"""
                                cursor.execute(command, (tr_id, ))
                                voltages = cursor.fetchone()
                                percent = ((battery/1000) - voltages[0])/(voltages[1] - voltages[0])
                                if ((percent*100) < 20) and scooter_data[4] != "LB":
                                    send_alert_to_mail("Заряд самоката id " + str(scooter_data[3]) + " ниже 20% ", email)
                                    status = "LB"
                            if 'sat' in data:
                                gps = data['sat']
                                vars.append(gps)
                                finally_command += "gps = %s, "
                                if int(data['sat']) == 0:
                                    status = 'NG'
                                    send_alert_to_mail("Потеряно GPS соединение с самокатом id " + str(scooter_data[3]), email)
                            if 'csq' in data:
                                gsm = data['csq']
                                vars.append(gsm)
                                finally_command += "gsm = %s, "
                            if 'alarm' in data:
                                if data['alarm'] != 0:
                                    send_alert_to_mail("Поптыка угона самоката " + str(scooter_data[3]) + " " +
                                                       str(scooter_data[1]) + ", " + str(scooter_data[2]), email)
                                    status = "HJ"
                            if 'ver' in data:
                                vars.append(data['ver'])
                                finally_command += "firmware_version = %s, "
                            finally_command += "last_ping = %s, alert_status = %s WHERE tracker_id = %s;"
                            vars += [pytz.utc.localize(datetime.utcnow()), status, tr_id]
                            cursor.execute(finally_command, vars)
                            conn.commit()


        client = mqtt.Client()
        client.on_connect = on_connect
        client.on_message = on_message
        client.tls_set()
        client.tls_insecure_set(True)

        client.connect("www.someurl.ru", 8883, 60)

        client.subscribe("scooter")

        client.loop_forever()

Пожалуйста, объясните причину такого поведения и способы устранить ошибку. Заранее спасибо!
  • Вопрос задан
  • 83 просмотра
Решения вопроса 1
Timtaran
@Timtaran
Начинающий программист.
Причина скорее всего в пинге к базе данных(если она удаленная), решение - поднять свой сервер PSQL
Ответ написан
Пригласить эксперта
Ответы на вопрос 2
romesses
@romesses
Backend инженер
К коду есть замечания:
Свалено все в кучу (требуется рефактринг)
Риск инъекций SQL. Используйте SQL query builder/ORM.
Нет проверок результата работы с СУБД.

Как решить:

send_alert_to_mail синхронна и вызывает блокировку - сделать асинхронным вызовом и даже отправлять в очередь специально для отправки писем.

Обвернуть все вызовы БД декоратором с замером времени исполнения. Обычно, проблема не в СУБД, а во взаимодействии с ней и всякими блокировками, как с send_alert_to_mail.

Поптыка
очепятка :-)
Ответ написан
dimonchik2013
@dimonchik2013
совет, который уже дан - ничего не стоит
маловато кода

но в любом случае
https://habr.com/ru/post/486710/
Ответ написан
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Войти через центр авторизации
Похожие вопросы