@fishlover12345

Как создать connection_pool асинхронно, но в отдельном модуле, чтобы затем использовать его в dp.message_handler для работы с БД асинхронно?

Вот код модуля с асинхронной функцией для создания connection_pool:
import asyncpg

from src.telegram.data import config

# Получение данных для подключения к DB PostgreSQL из файла config.py
host = config.POSTGRES_HOST
user = config.POSTGRES_USERNAME
password = config.POSTGRES_PASSWORD
db_name = config.POSTGRES_DB_NAME

async def create_asyncpg_connection_pool():
    pool = await asyncpg.create_pool(
        user=user,
        password=password,
        database=db_name,
        host=host,
        port='5432',
        min_size=1,
        max_size=20
    )
    return pool


Вот код из другого модуля, содержащего dp.message_handler:

import json

import openai
import asyncpg
from aiogram import types

from src.db_tools.connection_pools import create_asyncpg_connection_pool
from src.gpt_utils.gpt_core import create_advanced_dialog_w_gpt
from src.langchain_data.data_formulator import system, db_faiss
from src.telegram.data import config
from src.telegram.loader import dp

# Получение ключа openai из файла config.py
openai.api_key = config.OPENAI_TOKEN


@dp.message_handler(content_types=['text'])
async def main(message: types.message):
    # Создание переменной для хранения user_id и вопроса пользователя, и username (person_id, user_question,
    # person_name).
    person_id = message.from_user.id
    user_question = message.text
    person_name = message.from_user.full_name
    # <b>Хочу убрать вот этот кусок кода.</b>
    connection_pool = await create_asyncpg_connection_pool

    # Цикл if для того, чтобы chatgpt не отвечала на текстовые команды.
    if not message.text.startswith(('/help', '/website', '/start', '/menu')):
        # Подключение к БД PostgreSQL.
        try:
            # Получение асинхронного соединения из пула <b>(Вот с этим пунктом нужна помощь).</b>
            async with сonnection_pool().acquire() as connection:
                # Начало асинхронной транзакции.
                async with connection.transaction():
                    async with connection.cursor() as cursor:
                        # Проверка наличия user_id в таблице user_data, запись результата запроса в переменную.
                        await cursor.execute("SELECT personal_id FROM user_data WHERE user_id = $1", person_id)
                        user_data_result = await cursor.fetchone()

                        # Если user_data_result пуст, то происходит запись в БД из переменной person_id, person_name.
                        if not user_data_result:
                            await cursor.execute("INSERT INTO user_data (user_id, username) VALUES ($1, $2)",
                                                 person_id, person_name)
                            await connection.commit()

                # Получение данных из (пока что) тестовой таблицы.
                await cursor.execute("SELECT dialog_history FROM telegram_test WHERE user_id = $1", person_id)
                result = await cursor.fetchone()

                # Если user_id уже существует.
                if result:
                    # Извлечение dialog_history для user_id текущего пользователя (конвертация из json -> dict).
                    dialog_history = result[0]
                # Если нет:

                else:
                    # Если пользователя нет в таблице, переменной присваивается пустой словарь.
                    dialog_history = {}

                reply = await create_advanced_dialog_w_gpt(system_prompt=system, data_base=db_faiss,
                                                           user_question=user_question,
                                                           dialog_history=dialog_history)

                # Записывается dialog_history в таблицу БД. (перед этим dialog_history преобразуется в json-формат).

                if result:
                    # Если пользователь уже существует, обновить dialog_history.
                    await cursor.execute("UPDATE telegram_test SET dialog_history = $1 WHERE user_id = $2",
                                         dialog_history, person_id)
                else:
                    # Если пользователь не существует, добавить его в таблицу.
                    await cursor.execute("INSERT INTO telegram_test (user_id, dialog_history) VALUES ($1, $2)",
                                         person_id, dialog_history)

                # Сохранить изменения в базе данных
                await connection.commit()

        except Exception as error:
            # Блок except необходим для корректного отображения ошибки.
            print(error)
            reply = (
                "Ошибка при обращении к Базе Данных, попробуйте позже. Также вы можете сообщить о проблеме с помощью "
                "команды /report")

        # Закрыть соединение с БД.
        finally:
            if connection:
                await connection.close()

        await message.reply(reply)

Я хочу создать connection pool асинхронно, но в другом модуле, а затем использовать его для работы с БД.
Проблема в том, что я не хочу инициировать процесс создания пула соединений каждый раз, внутри хендлера, когда от пользователя поступает новое сообщение (хочу убрать вот эту строчку: connection_pool = await create_asyncpg_connection_pool).
Могу ли я, например, в другом модуле присвоить Dispatcher (aiogram) connection_pool: dp.connection_pool = create_asyncpg_connection_pool(), а затем импортировать его в модуль, содержащий обработчик сообщений:
вместо from src.telegram.loader import dp импортировать DP, уже содержащий в себе connection_pool: from src.telegram.new_module1 import dp из другого модуля? Помогите, пожалуйста :) Я новичок.
  • Вопрос задан
  • 110 просмотров
Пригласить эксперта
Ответы на вопрос 3
Vindicar
@Vindicar
RTFM!
Так не получится, потому что кто кого импортировать будет? Поймаешь циклический импорт.
Можно проще.
# handlers.py
def setup(dp, pool):
    # внутри setup уже объявляешь обработчики
    @dp.message_handler(content_types=['text'])
    def some_handler(message: types.message):
        ...

# main.py
connection_pool = await create_asyncpg_connection_pool()
dp = ...

import handlers
handlers.setup(dp, connection_pool)  # вызывать строго один раз!
Ответ написан
@Everything_is_bad
1. коннект к базе в aiogram принято пробрасывать через middleware, открой гитхаб и сделай поиск по "aiogram DbMiddleware"
2. чтобы не было такого говнокода
if not message.text.startswith(('/help', '/website', '/start', '/menu')):
объяви/зарегестрируй эти команды до этого обработчика.

А тут походу вообще 2я версия, надо про нее забыть и перейти на 3
Ответ написан
nmkru
@nmkru
beginner python programmer
В файле, которым ты запускаешь бота, типа main.py в главной функции открываешь пул и передает конект боту:
# Создаем пул соединений
    pool = await asyncpg.create_pool(
        user=db_user,
        password=db_password,
        database=db_name,
        host=db_host,
        port=db_port
    )
    dp.bot['db_pool'] = pool

spoiler
Это работает только с Aiogram 2, на 3 версии изменилась типизация, поэтому надо делать через наследование.


Далее в функции, в которой тебе нужно подрубить к БД прописываешь pool = message.bot['db_pool'] # Получаешь пул из объекта бота
Ответ написан
Комментировать
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Войти через центр авторизации
Похожие вопросы