public abstract class TcpCommunicatorBase : ICommunicator
{
private const string MessageRecievedResponce = "MessageRecieved";
private static readonly int _maxMessageLenght = 1024 * 10;
protected TcpClient _client;
protected Encoding _encoding;
private bool _isWaitResponce;
private readonly Queue<string> _messageQueue;
/// <summary>
/// Переменная необходимая для остановки прослушивания
/// </summary>
private bool _stopListen;
private object _streamLock;
private bool inRead;
protected TcpCommunicatorBase(TcpClient client, Encoding encoding)
{
_encoding = encoding;
_client = client;
_messageQueue = new Queue<string>();
MessageReaded = (s, a) => { };
ConnectionBroken = (s, a) =>
{
_client.Close();
EndListenForMessage();
};
var checkTimer = new Timer(s => CheckCommunicator(), null, 1000, 1000); //Проверка соединения(HeartBit)
ConnectionCreated = (s, a) => { };
}
/// <summary>
/// Получение потока
/// </summary>
protected NetworkStream Stream => _client.GetStream();
/// <summary>
/// Отправка сообщения
/// </summary>
/// <param name="text"></param>
public virtual void SendMessage(string text)
{
if (text == null)
return;
Debug.Print($"Add to sending queue {text}");
_messageQueue.Enqueue(text);
SendMessageFromQueue();
}
/// <summary>
/// Проверка соединения
/// </summary>
public virtual void CheckCommunicator()
{
if (!_client.Connected) OnConnectionBroken();
}
/// <summary>
/// Комманда к началу ожидания сообщений
/// </summary>
public virtual void BeginListenForMessage()
{
_stopListen = false;
var bufer = new byte[4];
if (inRead)
return;
try
{
inRead = true;
if (Stream.DataAvailable)
Read();
else
Stream.BeginRead(bufer, 0, bufer.Length, MessageRecievedAsyncCallback, bufer);
}
catch (Exception e)
{
Debug.Print(e.Message);
}
}
/// <summary>
/// Комманда к окончанию ожидания сообщений
/// </summary>
public virtual void EndListenForMessage()
{
_stopListen = true;
}
/// <summary>
/// Событие к прочтению сообщения
/// </summary>
public event EventHandler<MessageReadedEventArgs> MessageReaded;
/// <summary>
/// Событие к разрыву связи
/// </summary>
public event EventHandler ConnectionBroken;
public event EventHandler ConnectionCreated;
/// <summary>
/// Подключение к IP
/// </summary>
/// <param name="adres"></param>
/// <param name="port"></param>
/// <returns></returns>
public virtual bool Connect(string adres, int port)
{
_client?.Close();
try
{
_client = new TcpClient(adres, port);
return true;
}
catch
{
return false;
}
}
public abstract void ConnectAsync(string adres, int port);
/// <summary>
/// Отключение коммуникатора
/// </summary>
public virtual void Disconect()
{
_client?.Close();
}
public virtual void Dispose()
{
Dispose(true);
}
protected virtual void SendMessageFromQueue()
{
if (_messageQueue.Count == 0)
return;
if (!_isWaitResponce)
{
var text = _messageQueue.Dequeue();
Debug.Print($"Sending:{text}");
var list = new List<byte>();
var messageLenght =
(uint) _encoding.GetByteCount(text);
list.AddRange(
BitConverter.GetBytes(messageLenght)); //В начало сообщения добавляется его длинна в байтах
var enumerable = _encoding.GetBytes(text);
list.AddRange(enumerable);
var bytes = list.ToArray();
Stream.Write(bytes, 0, bytes.Length);
_isWaitResponce = true;
}
}
private void SendMessageRecievedResponce()
{
var text = MessageRecievedResponce;
var list = new List<byte>();
var messageLenght =
(uint) _encoding.GetByteCount(text);
list.AddRange(BitConverter.GetBytes(messageLenght)); //В начало сообщения добавляется его длинна в байтах
var enumerable = _encoding.GetBytes(text);
list.AddRange(enumerable);
var bytes = list.ToArray();
Stream.Write(bytes, 0, bytes.Length);
}
private void SendWithoutWaiting(string text)
{
var bytes = _encoding.GetBytes(text);
Stream.Write(bytes, 0, bytes.Count());
}
public virtual string Read()
{
try
{
var bufer = new byte[4];
Stream.Read(bufer, 0, bufer.Length);
string message;
var expectedMessageLenght = BitConverter.ToUInt32(bufer.ToArray(), 0);
var messageBytes = GetMessageBytes(expectedMessageLenght);
var i = messageBytes.Length;
inRead = false;
message = _encoding.GetString(messageBytes);
HandleMessage(message, i, expectedMessageLenght);
return message;
}
catch
{
return null;
//Ошибка при считывании
}
}
/// <summary>
/// Вызывается при получение сообщения
/// </summary>
/// <param name="ar"></param>
protected virtual void MessageRecievedAsyncCallback(IAsyncResult ar)
{
try
{
var _bufer = ar.AsyncState as byte[];
string message;
var expectedMessageLenght = BitConverter.ToUInt32(_bufer.ToArray(), 0);
try
{
Stream.EndRead(ar);
}
catch
{
message = string.Empty;
return;
//Связь разорванна
}
var messageBytes = GetMessageBytes(expectedMessageLenght);
var i = messageBytes.Length;
inRead = false;
message = _encoding.GetString(messageBytes);
HandleMessage(message, i, expectedMessageLenght);
}
catch
{
//Ошибка при считывании
}
}
private void HandleMessage(string Message, int ReadedLenght, uint ExpectedMessageLenght)
{
Debug.Print($"message:{Message}");
if (!_stopListen) //Если прослушку не остановили
{
if (ReadedLenght == ExpectedMessageLenght)
if (Message == MessageRecievedResponce)
{
_isWaitResponce = false;
SendMessageFromQueue();
}
else
{
//if (!_isMessageValid.TryParse(Message).WasSuccessful)
//{
// SendWithoutWaiting(Message); //В случае, если тип сообщения не поддерживается
//}
//else
//{
SendMessageRecievedResponce();
MessageReaded(this, new MessageReadedEventArgs
{
Message = Message
});
//}
}
BeginListenForMessage();
}
}
private byte[] GetMessageBytes(uint ExpectedLenght)
{
Debug.Print($"Message lenght:{ExpectedLenght}");
if (ExpectedLenght > _maxMessageLenght)
{
Debug.Print("Message ignored");
Stream.Flush();
return null;
}
var messageBytes = new byte[ExpectedLenght];
var i = 0;
while (i < ExpectedLenght)
if (Stream.DataAvailable)
{
messageBytes[i] = (byte) Stream.ReadByte();
i++;
}
return messageBytes;
}
protected virtual void OnConnectionBroken()
{
ConnectionBroken?.Invoke(this, EventArgs.Empty);
}
private void ReleaseUnmanagedResources()
{
}
private void Dispose(bool disposing)
{
ReleaseUnmanagedResources();
if (disposing)
{
_client?.Close();
_client?.Dispose();
}
}
protected virtual void OnConnectionCreated()
{
ConnectionCreated?.Invoke(this, EventArgs.Empty);
}
}
}