Как запустить синхронную функцию параллельно телеграм боту на aiogram?
Есть телеграм бот, который написан на aiogram - асинхронной библиотеке. Как мне одновременно запустить следующую функцию, чтобы и она выполнялась и телеграм бот продолжал отвечать.
def background_func():
while True:
print('Hi')
time.sleep(60) # Именно time.sleep, синхронное засыпание, так как это имитация синхронного кода
Для запуска синхронной функции background_func() параллельно с асинхронным выполнением телеграм-бота на aiogram, вам потребуется использовать многопоточность.
Вот пример кода, который объясняет, как это можно сделать:
import asyncio
import time
import threading
from aiogram import Bot, Dispatcher, types
# Создание бота и диспетчера aiogram
bot = Bot(token='YOUR_TELEGRAM_TOKEN')
dp = Dispatcher(bot)
# Функция-обработчик команды /start
@dp.message_handler(commands=['start'])
async def start(message: types.Message):
await message.reply("Привет! Я бот!")
# Асинхронная функция для запуска aiogram
async def main():
await dp.start_polling()
# Синхронная функция для выполнения фоновых задач
def background_func():
while True:
print('Hi')
time.sleep(60)
# Запуск фоновой функции в отдельном потоке
def run_background_func():
threading.Thread(target=background_func).start()
# Запуск асинхронной функции main() в отдельном потоке
def run_aiogram():
asyncio.run(main())
# Запуск обоих функций параллельно
if __name__ == '__main__':
run_background_func()
run_aiogram()
В этом примере run_background_func() и run_aiogram() запускаются одновременно в отдельных потоках. run_background_func() запускает синхронную функцию background_func(), которая выводит "Hi" каждые 60 секунд. run_aiogram() запускает асинхронную функцию main(), которая обрабатывает сообщения и команды в телеграм-боте.
Обратите внимание, что потоки работают параллельно и могут взаимодействовать с общими ресурсами. Поэтому, если ваша синхронная функция background_func() взаимодействует с ресурсами, которые используются aiogram (например, базой данных), вам может потребоваться синхронизация доступа к этим ресурсам.
я не смог Ж) в итоге у меня бот работает на telebot и обычных тредах без какой либо асинхронщины
вот он с примером одной из функций. а твою функцию можно запустить так же но при запуске указать ее как onstartup
#!/usr/bin/env python3
import threading
import telebot
import cfg
bot = telebot.TeleBot(cfg.token, skip_pending=True)
# до 40 одновременных потоков для чата с гпт и бингом
semaphore_talks = threading.Semaphore(40)
class ShowAction(threading.Thread):
"""Поток который можно остановить. Беспрерывно отправляет в чат уведомление об активности.
Телеграм автоматически гасит уведомление через 5 секунд, по-этому его надо повторять.
Использовать в коде надо как то так
with ShowAction(chat_id, 'typing'):
делаем что-нибудь и пока делаем уведомление не гаснет
"""
def __init__(self, chat_id, action):
"""_summary_
Args:
chat_id (_type_): id чата в котором будет отображаться уведомление
action (_type_): "typing", "upload_photo", "record_video", "upload_video", "record_audio",
"upload_audio", "upload_document", "find_location", "record_video_note", "upload_video_note"
"""
super().__init__()
self.actions = [ "typing", "upload_photo", "record_video", "upload_video", "record_audio",
"upload_audio", "upload_document", "find_location", "record_video_note", "upload_video_note"]
assert action in self.actions, f'Допустимые actions = {self.actions}'
self.chat_id = chat_id
self.action = action
self.is_running = True
#self.start()
self.timerseconds = 1
def run(self):
while self.is_running:
bot.send_chat_action(self.chat_id, self.action)
n = 50
while n > 0:
time.sleep(0.1)
n = n - self.timerseconds
def stop(self):
self.timerseconds = 50
self.is_running = False
def __enter__(self):
self.start()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.stop()
@bot.message_handler(content_types = ['voice'])
def handle_voice(message: telebot.types.Message):
"""Автоматическое распознавание текст из голосовых сообщений"""
thread = threading.Thread(target=handle_voice_thread, args=(message,))
thread.start()
def handle_voice_thread(message: telebot.types.Message):
"""Автоматическое распознавание текст из голосовых сообщений"""
my_log.log_media(message)
with semaphore_talks:
# Создание временного файла
with tempfile.NamedTemporaryFile(delete=False) as temp_file:
file_path = temp_file.name
# Скачиваем аудиофайл во временный файл
file_info = bot.get_file(message.voice.file_id)
downloaded_file = bot.download_file(file_info.file_path)
with open(file_path, 'wb') as new_file:
new_file.write(downloaded_file)
# Распознаем текст из аудио
# если мы не в привате и в этом чате нет блокировки автораспознавания то показываем активность
if not (message.chat.id in BLOCKS and BLOCKS[message.chat.id] == 1) or message.chat.type == 'private':
with ShowAction(message.chat.id, 'typing'):
if cfg.stt == 'vosk':
text = my_stt.stt(file_path)
elif cfg.stt == 'whisper':
text = my_whisper.get_text(file_path)
else:
if cfg.stt == 'vosk':
text = my_stt.stt(file_path)
elif cfg.stt == 'whisper':
text = my_whisper.get_text(file_path)
os.remove(file_path)
# если мы не в привате и в этом чате нет блокировки автораспознавания
if not (message.chat.id in BLOCKS and BLOCKS[message.chat.id] == 1) or message.chat.type == 'private':
# Отправляем распознанный текст
if text.strip() != '':
bot.reply_to(message, text, reply_markup=get_keyboard('hide'))
my_log.log_echo(message, f'[ASR] {text}')
else:
bot.reply_to(message, 'Очень интересно, но ничего не понятно.', reply_markup=get_keyboard('hide'))
my_log.log_echo(message, '[ASR] no results')
# и при любом раскладе отправляем текст в обработчик текстовых сообщений, возможно бот отреагирует на него если там есть кодовые слова
if text:
message.text = text
echo_all(message)
def set_default_commands():
"""
Reads a file containing a list of commands and their descriptions,
and sets the default commands for the bot.
"""
commands = []
with open('commands.txt', encoding='utf-8') as file:
for line in file:
try:
command, description = line[1:].strip().split(' - ', 1)
if command and description:
commands.append(telebot.types.BotCommand(command, description))
except Exception as error:
print(error)
bot.set_my_commands(commands)
def main():
"""
Runs the main function, which sets default commands and starts polling the bot.
"""
set_default_commands()
bot.polling()
if __name__ == '__main__':
main()