Мне нужно сделать очередь на подобии AWS SQS FIFO. У очереди следующие требования:
1. В очереди может находится до 10 000 000 задач
2. В очереди есть разбиение тасков на группы (Так же как в SQS FIFO)
3. Не должно быть ограничения на поиск групп только в 300 000 первых тасков (такое есть в SQS FIFO)
4. Если в группе есть хотя-бы один выполняющийся таск, то новые на обработку не отправляются (так же как и в SQS FIFO)
5. Таски должны выполняться в той последовательности в которой они поступили в систему (собственно то для чего и нужен FIFO)
Я реализовал такую очередь, но когда тасков в коллекции больше миллиона запрос поиска новых тасков начинает сильно тормозить (5-10 секунд), если в очереди параллельно с запросом на поиск тасков добавляют/удаляют исполненные/продлевают время видимости существующих, то этот запрос начинает выполняться больше минуты.
Примерно вот такой запрос я выполняю для поиска тасков:
db.tasks_grpc.aggregate([
{ // Находим все группы и берём первые 10 тасков для каждой найденной группы
"$group": {
"_id": {
"group": "$group"
},
"maxVisibilityExpireAt": {"$max": "$visibilityExpireAt"},
"minCreatedAt": {"$min": "$createdAt"},
"tasks": {
"$bottomN": {
"n": 10,
"sortBy": {"createdAt": -1},
"output": ["$$ROOT"],
}
}
}
},
{ // Отсеиваем группы в которых есть задачи которые прямо сейчас выполняются
$match: { "maxVisibilityExpireAt": { "$lte": new Date() } }
},
{ // Сначала должны выполниться таски из групп в которых сообщения были созданы раньше
$sort: { "minCreatedAt": 1 }
},
{ // Оставляем 10 групп
$limit: 10
},
// Убираем лишние поля
{ $project: { "_id": 0, "maxVisibilityExpireAt": 0, "minCreatedAt": 0 } },
// Извлекаем таски и делаем для них $replaceRoot
{ $unwind: "$tasks" },
{ $unwind: "$tasks" },
{ $replaceRoot: { newRoot: "$tasks" }}
])
Понятно что это не может работать быстро по тому что я сморю всю коллекцию когда группирую.
У меня есть реализация для postgesql в которой всё работает быстро но там немного другой подход с использованием WITH:
1. Я нахожу группы в которых есть таски которые сейчас выполняются (для этого есть индекс)
2. Нахожу нужное количество групп
3. Нахожу по 10 тасков для каждой найденной группы (для этого есть индекс)
4. Нахожу и обновляю visibility_expire_at для найденных тасков
5. Нахожу в таблице OneToOne дополнительные поля тасков (вынес их что-бы ускорить запросы)
По итогу в postgesql запрос на поиски тасков даже под нагрузкой связанной с обновлением и удалением тасков отрабатывает меньше чем за 3 секунды на таблице с 10 миллионами тасков.
Вот примерный запрос:
WITH exclude_groups AS ( # Находим группы которые прямо сейчас выполняются
SELECT DISTINCT "group"
FROM "tasks_grpc" AS t0
WHERE "visibility_expire_at" > NOW()
), target_group AS ( # Находим 10 групп которые сейчас не исполняются, сортируем и оставляем 10 штук
SELECT "group", MIN(id) AS min_id
FROM "tasks_grpc" AS t1
WHERE "group" NOT IN (SELECT "group" FROM exclude_groups AS eg)
GROUP BY "group"
ORDER BY min_id
LIMIT 10
), target_ids AS ( # находим таски для каждой из группы
SELECT
tg.min_id,
array( # Складываем их id в массив
SELECT id
FROM "tasks_grpc"
WHERE "group" = tg."group"
ORDER BY id
LIMIT 10
) AS ids
FROM target_group AS tg
), included_ids AS ( # Разворачиваем id тасков в плоскую таблицу
SELECT ti.min_id, t.t AS id
FROM target_ids AS ti
LEFT JOIN unnest(ti.ids) t ON TRUE
ORDER BY ti.min_id, id
), updated AS ( # обновляем всем таскам visibility_expire_at
UPDATE "tasks_grpc" AS tu
SET visibility_expire_at = (NOW() + (15*60*1000 || ' milliseconds')::INTERVAL)
FROM included_ids AS ii
WHERE ii.id = tu.id
RETURNING tu.id, ii.min_id, tu.visibility_expire_at
)
# Запрашиваем дополнительную информацию о тасках из соседней таблицы
SELECT updated.visibility_expire_at, info.*
FROM "tasks_grpc_info" AS info
INNER JOIN updated ON updated.id = info.id
ORDER BY updated.min_id, updated.id;
Что я пытался сделать:
1. Добавлял всевозможные индексы, но монга их не использовала т.к. на первом шаге я я группирую всё таски по группе.
2. Пытался сделать похожий с postgesql запрос (на первом шаге искал группы которые мне не нужны, а на втором все остальные), но это не помогло, запрос стал отжирать всю оперативку.
Может ли кто-то помочь с оптимизацией запроса к mongodb?