@SomeOneThere
.NET Developer

Что происходит с BrokeredMessage после await вызова метода доступа к БД?

Имеется некоторая Worker Role, которая крутится в Azure. Её задачей является брать сообщения из одной очереди ServiceBus, как-то обрабатывать их и помечать в БД, а затем сформировать новое сообщение на основе данных из первого и отправить в другую очередь.
Пусть первая очередь называется task, а вторая - command.
Создаём клиентов:
_taskQueueClient = QueueClient.CreateFromConnectionString(CONNECTION_STRING, TASK_QUEUE_NAME);
_commandQueueClient = QueueClient.CreateFromConnectionString(CONNECTION_STRING, СOMMAND_QUEUE_NAME);


Для первой очереди в методе Run подписываемся на события OnMessage:
OnMessageOptions options = new OnMessageOptions();
options.AutoComplete = false; // Indicates if the message-pump should call complete on messages after the callback has completed processing.
options.MaxConcurrentCalls = 4; // Indicates the maximum number of concurrent calls to the callback the pump should initiate 

_taskQueueClient.OnMessage(Callback, options);


И создаём следующий метод Callback:
private async void Callback(BrokeredMessage receivedMessage)
{
   ///... некоторый код опущен
  
   await BusinessLayer.DoSomeWork1(arg);

   await BusinessLayer.DoSomeWork2(arg);

   await _commandQueueClient.SendAsync(new BrokeredMessage(data));  

   receivedMessage.Complete(); 
}


Что может такого происходить за кулисами во время await BusinessLayer.DoSomeWork2(arg);, что после него receivedMessage становится Disposed?
  • Вопрос задан
  • 132 просмотра
Решения вопроса 2
lasalas
@lasalas
.NET Architect
После выполнения первого await управление в потоке вызова Callback() вернется из него и BrokeredMessage прекратит активное существование. Следующий await будет выполнен в другом потоке, по завершении DoSomeWork1() и т.д.

Один из вариантов решения:
private async void Callback(BrokeredMessage receivedMessage)
{
CallbackPrim(receivedMessage).Wait(); // НЕ await!
}

private async void CallbackPrim(BrokeredMessage receivedMessage)
{
   ///... некоторый код опущен
  
   await BusinessLayer.DoSomeWork1(arg);

   await BusinessLayer.DoSomeWork2(arg);

   await _commandQueueClient.SendAsync(new BrokeredMessage(data));  

   receivedMessage.Complete(); 
}
Ответ написан
@tselofun
Использование метода OnMessageAsync также решит такую проблему. В вашей ситуации сигнатура обработчика async void Callback(...) . Это означает, что при первом встретившемся в нем await создается новый поток, в то время как вызывающий поток (поток в котором выполняется метод OnMessage) продолжит свое выполнение, не дожидаясь выполнения обработчика. Судя по поведению, в методе OnMessage используется конструкция using для объекта BrockeredMessage, в следствии чего он "диспознится" в другом потоке, нежели где вызывается receivedMessage.Complete() и сделает это, видимо, раньше. В тоже время OnMessageAsync принимает в качестве аргумента обработчик с сигнатурой Func, что позволяет вызывать его в асинхронном стиле с помощью ключевого слова await (опять же закулисами метода OnMessageAsync ). В результате метод OnMessageAsync "дожидается" выполнения обработчика, чтобы потом уничтожить объект BrockeredMessage.
Ответ написан
Комментировать
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Войти через центр авторизации
Похожие вопросы