Всем привет, есть множество клиентов, есть WCF, есть устройства. Use case связки следующий, клиенты вызывают wcf-методы, wcf-методы выполняют различные команды устройств (connect, do1, do2, dissconnect), однако устройства поддерживают только один коннект в одно время, поэтому нужна очередь команд на устройств.
Например пусть 3 клиента отослали в примерно одно и то же время 3 пачки команд на одно и то же устройство, эти команды должны прийти на устройство по очереди. При этом на клиентах эти вызовы должны быть async (wcf сам их генерирует) и возвращать результат, только когда пачка команд (call) от этого клиента выполнена.
Я решил это реализовать следующим способом:
Сделать класс DeviceCommands, в котором реализованы внутренности общения с устройством.
Например в DeviceCommands 3 метода: void Connect(); void Dissconnect(); void sendData(string data);
Далее реализую статический класс, для хранения очереди команд, а так же класс для информации статуса набора команд от клиента:
UPD2 Я обновил код, после ответа @basrach
namespace testSuite.CallsManager
{
public class CallInfo
{
public string CallId { get; set; }
public bool Completed { get; set; }
public bool Failed { get; set; }
public Nullable<DateTime> FinishedTime { get; set; }
public ManualResetEvent CompletedResetEvent { get; set; }
public CallInfo()
{
Failed = false;
Completed = false;
CompletedResetEvent = new ManualResetEvent(false);
}
public CallInfo(string callId, bool completed = false)
{
CompletedResetEvent = new ManualResetEvent(false);
CallId = callId;
Completed = completed;
}
}
public static class DeviceCallsManager
{
static Random random = new Random();
static string randomString(int length)
{
const string chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
return new string(Enumerable.Repeat(chars, length)
.Select(s => s[random.Next(s.Length)]).ToArray());
}
static ConcurrentDictionary<string, ConcurrentQueue<Tuple<string, Action[]>>> calls = new ConcurrentDictionary<string, ConcurrentQueue<Tuple<string, Action[]>>>();
static ConcurrentDictionary<string, CallInfo> callsInfo = new ConcurrentDictionary<string, CallInfo>();
public static CallInfo getInfo(string callId)
{
return callsInfo[callId];
}
public static string addCall(string queueId, params Action[] cmds )
{
string callId = randomString(6);
bool addCallInfoStatus = callsInfo.TryAdd(callId, new CallInfo(callId));
if (!addCallInfoStatus)
throw new Exception("dublicate callId");
bool addStatus = false;
if (!calls.ContainsKey(queueId))
{
var q = new ConcurrentQueue<Tuple<string, Action[]>>();
q.Enqueue(new Tuple<string, Action[]>(callId, cmds));
addStatus = calls.TryAdd(queueId, q);
}
else if (!addStatus)
{
calls[queueId].Enqueue(new Tuple<string, Action[]>(callId, cmds));
}
return callId;
}
public static void invokeAll()
{
Parallel.ForEach<string>(calls.Keys, s =>
{
Tuple<string, Action[]> tuple = null;
while (calls[s].TryDequeue(out tuple))
{
bool ifError = false;
foreach (var a in tuple.Item2)
{
try
{
a.Invoke();
}
catch (Exception)
{
ifError = true;
callsInfo[tuple.Item1].Failed = true;
callsInfo[tuple.Item1].FinishedTime = DateTime.Now;
callsInfo[tuple.Item1].CompletedResetEvent.Set();
break;
}
}
if (ifError)
continue;
callsInfo[tuple.Item1].Completed = true;
callsInfo[tuple.Item1].FinishedTime = DateTime.Now;
callsInfo[tuple.Item1].CompletedResetEvent.Set();
}
});
}
}
}
Функция `public static string addCall(string queqeId, params Action[] cmds )` будет вызываться через WCF клиентами и в ответ давать callId для дальнейшего мониторинга завершения всех задач этой очереди.
Пример wcf methoda:
void AddDataToDevice(string deviceId, string data)
{
var deviceCmds = new DeviceCmds(deviceId);
var callId = DeviceCallsManager.addCall(deviceId,
() => deviceCmds.Connect(),
() => deviceCmds.SendData(data),
() => deviceCmds.Dissconnect());
DeviceCallsManager.getInfo(id).CompletedResetEvent.WaitOne();
}
Т.е просто добавляет в очередь и ждет пока call завершится, сразу замечу, что пока, я не реализовывал отлов ошибок.
DeviceCallsManager.invokeAll(); - будет вызываться в backgroundJob через hangfire каждую минуту c ограничением, что одновременно может быть запущен только один метод invokeAll.
Как это по моему мнению должно работать, клиент вызывает async метод wcf -> команды добавляется в очередь -> когда команды выполнены, в UI клиента выводится нотификация, мол addData(args) #callId выполнена.
Понимаю, что такое можно реализовать на различных MQ системах, но в данном случае стоит ли?
Клиентов одновременно работающих будет относительно мало до 10.
Устойчивость очереди не так важна, если упадет сервер, то ничего страшного если очередь потеряется, клиенты получат ошибки и соответственно в ручную сами выполнят заново эти задачи. Если упадет клиент, он потеряет статус завершения задачи - это можно решить, сохраняя id call-ов на клиенте на диск, и лонг пулить их статус (через отдельный wcf method CallInfo[] getCallInfo(int[] callsIds)); при работающем приложении или сделать отдельное окно , со статусом всех задач клиента.
Вообщем мне кажется решение гибким, все необходимое по мере потребности я смогу добавить в эту реализацию, но я опасаюсь, что я чего то не вижу, поэтому обращаюсь к вашему взгляду со стороны :)