Задать вопрос
@windf1n

Почему кириллица заменяется на вопросы?

import socket
import struct
import sys
import time
A2S_INFO = b'\xFF\xFF\xFF\xFFTSource Engine Query\x00'
A2S_PLAYERS = b'\xFF\xFF\xFF\xFF\x55'
A2S_RULES = b'\xFF\xFF\xFF\xFF\x56'
S2A_INFO_SOURCE = chr(0x49)
S2A_INFO_GOLDSRC = chr(0x6D)
class SourceQuery(object):
    is_third = False
    __sock = None
    __challenge = None
    def __init__(self, addr, port=27015, timeout=5.0):
        self.ip, self.port, self.timeout = socket.gethostbyname(addr), port, timeout
        if sys.version_info >= (3, 0):
            self.is_third = True
    def disconnect(self):
        """ Close socket """
        if self.__sock is not None:
            self.__sock.close()
            self.__sock = False
    def connect(self):
        """ Opens a new socket """
        self.disconnect()
        self.__sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self.__sock.settimeout(self.timeout)
        self.__sock.connect((self.ip, self.port))
    def get_ping(self):
        """ Get ping to server """
        return self.get_info()['Ping']
    def get_info(self):
        """ Retrieves information about the server including, but not limited to: its name, the map currently being played, and the number of players. """
        if self.__sock is None:
            self.connect()
        self.__sock.send(A2S_INFO)
        before = time.time()
        try:
            data = self.__sock.recv(4096)
        except:
            return False
        after = time.time()
        data = data[4:]
        result = {}
        header, data = self.__get_byte(data)
        result['Ping'] = int((after - before) * 1000)
        if chr(header) == S2A_INFO_SOURCE:
            result['Protocol'], data = self.__get_byte(data)
            result['Hostname'], data = self.__get_string(data)
            result['Map'], data = self.__get_string(data)
            result['GameDir'], data = self.__get_string(data)
            result['GameDesc'], data = self.__get_string(data)
            result['AppID'], data = self.__get_short(data)
            result['Players'], data = self.__get_byte(data)
            result['MaxPlayers'], data = self.__get_byte(data)
            result['Bots'], data = self.__get_byte(data)
            dedicated, data = self.__get_byte(data)
            if chr(dedicated) == 'd':
                result['Dedicated'] = 'Dedicated'
            elif dedicated == 'l':
                result['Dedicated'] = 'Listen'
            else:
                result['Dedicated'] = 'SourceTV'
            os, data = self.__get_byte(data)
            if chr(os) == 'w':
                result['OS'] = 'Windows'
            elif chr(os) in ('m', 'o'):
                result['OS'] = 'Mac'
            else:
                result['OS'] = 'Linux'
            result['Password'], data = self.__get_byte(data)
            result['Secure'], data = self.__get_byte(data)
            if result['AppID'] == 2400:  # The Ship server
                result['GameMode'], data = self.__get_byte(data)
                result['WitnessCount'], data = self.__get_byte(data)
                result['WitnessTime'], data = self.__get_byte(data)
            result['Version'], data = self.__get_string(data)
            edf, data = self.__get_byte(data)
            try:
                if edf & 0x80:
                    result['GamePort'], data = self.__get_short(data)
                if edf & 0x10:
                    result['SteamID'], data = self.__get_long_long(data)
                if edf & 0x40:
                    result['SpecPort'], data = self.__get_short(data)
                    result['SpecName'], data = self.__get_string(data)
                if edf & 0x10:
                    result['Tags'], data = self.__get_string(data)
            except:
                pass
        elif chr(header) == S2A_INFO_GOLDSRC:
            result['GameIP'], data = self.__get_string(data)
            result['Hostname'], data = self.__get_string(data)
            result['Map'], data = self.__get_string(data)
            result['GameDir'], data = self.__get_string(data)
            result['GameDesc'], data = self.__get_string(data)
            result['Players'], data = self.__get_byte(data)
            result['MaxPlayers'], data = self.__get_byte(data)
            result['Version'], data = self.__get_byte(data)
            dedicated, data = self.__get_byte(data)
            if chr(dedicated) == 'd':
                result['Dedicated'] = 'Dedicated'
            elif dedicated == 'l':
                result['Dedicated'] = 'Listen'
            else:
                result['Dedicated'] = 'HLTV'
            os, data = self.__get_byte(data)
            if chr(os) == 'w':
                result['OS'] = 'Windows'
            else:
                result['OS'] = 'Linux'
            result['Password'], data = self.__get_byte(data)
            result['IsMod'], data = self.__get_byte(data)
            if result['IsMod']:
                result['URLInfo'], data = self.__get_string(data)
                result['URLDownload'], data = self.__get_string(data)
                data = self.__get_byte(data)[1]  # NULL-Byte
                result['ModVersion'], data = self.__get_long(data)
                result['ModSize'], data = self.__get_long(data)
                result['ServerOnly'], data = self.__get_byte(data)
                result['ClientDLL'], data = self.__get_byte(data)
            result['Secure'], data = self.__get_byte(data)
            result['Bots'], data = self.__get_byte(data)
        return result
    # <------------------getInfo() End -------------------------->
    def get_challenge(self):
        """ Get challenge number for A2S_PLAYER and A2S_RULES queries. """
        if self.__sock is None:
            self.connect()
        self.__sock.send(A2S_PLAYERS + b'0xFFFFFFFF')
        try:
            data = self.__sock.recv(4096)
        except:
            return False
        self.__challenge = data[5:]
        return True
    # <-------------------getChallenge() End --------------------->
    def get_players(self):
        """ Retrieve information about the players currently on the server. """
        if self.__sock is None:
            self.connect()
        if self.__challenge is None:
            self.get_challenge()
        self.__sock.send(A2S_PLAYERS + self.__challenge)
        try:
            data = self.__sock.recv(4096)
        except:
            return False
        data = data[4:]
        header, data = self.__get_byte(data)
        num, data = self.__get_byte(data)
        result = []
        try:
            for i in range(num):
                player = {}
                data = self.__get_byte(data)[1]
                player['id'] = i + 1  # ID of All players is 0
                player['Name'], data = self.__get_string(data)
                player['Frags'], data = self.__get_long(data)
                player['Time'], data = self.__get_float(data)
                ftime = time.gmtime(int(player['Time']))
                player['FTime'] = ftime
                player['PrettyTime'] = time.strftime('%H:%M:%S', ftime)
                result.append(player)
        except Exception:
            pass
        return result
    # <-------------------getPlayers() End ----------------------->
    def get_rules(self):
        """ Returns the server rules, or configuration variables in name/value pairs. """
        if self.__sock is None:
            self.connect()
        if self.__challenge is None:
            self.get_challenge()
        self.__sock.send(A2S_RULES + self.__challenge)
        try:
            data = self.__sock.recv(4096)
            if data[0] == '\xFE':
                num_packets = ord(data[8]) & 15
                packets = [' ' for i in range(num_packets)]
                for i in range(num_packets):
                    if i != 0:
                        data = self.__sock.recv(4096)
                    index = ord(data[8]) >> 4
                    packets[index] = data[9:]
                data = ''
                for i, packet in enumerate(packets):
                    data += packet
        except:
            return False
        data = data[4:]
        header, data = self.__get_byte(data)
        num, data = self.__get_short(data)
        result = {}
        # Server sends incomplete packets. Ignore "NumPackets" value.
        while 1:
            try:
                rule_name, data = self.__get_string(data)
                rule_value, data = self.__get_string(data)
                if rule_value:
                    result[rule_value] = rule_name
            except:
                break
        return result
    # <-------------------getRules() End ------------------------->
    def __get_byte(self, data):
        if self.is_third:
            return data[0], data[1:]
        else:
            return ord(data[0]), data[1:]
    def __get_short(self, data):
        return struct.unpack('<h', data[0:2])[0], data[2:]
    def __get_long(self, data):
        return struct.unpack('<l', data[0:4])[0], data[4:]
    def __get_long_long(self, data):
        return struct.unpack('<Q', data[0:8])[0], data[8:]
    def __get_float(self, data):
        return struct.unpack('<f', data[0:4])[0], data[4:]
    def __get_string(self, data):
        s = ""
        i = 0
        if not self.is_third:
            while data[i] != '\x00':
                s += data[i]
                i += 1
        else:
            while chr(data[i]) != '\x00':
                s += chr(data[i])
                i += 1
        return s, data[i + 1:]
# Just for testing
if __name__ == '__main__':
    query = SourceQuery(str("185.195.25.53"), int(27016))
    players = query.get_players()
    for player in players:
        print("{id:<2} {Name:<35} {Frags:<5} {PrettyTime}".format(**player))
    res = query.get_info()
    print(res['Hostname'])
    print(res['Map'])
    print("%i/%i" % (res['Players'], res['MaxPlayers']))
    query.disconnect()
    query = False

Кириллица на вопросы(
  • Вопрос задан
  • 213 просмотров
Подписаться 1 Простой 4 комментария
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы