import sqlite3
import secrets
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from typing import Dict, Union
from datetime import datetime, timedelta
from aiogram import Bot, Dispatcher, types
from aiogram.dispatcher import FSMContext
from aiogram.dispatcher.filters import Text
from aiogram.dispatcher.filters.state import State, StatesGroup
from aiogram.types import ParseMode, ChatActions
from aiogram.utils import executor
from aiogram.contrib.fsm_storage.memory import MemoryStorage
from Keyboards.Keyboard import Registration, main_menu, daily_book, Kyrs1, Kyrs2, Kyrs3, Kyrs4
# Connect to the database
conn = sqlite3.connect("database.db")
cursor = conn.cursor()
# Set the bot token and initialize the bot, dispatcher and FSM storage
TOKEN = "token"
bot = Bot(token=TOKEN)
storage = MemoryStorage()
dp = Dispatcher(bot, storage=storage)
#Структура бази данних
# Створення таблиці Login Token
cursor.execute("""
CREATE TABLE IF NOT EXISTS Login_token (
id INTEGER PRIMARY KEY,
value TEXT,
student_id INTEGER,
sent INTEGER,
used INTEGER,
due_date TEXT,
FOREIGN KEY(student_id) REFERENCES Student(id)
);
""")
conn.commit()
class Student:
def __init__(self, id: int, email: str, status: str):
self.id = id
self.email = email
self.status = status
async def process_email_handler(message: types.Message, state: FSMContext) -> None:
# Check if a user exists with this email
email = message.text.lower().strip()
cursor.execute(f"SELECT * FROM student WHERE LOWER(email)='{email}'")
student_data = cursor.fetchone()
if not student_data:
await message.answer("Користувача з такою електронною поштою не знайдено. Спробуйте ще раз.")
return
# Generate a new token and send it to the email
student_id = student_data[0]
token = generate_token(student_id)
await send_token(token, email)
# Store the user information in the FSM
await state.update_data(student_id=student_id, token=token)
await AuthStates.waiting_for_token.set()
# Generate a new token and store it in the database
def generate_token(student_id: int) -> str:
token = secrets.token_urlsafe(4)
cursor.execute(f"INSERT INTO login_token (value, student_id, sent, used, due_date) VALUES ('{token}', {student_id}, 0, 0, '{(datetime.now() + timedelta(days=1)).isoformat()}')")
conn.commit()
return token
# Send a token to an email address
async def send_token(token: str, email: str) -> None:
username = "d.maksim4ik69@gmail.com"
password = "password"
mail_from = "d.maksim4ik69@gmail.com"
mail_to = email
mail_subject = "Verefication"
mail_body = f"Ваш код для авторизації в телеграм бот : {token} "
mimemsg = MIMEMultipart()
mimemsg['From'] = mail_from
mimemsg['To'] = mail_to
mimemsg['Subject'] = mail_subject
mimemsg.attach(MIMEText(mail_body))
try:
smtpObj = smtplib.SMTP('smtp.gmail.com', 587)
smtpObj.starttls()
smtpObj.login(username, password)
smtpObj.sendmail(mail_from, mail_to, mimemsg.as_string())
smtpObj.quit()
print(f"Email sent to {email} with token {token}")
except smtplib.SMTPException as e:
print(f"Error: {e}")
class AuthStates(StatesGroup):
waiting_for_email = State()
waiting_for_token = State()
@dp.message_handler(commands=['start'])
async def start(message: types.Message):
await message.answer("Вітаємо! Введіть свою електронну пошту для отримання коду авторизації.")
await AuthStates.waiting_for_email.set()
@dp.message_handler(state=AuthStates.waiting_for_email)
async def process_email(message: types.Message, state: FSMContext):
await process_email_handler(message, state)
@dp.message_handler(state=AuthStates.waiting_for_token)
async def process_token(message: types.Message, state: FSMContext):
data = await state.get_data()
student_id = data.get("student_id")
token = data.get("token")
if message.text == token:
cursor.execute(f"UPDATE login_token SET used=1 WHERE value='{token}'")
conn.commit()
student = get_student(student_id)
await message.answer(f"Вітаємо, {student.email}! Ви успішно авторизувались.")
await state.finish()
await message.answer("Ось меню, яке ви можете використовувати : ", reply_markup=main_menu)
else:
await message.answer("Код авторизації недійсний. Спробуйте ще раз.")
def get_student(student_id: int) -> Student:
cursor.execute(f"SELECT * FROM student WHERE id={student_id}")
student_data = cursor.fetchone()
student = Student(*student_data)
return student
if __name__ == '__main__':
executor.start_polling(dp)
# asyncio.get_event_loop().run_forever()