@pivodev

Как сделать чтобы я получил информацию о сервере CS2?

Я хочу получить информацию о сервере кс2 но получаю только информацию о сервере ксго вот строчки кода
def get_server_info(ip, port):
    try:
        with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
            sock.settimeout(5)  # Устанавливаем таймаут на 5 секунд
            sock.connect((ip, port))
            sock.send(b'\xFF\xFF\xFF\xFF\x54Source Engine Query\x00')
            data = sock.recv(4096)
            return data.decode('latin1').split('\x00')[1:]
    except Exception as e:
        logging.error(f"Ошибка при получении информации о сервере: {e}")
        return None
  • Вопрос задан
  • 220 просмотров
Пригласить эксперта
Ответы на вопрос 1
@AsQ_QQ
from ServerGet.ConfigRead import read_all_port, read_ip
import concurrent.futures, socket, a2s
QUERY_TIMEOUT = 4
import socket


class servergetinfo:
    def __init__(self):
        self.ip = read_ip()
        self.ports = read_all_port()


    def get_server_info(self):
        """Функция для получения информации о серверах из конфигурационного файла

        Raises:
            info_except: Информация о серверах

        Returns:
            _type_: Список словарей с информацией о серверах
        """


        info = []
        try:
            with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
                sock.settimeout(5)  # Устанавливаем таймаут на 5 секунд
                for port in self.ports:
                    ip, port = self.ip, int(port)

                    # Запрос подключения к серверу
                    try:
                        socket.getaddrinfo(ip, port)
                    except socket.gaierror:
                        print("Неверный адрес сервера")
                    
                    # Получение информации о сервере
                    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as pool:
                        info_future = pool.submit(
                            a2s.info, (ip, port), timeout=QUERY_TIMEOUT)

                    # Инфомарция сервера (временная)
                    info_except = info_future.exception()

                    # Проверка ответа сервера
                    if isinstance(info_except, socket.timeout):
                        print("Сервер не ответил")
                    elif isinstance(info_except, a2s.BrokenMessageError):
                        print("Сервер отправил неверный ответ")
                    elif info_except is not None:
                        raise info_except
                    
                    # Инфомария сервера
                    info_res = info_future.result()
                    info.append(info_res)

                return info
        except Exception as e:
            print(f"Ошибка при получении информации о сервере: {e}")
            return None
Ответ написан
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Войти через центр авторизации
Похожие вопросы