Задать вопрос

Как нескольким worker'ам последовательно (в рамках topic) брать записи из таблицы сообщений?

Имеется таблица в которую приходят сообщения из RabbitMq.
Каждое сообщение содержит topic. Список topic'ов динамический.
CREATE TABLE messages (
	id bigserial PRIMARY KEY,
	message_id uuid NOT NULL,
	topic  character varying(128) NOT NULL,
	payload bytea NOT NULL,
	state int NOT NULL
);


Есть N worker'ов, которые должны обрабатывать эти сообщения параллельно.
НО, в рамках одного topic'а, сообщения должны обрабатываться последовательно в порядке поступления (ORDER BY id).

Я реализовал получение сообщения для обработки следующим запросом с использованием advisory locks:
WITH
h_topics AS (
	SELECT id, topic, hashtext(topic) AS h_topic 
	FROM (
		SELECT MIN(m.id) AS id, m.topic
		FROM messages m
		WHERE m.state = 0
		GROUP BY m.topic
	)
)

SELECT 
	m.id, m.message_id , m.topic , m.payload , m.state 
FROM 
(
	SELECT 
		pg_try_advisory_xact_lock(ht.h_topic, 0) AS lock_taken, ht.id
	FROM 
		h_topics ht
		LEFT JOIN pg_locks l_topics 
			ON 
				l_topics.locktype = 'advisory' 
				AND l_topics.classid = ht.h_topic 
				AND l_topics.objid = 0
	WHERE 
        l_topics.classid IS NULL
	ORDER BY ht.id
	LIMIT 1
) m_filter
JOIN messages m ON m.id  = m_filter.id AND m_filter.lock_taken = True;


Есть ли решение попроще?
  • Вопрос задан
  • 202 просмотра
Подписаться 2 Простой 5 комментариев
Пригласить эксперта
Ответы на вопрос 3
AshBlade
@AshBlade
Просто хочу быть счастливым
BEGIN;
SELECT ... FOR UPDATE SKIP LOCKED;
COMMIT;

дока
Ответ написан
@GTD01
У Вас есть RabbitMQ, зачем тут таблица?
Ответ написан
@Degot Автор вопроса
Решил сделать так:
1. Воркер блокирует топик для работы:
WITH
h_topics AS (
	SELECT 
		topic_order_by,
		topic,
		hashtext(topic) AS h_topic
	FROM 
	(
		SELECT 
			MIN(m.id) AS topic_order_by,
			m.topic
		FROM messages m
		WHERE m.state = 0
		GROUP BY m.topic
	)
)
SELECT 
	m_selected.topic 
FROM 
	h_topics ht_cte
	CROSS JOIN
	(
		SELECT 
			m_filter.topic
		FROM 
		(
			SELECT 
				pg_try_advisory_lock('messages'::regclass::oid::int, ht.h_topic) AS lock_taken,
				ht.topic_order_by,
				ht.topic
			FROM 
				h_topics ht
				LEFT JOIN pg_locks l_topics 
					ON 
						l_topics.locktype = 'advisory' 
						AND l_topics.classid = 'messages'::regclass::oid::int 
						AND l_topics.objid = ht.h_topic
			WHERE l_topics.classid IS NULL
			ORDER BY ht.topic_order_by
			LIMIT 1
		) m_filter
		WHERE m_filter.lock_taken = TRUE
	) m_selected
WHERE m_selected.topic IS NOT NULL
LIMIT 1


2. Потом в транзакции берёт сообщение на обработку:
SELECT 
	m.id,
	m.message_id,
	m.topic,
	m.payload
FROM messages m
WHERE 
	m.topic = @topic
	AND m.state = 0
ORDER BY m.id
LIMIT 1

3. Выполняет работу
4. Обновляет статус сообщения и "публикует" результаты обработки.
5. Коммитит транзакцию
6. Повторяет с п.2 х N-раз (например 100)
7. Разблокирует топик:
PERFORM pg_advisory_unlock('messages'::regclass::oid::int, hashtext(@topic))


В результате, если топиков больше чем worker'ов, то за счёт лимита N-сообщений за одну блокировку топика, не заставим "старые" сообщения простаивать...
Ответ написан
Комментировать
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы