Есть ли структура данных для многопоточной обработки с лимитами не-параллельности по ID?
Пока собираю инфу - есть ли такая структура данных - ее название, возможно и имплементация в какой-то из открытых библиотек.
Должна быть такой:
1) Очередь сообщений (возможно несколько), при этом у каждого сообщения есть SubjectId (скажем ID пользователя и т.п.)
2) Пул потоков (т.е. больше 1 потока в общем виде), которые берут сообщения из очереди и начинают их исполнять (consumer)
3) При этом обязательно запрещено начинать исполнение запроса с SubjectId который уже исполняется, т.е.
можно начать исполнять в 1м потоке Message с SubjectId == 1,
можно во 2м потоке Message с SubjectId == 2,
но если условно у нас 3 потока, то нельзя начать исполнять в 3м потоке Message с SubjectId == 1 или 2, это должен быть любой другой SubjectId
* как только один из потоков закончил работу, тот SubjectId становится свободным, скажем == 1, и тогда новый поток может взять Message с SubjectId == 1 из очереди (структуры)
Для чего нужна такая структура - чтобы не требовались дополнительные критические секции (lock \ synchronized) в разрезе одинаковых объектов, т.к. когда мы работаем в параллель с объектом с SubjectId == 1 из 2х потоков то мы обязаны lock-ть обращения к его структурам, а в случае же такой структуры - lock-ть не нужно на сам объект (с SubjectId) (хотя возможны lock-и на другие общие структуры - не суть вопроса)
В отличие от однопоточного исполнения перфоманс будет выше т.к. потоков несколько, целый пул, т.е. сколько угодно ядер можно загрузить, если запросы в большинстве случаев будут с разными SubjectId.
Главное чтобы тут все не было покрыто lock-ами или их аналогами, что сильно порежет перфоманс изза простоя ядер CPU.
Как мне видится на первый взгляд тут нужен HashMap с ключем SubjectId и значение List т.е. все сообщения которые по этому SubjectId.
И куча обвязки в виде начал и конца обработки, выбора нужного SubjectId и т.п.
Тут есть сходство с акторами из Akka, но я не эксперт в них. Но самое главное что в них не устраивает на первый взгляд - каждый actor это инстанс через new и со своей очередью, а если в большинстве случаев у нас разные SubjectId то расточительно по памяти иметь по инстансу класса со своей очередью на один-два Message да плюс не известно как оно по перфомансу будет в таком кол-ве акторов. Например если у нас SubjectId от 1 до 1_000_000 то создастся 1_000_000 акторов, у каждого своя очередь, что вероятно траблы (надо профайлить).
__ update
глобальный порядок не нужен (действительно если бы нужен был - только в однопотоке можно это сделать), а вот локальный желателен
и если получится - глобальный хоть сколько-нибудь нужен в том плане чтобы не висели задачи в очереди вечно, например если придет 1я задача, 2я .. 1000я, не надо чтобы 1я висела до окончания веков, потому что новые задачи будут появляться (1000я-2000я и т.п)
Выглядит как обычный striped lock. Берёте обычную очередь, обычный пул потоков и заполняете массив экземплярами Lock. Поток в начале работы берёт Message из очереди, получает из него SubjectId, вычисляет его хэш и пытается захватить блокировку из соответствующего хэшу элемента массива. Если блокировка удалась, поток выполняет свою работу. Если нет, возвращает Message в конец очереди и берёт следующий из начала. Остаётся только подобрать эффективный размер массива блокировок.
о максимально возможной производительности придётся забыть.
Сергей Горностаев, чего это? можно, например, заводить промежуточную очередь ассоциированную с ID и складывать все сообщения с этим ID вынутые из основной очереди в неё.
jcmvbkbc, это гарантирует порядок обработки в рамках одного идентификатора, но не всех. Является ли это достаточным условием известно только автору вопроса. Я обычно сталкивался с ситуациями когда нужна либо глабальная гарантия упорядоченности, либо ненужна вовсе.
jcmvbkbc, глобальный порядок не нужен (действительно если бы нужен был - только в однопотоке можно это сделать), а вот локальный желателен
и если получится - глобальный хоть сколько-нибудь нужен в том плане чтобы не висели задачи в очереди вечно, например если придет 1я задача, 2я .. 1000я, не надо чтобы 1я висела до окончания веков, потому что новые задачи будут появляться (1000я-2000я и т.п)
заводить промежуточную очередь ассоциированную с ID и складывать все сообщения с этим ID вынутые из основной очереди в неё.
да я тоже первоначально подумал что придется иметь несколько очередей
1) та что исполняется - возможно в виде статусов, или не очередь вовсе а HashSet[SubjectId]
2) та что отложена - HashMap[SubjectId, List[Message]] изза того что уже исполняется с таким же
.. но вот думаю есть ли такие имплементации, чтобы уже отлажено все было
вроде задача разумная, всмысле с многопотоком связана, неужто никто так не делает
Если блокировка удалась, поток выполняет свою работу. Если нет, возвращает Message в конец очереди и берёт следующий из начала. Остаётся только подобрать эффективный размер массива блокировок.
интересный подход, да только тут нюанс с локальным порядком все портит - он полностью реверсным станет или вообще хаотичным
а скажите что имели ввиду под размером массива блокировок.. разьве массив не должен быть равен кол-ву SubjectId ?
AlexHell, гарантия упорядоченности всё очень сильно усложняет. Готовое решение вы вряд ли найдёте, придётся городить свои структуры данных.
а скажите что имели ввиду под размером массива блокировок.. разьве массив не должен быть равен кол-ву SubjectId ?
Это был бы перерасход ресурсов. Хорошая хэш-функция позволяет сводить огромные диапазоны возможных значений к небольшим. Например striped lock в ConcurrentHashMap по умолчанию содержит всего 16 блокировок, хотя количество уникальных ключей теоретически бесконечно.
спасибо, но я представляю что такое очередь сообщений, в нее можно положить Message, и потом взять Message, только в общем случае оттуда достанется произвольный Message с SubjectId который уже исполняется каким-то потоком, а мне нужно чтобы доставался не с произвольным SubjectId