Когда-то решал уже похожую проблему. Только чтение батчей из кафки и сохранение в БД. Не знаю какой у тебя стек, но я решил так.
Создаю Channel и BackgroundWorker, который из этой очереди читает. В эту очередь кладутся объекты по типу InsertContext. В ней хранится аргумент (данные для вставки), поле для результата и Task, который кончится по окончании.
Работа bw организуется так (в вечном цикле):
- Читаем первую запись из Channel (infinite timeout)
- Создаем TaskCompletionSource с таймаутом и пока он не кончился читаем записи из Channel (без ожидания)
- Собираем все собранные данные
- Записываем их в БД
- В каждый элемент записываем результат и уведомляем о готовности
Бесшовно добавить эту функциональность можно с помощью декоратора.
Единственная проблема в данном случае - ошибки вставки. Если делать это единой транзакцией, то ошибка в 1 инвалидирует все. Но это можно обойти fallback'ом - если произошла ошибка, то вставляем по одному и ошибку возвращаем только тем, у кого они возникли