Задать вопрос
@vilix

Какие минусы могут вылезти в дальнейшем при такой примитивной реализации очереди задач?

Всем привет, есть множество клиентов, есть WCF, есть устройства. Use case связки следующий, клиенты вызывают wcf-методы, wcf-методы выполняют различные команды устройств (connect, do1, do2, dissconnect), однако устройства поддерживают только один коннект в одно время, поэтому нужна очередь команд на устройств.

Например пусть 3 клиента отослали в примерно одно и то же время 3 пачки команд на одно и то же устройство, эти команды должны прийти на устройство по очереди. При этом на клиентах эти вызовы должны быть async (wcf сам их генерирует) и возвращать результат, только когда пачка команд (call) от этого клиента выполнена.

Я решил это реализовать следующим способом:

Сделать класс DeviceCommands, в котором реализованы внутренности общения с устройством.

Например в DeviceCommands 3 метода: void Connect(); void Dissconnect(); void sendData(string data);

Далее реализую статический класс, для хранения очереди команд, а так же класс для информации статуса набора команд от клиента:

UPD2 Я обновил код, после ответа @basrach
namespace testSuite.CallsManager
{

    public class CallInfo
    {
        public string CallId { get; set; }

        public bool Completed { get; set; }

        public bool Failed { get; set; }

        public Nullable<DateTime> FinishedTime { get; set; }

        public ManualResetEvent CompletedResetEvent { get; set; }

        public CallInfo()
        {
            Failed = false;
            Completed = false;
            CompletedResetEvent = new ManualResetEvent(false);
        }

        public CallInfo(string callId, bool completed = false)
        {
            CompletedResetEvent = new ManualResetEvent(false);
            CallId = callId;
            Completed = completed;
        }
    }

    public static class DeviceCallsManager
    {
        static Random random = new Random();
        
        static string randomString(int length)
        {
            const string chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
            return new string(Enumerable.Repeat(chars, length)
            .Select(s => s[random.Next(s.Length)]).ToArray());
        }

        static ConcurrentDictionary<string, ConcurrentQueue<Tuple<string, Action[]>>> calls = new ConcurrentDictionary<string, ConcurrentQueue<Tuple<string, Action[]>>>();
        static ConcurrentDictionary<string, CallInfo> callsInfo = new ConcurrentDictionary<string, CallInfo>();

        public static CallInfo getInfo(string callId)
        {
            return callsInfo[callId];
        }

        public static string addCall(string queueId, params Action[] cmds )
        {
            string callId = randomString(6);
            
            bool addCallInfoStatus = callsInfo.TryAdd(callId, new CallInfo(callId));

            if (!addCallInfoStatus)
                throw new Exception("dublicate callId");
            
            bool addStatus = false;


            if (!calls.ContainsKey(queueId))
            {
                var q = new ConcurrentQueue<Tuple<string, Action[]>>();
                q.Enqueue(new Tuple<string, Action[]>(callId, cmds));
                addStatus = calls.TryAdd(queueId, q);
                
            }
            else if (!addStatus)
            {
                calls[queueId].Enqueue(new Tuple<string, Action[]>(callId, cmds));
            }

            return callId;
        }

        public static void invokeAll()
        {
            Parallel.ForEach<string>(calls.Keys, s =>
            {
                Tuple<string, Action[]> tuple = null;

                while (calls[s].TryDequeue(out tuple))
                {
                    bool ifError = false;

                    foreach (var a in tuple.Item2)
                    {
                        try
                        {
                            a.Invoke();
                        }
                        catch (Exception)
                        {
                            ifError = true;
                            callsInfo[tuple.Item1].Failed = true;
                            callsInfo[tuple.Item1].FinishedTime = DateTime.Now;
                            callsInfo[tuple.Item1].CompletedResetEvent.Set();
                            break;
                        }
                    }

                    if (ifError)
                        continue;

                    callsInfo[tuple.Item1].Completed = true;
                    callsInfo[tuple.Item1].FinishedTime = DateTime.Now;
                    callsInfo[tuple.Item1].CompletedResetEvent.Set();
                }
            });
        }
    }
}


