Решил сделать так:
1. Воркер блокирует топик для работы:
WITH
h_topics AS (
SELECT
topic_order_by,
topic,
hashtext(topic) AS h_topic
FROM
(
SELECT
MIN(m.id) AS topic_order_by,
m.topic
FROM messages m
WHERE m.state = 0
GROUP BY m.topic
)
)
SELECT
m_selected.topic
FROM
h_topics ht_cte
CROSS JOIN
(
SELECT
m_filter.topic
FROM
(
SELECT
pg_try_advisory_lock('messages'::regclass::oid::int, ht.h_topic) AS lock_taken,
ht.topic_order_by,
ht.topic
FROM
h_topics ht
LEFT JOIN pg_locks l_topics
ON
l_topics.locktype = 'advisory'
AND l_topics.classid = 'messages'::regclass::oid::int
AND l_topics.objid = ht.h_topic
WHERE l_topics.classid IS NULL
ORDER BY ht.topic_order_by
LIMIT 1
) m_filter
WHERE m_filter.lock_taken = TRUE
) m_selected
WHERE m_selected.topic IS NOT NULL
LIMIT 1
2. Потом в транзакции берёт сообщение на обработку:
SELECT
m.id,
m.message_id,
m.topic,
m.payload
FROM messages m
WHERE
m.topic = @topic
AND m.state = 0
ORDER BY m.id
LIMIT 1
3. Выполняет работу
4. Обновляет статус сообщения и "публикует" результаты обработки.
5. Коммитит транзакцию
6. Повторяет с п.2 х N-раз (например 100)
7. Разблокирует топик:
PERFORM pg_advisory_unlock('messages'::regclass::oid::int, hashtext(@topic))
В результате, если топиков больше чем worker'ов, то за счёт лимита N-сообщений за одну блокировку топика, не заставим "старые" сообщения простаивать...