Как хранить в базе исторические данные и удалять дубликаты?
Есть 500 миллионов датчиков (проект под NDA, поэтому немного меняю названия элементов задачи, но суть остается какой нужно), у каждого датчика есть такой параметр как "текущие показания". Показания меняются редко, где-то раз в 1-2 месяца.
Раз в 3-4 дня приходят данные о показаниях в виде огромного csv файла, которы выглядит вот так
sensor_id,timestamp,value
Нужно хранить историю изменения показаний.
Если хранить в лоб, то будет много дубликатов, которые отличаются только датой. хочется хранить только те строки, которые отличаются от предыдущей
то хотелось бы оставить только строки
sensor_N1,2021-02-01,100
sensor_N1,2021-02-21,115
sensor_N1,2021-03-11,100
В какую сторону смотреть, чтобы такое реализовать? .
Пока придумались такие варианты
1) Если при инсерте делать select на предыдущую строку - то это ужасно медленно (500 миллионов датчиков)
2) Перед импортом данных, делать экспорт последней записи для всех датчиков, скриптами отпределять данные которые изменились и заливать только измененные данные
3) Что-то еще?
Смотрим любую базу которая позволит решить эту задачу.
ReplacingMergeTree тут тоже не поможет, хранить полную историю я не хочу. ну и запросы вида select * from xxx where sensorID = sensor_N1 - это тоже не лучший тип запроса кликхауса (он читает всю гранулу)
Тогда, мне кажется, второй вариант нормальный. Возможно, это лучше будет даже делать в БД: загрузили во временную таблицу, туда же добавили последние показания, одним запросом строки с изменениями и вставили. Запрос типа:
SELECT ... FROM (
SELECT ..., value, LAG(value) OVER (PARTITION BY sensor_id ORDER BY timestamp) AS prev
) WHERE value <> prev
Можно для каждого датчика в оперативной таблице хранить две даты: дата начала интервала постоянства и дата окончания этого интервала. Это такой интервал в котором значение датчика не меняется.
Т. е. грузим текущие данные и если по датчику значение не поменялось, то просто изменяем дату окончания интервала на дату текущей загрузки, если значение датчика поменялось(отличается от сохраненного в оперативной таблице) , то текущий интервал выгружаем в архивную таблицу, а в оперативной добавляем новую(изменяем существующую) запись для датчика у которой дата начала и дата окончания будет равна дате текущей загрузки, а значение текущему загружаемому значению датчика.
Т. е. в оперативной таблице всегда количество записей равно количеству датчиков, а в исторической весь скоп предыдущих значений.
Это должно защитить от того, что с течением времени скорость работы с оперативной таблицей будет деградировать, от того что там будет расти число записей.
Если же нужно какой-то отчёт строить или выгрузку за период или за прошлые даты, то тут уже нужно будет работать с исторической таблицей и это будет уже не очень быстро, но такие операции обычно не требуется часто выполнять.
Так как данные грузятся раз в 3-4 дня, наверное не очень критично, что загрузка будет выполнять не мгновенно. Тут наверное проще в несколько этапов сделать: на первом проходе построить список идентификатор датчиков по которым значение не поменялось, затем по этому списку проапдейтить дату окончания интервала на дату текущей загрузки, затем по датчикам которые не попали в список перелить строки в архивную таблицу и наконец поменять для изменённый датчиков в оперативной таблице значение счётчика и даты начала и окончания интервала.
Всё шаги можно делать массово.
Если хранить в лоб, то будет много дубликатов, которые отличаются только датой. хочется хранить только те строки, которые отличаются от предыдущей
Мне кажется, это плохой подход
Если с какого-то датчика приходят данные всегда одинаковые в течение месяца, то это подтверждает, что канал связи с ним работает.
Имхо, все данные нужно хранить.
В вашем случае использовать Time series database - InfluxDB, TimescaleDB и прочие, которые заточены для работы с временными рядами.
А в чём сложность-то? К вам приходит огромный CSV-файл, берёте - и вычищаете из него строчки с одинаковыми показаниями, оставляя только первую, после чего инсертите получившееся в базу.
Засовывать кучу лишней информации в базу, и только потом вычищать - контрпродуктивно. С этим справится простейший скрипт, пробегающий по всем строчкам.
А в чём сложность-то? К вам приходит огромный CSV-файл, берёте - и вычищаете из него строчки с одинаковыми показаниями, оставляя только первую, после чего инсертите получившееся в базу.
И как вы собираетесь "вычищать" из огромного CSV? Сначала записать результат в промежуточный файл, и аж потом в БД?
vgray, это просто дополнительный шаг: посмотреть последнее показание каждого счётчика в базе, и если в "вычищенном" CSV показание ровно одно и не отличается от "базового" - ничего не добавлять.
vgray, непременно. Вы тоже держите в курсе - каким магическим способом избавитесь от девятизначного числа операций, не засирая базу кучей лишней информации.
1) Если при инсерте делать select на предыдущую строку - то это ужасно медленно (500 миллионов датчиков)
А вы пробовали? Если делать умеючи, то нормально.
Заливаете порцию данных, скажем, в 100 тыс. строк в промежуточную таблицу. Делаете INSERT ... SELECT ... JOIN. Ну и индексы правльно настроить.
3) Что-то еще?
Например. Вместо INSERT делаете UPDATE. На UPDATE вешаете триггер, который делает копию текущего значения, если оно отличается.
Предложу свой вариант обработчик через временную таблицу, на сколько он быстр не проверял, но его можно оптимизировать:
WITH t AS (SELECT *, row_number() over (PARTITION BY id ORDER BY created_at) as row FROM temporary_data),
d AS (SELECT DISTINCT ON (id) * FROM current_data ORDER BY id, created_at DESC)
SELECT t.*
FROM t
LEFT JOIN t t2 ON t.id = t2.id AND t.row = t2.row+1
LEFT JOIN d ON t.row = 1 AND t.id = d.id
WHERE t.value <> coalesce(t2.value, d.value);