Задать вопрос
  • Батчинг входящих запросов с неблокирующим ожиданием?

    AshBlade
    @AshBlade Куратор тега C#
    Просто хочу быть счастливым
    Когда-то решал уже похожую проблему. Только чтение батчей из кафки и сохранение в БД. Не знаю какой у тебя стек, но я решил так.

    Создаю Channel и BackgroundWorker, который из этой очереди читает. В эту очередь кладутся объекты по типу InsertContext. В ней хранится аргумент (данные для вставки), поле для результата и Task, который кончится по окончании.

    Работа bw организуется так (в вечном цикле):

    1. Читаем первую запись из Channel (infinite timeout)
    2. Создаем TaskCompletionSource с таймаутом и пока он не кончился читаем записи из Channel (без ожидания)
    3. Собираем все собранные данные
    4. Записываем их в БД
    5. В каждый элемент записываем результат и уведомляем о готовности


    Бесшовно добавить эту функциональность можно с помощью декоратора.

    Единственная проблема в данном случае - ошибки вставки. Если делать это единой транзакцией, то ошибка в 1 инвалидирует все. Но это можно обойти fallback'ом - если произошла ошибка, то вставляем по одному и ошибку возвращаем только тем, у кого они возникли
    Ответ написан
    1 комментарий
  • Батчинг входящих запросов с неблокирующим ожиданием?

    @mvv-rus
    Настоящий админ AD и ненастоящий программист
    А вам точно это нужно?
    Потому что тут у вас появляется нюанс, который не совсем ложится на схему работы веб-приложения: у ваших отдельных запросов появляется общее состояние. Как минимум, это - накапливаемый пакет запросов, а ещё, наверное, в это состояние входит нечто общее для всех запросов для работы с БД: подключение или, если используется Entity Framework, DbContext. Это общее состояние придется как-то хранить, регулировать доступ к нему (DbContext к примеру, параллельный доступ не поддерживает в принципе), и вовремя это состояние удалять. Если посмотреть на стандартные механизмы ASP.NET Core, то сессия (ISession) для этого, наверное, не подойдет - там можно хранить только сереализуемые в байты пассивные объекты, и насчет регулировки доступа там непросто. Подойдет концентратор (Hub) SignalR, у которого есть сохраняемый между вызовами контекст подкючения - но ради него, скорее всего, потребуется менять способ вызова API из браузера: у него там своя клиентская библиотека.
    Ну и, по-любому, как-то надо реализовывать активную часть - которая, собственно, отслеживает пакет изменений и вовремя отправляет его в БД.
    Ваша идея
    Шедуллер в фоне будет периодически читать коллекцю объектов на запись и устанавливать результат выполнения в соотвтвующий TaskComplitionSource.

    мне не совсем нравится. Зачем периодически? Kучше чтобы эта активная часть срабатывала по факту добавления запроса в пакет - например, асинхронно ожидала Task от TaskComplitionSource. который метод добавления запроса завершал бы по факту добавления завершающего пакет запроса. Но и завершение по таймауту тоже предусмотреть надо - по жизни оно всякое бывает: обычно для таких целей используется WaitAny для комбинации основной ожидаемой задачи с Delay по таймауту.
    Ну, а ещё требуется, наверное, чтобы для каждого пользователя состояние было свое. В принципе, это делается, но надо делать. Для SignalR для этого можно использовать Hub.Context.Items - это словарь, который может содержать произвольные объекты, и сохраняется на время действия всего подключения.

    А ещё у меня, в принципе, есть своя самодельная библиотека, которая решает ту же задачу - сохранение контекста сеанса, в том числе - активного, с выполняющимся кодом. Я описывал ее недавно в статьях (кроме основной статьи есть дополнительная) на Хабре. Можете попробовать её, если переделывать API не хочется: она вполне годится для работы с API на базе MVC API Controller. или Minimal API. В принципе, она заточена немного под другую задачу - получние и возврат дополнительных результатов в фоне, но для вашей задачи она тоже подойдет. Напишу тут сразу технические подробности как использовать: ссылку на активную часть, собирающую и отправляющую пакет, можно хранить в IActiveSession.Properties, точно так же, как если бы вы хранили ее в Hub.Context.Items, а обработчик завершения, который прибирает за собой (в SignalR его место в OnDisconnect) - привязать к IActiveSession.CompletionToken через его метод Register. В общее для всез запросов состояние входит свой контейнер сервисов со своей областью действия в течение всего существования состояния, так что, если для работы с БД требуется Scoped-сервис из DI-контейнера, то его можно получить оттуда (в дополнительной статье написано, как, а также написано, как защититься от нежелательного одновременного доступа к такому сервису).

    Только вот библиотека эта, естественно, ни разу не стадартная, use at your own risk. Но если попробуете ее, мне будет интересно, что получилось. В том числе - и обнаруженные ошибки, заодно я и для вас их исправлю :-) .

    PS И неплохо было бы IMHO, чтобы дальнейший диалог, если он будет, шел на русском языке, без "батч", "эвейтить", "шедулер" и прочих транслитераций, IMHO лучше уж по-английски писать, если перевод неизвестен. А то я человек старый, мне читать этот пиджин тяжеловато.

    PPS А ещё благодарю за идею, о том, в какую сторону мне развивать мою библиотеку.
    Ответ написан
    1 комментарий