Часть первая: Как прочитать текстовый файл по событию, которое получаем с помощью Observer.
Пример: В директории 'C:\ForTest' лежит файл text.txt С помощью нижеуказанного кода, если мы что-то напишем в в файл text.txt и сохраним его, в консоль будет выведен путь к файлу, который изменен. А как получить текст файла, который был изменен? Если пробовать открыть файл с помощью полученного пути - он его не видит по какой-то причине.
import time
from watchdog.observers import Observer
from watchdog.events import PatternMatchingEventHandler
class MyHandler(PatternMatchingEventHandler):
patterns = ["*.txt", "*.jpg"]
def process(self, event):
"""
event.event_type
'modified' | 'created' | 'moved' | 'deleted'
event.is_directory
True | False
event.src_path
path/to/observed/file
"""
# print(event.src_path, event.event_type)
out = str(event.src_path)
print(out)
def on_modified(self, event):
self.process(event)
def on_created(self, event):
self.process(event)
if __name__ == '__main__':
args = 'C:\\ForTest'
observer = Observer()
observer.schedule(MyHandler(), path=args if args else '.')
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
Часть вторая: как объединить поток Observer с потоком bot_polling Aiogram так, чтобы текст из файла test.txt, отправлялся в бот
import asyncio
from aiogram import Bot, Dispatcher, Router
from aiogram.types import Message, CallbackQuery
from aiogram.filters import CommandStart
router: Router = Router()
async def start():
bot: Bot = Bot(token=BOT_TOKEN)
dp: Dispatcher = Dispatcher()
dp.include_router(router=router)
try:
await dp.start_polling(bot)
finally:
await bot.session.close()
if __name__ == '__main__':
asyncio.run(start())