Имеется таблица в которую приходят сообщения из RabbitMq.
Каждое сообщение содержит topic. Список topic'ов динамический.
CREATE TABLE messages (
id bigserial PRIMARY KEY,
message_id uuid NOT NULL,
topic character varying(128) NOT NULL,
payload bytea NOT NULL,
state int NOT NULL
);
Есть N worker'ов, которые должны обрабатывать эти сообщения параллельно.
НО, в рамках одного topic'а, сообщения должны обрабатываться последовательно в порядке поступления (ORDER BY id).
Я реализовал получение сообщения для обработки следующим запросом с использованием advisory locks:
WITH
h_topics AS (
SELECT id, topic, hashtext(topic) AS h_topic
FROM (
SELECT MIN(m.id) AS id, m.topic
FROM messages m
WHERE m.state = 0
GROUP BY m.topic
)
)
SELECT
m.id, m.message_id , m.topic , m.payload , m.state
FROM
(
SELECT
pg_try_advisory_xact_lock(ht.h_topic, 0) AS lock_taken, ht.id
FROM
h_topics ht
LEFT JOIN pg_locks l_topics
ON
l_topics.locktype = 'advisory'
AND l_topics.classid = ht.h_topic
AND l_topics.objid = 0
WHERE
l_topics.classid IS NULL
ORDER BY ht.id
LIMIT 1
) m_filter
JOIN messages m ON m.id = m_filter.id AND m_filter.lock_taken = True;
Есть ли решение попроще?