@Slonopotam44

Почему не работает коммуникация между клиентом и сервером?

Клиент и сервер общаются друг с другом посредством коммуникаторов
Базовый коммуникатор
spoiler
public abstract class TcpCommunicatorBase : ICommunicator
    {
        private const string MessageRecievedResponce = "MessageRecieved";
        private static readonly int _maxMessageLenght = 1024 * 10;
        protected TcpClient _client;
        protected Encoding _encoding;
        private bool _isWaitResponce;
        private readonly Queue<string> _messageQueue;
        /// <summary>
        ///     Переменная необходимая для остановки прослушивания
        /// </summary>
        private bool _stopListen;
        private object _streamLock;
        private bool inRead;
        protected TcpCommunicatorBase(TcpClient client, Encoding encoding)
        {
            _encoding = encoding;
            _client = client;
            _messageQueue = new Queue<string>();
            MessageReaded = (s, a) => { };
            ConnectionBroken = (s, a) =>
            {
                _client.Close();
                EndListenForMessage();
            };
            var checkTimer = new Timer(s => CheckCommunicator(), null, 1000, 1000); //Проверка соединения(HeartBit)
            ConnectionCreated = (s, a) => { };
        }
        /// <summary>
        ///     Получение потока
        /// </summary>
        protected NetworkStream Stream => _client.GetStream();
        /// <summary>
        ///     Отправка сообщения
        /// </summary>
        /// <param name="text"></param>
        public virtual void SendMessage(string text)
        {
            if (text == null)
                return;
            Debug.Print($"Add to sending queue {text}");
            _messageQueue.Enqueue(text);
            SendMessageFromQueue();
        }
        /// <summary>
        ///     Проверка соединения
        /// </summary>
        public virtual void CheckCommunicator()
        {
            if (!_client.Connected) OnConnectionBroken();
        }
        /// <summary>
        ///     Комманда к началу ожидания сообщений
        /// </summary>
        public virtual void BeginListenForMessage()
        {
            _stopListen = false;
            var bufer = new byte[4];
            if (inRead)
                return;
            try
            {
                inRead = true;
                if (Stream.DataAvailable)
                    Read();
                else
                    Stream.BeginRead(bufer, 0, bufer.Length, MessageRecievedAsyncCallback, bufer);
            }
            catch (Exception e)
            {
                Debug.Print(e.Message);
            }
        }
        /// <summary>
        ///     Комманда к окончанию ожидания сообщений
        /// </summary>
        public virtual void EndListenForMessage()
        {
            _stopListen = true;
        }
        /// <summary>
        ///     Событие к прочтению сообщения
        /// </summary>
        public event EventHandler<MessageReadedEventArgs> MessageReaded;
        /// <summary>
        ///     Событие к разрыву связи
        /// </summary>
        public event EventHandler ConnectionBroken;
        public event EventHandler ConnectionCreated;
        /// <summary>
        ///     Подключение к IP
        /// </summary>
        /// <param name="adres"></param>
        /// <param name="port"></param>
        /// <returns></returns>
        public virtual bool Connect(string adres, int port)
        {
            _client?.Close();
            try
            {
                _client = new TcpClient(adres, port);
                return true;
            }
            catch
            {
                return false;
            }
        }
        public abstract void ConnectAsync(string adres, int port);
        /// <summary>
        ///     Отключение коммуникатора
        /// </summary>
        public virtual void Disconect()
        {
            _client?.Close();
        }
        public virtual void Dispose()
        {
            Dispose(true);
        }
        protected virtual void SendMessageFromQueue()
        {
            if (_messageQueue.Count == 0)
                return;
            if (!_isWaitResponce)
            {
                var text = _messageQueue.Dequeue();
                Debug.Print($"Sending:{text}");
                var list = new List<byte>();
                var messageLenght =
                    (uint) _encoding.GetByteCount(text);
                list.AddRange(
                    BitConverter.GetBytes(messageLenght)); //В начало сообщения добавляется его длинна в байтах
                var enumerable = _encoding.GetBytes(text);
                list.AddRange(enumerable);
                var bytes = list.ToArray();
                Stream.Write(bytes, 0, bytes.Length);
                _isWaitResponce = true;
            }
        }
        private void SendMessageRecievedResponce()
        {
            var text = MessageRecievedResponce;
            var list = new List<byte>();
            var messageLenght =
                (uint) _encoding.GetByteCount(text);
            list.AddRange(BitConverter.GetBytes(messageLenght)); //В начало сообщения добавляется его длинна в байтах
            var enumerable = _encoding.GetBytes(text);
            list.AddRange(enumerable);
            var bytes = list.ToArray();
            Stream.Write(bytes, 0, bytes.Length);
        }
        private void SendWithoutWaiting(string text)
        {
            var bytes = _encoding.GetBytes(text);
            Stream.Write(bytes, 0, bytes.Count());
        }
        public virtual string Read()
        {
            try
            {
                var bufer = new byte[4];
                Stream.Read(bufer, 0, bufer.Length);
                string message;
                var expectedMessageLenght = BitConverter.ToUInt32(bufer.ToArray(), 0);
                var messageBytes = GetMessageBytes(expectedMessageLenght);
                var i = messageBytes.Length;
                inRead = false;
                message = _encoding.GetString(messageBytes);
                HandleMessage(message, i, expectedMessageLenght);
                return message;
            }
            catch
            {
                return null;
                //Ошибка при считывании
            }
        }
        /// <summary>
        ///     Вызывается при получение сообщения
        /// </summary>
        /// <param name="ar"></param>
        protected virtual void MessageRecievedAsyncCallback(IAsyncResult ar)
        {
            try
            {
                var _bufer = ar.AsyncState as byte[];
                string message;
                var expectedMessageLenght = BitConverter.ToUInt32(_bufer.ToArray(), 0);
                try
                {
                    Stream.EndRead(ar);
                }
                catch
                {
                    message = string.Empty;
                    return;
                    //Связь разорванна
                }
                var messageBytes = GetMessageBytes(expectedMessageLenght);
                var i = messageBytes.Length;
                inRead = false;
                message = _encoding.GetString(messageBytes);
                HandleMessage(message, i, expectedMessageLenght);
            }
            catch
            {
                //Ошибка при считывании
            }
        }
        private void HandleMessage(string Message, int ReadedLenght, uint ExpectedMessageLenght)
        {
            Debug.Print($"message:{Message}");
            if (!_stopListen) //Если прослушку не остановили
            {
                if (ReadedLenght == ExpectedMessageLenght)
                    if (Message == MessageRecievedResponce)
                    {
                        _isWaitResponce = false;
                        SendMessageFromQueue();
                    }
                    else
                    {
                        //if (!_isMessageValid.TryParse(Message).WasSuccessful)
                        //{
                        //    SendWithoutWaiting(Message); //В случае, если тип сообщения не поддерживается
                        //}
                        //else
                        //{
                        SendMessageRecievedResponce();
                        MessageReaded(this, new MessageReadedEventArgs
                        {
                            Message = Message
                        });
                        //}
                    }

                BeginListenForMessage();
            }
        }

        private byte[] GetMessageBytes(uint ExpectedLenght)
        {
            Debug.Print($"Message lenght:{ExpectedLenght}");

            if (ExpectedLenght > _maxMessageLenght)
            {
                Debug.Print("Message ignored");
                Stream.Flush();
                return null;
            }

            var messageBytes = new byte[ExpectedLenght];

            var i = 0;

            while (i < ExpectedLenght)
                if (Stream.DataAvailable)
                {
                    messageBytes[i] = (byte) Stream.ReadByte();
                    i++;
                }

            return messageBytes;
        }

        protected virtual void OnConnectionBroken()
        {
            ConnectionBroken?.Invoke(this, EventArgs.Empty);
        }

        private void ReleaseUnmanagedResources()
        {
        }

        private void Dispose(bool disposing)
        {
            ReleaseUnmanagedResources();
            if (disposing)
            {
                _client?.Close();
                _client?.Dispose();
            }
        }

        protected virtual void OnConnectionCreated()
        {
            ConnectionCreated?.Invoke(this, EventArgs.Empty);
        }
    }
}


В какой то момент, клиентский коммуникатор не отправляет сообщение на сервер (даже не добавляет его в очередь).
В чем может быть проблема?
  • Вопрос задан
  • 78 просмотров
Пригласить эксперта
Ответы на вопрос 1
@basrach
Навскидку есть несколько подозрительных мест:
  1. Флаг _isWaitResponce в методе SendMessageFromQueue класса TcpCommunicatorBase, который запрещает отправку если поднят (имеет значение true). При этом сбрасывается он только после получения ответа, и то не гарантированно, для этого нужно чтобы выполнился ряд условий, и он не сбросится в случае исключения. Таким образом этот флаг может быть причиной ошибки, если клиент не получил ответа после отправки, либо при получении ответа произошла ошибка.
  2. Флаг inRead в методе BeginListenForMessage того же класса. Этот флаг также запрещает чтение если поднят, и гарантированно не сбрасывается. Т.е. может остаться в состоянии true если произошло исключение про чтении сообщения. Соответственно сломанное чтение на клиенте может заблокировать отправку.
  3. В CustomSynchronizationContext с хабра есть баг, когда сообщение может висеть в очереди потока и не обрабатываться до тех пор, пока не придет еще одно.

PS.
Имхо, в целом код не выглядит надежным. Когда отправка завязана на получение, и все это в одном классе, и для обоих процессов используются те же переменные, всё это резко усложняет понимание и отладку, и снижает надежность кода.
Ответ написан
Комментировать
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы