Задать вопрос
@Maxwell012

Как создать пул соединений asyncpg?

У меня есть бот в котором создается поток под selenium, раньше я работал с бд mysql, для работы с ней использовал библиотеку pymysql-pool с помощью которой я создавал пул соединений, потом создавал одно соединение для основного потока и одно для selenium`а. Возникла потребность перейти на postgresql, немного погуглив я нашел библиотеку asyncpg в которой можно создавать пул, но этот пул у меня не получается реализовать.
Создание пула:
async def create_pool():
    while True:
        try:
            pool = await asyncpg.create_pool(**config)
            logger_db.info('--- Database connection successful ---')
            return pool
        except Exception as ex:
            print(ex)
            logger_db.warning('Connection refused...', exc_info=True)

Создаю соединение для основного потока, которое я потом передаю в каждую функцию которая работает с бд:
import asyncio
from data_base.postgresql import create_pool

loop = asyncio.get_event_loop()
pool = loop.run_until_complete(create_pool())
main_connection = pool.acquire()

Также еще надо сказать что я не закрываю это соединение, второе подключение создается уже не посредственно в втором потоке (но про него нету смысла говорить так как у меня не работает соединение в основном потоке).
Также пример функции которая работает с бд:
async def db_get_data(id, connection):
    try:
        sql = 'SELECT surname, email, time, incorrect_answer ' \
              f'FROM users WHERE id={id}'
        async with connection as cursor:
            data = await cursor.fetch(sql)

        if data:
            return data[0]
        else:
            return 0
    except:
        logger_db.critical('db_get_data', exc_info=True)

Тут хочу остановится на строчке async with connection as cursor, эта строчка прописана в каждой функции, из-за нее происходит ошибка: 'Соединение уже было установлено', по идеи это строчка лишняя, но если код написать так:
async def db_get_data(id, connection):
    try:
        sql = 'SELECT surname, email, time, incorrect_answer ' \
              f'FROM users WHERE id={id}'
        data = await connection.fetch(sql)

        if data:
            return data[0]
        else:
            return 0
    except:
        logger_db.critical('db_get_data', exc_info=True)

То появится ошибка что connection не имеет метода fetch().
В документации эти моменты не расписаны, просмотрев другие примеры не нашел того что мне надо.
У меня все функции по работе с бд в одном файле и единственный рабочий метод это сделать переменную pool глобальной и использовать место async with connection as cursor ------ async with pool.acquire() as cursor, но тогда проблема возникает в другом, как создать разные соединения для потоков

Также еще один вопрос, я знаю что вписывать переменные в запрос как я это сделал, не правильно, но я пробовал через список передавать переменные, а в sql запрос писать %s, но компилятор не распознает %s, выдает ошибку
  • Вопрос задан
  • 1497 просмотров
Подписаться 1 Простой Комментировать
Решения вопроса 2
@Maxwell012 Автор вопроса
Мой бот написан на библиотеки aiogram, то есть весь код является асинхронным, я хотел чтобы и бд работала с асинхронной библиотекой, но в итоге так и не получилось разобраться и я сменил asyncpg на синхр библиотеку psycopg2, так как бд у меня не большая по размерам и запросы к бд у меня короткие из-за этого у меня не возникает проблем с ожиданием.
Ответ написан
Комментировать
@MYATAKZ
Попробуйте так

class DB:

    def __init__(self, config: dict):
        self.config = config
        self._pool = None


    async def get_pool(self):
        return self._pool

    async def create_pool(self):
        self._pool = await asyncpg.create_pool(**self.config)

    async def select(self):
        async with self._pool.acquire() as conn:
            print(await conn.fetch('''SELECT * FROM names'''))


async def select(pool):
    async with pool.acquire() as conn:
        print(await conn.fetch('''SELECT * FROM names'''))


db = DB(data)


async def main():
    await db.create_pool()
    await db.select()

    pool = await db.get_pool()

    await select(pool)


asyncio.run(main())
Ответ написан
Пригласить эксперта
Ответы на вопрос 1
@Bobrikk
Попробуй db = await asyncpg.create_pool(**credentials)
Ответ написан
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы