Вот код модуля с асинхронной функцией для создания connection_pool:
import asyncpg
from src.telegram.data import config
# Получение данных для подключения к DB PostgreSQL из файла config.py
host = config.POSTGRES_HOST
user = config.POSTGRES_USERNAME
password = config.POSTGRES_PASSWORD
db_name = config.POSTGRES_DB_NAME
async def create_asyncpg_connection_pool():
pool = await asyncpg.create_pool(
user=user,
password=password,
database=db_name,
host=host,
port='5432',
min_size=1,
max_size=20
)
return pool
Вот код из другого модуля, содержащего dp.message_handler:
import json
import openai
import asyncpg
from aiogram import types
from src.db_tools.connection_pools import create_asyncpg_connection_pool
from src.gpt_utils.gpt_core import create_advanced_dialog_w_gpt
from src.langchain_data.data_formulator import system, db_faiss
from src.telegram.data import config
from src.telegram.loader import dp
# Получение ключа openai из файла config.py
openai.api_key = config.OPENAI_TOKEN
@dp.message_handler(content_types=['text'])
async def main(message: types.message):
# Создание переменной для хранения user_id и вопроса пользователя, и username (person_id, user_question,
# person_name).
person_id = message.from_user.id
user_question = message.text
person_name = message.from_user.full_name
# <b>Хочу убрать вот этот кусок кода.</b>
connection_pool = await create_asyncpg_connection_pool
# Цикл if для того, чтобы chatgpt не отвечала на текстовые команды.
if not message.text.startswith(('/help', '/website', '/start', '/menu')):
# Подключение к БД PostgreSQL.
try:
# Получение асинхронного соединения из пула <b>(Вот с этим пунктом нужна помощь).</b>
async with сonnection_pool().acquire() as connection:
# Начало асинхронной транзакции.
async with connection.transaction():
async with connection.cursor() as cursor:
# Проверка наличия user_id в таблице user_data, запись результата запроса в переменную.
await cursor.execute("SELECT personal_id FROM user_data WHERE user_id = $1", person_id)
user_data_result = await cursor.fetchone()
# Если user_data_result пуст, то происходит запись в БД из переменной person_id, person_name.
if not user_data_result:
await cursor.execute("INSERT INTO user_data (user_id, username) VALUES ($1, $2)",
person_id, person_name)
await connection.commit()
# Получение данных из (пока что) тестовой таблицы.
await cursor.execute("SELECT dialog_history FROM telegram_test WHERE user_id = $1", person_id)
result = await cursor.fetchone()
# Если user_id уже существует.
if result:
# Извлечение dialog_history для user_id текущего пользователя (конвертация из json -> dict).
dialog_history = result[0]
# Если нет:
else:
# Если пользователя нет в таблице, переменной присваивается пустой словарь.
dialog_history = {}
reply = await create_advanced_dialog_w_gpt(system_prompt=system, data_base=db_faiss,
user_question=user_question,
dialog_history=dialog_history)
# Записывается dialog_history в таблицу БД. (перед этим dialog_history преобразуется в json-формат).
if result:
# Если пользователь уже существует, обновить dialog_history.
await cursor.execute("UPDATE telegram_test SET dialog_history = $1 WHERE user_id = $2",
dialog_history, person_id)
else:
# Если пользователь не существует, добавить его в таблицу.
await cursor.execute("INSERT INTO telegram_test (user_id, dialog_history) VALUES ($1, $2)",
person_id, dialog_history)
# Сохранить изменения в базе данных
await connection.commit()
except Exception as error:
# Блок except необходим для корректного отображения ошибки.
print(error)
reply = (
"Ошибка при обращении к Базе Данных, попробуйте позже. Также вы можете сообщить о проблеме с помощью "
"команды /report")
# Закрыть соединение с БД.
finally:
if connection:
await connection.close()
await message.reply(reply)
Я хочу создать connection pool
асинхронно, но в другом модуле, а затем использовать его для работы с БД.
Проблема в том, что я не хочу инициировать процесс создания пула соединений каждый раз, внутри хендлера, когда от пользователя поступает новое сообщение (хочу убрать вот эту строчку:
connection_pool = await create_asyncpg_connection_pool).
Могу ли я, например, в другом модуле присвоить Dispatcher (aiogram) connection_pool:
dp.connection_pool = create_asyncpg_connection_pool(), а затем импортировать его в модуль,
содержащий обработчик сообщений:
вместо
from src.telegram.loader import dp импортировать DP, уже содержащий в себе connection_pool:
from src.telegram.new_module1 import dp из другого модуля? Помогите, пожалуйста :) Я новичок.