import os
from ldap3 import Server, Connection, ALL_ATTRIBUTES, ALL
# Установка параметров подключения к Active Directory
def get_emails():
server = Server('172.XXXX', get_info=ALL)
# Указываем учетные данные для подключения
username = os.getenv("user_adm")
password = os.getenv("user_pw")
# Устанавливаем соединение с Active Directory
conn = Connection(server, user=username, password=password)
conn.bind()
# Выполняем поиск пользователей по полю email
email = '*'
filter_str = f'(&(mail={email})(userAccountControl:1.2.840.113556.1.4.803:=512)(!(userAccountControl:1.2.840.113556.1.4.803:=2)))'
conn.search('DC=smmretail,DC=ru', filter_str, attributes=ALL_ATTRIBUTES)
emails = [
entry.mail for entry in conn.entries
]
#
# if conn.result:
# print('Ошибка при выполнении поиска:', conn.result)
#
# if conn.bind():
# print('Успешная привязка к серверу')
# else:
# print('Ошибка привязки к серверу')
print(emails)
print('Количество найденных записей:', len(conn.entries))
# Закрываем соединение
conn.unbind()
# exit()
#return emails
import os
from aiogram import Bot, Dispatcher, types
from aiogram.utils import executor
from validators import email
from ad import get_emails
from db import Db
bot = Bot(token=os.getenv("bot_token"))
dp = Dispatcher(bot)
@dp.message_handler(lambda message: '@' in message.text and message.text.endswith('.ru'))
async def echo_send(message: types.Message):
if email(message.text.strip()):
emails = await get_emails()
print(emails)
if message.text.strip() in emails:
# Проверка email в ад. Вызываю функцию генерации кода для авторизации.
unicode = Db.generation_unicode()
await Db().add_to_db(
mail_sender='telegram_bot@XXXXX',
mail_receiver=message.text.strip(),
mail_text=f'Ваш код для авторизации {unicode}',
tg_id=message.from_user.id,
msg_code=unicode
)
await bot.send_message(message.chat.id, 'Введите полученый код с почты.')
else:
await bot.send_message(message.chat.id, 'Вашей почты нету в базе данных.')
else:
await bot.send_message(message.chat.id, 'Введеная почта некорректна.')
if __name__ == '__main__':
executor.start_polling(dp, skip_updates=True)