@ksimmi

Насколько хорошо/оптимально использовать хранимую процедуру для полинга?

Всем привет!

Больше месяца назад я решал проблему описанную в другом вопросе, но, к сожалению, так и не нашел того решения которого хотел, а именно "повесить тригер на устаревание даты и не использовать селекты". Подробно логика поллинга описана по ссылке в вопросе и тут дублировать задачу не буду. В этом же вопросе в секции UPD я указал, что вижу другое решение, через хранимую процедуру с слектованием, по этому пути я и пошел. У меня за одни выходные получилось сделать то, что я хотел, но это мой первый опыт написания хранимых процедур и у меня нет увернности, что я сделал все правильно и вообще это будет быстро работать.

Дано:
  • Три таблицы очередей с записями, по которым надо делать поллинг: second_intervals_polling, minute_intervals_polling, hour_intervals_polling. Таблицы имеют идентичную структуру, одинаковые индексы, все что их отличает - частота селектования из этих таблиц, т.е. интервал между селектами. В этих таблицах я по полной использую типы данных INTERVAL и ARRAY. ;
  • Таблица выведенных из очереди записей terminated. Тут все понятно, сюда складываю все записи, которые были успешно доведены до терминального статуса, удаляя их из трех предыдущих таблиц;
  • Таблица с базовыми настройками очередей для каждого типа поллера. Эта таблица используется на уровне приложения, чтобы считать настройки перед постановкой в очередь очередной записи и далее в объянии фигурировать не будет;
  • Три хранимых функции для получения записей с истекшей временной меткой из соответствующей ей таблицы poll_second_intervals, poll_minute_intervals, poll_hour_intervals. Код функций приведу ниже, но логика не сложная. Каждая функция селектует записи из связанной с ней таблицы по которым пора запросить статус и для всех таких записей, если они есть, на основании текущей итерации происходит вычисление следующего интервала. Если, например, текущая таблица исчисляется секундными интервалами, а вычисленный интервал в минутах, то запись будет перенесена из секундной таблицы в минутную. В конечном итоге функция фозвращает список строк, по которым пора опрашивать статус;
  • Одна хранимая функция для выведения из очередей полинга terminate_polling;
  • Одна хранимая функция для получения последного элемента массива, добавленная просто для семантики array_last.


Как это используется:
На уровне приложения, в зависимости от типа поллера (точнее от требований к частоте поллинга) создается новая запись в одну из трех таблиц second_intervals_polling, minute_intervals_polling, hour_intervals_polling. Допустим, что запись была записана в second_intervals_polling. Есть расписание (например крон), которое, через приложение, раз в секунду вызвает функцию poll_second_intervals. В результате работы этой функции приложение получает все данные, которые пора опросить, а на уровне БД записи присваивается номер итерации и на основании интервала этой итерации (intervals[iteration]) вычисляется следующая дата next_poll_at. При повторномы вызове функции poll_second_intervals итерация будет инкрементирована, и опять произойдет на основании интервала (intervals[iteration + 1]) вычисление следующей даты next_poll_at. Если интервал для текущей итерации будет исчисляться не секундами (больше чем 59 секунд), то запись будет перенесена в другую таблицу minute_intervals_polling. Для данных в таблице minute_intervals_polling справедлива ровно таже самая логика, что и для second_intervals_polling, только расписание вызова связанной с ней функции poll_minute_intervals вызывается раз в минуту сооветственно, а записи с интервалом выходящие за допустимый диапазон интервалов переносятся уже в след. таблицу hour_intervals_polling, где вся начинается заново. После получения терминального статуса вызывается функция terminate_polling, которая удаляет запись из любой из трех таблиц и сохраняет их в таблицу terminated.

Структура табдиц очередей на примере second_intervals_polling:
create table second_intervals_polling
(
    id                 uuid default gen_random_uuid() not null constraint second_intervals_polling_pkey primary key,
    poller_type        varchar                             not null,
    created_at         timestamp default now()             not null,
    next_poll_at       timestamp                           not null,
    iteration          integer   default 0                 not null,
    iteration_interval interval                            not null,
    intervals          interval[]                          not null,
    technical_name     varchar                             not null,
    pollable           jsonb
);


Код функций:
CREATE OR REPLACE FUNCTION array_last(arr anyarray) RETURNS anyelement AS $$
    SELECT arr[array_upper(arr, 1)];
$$ LANGUAGE sql;

CREATE OR REPLACE FUNCTION poll_second_intervals() RETURNS SETOF second_intervals_polling AS $$
WITH 
    deleted AS (
        DELETE FROM second_intervals_polling
        WHERE iteration_interval >= '1 MINUTE'
        RETURNING  *
    ),
    moved AS (
        INSERT INTO minute_intervals_polling SELECT * FROM deleted RETURNING *
    ),
    updated AS (
        UPDATE second_intervals_polling
        SET iteration = iteration + 1,
            next_poll_at = NOW() + iteration_interval,
            iteration_interval = COALESCE(intervals[iteration + 1], iteration_interval)
        WHERE second_intervals_polling.id NOT IN (SELECT id FROM moved)
          AND next_poll_at <= NOW()
        RETURNING *
    )
    SELECT * FROM updated;
$$ LANGUAGE sql;


CREATE OR REPLACE FUNCTION poll_minute_intervals() RETURNS SETOF minute_intervals_polling AS $$
WITH
    deleted AS (
        DELETE FROM minute_intervals_polling
        WHERE iteration_interval >= '1 HOUR'
        RETURNING  *
    ),
    moved AS (
        INSERT INTO hour_intervals_polling SELECT * FROM deleted RETURNING *
    ),
    updated AS (
        UPDATE minute_intervals_polling
        SET iteration = iteration + 1,
            next_poll_at = NOW() + iteration_interval,
            iteration_interval = COALESCE(intervals[iteration + 1], iteration_interval)
        WHERE minute_intervals_polling.id NOT IN (SELECT id FROM moved)
          AND next_poll_at <= NOW()
        RETURNING *
    )
    SELECT * FROM updated;
$$ LANGUAGE sql;


CREATE OR REPLACE FUNCTION poll_hour_intervals() RETURNS SETOF hour_intervals_polling AS $$
WITH
    updated  AS (
        UPDATE hour_intervals_polling
        SET iteration = iteration + 1,
            next_poll_at = NOW() + COALESCE(intervals[iteration + 1], array_last(intervals))
        WHERE next_poll_at <= NOW()
        RETURNING *
    )
    SELECT * FROM updated;
$$ LANGUAGE sql;


CREATE OR REPLACE FUNCTION terminate_polling(ptype varchar, pkey jsonb)
  RETURNS TABLE(
      id uuid,
      poller_type varchar,
      created_at timestamp,
      terminated_at timestamp,
      iteration integer,
      intervals interval[],
      technical_name varchar,
      pollable jsonb
  ) AS $$
WITH deleted_per_seconds AS ( DELETE FROM second_intervals_polling WHERE pkey = pollable AND ptype = poller_type RETURNING  * ),
     deleted_per_minutes AS ( DELETE FROM minute_intervals_polling WHERE pkey = pollable AND ptype = poller_type RETURNING  * ),
     deleted_per_hours   AS ( DELETE FROM hour_intervals_polling   WHERE pkey = pollable AND ptype = poller_type RETURNING  * ),
     moved AS (
          INSERT INTO terminated
               ( id, created_at, poller_type, intervals, iteration, technical_name, pollable )
          SELECT id, created_at, poller_type, intervals, iteration, technical_name, pollable FROM deleted_per_seconds UNION
          SELECT id, created_at, poller_type, intervals, iteration, technical_name, pollable FROM deleted_per_minutes UNION
          SELECT id, created_at, poller_type, intervals, iteration, technical_name, pollable FROM deleted_per_hours
          RETURNING *
     )
SELECT * FROM moved;
$$ LANGUAGE sql;


Протестил локально и на тестовом стенде. Работает как ожидал, но возможно где-то ошибся, т.к. мой первый опыт. Сейчас меня больше всего интересует оценка людей более сведущих, другими словами мне нужна критика. Есть ли очевидные недостатки?

PS На три таблицы разделил по причине того, что по требованиям реально нужно селектовать каждую секунду и я не хочу, чтобы "несрочные" данные лежали вместе со "срочными", мне кажется, что это должно работать быстрее.
  • Вопрос задан
  • 45 просмотров
Пригласить эксперта
Ответы на вопрос 1
@rPman
Изначально неправильный подход к реализации, у которого будут глюки в проблемных местах. К тому же слишком сложный у вас получился вариант.

Если база данных в своей основе не позволяет нужны вам функционал, как бы вы не извращались, делать его придется снаружи. И лучше чтобы это было 'userspace' а не ядро базы данных или ее расширение, так как стоимость поддержки результата растет экспоненциально.

Правильный подход - у вас работает ваше приложение (не крон), которое в момент изменения/добавления временных интервалов будет вычислять какой следующий интервал в ближайшее время должен тригериться, ждать соответствующее время и исполнять его.

В базе данных (таблица задач), в зависимости от необходимости, у вас должны быть следующие поля:
* таймстамп создания интервала (нужен чтобы вычислять сколько раз должен исполниться и уже исполнился интервал)
Уже в этот момент появляется важное замечание, нужно понимать что время на машине может быть изменено, скорректировано назад например и в зависимости от того на сколько вам важно не пропустить или не получить лишнее исполнение, можно использовать синтетические значения, в т.ч. использовать специальные процессорные счетчики, значение которых не меняется при смене времени, конечно придется добавлять инструменты корректировки/инициализации при перезапуске службы, отдельный разговор и задача.
* временной интервал между повторениями
* лимит количества исполнений - здесь указывается максимальное количество раз исполнений задачи
* количество исполнений - счетчик должен увеличиваться при исполнении задачи
еще нужно учесть что есть просто попытки исполнения и есть успешно завершенные, бывают задачи, при которых не успешные попытки тоже должны быть учтены, так как контроль за ошибками системы важен.

Все изменения интервалов должны проходить через вашу службу или вы должны собирать нотификации базы данных о таких изменениях (некоторые базы данных дают такой инструментарий, позволяют передавать события через отдельные сокеты или даже сессию подключения клиента), правда вам придется учитывать что записи в базе транзакционные и если вдруг произойдет откат записи об интервале а вы уже обработали событие (или наоборот в базе транзакция прошла а у вас из-за ошибки событие не обработалось), поэтому проще управлять интервалами через свое приложение.

В момент когда приложение получает новую ситуацию по интервалам, оно вычисляет какой ближайший интервал и когда закончится, и запускает счетчик времени, который должен быть отменен при получении новых изменений.

По окончанию работы счетчика вы должны снова запросить из базы данных список задач которые опоздали или должны быть исполнены сейчас, вычисляя разницу между расчетным количеством исполнений (now - creation_time)/exec_interval и счетчиком исполнений (тут же проверяем лимит количества запусков, и при его превышении задачу удаляем, не забыв доделать нужное количество запусков). Для каждой задачи получаем количество исполнений - запускаем эти задачи и по окончанию каждой итерации увеличиваем счетчик исполнений.

p.s. разделение задач по разным таблицам в зависимости от длины интервала никак на производительность не повлияют, только усложнят алгоритм
Ответ написан
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Войти через центр авторизации
Похожие вопросы