Задать вопрос
@ArtKiss123

Как правильно реализовать перевод данных из STG в ODS при помощи SCD2 в PostgreSQL?

Есть задание: нужно сделать task в Airflow, чтобы данные из БД STG переносились в БД ODS с реализацией SCD2. Хочу сделать универсальную процедуру, которую мы будем вызывать в нашем dag. Проблема с написанием самой функции. Нужно учесть, что таблицы имеют разное количество столбцов и разные названия.
Как основа у меня есть процедура для одной таблицы:
CREATE TABLE товары_scd2 (
    id SERIAL PRIMARY KEY,
    название TEXT,
    код_товара TEXT NOT NULL,
    группа_товаров_id INT NOT NULL,
    бренд_id INT NOT NULL,
    линейка_товара_id INT NOT NULL,
    производитель_id INT NOT NULL,
    дата_начала DATE NOT NULL,
    дата_окончания DATE,
    признак_актуальности BOOLEAN NOT NULL
);

CREATE OR REPLACE PROCEDURE update_товары_scd2()
LANGUAGE plpgsql
AS $$
DECLARE
    updated_old_versions INT := 0;
    inserted_new_versions INT := 0;
    inserted_new_products INT := 0;
BEGIN
    DROP TABLE IF EXISTS tmp_source_data;

    CREATE TEMP TABLE tmp_source_data AS
    SELECT 
        t.Товар AS название,
        t.КодТовара AS код_товара,
        g.id AS группа_товаров_id,
        b.id AS бренд_id,
        l.id AS линейка_товара_id,
        p.id AS производитель_id
    FROM dblink(
        'host=localhost port=5432 dbname=postgres user=postgres password=1',
        'SELECT DISTINCT Товар, КодТовара, ГруппаТовара, Бренд, ЛинейкаТовара, Производитель FROM товары'
    ) AS t(
        Товар VARCHAR,
        КодТовара VARCHAR,
        ГруппаТовара VARCHAR,
        Бренд VARCHAR,
        ЛинейкаТовара VARCHAR,
        Производитель VARCHAR
    )
    JOIN бренды b ON b.название = t.Бренд
    JOIN группы_товаров g ON g.группа = t.ГруппаТовара AND g.бренд_id = b.id
    JOIN линейки_товара l ON l.название = t.ЛинейкаТовара
    JOIN производители p ON p.имя_производителя = t.Производитель
    WHERE 
        t.Товар IS NOT NULL AND 
        t.КодТовара IS NOT NULL AND 
        t.ГруппаТовара IS NOT NULL AND 
        t.Бренд IS NOT NULL AND 
        t.ЛинейкаТовара IS NOT NULL AND 
        t.Производитель IS NOT NULL;

    -- Закрываем старые версии
    UPDATE товары_scd2 t
    SET 
        дата_окончания = CURRENT_DATE - 1,
        признак_актуальности = FALSE
    WHERE t.признак_актуальности = TRUE
      AND EXISTS (
        SELECT 1 FROM tmp_source_data s
        WHERE s.код_товара = t.код_товара
          AND s.группа_товаров_id = t.группа_товаров_id
          AND (
              s.название IS DISTINCT FROM t.название OR
              s.бренд_id IS DISTINCT FROM t.бренд_id OR
              s.линейка_товара_id IS DISTINCT FROM t.линейка_товара_id OR
              s.производитель_id IS DISTINCT FROM t.производитель_id
          )
      );

    GET DIAGNOSTICS updated_old_versions = ROW_COUNT;
    RAISE NOTICE 'Закрыто устаревших версий: %', updated_old_versions;

    -- Вставка новых версий
    INSERT INTO товары_scd2 (
        название, код_товара, группа_товаров_id,
        бренд_id, линейка_товара_id, производитель_id,
        дата_начала, дата_окончания, признак_актуальности
    )
    SELECT
        s.название, s.код_товара, s.группа_товаров_id,
        s.бренд_id, s.линейка_товара_id, s.производитель_id,
        CURRENT_DATE, NULL, TRUE
    FROM tmp_source_data s
    JOIN товары_scd2 t ON
        t.код_товара = s.код_товара AND
        t.группа_товаров_id = s.группа_товаров_id
    WHERE t.признак_актуальности = FALSE
      AND NOT EXISTS (
        SELECT 1 FROM товары_scd2 t2
        WHERE t2.код_товара = s.код_товара
          AND t2.группа_товаров_id = s.группа_товаров_id
          AND t2.название = s.название
          AND t2.бренд_id = s.бренд_id
          AND t2.линейка_товара_id = s.линейка_товара_id
          AND t2.производитель_id = s.производитель_id
          AND t2.признак_актуальности = TRUE
      );

    GET DIAGNOSTICS inserted_new_versions = ROW_COUNT;
    RAISE NOTICE 'Добавлено новых версий: %', inserted_new_versions;

    -- Вставка новых товаров
    INSERT INTO товары_scd2 (
        название, код_товара, группа_товаров_id,
        бренд_id, линейка_товара_id, производитель_id,
        дата_начала, дата_окончания, признак_актуальности
    )
    SELECT
        s.название, s.код_товара, s.группа_товаров_id,
        s.бренд_id, s.линейка_товара_id, s.производитель_id,
        CURRENT_DATE, NULL, TRUE
    FROM tmp_source_data s
    LEFT JOIN товары_scd2 t ON 
        s.код_товара = t.код_товара AND
        s.группа_товаров_id = t.группа_товаров_id
    WHERE t.id IS NULL;

    GET DIAGNOSTICS inserted_new_products = ROW_COUNT;
    RAISE NOTICE 'Добавлено новых товаров: %', inserted_new_products;

    RAISE NOTICE 'SCD2: Обновление завершено.';
END;
$$;
  • Вопрос задан
  • 24 просмотра
Подписаться 1 Простой Комментировать
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы