Задать вопрос
  • AIOGRAM TypeError: State.__init__() got multiple values for argument 'state' в чем может быть ошибка?

    class Question(StatesGroup):
        #Имя юзера
        name=State()
        #Вопрос юзера
        userAsk=State()
    Ответ написан
    Комментировать
  • Можно ли скрапить телеграмм?

    Код
    from pyrogram import Client
    from pyrogram.errors import FloodWait
    import asyncio
    
    
    async def get_post_stats(client: Client, channel_id: int, message_id: int):
        """
        Получает статистику поста: просмотры, реакции и количество комментариев.
    
        Args:
            client: Экземпляр клиента Pyrogram
            channel_id: ID канала
            message_id: ID сообщения
    
        Returns:
            dict: Словарь со статистикой
        """
        try:
            # Получаем сообщение
            message = await client.get_messages(channel_id, message_id, replies=-1)
    
            # Получаем статистику просмотров
            views = message.views if message.views else 0
    
            # Получаем реакции
            reactions = {}
            if message.reactions:
                for reaction in message.reactions.reactions:
                    reaction_id = reaction.custom_emoji_id if reaction.custom_emoji_id else reaction.emoji
                    reactions[reaction_id] = reaction.count
    
            comments_count = await client.get_discussion_replies_count(channel_id, message_id)
            stats = {
                "views": views,
                "reactions": reactions,
                "forwards": message.forwards or 0,
                "comments": comments_count
            }
    
            return stats
    
        except FloodWait as e:
            print(f"Превышен лимит запросов, ждем {e.value} секунд")
            await asyncio.sleep(e.value)
            return await get_post_stats(client, channel_id, message_id)
        except Exception as e:
            print(f"Произошла ошибка: {e}")
            return None
    
    
    # Пример использования
    async def main():
        # Замените на свои данные
        api_id = "0"
        api_hash = "0"
        channel_id = -1 # ID канала
        message_id = 1 # ID сообщения
    
        async with Client("my_account", api_id=api_id, api_hash=api_hash) as app:
            stats = await get_post_stats(app, channel_id, message_id)
            if stats:
                print(f"Просмотры: {stats['views']}")
                print(f"Переслали: {stats['forwards']}")
                print(f"Реакции: {stats['reactions']}")
                print(f"Комментарии: {stats['comments']}")
    
    
    if __name__ == "__main__":
        asyncio.run(main())


    Вывод:
    Просмотры: 1
    Реакции: {'': 1, '❤': 1}
    Комментарии: 1
    Ответ написан
    2 комментария
  • Как получить callback_data из апдейта в aiogram?

    Используйте Callback Data Factory
    Ответ написан
    Комментировать
  • Aiogram 3, FSMContext/State: почему бот отвечает из всех фото только на последнее?

    Следует почитать на счет того как работает медиагруппа в ТГ, 5 фото в одном сообщении = 5 апдейтов.

    Я бы ушел от FSM в сторону Callback Data Factory

    # ...
    from aiogram.filters.callback_data import CallbackData
    
    
    class AnswerCallback(CallbackData, prefix='answer'):
        message_id: int
        answer_type: str
    
    
    @router.message(F.photo)
    async def photo_handle(message: Message):
        await hadnle_text(message)
    
    async def hadnle_text(message: Message):
        builder = InlineKeyboardBuilder()
        builder.add(InlineKeyboardButton(
            text="Answer 1",
            callback_data=AnswerCallback(message_id=message.message_id, answer_type='Answer 1').pack()),
        InlineKeyboardButton(
            text="Answer 2",
            callback_data=AnswerCallback(message_id=message.message_id, answer_type='Answer 2').pack()),
        )
        # Или так
        # builder.button(text="Answer 1", callback_data=AnswerCallback(message_id=message.message_id, answer_type='answer 1'))
        # builder.button(text="Answer 2", callback_data=AnswerCallback(message_id=message.message_id, answer_type='answer 2'))
        await message.answer(
            f"<b>Выберите кнопку</b>",
            reply_markup=builder.as_markup()
        )
    
    @router.callback_query(AnswerCallback.filter())
    async def text_state_callback(callback: CallbackQuery, callback_data: AnswerCallback):
        await bot.send_message(
            chat_id=callback.message.chat.id,
            text=callback_data.answer_type,
            reply_to_message_id=callback_data.message_id,
        )
    Ответ написан
    4 комментария
  • Ошибка при скачивании aiohtpps?

    aiogram==2.25.1 и aiohttp>=3.8.0,<3.9.0 указывают, что они поддерживаются максимально до Python 3.10.
    Вы пытаетесь использовать Python 3.12

    Можете скачать и почитать исходники на PyPI
    Ответ написан
    Комментировать
  • Автоматизация запроса по API в звуковую нейросеть из Excel через python?

    import os
    # import time
    
    import requests
    from openpyxl import load_workbook
    from dotenv import load_dotenv
    
    API_URL = 'https://api.edenai.run/v2/audio/text_to_speech'
    HEADERS = {"Authorization": f"Bearer {os.getenv('API_KEY')}"}
    DEFAULT_LANGUAGE = 'ru-RU'
    DEFAULT_OPTION = 'MALE'
    DEFAULT_VOICE = 'ru-RU_Alexei Syomin'
    
    
    def convert_text_to_speech(row_number: int, _id: int, text: str):
        payload = {
            'providers': 'lovoai',
            'language': DEFAULT_LANGUAGE,
            'option': DEFAULT_OPTION,
            'lovoai': DEFAULT_VOICE,
            'text': text
        }
    
        response = requests.post(API_URL, json=payload, headers=HEADERS)
        result = response.json()
    
        audio_url = result.get('lovoai', {}).get('audio_resource_url')
        if audio_url:
            audio_content = requests.get(audio_url).content
            with open(f'row_{row_number}_ID_{_id}.ogg', 'wb') as audio_file:
                audio_file.write(audio_content)
    
    
    def main(file_name: str):
        book = load_workbook(file_name)
        sheet = book.active
        for row in range(2, sheet.max_row + 1):
            _id = sheet[row][0].value
            text = sheet[row][1].value
            convert_text_to_speech(row, _id, text)
    
            # Установите задержку, если имеется ограничение на кол-во запросов в секунду
            # time.sleep(3)
    
    
    if __name__ == '__main__':
        load_dotenv()
        main('name.xlsx')
    Ответ написан
    3 комментария
  • Кому отправил криптовалюту отправитель?

    Как осуществляются биткойн-транзакции

    Почитайте о вводах и выводах
    Ответ написан
    Комментировать
  • Как правильно регистрировать хендлеры для импорта?

    1. Не нашел в вашем тексте упоминаемого файла update.py
    2. В файле main.py вы пытаетесь зарегистрировать хендлеры в хендлере
    main.py
    import logging
    
    
    async def on_startup(dispatcher):
        logging.basicConfig(
            level=logging.DEBUG,
            format=u'%(filename)s:%(lineno)d #%(levelname)-8s [%(asctime)s] - %(name)s - %(message)s',
        )
        logger.info("Starting bot")
    
        # Импортируем функции регистрации хендлеров
        from update_photo import register_new_photo
        # Регистрируем хендлеры
        register_new_photo(dispatcher)
    
    
    async def on_shutdown(dispatcher):
        logging.warning('Shutting down..')
        await dispatcher.storage.close()
        await dispatcher.storage.wait_closed()
        logging.warning('Bye!')
    
    
    if __name__ == '__main__':
        from aiogram.utils import executor
        from loader import dp
    
        executor.start_polling(dp,
                               skip_updates=True,  # Если нужно пропускаем апдейты
                               on_startup=on_startup,
                               on_shutdown=on_shutdown)

    loader.py

    import os
    
    from aiogram import Bot, Dispatcher
    from aiogram.contrib.fsm_storage.memory import MemoryStorage
    
    bot = Bot(token=os.getenv("TOKEN"))
    storage = MemoryStorage()
    dp = Dispatcher(bot=bot, storage=storage)

    Ответ написан
  • Не могу понять в чем проблема, сохраняет только последние полученные данные, в чем дело?

    Вы создали 3 цикла, в которых перезаписываете значение переменной до добавления в лист, именно поэтому у вас сохраняется последнее значение.
    Также вместо создания трех циклов можно обойтись одним. (Вывод сделан на основании одинаковых селекторов переменных cards_hrefs, titles, prices).

    Изначальный ответ
    for items in data:    
        cards = data.find_all("div", class_="card-body")
        for card in cards:
            catalog.append({
                "link to the product": "https://scrapingclub.com" + card.find("a").get("href"),
                "title": card.find("h4").text.strip(),
                "price": card.find("h5").text.strip()
            })

    С верным замечанием от AUser0
    Обновленный ответ
    data = soup.find("div", class_="row my-4")
    
    catalog = []
    cards = data.find_all("div", class_="card-body")
    for card in cards:
        catalog.append({
            "link to the product": "https://scrapingclub.com" + card.find("a").get("href"),
            "title": card.find("h4").text.strip(),
            "price": card.find("h5").text.strip()
        })
    Ответ написан
    2 комментария
  • Как показать пользователю, что бот отправляет видео или фото?

    Вы не указали какую библиотеку используете, вот реф на документацию Bot API.
    sendChatAction
    Ищите данный метод в используемой библиотеке
    Ответ написан
    2 комментария
  • FSMStorageWarning?

    При инициализации dispatcher нужно передать в него storage
    from aiogram import Bot, Dispatcher, types
    # from aiogram.contrib.fsm_storage.memory import MemoryStorage
    from aiogram.contrib.fsm_storage.redis import RedisStorage2
    
    storage = RedisStorage2()
    # storage = MemoryStorage()
    
    bot = Bot(token=c.tg_bot.token, parse_mode=types.ParseMode.HTML)
    dp = Dispatcher(bot, storage=storage)

    UPD. При использовании redis на windows рекомендую устанавливать Memurai, так как есть поддержка redis 6, в некоторых случаях это избавит вас от проблем при переносе кода на Unix системы
    Ответ написан
    Комментировать
  • Как подключить приватный прокси к яндекс музыке?

    Вы не указали протокол прокси. На гитхабе указаны следующие примеры
    Примеры proxy url:
    
    socks5://user:password@host:port
    http://host:port
    https://host:port
    http://user:password@host
    Ответ написан
    Комментировать
  • Telegram bot Как назначить время и уведомить через заданное время?

    По правилам ресурса код стоит выкладывать текстом, оборачивая тэгом code.
    Вам подойдут очереди сообщений (rq, Celery, queue, asyncio.Queue).
    Примитивно ставить задачу в цикл событий.
    P.S. Рекомендую очереди
    @dp.message_handler(commands=['some_command'])
    async def some_handler(message: Message):
        text = 'Текст для отправки '
        DELAY = 10
        _loop = asyncio.get_running_loop()
        _loop.call_later(DELAY, func, message.from_user.id, text)
    
    async def func(chat_id, text):
        await dp.bot.send_message(chat_id, text)

    UPD Рекомендации
    1. Избавьтесь от n = int(input()). Вызов input блокирует выполнение кода, как и time.sleep
    2. Ознакомьтесь с встроенной в aiogram машиной состояний (FSM). Меняя state отлавливайте введенное пользователем число.
    3. Избавьтесь от конструкции try: except: проверяя является ли message.text числом
    if message.text.isdigit():
        print('ok')
    else:
        print('Это не число')
    Ответ написан
    4 комментария
  • Как преобразовать такой список словарей в Python?

    Умножив d_list на 100.000 у меня выполнилось за 0.6868....
    Не знаю на сколько "быстрым" такой результат можно назвать
    Первый вариант
    res = []
    for dictionary in d_list:  # * 100_000:
        result_dict = {}
        for key in dictionary:
            if dictionary[key] is not None:
                result_dict[key] = dictionary[key]
        result_dict = {**result_dict, **{'qty1': False, 'fp1': False,
                                         'qty2': False, 'fp2': False,
                                         'qty3': False, 'fp3': False}}
        if isinstance(dictionary['b'], list):
            for z in range(1, len(dictionary['b']) - 1):
                result_dict[f'qty{z}'] = dictionary['b'][z - 1]['qty']
                result_dict[f'fp{z}'] = dictionary['b'][z - 1]['fp']
    
        res.append(result_dict)

    Если все таки вы не ожидаете в результате значение ключа b в исходном виде.
    Выполнилось за 0.3226
    UPD
    res = []
    for dictionary in d_list:  # * 100_000:
        result_dict = {'a': dictionary['a'],
                       'c': dictionary['c'],
                       'qty1': False, 'fp1': False,
                       'qty2': False, 'fp2': False,
                       'qty3': False, 'fp3': False
                       }
        if isinstance(dictionary['b'], list):
            for z in range(1, len(dictionary['b']) - 1):
                result_dict[f'qty{z}'] = dictionary['b'][z - 1]['qty']
                result_dict[f'fp{z}'] = dictionary['b'][z - 1]['fp']
    
        res.append(result_dict)
    Ответ написан
    1 комментарий
  • Как запустить асинхронную функцию по расписанию python?

    if __name__ == '__main__':
        loop = asyncio.new_event_loop()
        scheduler = AsyncIOScheduler(event_loop=loop)
        scheduler.add_job(main, 'interval', seconds=3)
        scheduler.start()
        try:
            loop.run_forever()
        except (KeyboardInterrupt, SystemExit):
            pass

    Описание проблемы. (Как я это понял)
    Пример из github репозитория библиотеки ошибочен по двум причинам
    вызывается функция start(), которая сама выполняет get_event_loop()
    После этого в текущем потоке создается цикл событий.
    Далее в примере эти строки
    try:
        asyncio.get_event_loop().run_forever()
    except (KeyboardInterrupt, SystemExit):
        pass
    в результате мы получаем второй запущенный цикл событий из-за чего мы получаем DeprecationWarning: There is no current event loop
    Ответ написан
    4 комментария
  • Как соединиться с mysql, используя SSL файл и флаг verify identity?

    import asyncio
    import ssl
    
    
    ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
    # the root CA path is dependent on your system. 
    ctx.load_verify_locations(cafile='/etc/ssl/cert.pem')
    
    async def test_example(loop):
        pool = await aiomysql.create_pool(host='', port=3306, user='redst', password='',
                                      db='aiodb', loop=loop, ssl=ctx)
    Ответ написан
    Комментировать
  • Почему следующий код может иногда падать с runtime error?

    Код [запускался на Python 3.8, 3.9, 3.10]
    import asyncio
    import datetime
    import random
    import colorama
    
    
    async def main():
        t0 = datetime.datetime.now()
        print(colorama.Fore.WHITE + "App started.", flush=True)
    
        data = asyncio.Queue()
    
        task1 = asyncio.ensure_future(generate_data(20, data))
        task2 = asyncio.ensure_future(generate_data(20, data))
        task3 = asyncio.ensure_future(process_data(40, data))
        # Переименовано из "final_task" в "gather_result" т.к. gather возвращает результат (Список значений объектов)
        # В данном коде ничего не возвращается и можно удалить print и саму переменную
        gather_result = await asyncio.gather(task1, task2, task3)
        print(gather_result)
        dt = datetime.datetime.now() - t0
        print(colorama.Fore.WHITE + f"App exiting, total time: {dt.total_seconds():,.2f} sec.", flush=True)
    
    
    async def generate_data(num: int, data: asyncio.Queue):
        for idx in range(1, num + 1):
            item = idx * idx  # equal idx ** 2 
            await data.put((item, datetime.datetime.now()))
    
            print(colorama.Fore.YELLOW + f" -- generated item {idx}", flush=True)
            await asyncio.sleep(random.random() + 0.5)
    
    
    async def process_data(num: int, data: asyncio.Queue):
        processed = 0
        while processed < num:
            item = await data.get()
    
            processed += 1
            value = item[0]
            t = item[1]
            dt = datetime.datetime.now() - t
    
            print(colorama.Fore.CYAN + f" +++ Processed value {value} after {dt.total_seconds():,.2f} sec.", flush=True)
            await asyncio.sleep(0.5)
    
    
    if __name__ == '__main__':
        asyncio.run(main())

    Asyncio "Running Tasks Concurrently" gather
    asyncio.gather на русском
    Приведенный вами код актуален для Python 3.6 (Пункт 18.5.3.5.1. документации asyncio Python 3.6.15), но не для 3.7+
    Ответ написан
    1 комментарий
  • Почему возникает ошибка?

    Вы можете создать подобный класс.
    Пример реализации
    import asyncio
    import ssl
    from typing import Optional, Type
    
    import aiohttp
    import certifi
    import ujson as json
    
    
    class Scrapper:
        def __init__(self, connections_limit: int = None) -> None:
            self._session: Optional[aiohttp.ClientSession] = None
            self._connector_class: Type[aiohttp.TCPConnector] = aiohttp.TCPConnector
            ssl_context = ssl.create_default_context(cafile=certifi.where())
            self._connector_init = dict(limit=connections_limit, ssl=ssl_context)
    
        async def get_new_session(self) -> aiohttp.ClientSession:
            return aiohttp.ClientSession(
                connector=self._connector_class(**self._connector_init),
                json_serialize=json.dumps
            )
    
        async def get_session(self) -> Optional[aiohttp.ClientSession]:
            if self._session is None or self._session.closed:
                self._session = await self.get_new_session()
    
            if not self._session._loop.is_running():
                await self._session.close()
                self._session = await self.get_new_session()
            return self._session
    
        async def make_request(self, session, url, post, **kwargs):
            try:
                if post:
                    async with session.post(url, data=None, headers=None, **kwargs) as response:
                        try:
                            return await response.json()
                        except:
                            return response.text
                else:
                    async with session.get(url, params=None, headers=None, **kwargs) as response:
                        try:
                            return await response.json()
                        except:
                            return response.text
            except aiohttp.ClientError as e:
                print(f"aiohttp client throws an error: {e.__class__.__name__}: {e}")
    
        async def request(self, url: str, post: bool = False, **kwargs):
            return await self.make_request(await self.get_session(), url, post, **kwargs)
    
        async def get_access(self) -> None:
            return await self.request("https://google.com", False)
    
    
    scrapper = Scrapper()
    
    
    async def main():
        await scrapper.get_access()
    
    
    asyncio.run(main())
    Ответ написан
    Комментировать
  • Почему бот на aiogram молчит, пока выполняет действия?

    asyncio sleeping
    sleep() always suspends the current task, allowing other tasks to run.

    Наглядный пример выполнения
    Код
    import asyncio
    import time
    
    
    async def test(delay: int, number: int):
        print(f"Задача: {number}\n"
              f"Начато выполнение в {time.strftime('%X')}\n"
              f"Ждем {delay} секунд")
        await asyncio.sleep(delay)
        print(f"Закончили задачу {number} в {time.strftime('%X')}")
    
    
    async def startup():
        delays = [10, 8, 14, 5]
        for i, x in enumerate(delays):
            await test(x, i + 1)
    
    
    if __name__ == '__main__':
        asyncio.run(startup())

    Вывод
    Задача: 1
    Начато выполнение в 01:47:05
    Ждем 10 секунд
    Закончили задачу 1 в 01:47:15
    Задача: 2
    Начато выполнение в 01:47:15
    Ждем 8 секунд
    Закончили задачу 2 в 01:47:23
    Задача: 3
    Начато выполнение в 01:47:23
    Ждем 14 секунд
    Закончили задачу 3 в 01:47:37
    Задача: 4
    Начато выполнение в 01:47:37
    Ждем 5 секунд
    Закончили задачу 4 в 01:47:42

    Можете считать, что в данном случае await asyncio.sleep(delay) является блокирующей функцией
    Ответ написан