Задать вопрос
  • Как прикрутить авторизация запросов по API key или Token в Django?

    Модель
    from django.db import models
    from django.contrib.auth.models import User
    
    class APIKey(models.Model):
        user = models.OneToOneField(User, on_delete=models.CASCADE)
        key = models.CharField(max_length=64, unique=True)


    Проверка ключа API
    from django.http import JsonResponse
    from your_app.models import APIKey
    
    def api_key_required(view_func):
        @wraps(view_func)
        def _wrapped_view(request, *args, **kwargs):
            api_key = request.headers.get("X-API-Key")
            if not api_key or not APIKey.objects.filter(key=api_key).exists():
                return JsonResponse({"error": "Unauthorized"}, status=403)
            return view_func(request, *args, **kwargs)
        
        return _wrapped_view


    Вью для API
    from django.http import JsonResponse
    from django.views.decorators.csrf import csrf_exempt
    
    @csrf_exempt
    @api_key_required
    def my_api_view(request):
        return JsonResponse({"message": "Success"})
    Ответ написан
    1 комментарий
  • AIOGRAM TypeError: State.__init__() got multiple values for argument 'state' в чем может быть ошибка?

    class Question(StatesGroup):
        #Имя юзера
        name=State()
        #Вопрос юзера
        userAsk=State()
    Ответ написан
    Комментировать
  • Можно ли скрапить телеграмм?

    Код
    from pyrogram import Client
    from pyrogram.errors import FloodWait
    import asyncio
    
    
    async def get_post_stats(client: Client, channel_id: int, message_id: int):
        """
        Получает статистику поста: просмотры, реакции и количество комментариев.
    
        Args:
            client: Экземпляр клиента Pyrogram
            channel_id: ID канала
            message_id: ID сообщения
    
        Returns:
            dict: Словарь со статистикой
        """
        try:
            # Получаем сообщение
            message = await client.get_messages(channel_id, message_id, replies=-1)
    
            # Получаем статистику просмотров
            views = message.views if message.views else 0
    
            # Получаем реакции
            reactions = {}
            if message.reactions:
                for reaction in message.reactions.reactions:
                    reaction_id = reaction.custom_emoji_id if reaction.custom_emoji_id else reaction.emoji
                    reactions[reaction_id] = reaction.count
    
            comments_count = await client.get_discussion_replies_count(channel_id, message_id)
            stats = {
                "views": views,
                "reactions": reactions,
                "forwards": message.forwards or 0,
                "comments": comments_count
            }
    
            return stats
    
        except FloodWait as e:
            print(f"Превышен лимит запросов, ждем {e.value} секунд")
            await asyncio.sleep(e.value)
            return await get_post_stats(client, channel_id, message_id)
        except Exception as e:
            print(f"Произошла ошибка: {e}")
            return None
    
    
    # Пример использования
    async def main():
        # Замените на свои данные
        api_id = "0"
        api_hash = "0"
        channel_id = -1 # ID канала
        message_id = 1 # ID сообщения
    
        async with Client("my_account", api_id=api_id, api_hash=api_hash) as app:
            stats = await get_post_stats(app, channel_id, message_id)
            if stats:
                print(f"Просмотры: {stats['views']}")
                print(f"Переслали: {stats['forwards']}")
                print(f"Реакции: {stats['reactions']}")
                print(f"Комментарии: {stats['comments']}")
    
    
    if __name__ == "__main__":
        asyncio.run(main())


    Вывод:
    Просмотры: 1
    Реакции: {'': 1, '❤': 1}
    Комментарии: 1
    Ответ написан
    2 комментария
  • Как получить callback_data из апдейта в aiogram?

    Используйте Callback Data Factory
    Ответ написан
    Комментировать
  • Aiogram 3, FSMContext/State: почему бот отвечает из всех фото только на последнее?

    Следует почитать на счет того как работает медиагруппа в ТГ, 5 фото в одном сообщении = 5 апдейтов.

    Я бы ушел от FSM в сторону Callback Data Factory

    # ...
    from aiogram.filters.callback_data import CallbackData
    
    
    class AnswerCallback(CallbackData, prefix='answer'):
        message_id: int
        answer_type: str
    
    
    @router.message(F.photo)
    async def photo_handle(message: Message):
        await hadnle_text(message)
    
    async def hadnle_text(message: Message):
        builder = InlineKeyboardBuilder()
        builder.add(InlineKeyboardButton(
            text="Answer 1",
            callback_data=AnswerCallback(message_id=message.message_id, answer_type='Answer 1').pack()),
        InlineKeyboardButton(
            text="Answer 2",
            callback_data=AnswerCallback(message_id=message.message_id, answer_type='Answer 2').pack()),
        )
        # Или так
        # builder.button(text="Answer 1", callback_data=AnswerCallback(message_id=message.message_id, answer_type='answer 1'))
        # builder.button(text="Answer 2", callback_data=AnswerCallback(message_id=message.message_id, answer_type='answer 2'))
        await message.answer(
            f"<b>Выберите кнопку</b>",
            reply_markup=builder.as_markup()
        )
    
    @router.callback_query(AnswerCallback.filter())
    async def text_state_callback(callback: CallbackQuery, callback_data: AnswerCallback):
        await bot.send_message(
            chat_id=callback.message.chat.id,
            text=callback_data.answer_type,
            reply_to_message_id=callback_data.message_id,
        )
    Ответ написан
    4 комментария
  • Ошибка при скачивании aiohtpps?

    aiogram==2.25.1 и aiohttp>=3.8.0,<3.9.0 указывают, что они поддерживаются максимально до Python 3.10.
    Вы пытаетесь использовать Python 3.12

    Можете скачать и почитать исходники на PyPI
    Ответ написан
    Комментировать
  • Автоматизация запроса по API в звуковую нейросеть из Excel через python?

    import os
    # import time
    
    import requests
    from openpyxl import load_workbook
    from dotenv import load_dotenv
    
    API_URL = 'https://api.edenai.run/v2/audio/text_to_speech'
    HEADERS = {"Authorization": f"Bearer {os.getenv('API_KEY')}"}
    DEFAULT_LANGUAGE = 'ru-RU'
    DEFAULT_OPTION = 'MALE'
    DEFAULT_VOICE = 'ru-RU_Alexei Syomin'
    
    
    def convert_text_to_speech(row_number: int, _id: int, text: str):
        payload = {
            'providers': 'lovoai',
            'language': DEFAULT_LANGUAGE,
            'option': DEFAULT_OPTION,
            'lovoai': DEFAULT_VOICE,
            'text': text
        }
    
        response = requests.post(API_URL, json=payload, headers=HEADERS)
        result = response.json()
    
        audio_url = result.get('lovoai', {}).get('audio_resource_url')
        if audio_url:
            audio_content = requests.get(audio_url).content
            with open(f'row_{row_number}_ID_{_id}.ogg', 'wb') as audio_file:
                audio_file.write(audio_content)
    
    
    def main(file_name: str):
        book = load_workbook(file_name)
        sheet = book.active
        for row in range(2, sheet.max_row + 1):
            _id = sheet[row][0].value
            text = sheet[row][1].value
            convert_text_to_speech(row, _id, text)
    
            # Установите задержку, если имеется ограничение на кол-во запросов в секунду
            # time.sleep(3)
    
    
    if __name__ == '__main__':
        load_dotenv()
        main('name.xlsx')
    Ответ написан
    3 комментария
  • Кому отправил криптовалюту отправитель?

    Как осуществляются биткойн-транзакции

    Почитайте о вводах и выводах
    Ответ написан
    Комментировать
  • Как правильно регистрировать хендлеры для импорта?

    1. Не нашел в вашем тексте упоминаемого файла update.py
    2. В файле main.py вы пытаетесь зарегистрировать хендлеры в хендлере
    main.py
    import logging
    
    
    async def on_startup(dispatcher):
        logging.basicConfig(
            level=logging.DEBUG,
            format=u'%(filename)s:%(lineno)d #%(levelname)-8s [%(asctime)s] - %(name)s - %(message)s',
        )
        logger.info("Starting bot")
    
        # Импортируем функции регистрации хендлеров
        from update_photo import register_new_photo
        # Регистрируем хендлеры
        register_new_photo(dispatcher)
    
    
    async def on_shutdown(dispatcher):
        logging.warning('Shutting down..')
        await dispatcher.storage.close()
        await dispatcher.storage.wait_closed()
        logging.warning('Bye!')
    
    
    if __name__ == '__main__':
        from aiogram.utils import executor
        from loader import dp
    
        executor.start_polling(dp,
                               skip_updates=True,  # Если нужно пропускаем апдейты
                               on_startup=on_startup,
                               on_shutdown=on_shutdown)

    loader.py

    import os
    
    from aiogram import Bot, Dispatcher
    from aiogram.contrib.fsm_storage.memory import MemoryStorage
    
    bot = Bot(token=os.getenv("TOKEN"))
    storage = MemoryStorage()
    dp = Dispatcher(bot=bot, storage=storage)

    Ответ написан
  • Не могу понять в чем проблема, сохраняет только последние полученные данные, в чем дело?

    Вы создали 3 цикла, в которых перезаписываете значение переменной до добавления в лист, именно поэтому у вас сохраняется последнее значение.
    Также вместо создания трех циклов можно обойтись одним. (Вывод сделан на основании одинаковых селекторов переменных cards_hrefs, titles, prices).

    Изначальный ответ
    for items in data:    
        cards = data.find_all("div", class_="card-body")
        for card in cards:
            catalog.append({
                "link to the product": "https://scrapingclub.com" + card.find("a").get("href"),
                "title": card.find("h4").text.strip(),
                "price": card.find("h5").text.strip()
            })

    С верным замечанием от AUser0
    Обновленный ответ
    data = soup.find("div", class_="row my-4")
    
    catalog = []
    cards = data.find_all("div", class_="card-body")
    for card in cards:
        catalog.append({
            "link to the product": "https://scrapingclub.com" + card.find("a").get("href"),
            "title": card.find("h4").text.strip(),
            "price": card.find("h5").text.strip()
        })
    Ответ написан
    2 комментария
  • Как показать пользователю, что бот отправляет видео или фото?

    Вы не указали какую библиотеку используете, вот реф на документацию Bot API.
    sendChatAction
    Ищите данный метод в используемой библиотеке
    Ответ написан
    2 комментария
  • FSMStorageWarning?

    При инициализации dispatcher нужно передать в него storage
    from aiogram import Bot, Dispatcher, types
    # from aiogram.contrib.fsm_storage.memory import MemoryStorage
    from aiogram.contrib.fsm_storage.redis import RedisStorage2
    
    storage = RedisStorage2()
    # storage = MemoryStorage()
    
    bot = Bot(token=c.tg_bot.token, parse_mode=types.ParseMode.HTML)
    dp = Dispatcher(bot, storage=storage)

    UPD. При использовании redis на windows рекомендую устанавливать Memurai, так как есть поддержка redis 6, в некоторых случаях это избавит вас от проблем при переносе кода на Unix системы
    Ответ написан
    Комментировать
  • Как подключить приватный прокси к яндекс музыке?

    Вы не указали протокол прокси. На гитхабе указаны следующие примеры
    Примеры proxy url:
    
    socks5://user:password@host:port
    http://host:port
    https://host:port
    http://user:password@host
    Ответ написан
    Комментировать
  • Telegram bot Как назначить время и уведомить через заданное время?

    По правилам ресурса код стоит выкладывать текстом, оборачивая тэгом code.
    Вам подойдут очереди сообщений (rq, Celery, queue, asyncio.Queue).
    Примитивно ставить задачу в цикл событий.
    P.S. Рекомендую очереди
    @dp.message_handler(commands=['some_command'])
    async def some_handler(message: Message):
        text = 'Текст для отправки '
        DELAY = 10
        _loop = asyncio.get_running_loop()
        _loop.call_later(DELAY, func, message.from_user.id, text)
    
    async def func(chat_id, text):
        await dp.bot.send_message(chat_id, text)

    UPD Рекомендации
    1. Избавьтесь от n = int(input()). Вызов input блокирует выполнение кода, как и time.sleep
    2. Ознакомьтесь с встроенной в aiogram машиной состояний (FSM). Меняя state отлавливайте введенное пользователем число.
    3. Избавьтесь от конструкции try: except: проверяя является ли message.text числом
    if message.text.isdigit():
        print('ok')
    else:
        print('Это не число')
    Ответ написан
    4 комментария
  • Как преобразовать такой список словарей в Python?

    Умножив d_list на 100.000 у меня выполнилось за 0.6868....
    Не знаю на сколько "быстрым" такой результат можно назвать
    Первый вариант
    res = []
    for dictionary in d_list:  # * 100_000:
        result_dict = {}
        for key in dictionary:
            if dictionary[key] is not None:
                result_dict[key] = dictionary[key]
        result_dict = {**result_dict, **{'qty1': False, 'fp1': False,
                                         'qty2': False, 'fp2': False,
                                         'qty3': False, 'fp3': False}}
        if isinstance(dictionary['b'], list):
            for z in range(1, len(dictionary['b']) - 1):
                result_dict[f'qty{z}'] = dictionary['b'][z - 1]['qty']
                result_dict[f'fp{z}'] = dictionary['b'][z - 1]['fp']
    
        res.append(result_dict)

    Если все таки вы не ожидаете в результате значение ключа b в исходном виде.
    Выполнилось за 0.3226
    UPD
    res = []
    for dictionary in d_list:  # * 100_000:
        result_dict = {'a': dictionary['a'],
                       'c': dictionary['c'],
                       'qty1': False, 'fp1': False,
                       'qty2': False, 'fp2': False,
                       'qty3': False, 'fp3': False
                       }
        if isinstance(dictionary['b'], list):
            for z in range(1, len(dictionary['b']) - 1):
                result_dict[f'qty{z}'] = dictionary['b'][z - 1]['qty']
                result_dict[f'fp{z}'] = dictionary['b'][z - 1]['fp']
    
        res.append(result_dict)
    Ответ написан
    1 комментарий
  • Как запустить асинхронную функцию по расписанию python?

    if __name__ == '__main__':
        loop = asyncio.new_event_loop()
        scheduler = AsyncIOScheduler(event_loop=loop)
        scheduler.add_job(main, 'interval', seconds=3)
        scheduler.start()
        try:
            loop.run_forever()
        except (KeyboardInterrupt, SystemExit):
            pass

    Описание проблемы. (Как я это понял)
    Пример из github репозитория библиотеки ошибочен по двум причинам
    вызывается функция start(), которая сама выполняет get_event_loop()
    После этого в текущем потоке создается цикл событий.
    Далее в примере эти строки
    try:
        asyncio.get_event_loop().run_forever()
    except (KeyboardInterrupt, SystemExit):
        pass
    в результате мы получаем второй запущенный цикл событий из-за чего мы получаем DeprecationWarning: There is no current event loop
    Ответ написан
    4 комментария
  • Как соединиться с mysql, используя SSL файл и флаг verify identity?

    import asyncio
    import ssl
    
    
    ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
    # the root CA path is dependent on your system. 
    ctx.load_verify_locations(cafile='/etc/ssl/cert.pem')
    
    async def test_example(loop):
        pool = await aiomysql.create_pool(host='', port=3306, user='redst', password='',
                                      db='aiodb', loop=loop, ssl=ctx)
    Ответ написан
    Комментировать
  • Почему следующий код может иногда падать с runtime error?

    Код [запускался на Python 3.8, 3.9, 3.10]
    import asyncio
    import datetime
    import random
    import colorama
    
    
    async def main():
        t0 = datetime.datetime.now()
        print(colorama.Fore.WHITE + "App started.", flush=True)
    
        data = asyncio.Queue()
    
        task1 = asyncio.ensure_future(generate_data(20, data))
        task2 = asyncio.ensure_future(generate_data(20, data))
        task3 = asyncio.ensure_future(process_data(40, data))
        # Переименовано из "final_task" в "gather_result" т.к. gather возвращает результат (Список значений объектов)
        # В данном коде ничего не возвращается и можно удалить print и саму переменную
        gather_result = await asyncio.gather(task1, task2, task3)
        print(gather_result)
        dt = datetime.datetime.now() - t0
        print(colorama.Fore.WHITE + f"App exiting, total time: {dt.total_seconds():,.2f} sec.", flush=True)
    
    
    async def generate_data(num: int, data: asyncio.Queue):
        for idx in range(1, num + 1):
            item = idx * idx  # equal idx ** 2 
            await data.put((item, datetime.datetime.now()))
    
            print(colorama.Fore.YELLOW + f" -- generated item {idx}", flush=True)
            await asyncio.sleep(random.random() + 0.5)
    
    
    async def process_data(num: int, data: asyncio.Queue):
        processed = 0
        while processed < num:
            item = await data.get()
    
            processed += 1
            value = item[0]
            t = item[1]
            dt = datetime.datetime.now() - t
    
            print(colorama.Fore.CYAN + f" +++ Processed value {value} after {dt.total_seconds():,.2f} sec.", flush=True)
            await asyncio.sleep(0.5)
    
    
    if __name__ == '__main__':
        asyncio.run(main())

    Asyncio "Running Tasks Concurrently" gather
    asyncio.gather на русском
    Приведенный вами код актуален для Python 3.6 (Пункт 18.5.3.5.1. документации asyncio Python 3.6.15), но не для 3.7+
    Ответ написан
    1 комментарий
  • Почему возникает ошибка?

    Вы можете создать подобный класс.
    Пример реализации
    import asyncio
    import ssl
    from typing import Optional, Type
    
    import aiohttp
    import certifi
    import ujson as json
    
    
    class Scrapper:
        def __init__(self, connections_limit: int = None) -> None:
            self._session: Optional[aiohttp.ClientSession] = None
            self._connector_class: Type[aiohttp.TCPConnector] = aiohttp.TCPConnector
            ssl_context = ssl.create_default_context(cafile=certifi.where())
            self._connector_init = dict(limit=connections_limit, ssl=ssl_context)
    
        async def get_new_session(self) -> aiohttp.ClientSession:
            return aiohttp.ClientSession(
                connector=self._connector_class(**self._connector_init),
                json_serialize=json.dumps
            )
    
        async def get_session(self) -> Optional[aiohttp.ClientSession]:
            if self._session is None or self._session.closed:
                self._session = await self.get_new_session()
    
            if not self._session._loop.is_running():
                await self._session.close()
                self._session = await self.get_new_session()
            return self._session
    
        async def make_request(self, session, url, post, **kwargs):
            try:
                if post:
                    async with session.post(url, data=None, headers=None, **kwargs) as response:
                        try:
                            return await response.json()
                        except:
                            return response.text
                else:
                    async with session.get(url, params=None, headers=None, **kwargs) as response:
                        try:
                            return await response.json()
                        except:
                            return response.text
            except aiohttp.ClientError as e:
                print(f"aiohttp client throws an error: {e.__class__.__name__}: {e}")
    
        async def request(self, url: str, post: bool = False, **kwargs):
            return await self.make_request(await self.get_session(), url, post, **kwargs)
    
        async def get_access(self) -> None:
            return await self.request("https://google.com", False)
    
    
    scrapper = Scrapper()
    
    
    async def main():
        await scrapper.get_access()
    
    
    asyncio.run(main())
    Ответ написан
    Комментировать