Как нескольким worker'ам последовательно (в рамках topic) брать записи из таблицы сообщений?

Имеется таблица в которую приходят сообщения из RabbitMq.
Каждое сообщение содержит topic. Список topic'ов динамический.
CREATE TABLE messages (
	id bigserial PRIMARY KEY,
	message_id uuid NOT NULL,
	topic  character varying(128) NOT NULL,
	payload bytea NOT NULL,
	state int NOT NULL
);


Есть N worker'ов, которые должны обрабатывать эти сообщения параллельно.
НО, в рамках одного topic'а, сообщения должны обрабатываться последовательно в порядке поступления (ORDER BY id).

Я реализовал получение сообщения для обработки следующим запросом с использованием advisory locks:
WITH
h_topics AS (
	SELECT id, topic, hashtext(topic) AS h_topic 
	FROM (
		SELECT MIN(m.id) AS id, m.topic
		FROM messages m
		WHERE m.state = 0
		GROUP BY m.topic
	)
)

SELECT 
	m.id, m.message_id , m.topic , m.payload , m.state 
FROM 
(
	SELECT 
		pg_try_advisory_xact_lock(ht.h_topic, 0) AS lock_taken, ht.id
	FROM 
		h_topics ht
		LEFT JOIN pg_locks l_topics 
			ON 
				l_topics.locktype = 'advisory' 
				AND l_topics.classid = ht.h_topic 
				AND l_topics.objid = 0
	WHERE 
        l_topics.classid IS NULL
	ORDER BY ht.id
	LIMIT 1
) m_filter
JOIN messages m ON m.id  = m_filter.id AND m_filter.lock_taken = True;


Есть ли решение попроще?
  • Вопрос задан
  • 81 просмотр
Пригласить эксперта
Ответы на вопрос 2
AshBlade
@AshBlade
Просто хочу быть счастливым
BEGIN;
SELECT ... FOR UPDATE SKIP LOCKED;
COMMIT;

дока
Ответ написан
@GTD01
У Вас есть RabbitMQ, зачем тут таблица?
Ответ написан
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Войти через центр авторизации
Похожие вопросы