Как реализовать параллельные вычисления в Pandas с разными DF без последующего сведения?

Необходимо ускорить обработку большого набора данных в Pandas/Python с 8,5 секунд до 3-х.
Оптимизировав код Pandas, вижу следующие возможности:
1. Использовать несколько ядер процессора (Pandas использует только 1) и сделать параллельные вычисления. Пока для меня основная гипотеза, готов её пересмотреть.
2. Изменить процесс хранения и обработки данных (сейчас данные хранятся локально в Parquet и обрабатываются в Pandas)

Дано:
Сервер 4 ядра, 16 гб оперативки, возможно масштабирования (виртуальная машина)
1 большой набор данных (45 столбцов, 10 000 000 строк, хранение parquet), где лежат продажи по дням (2 года), по продуктам (500 позиций), по заказчикам (15 000 клиентов), по менеджерам (500 торговых), с дополнительной категоризацией и с разными мерами (шт, рубли, кг, скидка, наценка и т.п.).

Требуемый конечный результат:
более 20 маленьких таблиц для пользователей бизнес отчётов в разном разрезе.

Текущая скорость:
Шаг1. Чтение parquet с предварительной фильтрацией 2,5 секунды
pd.read_parquet('db.parquet', engine='pyarrow', filters=parquet_filter_list, columns=parquet_columns_list)

Шаг2. Подготовка 5 средних df в процессе Фильтрации -> Группировки -> Агрегации 4 секунды
df1= df1[columns].groupby(groupby_columns)[values_columns].sum().reset_index()
df2..

Шаг3. Для каждого из 5 средних df - подготовка по 3-5 финальных таблиц и передача их пользователю через fw Django 2 секунды
pivot_table_1 = pd.pvot_table..

Итого 8,5 секунд

Идея: сделать вычисление каждого из средних df, а также подготовку по ним маленьких таблиц, параллельно, используя несколько ядер процессора.

Поделитесь пожалуйста опытом параллельных вычислений с Pandas.
Если у Вас есть другие предложения по оптимизации - тоже пожалуйста поделитесь!
  • Вопрос задан
  • 222 просмотра
Пригласить эксперта
Ответы на вопрос 1
mayton2019
@mayton2019
Bigdata Engineer
Поскольку на вопрос никто не писал ответа - я напишу вариант.

Это не совсем ответ. Это скорее список сомнений.

Дело в том что BigData в принципе не работает онлайн. Тоесть нельзя запускать анализ в ответ на мышко-клик пользователя и ожидать что это будет меньше 2.5 секунд. Таких задач никто не ставит. Если хотят чтоб пользователь что-то быстро получал - то готовят DataMart, OLAP-cube, или некий Gold (золотой) уровень материализовнных таблиц в парадигме lakehouse. Тоесть грубо говоря готовят данные к выдаче. Что только взять по ключу и показать. И на этом уровне обычно не ставят parquet а берут CosmosDb+Redis (key-value) чтоб браузер не ждал долго. Никаких агрегаций здесь быть не должно.

Как подготавливать эти уровни серебрянного или золотого представления данных - это отдельная задача. Есть микро-батчи или стриминг но в общем и целом нельзя делать в ответ на действие пользователя. Это делает pipeliene.

Второе. Как уже верно было подмечено "Сервер 4 ядра, 16 гб оперативки" - это не конфигурация для Spark. Это просто какой-то сервак. Spark - это кластерная система и он имеет преимущества при работе в "импульсном
режиме в кластере" но ей нужен пул рабочих машин (хотя-бы штук 4-8) и файловая система наподобие hdfs или ее более современные вариации (s3/msBlobStorage). Это очень важно. Иначе не будет скейлится I/O.

Третье. Паркет (parquet) вообще имеет сильные преимущества когда вы выбираете 45 столбцов из 4500. В этом случае дисковая оптимизация сработает лучше чем у реляционных систем и выберет ровно столько IOPS сколько надо для публикации именно 45 столбцов. В остальных случаях паркет только фейлит и лучше вам брать AVRO например или реляционные БД.

Четвертое. Партишенинг. Очень часто бизнес запрашивает данные которые физически можно консолидировать по партишенам. Тоесть если ваш parquet-файл разрезать на периоды и допустим продукты то мы получим следующую физическую структуру файлов:
sales/year=2022/product=001/part-00000-a11a0ce2-ea20-4897-a713-130a6538cd9a-c000.snappy.parquet
sales/year=2022/product=002/part-00000-a11a0ce2-ea20-4897-a713-130a6538cd9a-c000.snappy.parquet
...
sales/year=2023/product=500/part-00000-a11a0ce2-ea20-4897-a713-130a6538cd9a-c000.snappy.parquet

Эту структуру я придумал с головы. Но у вас есть много вариантов как комбинировать продукты-менеджеры-заказчики-отчетные-периоды.

И есть предположение что в таком случае операции агрегации пройдут быстрее за счет естесвтенного физического партишеннга. (Это я говорю для Spark. Для его оптимизатора. Как будет в Panadas я не знаю.)
С точки зрения Spark - структура файлов в таком формате рассматривается как партицированная табличка в оракле. При этом надо конечно помнить основные правила HDFS и биг-дата. Никаких мелких файлов! Вы должны расчитывать в 128Мб как в самую мелкую единицу I/O.

Пятое. Я это не использовал. Но посмотрите в направление библиотеки Apache Arrow. Она имеет сильные оптимизации для векторной алгебры и знает в лицо parquet. Если вы все-таки хотите оставаться в той парадигме выдачи запросов Python/Django/Pandas - то исследуйте как arrow может ускорить ваши операции агрегаций. Только вам возможно придется отказаться от Pandas-DataFrame API а использовать что-то низкоуровневое типа С++ для векторизовнного API.

Как видите - векторов развитя много. Думайте.

UPD:
Ответ написан
Комментировать
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы