Между сервером и клиентом идет общение посредством классов-коммуникаторов.
Базовый коммуникатор
public abstract class TcpCommunicatorBase : ICommunicator
{
private readonly byte[] _bufer = new byte[1024];
protected TcpClient _client;
protected Encoding _encoding;
private object _streamLock;
private object _clientLock;
/// <summary>
/// Переменная необходимая для остановки прослушивания
/// </summary>
private bool _stopListen;
protected int? expectedMessageLenght;
protected string Message = string.Empty;
protected TcpCommunicatorBase(TcpClient client, Encoding encoding)
{
_streamLock = new object();
_clientLock = new object();
lock (_clientLock)
{
_encoding = encoding;
_client = client;
MessageReaded = (s, a) => { };
ConnectionBroken = (s, a) =>
{
_client.Close();
EndListenForMessage();
};
}
}
/// <summary>
/// Получение потока
/// </summary>
protected NetworkStream Stream
{
get
{
lock (_clientLock)
{
return _client.GetStream();
}
}
}
/// <summary>
/// Отправка сообщения
/// </summary>
/// <param name="text"></param>
public virtual void SendMessage(string text)
{
var MessageLenght = _encoding.GetByteCount(text).ToString()+":";//В начало сообщения добавляется его длинна в байтах
var enumerable = _encoding.GetBytes(MessageLenght+text);
lock (_streamLock)
{
try
{
Stream.Write(enumerable, 0, enumerable.Length);
}
catch (Exception e)
{
Debug.Print(e.Message);
}
}
}
/// <summary>
/// Комманда к началу ожидания сообщений
/// </summary>
public virtual void BeginListenForMessage()
{
_stopListen = false;
lock (_streamLock)
{
try
{
Stream.BeginRead(_bufer, 0, _bufer.Length, AsyncCallback, null);
}
catch (Exception e)
{
Debug.Print(e.Message);
}
}
}
/// <summary>
/// Комманда к окончанию ожидания сообщений
/// </summary>
public virtual void EndListenForMessage()
{
_stopListen = true;
}
/// <summary>
/// Событие к прочтению сообщения
/// </summary>
public event EventHandler<MessageReadedEventArgs> MessageReaded;
/// <summary>
/// Событие к разрыву связи
/// </summary>
public event EventHandler ConnectionBroken;
/// <summary>
/// Подключение к IP
/// </summary>
/// <param name="adres"></param>
/// <param name="port"></param>
/// <returns></returns>
public virtual bool Connect(string adres, int port)
{
lock (_clientLock)
{
_client?.Close();
try
{
_client = new TcpClient(adres, port);
return true;
}
catch
{
return false;
}
}
}
/// <summary>
/// Отключение коммуникатора
/// </summary>
public virtual void Disconect()
{
lock (_clientLock)
{
_client?.Close();
}
}
/// <summary>
/// Прасер для получения длинны сообщения
/// </summary>
protected Parser<int> MessageLenght =
from Lenght in Parse.Decimal
select int.Parse(Lenght);
/// <summary>
/// Парсер для получения тела сообщения
/// </summary>
protected Parser<string> MessageBody =
from Lenght in Parse.Digit.Many()
from spliter in Parse.Char(':').Once()
from body in Parse.CharExcept('\0').Many().Text()
select body;
/// <summary>
/// Проверка сообщения на корректность
/// (соответствие фактической длинны сообщения заявленной)
/// </summary>
/// <param name="message"></param>
/// <returns></returns>
protected virtual bool IsCorrect(string message)
{
int Lenght = MessageLenght.Parse(message);
string Body = MessageBody.Parse(message);
try
{
if (Lenght == _encoding.GetByteCount(Body))
return true;
}
catch
{
// ignored
}
return false;
}
~TcpCommunicatorBase()
{
_client.Close();
}
/// <summary>
/// Вызывается при получение сообщения
/// </summary>
/// <param name="ar"></param>
protected virtual void AsyncCallback(IAsyncResult ar)
{
Message += _encoding.GetString(_bufer);
while (Stream.DataAvailable)//В случае если сообщение больше длинны буфера, то считываем его несколько раз
{
for (int i = 0; i < _bufer.Length; i++)
_bufer[i] = 0;
Stream.Read(_bufer, 0, _bufer.Length);
Message += _encoding.GetString(_bufer);
}
if (IsCorrect(Message))
{
//Проверка сообщения на корректность
if (!_stopListen)//Если прослушку не остановили
{
MessageReaded(this, new MessageReadedEventArgs
{
Message = MessageBody.Parse(Message)
});
Message = String.Empty;
BeginListenForMessage();
}
Message = string.Empty;
expectedMessageLenght = null;
}
}
}
В какой то момент, сервер отправляет сообщение, а клиент не может его поймать (AsyncCallback не запускается), притом, что прослушку никто не отключал.
В чем может быть проблема?
Достаточно ли код потокобезопасен?