Задать вопрос
@Maxwell012

Как сделать асинхронность в процессах?

Мне нужно вызывать 2 функции в разных процессах и каждый из этих процессов должен быть асинхронным. Первый процесс это бот телеграм, второй это класс который будет запускаться раз в 5 минут. Как лучше это сделать и какие библиотеки использовать?
  • Вопрос задан
  • 159 просмотров
Подписаться 1 Простой 2 комментария
Решения вопроса 1
@Everything_is_bad
Просто раздели это на два сервиса/программы/скрипта, а "класс который будет запускаться раз в 5 минут", звучит как скрипт запущенный через cron/systemd timer

Хотя асинхронность позволяет делать это всё в одном event loop'е, главное не блокировать его.
async def background_task():
    while True:
        await run_второй()
        await asyncio.sleep(5_минут)

# а в самом боте
asyncio.create_task(background_task())
Ответ написан
Пригласить эксперта
Ответы на вопрос 1
@codingoleg
Нужны асихронные библиотеки aiogram / pyrogram + schedule / APScheduler. Вот пример на aiogram 2.25 + schedule . Возможно, слегка костыль, но работает. Между выводом 'Старт' и 'Финиш' бот продолжает отзываться на команду /start.
import asyncio
import os
import schedule
from aiogram import Bot, Dispatcher
from aiogram.types import Message
from aiogram.utils import executor

bot = Bot(token=os.environ['token'])
dp = Dispatcher(bot)

@dp.message_handler(commands=['start'])
async def dp_start(message: Message):
    await message.answer('Test')

async def run_schedule():
    while True:
        schedule.run_pending()
        await asyncio.sleep(1)  # Раз в секунду проверяем, не настал ли момент выполнять задание

async def long_task():
    print('Старт')
    await asyncio.sleep(10)  # Функция, которая долго исполняется
    print('Финиш')

def do_task():
    asyncio.ensure_future(long_task())

if __name__ == '__main__':
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    loop.create_task(run_schedule())  # Добавляем планировщик в петлю
    # schedule.every(15).seconds.do(do_task)  # Тестовый вариант каждые 15 секунд
    schedule.every(5).minutes.do(do_task)
    executor.start_polling(dp, loop=loop, skip_updates=True)  # Добавляем петлю к боту и запускаем его
Ответ написан
Комментировать
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы