<a href="tg://user?id={message.from_user.id}">{message.from_user.full_name}</a>
@router.message(CommandStart())
async def cmd_start(message:Message):
# user_in_bd = тут из функции проверки пользователя в бд получаешь True или False
if user_in_bd:
await message.answer('Привет!)
else:
await message.answer('Привет! Для использования бота вам нужно оставить заявку.')
def sub_verification(chat_member):
if chat_member["status"] != "left":
return True
else:
return False
1. Нужно ли мне создавать для этого отдельный класс
2. Нужно ли мне использовать state.clear()
если да, то как мне получить state
state: FSMContext
global acceptable_time
eval(await message.text)
callback_query.message.answer
class AlbumMiddleware(BaseMiddleware):
"""This middleware is for capturing media groups."""
album_data: dict = {}
def __init__(self, latency = 0.6):
"""
You can provide custom latency to make sure
albums are handled properly in highload.
"""
self.latency = latency
super().__init__()
async def on_process_message(self, message: types.Message, data: dict):
if not message.media_group_id:
if message.photo :
data["one_photo"] = message
elif message.animation or message.video or message.video_note or message.voice or message.audio:
data["album"] = [message]
return
try:
self.album_data[message.media_group_id].append(message)
raise CancelHandler() # Tell aiogram to cancel handler for this group element
except KeyError:
self.album_data[message.media_group_id] = [message]
await asyncio.sleep(self.latency)
message.conf["is_last"] = True
data["album"] = self.album_data[message.media_group_id]
async def on_post_process_message(self, message: types.Message, result: dict, data: dict):
"""Clean up after handling our album."""
if message.media_group_id and message.conf.get("is_last"):
del self.album_data[message.media_group_id]
async def func(
message: types.Message,
album: Optional[List[types.Message]] = None,
one_photo: Optional[types.Message] = None
):
from fastapi import FastAPI, Request
from tgbot.main import tgbot
app = FastAPI()
@app.post('/api/bot')
async def tgbot_webhook_route(request: Request):
update_dict = await request.json()
await tgbot.update_bot(update_dict)
return ''
import asyncio
from aiogram import Bot, Dispatcher, Router
from tgbot.handlers import router
class TGBot:
def __init__(self, router: Router) -> None:
token = config('TOKEN')
self.bot = Bot(token)
self.dp = Dispatcher()
self.dp.include_router(router)
if not config('DEBUG', default=False):
loop = asyncio.get_event_loop()
loop.run_until_complete(self.set_commands())
loop.run_until_complete(self.set_webhook())
async def update_bot(self, update: dict) -> None:
await self.dp.feed_raw_update(self.bot, update)
await self.bot.session.close()
async def set_webhook(self):
webhook_url = config('WEBHOOK_URL')
# WEBHOOK_URL = адрес сайта/api/bot
await self.bot.set_webhook(webhook_url)
await self.bot.session.close()
tgbot = TGBot(router)
{
"rewrites": [
{ "source": "/(.*)", "destination": "/api/index" }
]
}
The Executor has been entirely removed; you can now use the Dispatcher directly to start poll the API or handle webhooks from it.
bot.send_message
global x
x = message.message_id