from pyrogram import Client
from pyrogram.errors import FloodWait
import asyncio
async def get_post_stats(client: Client, channel_id: int, message_id: int):
"""
Получает статистику поста: просмотры, реакции и количество комментариев.
Args:
client: Экземпляр клиента Pyrogram
channel_id: ID канала
message_id: ID сообщения
Returns:
dict: Словарь со статистикой
"""
try:
# Получаем сообщение
message = await client.get_messages(channel_id, message_id, replies=-1)
# Получаем статистику просмотров
views = message.views if message.views else 0
# Получаем реакции
reactions = {}
if message.reactions:
for reaction in message.reactions.reactions:
reaction_id = reaction.custom_emoji_id if reaction.custom_emoji_id else reaction.emoji
reactions[reaction_id] = reaction.count
comments_count = await client.get_discussion_replies_count(channel_id, message_id)
stats = {
"views": views,
"reactions": reactions,
"forwards": message.forwards or 0,
"comments": comments_count
}
return stats
except FloodWait as e:
print(f"Превышен лимит запросов, ждем {e.value} секунд")
await asyncio.sleep(e.value)
return await get_post_stats(client, channel_id, message_id)
except Exception as e:
print(f"Произошла ошибка: {e}")
return None
# Пример использования
async def main():
# Замените на свои данные
api_id = "0"
api_hash = "0"
channel_id = -1 # ID канала
message_id = 1 # ID сообщения
async with Client("my_account", api_id=api_id, api_hash=api_hash) as app:
stats = await get_post_stats(app, channel_id, message_id)
if stats:
print(f"Просмотры: {stats['views']}")
print(f"Переслали: {stats['forwards']}")
print(f"Реакции: {stats['reactions']}")
print(f"Комментарии: {stats['comments']}")
if __name__ == "__main__":
asyncio.run(main())
Просмотры: 1
Реакции: {'': 1, '❤': 1}
Комментарии: 1
# ...
from aiogram.filters.callback_data import CallbackData
class AnswerCallback(CallbackData, prefix='answer'):
message_id: int
answer_type: str
@router.message(F.photo)
async def photo_handle(message: Message):
await hadnle_text(message)
async def hadnle_text(message: Message):
builder = InlineKeyboardBuilder()
builder.add(InlineKeyboardButton(
text="Answer 1",
callback_data=AnswerCallback(message_id=message.message_id, answer_type='Answer 1').pack()),
InlineKeyboardButton(
text="Answer 2",
callback_data=AnswerCallback(message_id=message.message_id, answer_type='Answer 2').pack()),
)
# Или так
# builder.button(text="Answer 1", callback_data=AnswerCallback(message_id=message.message_id, answer_type='answer 1'))
# builder.button(text="Answer 2", callback_data=AnswerCallback(message_id=message.message_id, answer_type='answer 2'))
await message.answer(
f"<b>Выберите кнопку</b>",
reply_markup=builder.as_markup()
)
@router.callback_query(AnswerCallback.filter())
async def text_state_callback(callback: CallbackQuery, callback_data: AnswerCallback):
await bot.send_message(
chat_id=callback.message.chat.id,
text=callback_data.answer_type,
reply_to_message_id=callback_data.message_id,
)
aiogram==2.25.1
и aiohttp>=3.8.0,<3.9.0
указывают, что они поддерживаются максимально до Python 3.10.import os
# import time
import requests
from openpyxl import load_workbook
from dotenv import load_dotenv
API_URL = 'https://api.edenai.run/v2/audio/text_to_speech'
HEADERS = {"Authorization": f"Bearer {os.getenv('API_KEY')}"}
DEFAULT_LANGUAGE = 'ru-RU'
DEFAULT_OPTION = 'MALE'
DEFAULT_VOICE = 'ru-RU_Alexei Syomin'
def convert_text_to_speech(row_number: int, _id: int, text: str):
payload = {
'providers': 'lovoai',
'language': DEFAULT_LANGUAGE,
'option': DEFAULT_OPTION,
'lovoai': DEFAULT_VOICE,
'text': text
}
response = requests.post(API_URL, json=payload, headers=HEADERS)
result = response.json()
audio_url = result.get('lovoai', {}).get('audio_resource_url')
if audio_url:
audio_content = requests.get(audio_url).content
with open(f'row_{row_number}_ID_{_id}.ogg', 'wb') as audio_file:
audio_file.write(audio_content)
def main(file_name: str):
book = load_workbook(file_name)
sheet = book.active
for row in range(2, sheet.max_row + 1):
_id = sheet[row][0].value
text = sheet[row][1].value
convert_text_to_speech(row, _id, text)
# Установите задержку, если имеется ограничение на кол-во запросов в секунду
# time.sleep(3)
if __name__ == '__main__':
load_dotenv()
main('name.xlsx')
update.py
main.py
вы пытаетесь зарегистрировать хендлеры в хендлереimport logging
async def on_startup(dispatcher):
logging.basicConfig(
level=logging.DEBUG,
format=u'%(filename)s:%(lineno)d #%(levelname)-8s [%(asctime)s] - %(name)s - %(message)s',
)
logger.info("Starting bot")
# Импортируем функции регистрации хендлеров
from update_photo import register_new_photo
# Регистрируем хендлеры
register_new_photo(dispatcher)
async def on_shutdown(dispatcher):
logging.warning('Shutting down..')
await dispatcher.storage.close()
await dispatcher.storage.wait_closed()
logging.warning('Bye!')
if __name__ == '__main__':
from aiogram.utils import executor
from loader import dp
executor.start_polling(dp,
skip_updates=True, # Если нужно пропускаем апдейты
on_startup=on_startup,
on_shutdown=on_shutdown)
import os
from aiogram import Bot, Dispatcher
from aiogram.contrib.fsm_storage.memory import MemoryStorage
bot = Bot(token=os.getenv("TOKEN"))
storage = MemoryStorage()
dp = Dispatcher(bot=bot, storage=storage)
cards_hrefs
, titles
, prices
).for items in data:
cards = data.find_all("div", class_="card-body")
for card in cards:
catalog.append({
"link to the product": "https://scrapingclub.com" + card.find("a").get("href"),
"title": card.find("h4").text.strip(),
"price": card.find("h5").text.strip()
})
data = soup.find("div", class_="row my-4")
catalog = []
cards = data.find_all("div", class_="card-body")
for card in cards:
catalog.append({
"link to the product": "https://scrapingclub.com" + card.find("a").get("href"),
"title": card.find("h4").text.strip(),
"price": card.find("h5").text.strip()
})
from aiogram import Bot, Dispatcher, types
# from aiogram.contrib.fsm_storage.memory import MemoryStorage
from aiogram.contrib.fsm_storage.redis import RedisStorage2
storage = RedisStorage2()
# storage = MemoryStorage()
bot = Bot(token=c.tg_bot.token, parse_mode=types.ParseMode.HTML)
dp = Dispatcher(bot, storage=storage)
@dp.message_handler(commands=['some_command'])
async def some_handler(message: Message):
text = 'Текст для отправки '
DELAY = 10
_loop = asyncio.get_running_loop()
_loop.call_later(DELAY, func, message.from_user.id, text)
async def func(chat_id, text):
await dp.bot.send_message(chat_id, text)
n = int(input())
. Вызов input
блокирует выполнение кода, как и time.sleep
try: except:
проверяя является ли message.text числомif message.text.isdigit():
print('ok')
else:
print('Это не число')
d_list
на 100.000 у меня выполнилось за 0.6868....res = []
for dictionary in d_list: # * 100_000:
result_dict = {}
for key in dictionary:
if dictionary[key] is not None:
result_dict[key] = dictionary[key]
result_dict = {**result_dict, **{'qty1': False, 'fp1': False,
'qty2': False, 'fp2': False,
'qty3': False, 'fp3': False}}
if isinstance(dictionary['b'], list):
for z in range(1, len(dictionary['b']) - 1):
result_dict[f'qty{z}'] = dictionary['b'][z - 1]['qty']
result_dict[f'fp{z}'] = dictionary['b'][z - 1]['fp']
res.append(result_dict)
b
в исходном виде.res = []
for dictionary in d_list: # * 100_000:
result_dict = {'a': dictionary['a'],
'c': dictionary['c'],
'qty1': False, 'fp1': False,
'qty2': False, 'fp2': False,
'qty3': False, 'fp3': False
}
if isinstance(dictionary['b'], list):
for z in range(1, len(dictionary['b']) - 1):
result_dict[f'qty{z}'] = dictionary['b'][z - 1]['qty']
result_dict[f'fp{z}'] = dictionary['b'][z - 1]['fp']
res.append(result_dict)
if __name__ == '__main__':
loop = asyncio.new_event_loop()
scheduler = AsyncIOScheduler(event_loop=loop)
scheduler.add_job(main, 'interval', seconds=3)
scheduler.start()
try:
loop.run_forever()
except (KeyboardInterrupt, SystemExit):
pass
start()
, которая сама выполняет get_event_loop()
try:
asyncio.get_event_loop().run_forever()
except (KeyboardInterrupt, SystemExit):
pass
в результате мы получаем второй запущенный цикл событий из-за чего мы получаем DeprecationWarning: There is no current event loop
import asyncio
import ssl
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
# the root CA path is dependent on your system.
ctx.load_verify_locations(cafile='/etc/ssl/cert.pem')
async def test_example(loop):
pool = await aiomysql.create_pool(host='', port=3306, user='redst', password='',
db='aiodb', loop=loop, ssl=ctx)
import asyncio
import datetime
import random
import colorama
async def main():
t0 = datetime.datetime.now()
print(colorama.Fore.WHITE + "App started.", flush=True)
data = asyncio.Queue()
task1 = asyncio.ensure_future(generate_data(20, data))
task2 = asyncio.ensure_future(generate_data(20, data))
task3 = asyncio.ensure_future(process_data(40, data))
# Переименовано из "final_task" в "gather_result" т.к. gather возвращает результат (Список значений объектов)
# В данном коде ничего не возвращается и можно удалить print и саму переменную
gather_result = await asyncio.gather(task1, task2, task3)
print(gather_result)
dt = datetime.datetime.now() - t0
print(colorama.Fore.WHITE + f"App exiting, total time: {dt.total_seconds():,.2f} sec.", flush=True)
async def generate_data(num: int, data: asyncio.Queue):
for idx in range(1, num + 1):
item = idx * idx # equal idx ** 2
await data.put((item, datetime.datetime.now()))
print(colorama.Fore.YELLOW + f" -- generated item {idx}", flush=True)
await asyncio.sleep(random.random() + 0.5)
async def process_data(num: int, data: asyncio.Queue):
processed = 0
while processed < num:
item = await data.get()
processed += 1
value = item[0]
t = item[1]
dt = datetime.datetime.now() - t
print(colorama.Fore.CYAN + f" +++ Processed value {value} after {dt.total_seconds():,.2f} sec.", flush=True)
await asyncio.sleep(0.5)
if __name__ == '__main__':
asyncio.run(main())
import asyncio
import ssl
from typing import Optional, Type
import aiohttp
import certifi
import ujson as json
class Scrapper:
def __init__(self, connections_limit: int = None) -> None:
self._session: Optional[aiohttp.ClientSession] = None
self._connector_class: Type[aiohttp.TCPConnector] = aiohttp.TCPConnector
ssl_context = ssl.create_default_context(cafile=certifi.where())
self._connector_init = dict(limit=connections_limit, ssl=ssl_context)
async def get_new_session(self) -> aiohttp.ClientSession:
return aiohttp.ClientSession(
connector=self._connector_class(**self._connector_init),
json_serialize=json.dumps
)
async def get_session(self) -> Optional[aiohttp.ClientSession]:
if self._session is None or self._session.closed:
self._session = await self.get_new_session()
if not self._session._loop.is_running():
await self._session.close()
self._session = await self.get_new_session()
return self._session
async def make_request(self, session, url, post, **kwargs):
try:
if post:
async with session.post(url, data=None, headers=None, **kwargs) as response:
try:
return await response.json()
except:
return response.text
else:
async with session.get(url, params=None, headers=None, **kwargs) as response:
try:
return await response.json()
except:
return response.text
except aiohttp.ClientError as e:
print(f"aiohttp client throws an error: {e.__class__.__name__}: {e}")
async def request(self, url: str, post: bool = False, **kwargs):
return await self.make_request(await self.get_session(), url, post, **kwargs)
async def get_access(self) -> None:
return await self.request("https://google.com", False)
scrapper = Scrapper()
async def main():
await scrapper.get_access()
asyncio.run(main())
sleep() always suspends the current task, allowing other tasks to run.
import asyncio
import time
async def test(delay: int, number: int):
print(f"Задача: {number}\n"
f"Начато выполнение в {time.strftime('%X')}\n"
f"Ждем {delay} секунд")
await asyncio.sleep(delay)
print(f"Закончили задачу {number} в {time.strftime('%X')}")
async def startup():
delays = [10, 8, 14, 5]
for i, x in enumerate(delays):
await test(x, i + 1)
if __name__ == '__main__':
asyncio.run(startup())
await asyncio.sleep(delay)
является блокирующей функцией