Всем привет!
Больше месяца назад я решал проблему описанную в другом
вопросе, но, к сожалению, так и не нашел того решения которого хотел, а именно "повесить тригер на устаревание даты и не использовать селекты". Подробно логика поллинга описана по ссылке в вопросе и тут дублировать задачу не буду. В этом же вопросе в секции UPD я указал, что вижу другое решение, через хранимую процедуру с слектованием, по этому пути я и пошел. У меня за одни выходные получилось сделать то, что я хотел, но это мой первый опыт написания хранимых процедур и у меня нет увернности, что я сделал все правильно и вообще это будет быстро работать.
Дано:
- Три таблицы очередей с записями, по которым надо делать поллинг: second_intervals_polling, minute_intervals_polling, hour_intervals_polling. Таблицы имеют идентичную структуру, одинаковые индексы, все что их отличает - частота селектования из этих таблиц, т.е. интервал между селектами. В этих таблицах я по полной использую типы данных INTERVAL и ARRAY. ;
- Таблица выведенных из очереди записей terminated. Тут все понятно, сюда складываю все записи, которые были успешно доведены до терминального статуса, удаляя их из трех предыдущих таблиц;
- Таблица с базовыми настройками очередей для каждого типа поллера. Эта таблица используется на уровне приложения, чтобы считать настройки перед постановкой в очередь очередной записи и далее в объянии фигурировать не будет;
- Три хранимых функции для получения записей с истекшей временной меткой из соответствующей ей таблицы poll_second_intervals, poll_minute_intervals, poll_hour_intervals. Код функций приведу ниже, но логика не сложная. Каждая функция селектует записи из связанной с ней таблицы по которым пора запросить статус и для всех таких записей, если они есть, на основании текущей итерации происходит вычисление следующего интервала. Если, например, текущая таблица исчисляется секундными интервалами, а вычисленный интервал в минутах, то запись будет перенесена из секундной таблицы в минутную. В конечном итоге функция фозвращает список строк, по которым пора опрашивать статус;
- Одна хранимая функция для выведения из очередей полинга terminate_polling;
- Одна хранимая функция для получения последного элемента массива, добавленная просто для семантики array_last.
Как это используется:
На уровне приложения, в зависимости от типа поллера (точнее от требований к частоте поллинга) создается новая запись в одну из трех таблиц second_intervals_polling, minute_intervals_polling, hour_intervals_polling. Допустим, что запись была записана в second_intervals_polling. Есть расписание (например крон), которое, через приложение, раз в секунду вызвает функцию poll_second_intervals. В результате работы этой функции приложение получает все данные, которые пора опросить, а на уровне БД записи присваивается номер итерации и на основании интервала этой итерации (intervals[iteration]) вычисляется следующая дата next_poll_at. При повторномы вызове функции poll_second_intervals итерация будет инкрементирована, и опять произойдет на основании интервала (intervals[iteration + 1]) вычисление следующей даты next_poll_at. Если интервал для текущей итерации будет исчисляться не секундами (больше чем 59 секунд), то запись будет перенесена в другую таблицу minute_intervals_polling. Для данных в таблице minute_intervals_polling справедлива ровно таже самая логика, что и для second_intervals_polling, только расписание вызова связанной с ней функции poll_minute_intervals вызывается раз в минуту сооветственно, а записи с интервалом выходящие за допустимый диапазон интервалов переносятся уже в след. таблицу hour_intervals_polling, где вся начинается заново. После получения терминального статуса вызывается функция terminate_polling, которая удаляет запись из любой из трех таблиц и сохраняет их в таблицу terminated.
Структура табдиц очередей на примере second_intervals_polling:
create table second_intervals_polling
(
id uuid default gen_random_uuid() not null constraint second_intervals_polling_pkey primary key,
poller_type varchar not null,
created_at timestamp default now() not null,
next_poll_at timestamp not null,
iteration integer default 0 not null,
iteration_interval interval not null,
intervals interval[] not null,
technical_name varchar not null,
pollable jsonb
);
Код функций:
CREATE OR REPLACE FUNCTION array_last(arr anyarray) RETURNS anyelement AS $$
SELECT arr[array_upper(arr, 1)];
$$ LANGUAGE sql;
CREATE OR REPLACE FUNCTION poll_second_intervals() RETURNS SETOF second_intervals_polling AS $$
WITH
deleted AS (
DELETE FROM second_intervals_polling
WHERE iteration_interval >= '1 MINUTE'
RETURNING *
),
moved AS (
INSERT INTO minute_intervals_polling SELECT * FROM deleted RETURNING *
),
updated AS (
UPDATE second_intervals_polling
SET iteration = iteration + 1,
next_poll_at = NOW() + iteration_interval,
iteration_interval = COALESCE(intervals[iteration + 1], iteration_interval)
WHERE second_intervals_polling.id NOT IN (SELECT id FROM moved)
AND next_poll_at <= NOW()
RETURNING *
)
SELECT * FROM updated;
$$ LANGUAGE sql;
CREATE OR REPLACE FUNCTION poll_minute_intervals() RETURNS SETOF minute_intervals_polling AS $$
WITH
deleted AS (
DELETE FROM minute_intervals_polling
WHERE iteration_interval >= '1 HOUR'
RETURNING *
),
moved AS (
INSERT INTO hour_intervals_polling SELECT * FROM deleted RETURNING *
),
updated AS (
UPDATE minute_intervals_polling
SET iteration = iteration + 1,
next_poll_at = NOW() + iteration_interval,
iteration_interval = COALESCE(intervals[iteration + 1], iteration_interval)
WHERE minute_intervals_polling.id NOT IN (SELECT id FROM moved)
AND next_poll_at <= NOW()
RETURNING *
)
SELECT * FROM updated;
$$ LANGUAGE sql;
CREATE OR REPLACE FUNCTION poll_hour_intervals() RETURNS SETOF hour_intervals_polling AS $$
WITH
updated AS (
UPDATE hour_intervals_polling
SET iteration = iteration + 1,
next_poll_at = NOW() + COALESCE(intervals[iteration + 1], array_last(intervals))
WHERE next_poll_at <= NOW()
RETURNING *
)
SELECT * FROM updated;
$$ LANGUAGE sql;
CREATE OR REPLACE FUNCTION terminate_polling(ptype varchar, pkey jsonb)
RETURNS TABLE(
id uuid,
poller_type varchar,
created_at timestamp,
terminated_at timestamp,
iteration integer,
intervals interval[],
technical_name varchar,
pollable jsonb
) AS $$
WITH deleted_per_seconds AS ( DELETE FROM second_intervals_polling WHERE pkey = pollable AND ptype = poller_type RETURNING * ),
deleted_per_minutes AS ( DELETE FROM minute_intervals_polling WHERE pkey = pollable AND ptype = poller_type RETURNING * ),
deleted_per_hours AS ( DELETE FROM hour_intervals_polling WHERE pkey = pollable AND ptype = poller_type RETURNING * ),
moved AS (
INSERT INTO terminated
( id, created_at, poller_type, intervals, iteration, technical_name, pollable )
SELECT id, created_at, poller_type, intervals, iteration, technical_name, pollable FROM deleted_per_seconds UNION
SELECT id, created_at, poller_type, intervals, iteration, technical_name, pollable FROM deleted_per_minutes UNION
SELECT id, created_at, poller_type, intervals, iteration, technical_name, pollable FROM deleted_per_hours
RETURNING *
)
SELECT * FROM moved;
$$ LANGUAGE sql;
Протестил локально и на тестовом стенде. Работает как ожидал, но возможно где-то ошибся, т.к. мой первый опыт. Сейчас меня больше всего интересует оценка людей более сведущих, другими словами мне нужна критика. Есть ли очевидные недостатки?
PS На три таблицы разделил по причине того, что по требованиям реально нужно селектовать каждую секунду и я не хочу, чтобы "несрочные" данные лежали вместе со "срочными", мне кажется, что это должно работать быстрее.