from ServerGet.ConfigRead import read_all_port, read_ip
import concurrent.futures, socket, a2s
QUERY_TIMEOUT = 4
import socket
class servergetinfo:
def __init__(self):
self.ip = read_ip()
self.ports = read_all_port()
def get_server_info(self):
"""Функция для получения информации о серверах из конфигурационного файла
Raises:
info_except: Информация о серверах
Returns:
_type_: Список словарей с информацией о серверах
"""
info = []
try:
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
sock.settimeout(5) # Устанавливаем таймаут на 5 секунд
for port in self.ports:
ip, port = self.ip, int(port)
# Запрос подключения к серверу
try:
socket.getaddrinfo(ip, port)
except socket.gaierror:
print("Неверный адрес сервера")
# Получение информации о сервере
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as pool:
info_future = pool.submit(
a2s.info, (ip, port), timeout=QUERY_TIMEOUT)
# Инфомарция сервера (временная)
info_except = info_future.exception()
# Проверка ответа сервера
if isinstance(info_except, socket.timeout):
print("Сервер не ответил")
elif isinstance(info_except, a2s.BrokenMessageError):
print("Сервер отправил неверный ответ")
elif info_except is not None:
raise info_except
# Инфомария сервера
info_res = info_future.result()
info.append(info_res)
return info
except Exception as e:
print(f"Ошибка при получении информации о сервере: {e}")
return None