Функция `public static string addCall(string queqeId, params Action[] cmds )` будет вызываться через WCF клиентами и в ответ давать callId для дальнейшего мониторинга завершения всех задач этой очереди.
Пример wcf methoda:
void AddDataToDevice(string deviceId, string data)
{
  var deviceCmds = new DeviceCmds(deviceId);

  var callId = DeviceCallsManager.addCall(deviceId,
                () => deviceCmds.Connect(),
                () => deviceCmds.SendData(data),
                () => deviceCmds.Dissconnect());

  DeviceCallsManager.getInfo(id).CompletedResetEvent.WaitOne();
}


Т.е просто добавляет в очередь и ждет пока call завершится, сразу замечу, что пока, я не реализовывал отлов ошибок.

DeviceCallsManager.invokeAll(); - будет вызываться в backgroundJob через hangfire каждую минуту c ограничением, что одновременно может быть запущен только один метод invokeAll.

Как это по моему мнению должно работать, клиент вызывает async метод wcf -> команды добавляется в очередь -> когда команды выполнены, в UI клиента выводится нотификация, мол addData(args) #callId выполнена.

Понимаю, что такое можно реализовать на различных MQ системах, но в данном случае стоит ли?

Клиентов одновременно работающих будет относительно мало до 10.

Устойчивость очереди не так важна, если упадет сервер, то ничего страшного если очередь потеряется, клиенты получат ошибки и соответственно в ручную сами выполнят заново эти задачи. Если упадет клиент, он потеряет статус завершения задачи - это можно решить, сохраняя id call-ов на клиенте на диск, и лонг пулить их статус (через отдельный wcf method CallInfo[] getCallInfo(int[] callsIds)); при работающем приложении или сделать отдельное окно , со статусом всех задач клиента.

Вообщем мне кажется решение гибким, все необходимое по мере потребности я смогу добавить в эту реализацию, но я опасаюсь, что я чего то не вижу, поэтому обращаюсь к вашему взгляду со стороны :)
  • Вопрос задан
  • 453 просмотра
Подписаться 2 Оценить Комментировать
Решения вопроса 2
@basrach
При такой реализации проблемы вылезут сразу, как только случится несколько одновременных запросов к вашей службе из-за отсутствия синхронизации потоков, если только вы не используете Single режим для службы. Но тогда непонятны заморочки c Parallel.ForEach и прочие очереди.
1. в addCall несколько потоков могут попытаться добавить один и тот же ключ в словарь calls и это будет exception
2. в invokeAll перед выходом очищается словарь calls, при этом на момент выходы из этого метода в нем вполне могут оказаться новые команды, которые будут также удалены
3. Wait() перед выходом из AddDataToDevice заблокирует поток, который будет ждать окончания выполнения другого потока, который был запущен в Parallel.ForEach - странно выглядит
4. В методе getInfo не обрабатывается ситуация отсутствия ключа в словаре, в таком случае будет выброшено исключение
5. перед вызовом invokeAll не проверяется был ли завершен предыдущий вызов. Если первый вызов invokeAll не успел завершиться за минуту и придет второй, то тут вообще начнется каша от исключений при переборе ключей словаря, до отправки одних и тех же команд несколько раз
6. Данные из словаря callsInfo не удаляются, что при длительной работе приведет к утечкам со всеми вытекающими
Но если вы используете Single mode для WCF service-а, либо в один момент времени может быть только один запрос на сервис, то наверное будет работать.
Ответ написан
@huwesu
Ну послушайте - это же зависит от конкретной нагрузки.
Количества клиентов и скорости обработки одной команды (и стабильности скорости этой обработки, то есть real-time есть или нет).

Если не так все требовательно (как вы и описали), то я бы ориентировался на удобство и понятность и простату для разработчика.
Ответ написан
Комментировать
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы