Как загрузить большой датасет в память сервера (+ пара сопутсвующих проблем)?
Есть датасет (формат «raw», но отформатированный правильно для чтения pandas’ом) размерностью (4000, 20 000 000), который весит примерно 550гб. А также есть сервер со 112 ядрами, оперативной памятью 1.7 Тб и свопом 3 Тб.
Данные имеют следующий вид: Первые 6 столбцов - мусорные и не важны для обработки, оставшиеся содержат только 2, 1, 0 и Nan. Вопрос в следующем: возможно ли вообще загрузить данные в память сервера не используя инструментарий Big Data?
Мой путь и попытки решения вопроса:
Прямая попытка загрузить данные pandas’ом не увенчалась успехом. Как я понял, пандас плохо работает с данными, где количество столбцов исчисляется миллионами.
Через внешний инструмент транспонировал датасет. Данные теперь имеют вид (20 000 000, 4000), что по идее должно легко считаться пандасом. Далее, попытался вновь считать данные. Процесс пошел, но это привело к переполнению памяти сервера.
После загрузки в память, необходимо сократить размерность с помощью PCA. Думал разбить файл на куски по несколько миллионов, чтобы работать частями. Части действительно загружаются довольно быстро и работать с ними вполне возможно. Но, как я понимаю с математической точки зрения это сделать нельзя, так как компонентный анализ нужно производить именно на всем датасете, а не на его частях.
Ещё думал почистить данные от столбцов с нулевой и околонулевой дисперсией с помощью «VarianceThreshold» из библиотеки sklearn, чтобы уменьшить количество столбцов. Взял порог в 20% - т.е VarianceThreshold(0.2). Столкнулся со следующей ошибкой: «ValueError: No feature in X meets the variance threshold 0.20000». Как я понимаю, это означает, что все столбцы должны быть удалены?
Должен быть какой-то инструмент(скорее всего для какой-нить бд) чтоб перегнал эти 500гб сразу в рабочий формат, который можно взять и положить в память как есть, после чего с ним напрямую и работать. Но я далёк от темы, так что ничего конкретного не скажу.)
Спасибо. Скажу честно, что я никогда не разворачивал ни кликхаус, ни даск. С даском встречался только в виде питоновской библиотеки "modin".
Только начал работать дата саентистом и не предполагал, что нужно будет самому разворачивать такую инфраструктуру...
На счет даска, как я понимаю, нужно настраивать какие-то кластерные решения? Это же не просто питоновская библиотека. А на счет СУБД у вас нет какого-нибудь мануала, чтобы как-нибудь разобраться в том, как это все самому организовать?
p.s. На предприятии нет развернутой под это дело бд или кластера с даском. Поэтому я и ищу обходные варианты.
За эти два дня я попробовал Dask. На игрушечных данных всё работает гладко. Но как только перехожу к реальным данным, так начинаются проблемы. Реальные данные даже не пошли в память (следил через htop и дашборду даска) за пол дня работы. Кол-во потребляемой памяти оставалось неизменным, но пара ядер было активно.
В середине рабочего дня отрезал кусок в 27Гб от основного датасета. И даск пять часов пытался сделать PCA - в итоге выдал ошибку, что значение в ячейке nan, бесконечность или не помещается в int64. Хотя, как я и говорил, данные состоят из 2,1 и 0.
Сыпятся ошибки, связанные с тем, что воркеры перестают отвечать, какие-то ошибки лямбда где-то внутри даска (забыл заскринить, уже не на работе). Как я понял, даск наследует всё слабые стороны пандаса и принципиально не может работать с данными, в которых количество столбцов исчисляется миллионами.
ScriptKiddo,
Если пытаться загрузить датасет без транспонирования, то так:
import dask.dataframe as dd
data1 = dd.read_csv('/mnt/data1/MAF005.csv', sep=' ', header=None)
И больше ничего не происходит. Загружено только одно ядро, объем занимаемой оперативной памяти не увеличивается.
Вчера попробовал загрузить кусок 27Гб в транспонированном виде (чтобы строк было больше, чем столбцов). Но даск в его исходной реализации не умеет транспонировать обратно. Так что пришлось использовать modin, который тоже имеет даск под капотом.
#запуск локального кластера
from distributed import Client, LocalCluster
cluster = LocalCluster()
client = Client(cluster)
#библиотеки
import os
os.environ["MODIN_ENGINE"] = "dask" # Modin will use Dask
import modin.pandas as pd
import numpy as np
from sklearn.feature_selection import VarianceThreshold
import glob
#загрузка транспонированного файла и обратная трансформация
data = pd.read_csv('/mnt/data1/MAF005t.csv', sep=' ', header=None)
data = data.T
data.columns = data.iloc[0]
data = data[1:]
data = data.drop(columns=['FID','IID','PAT','MAT','SEX',"PHENOTYPE"], axis=1)
#попытка очистить датасет от столбцов с нулевой и околонулевой дисперсией
try:
sel = VarianceThreshold(threshold=0.2)
sel.fit(data)
concol = [column for column in data.columns
if column not in data.columns[sel.get_support()]]
data.drop(concol, axis=1, inplace = True)
data.to_csv(f'/mnt/data1/MAFIA005_output.csv')
except ValueError:
print('не подошел по 0.2')
#попытка сделать PCA
from sklearn.decomposition import PCA
pca = PCA(0.99)
pca.fit(data)
dfpca = pca.transform(data)
data.head()
dfpca.head()
Даск в чистом виде просто не загружает данные, а в реализации modin сыпятся ошибки.
Ошибки примерно такого рода в связи с потерей коннекта к воркерам:
"OSError: Timed out during handshake while connecting to tcp://127.0.0.1:36643 after 30 s
distributed.worker - ERROR - Worker stream died during communication: tcp://127.0.0.1:44957"
Хотя, судя по дашборде даска они все равно дальше работают.
Ещё я писал про ошибки лямбда. Они выглядят вот так: "CancelledError: lambda-3a6d8a1b-bccd-486a-9938-337ed9d52be4"
4000 колонок * 20кк строк * 1 байт значение (даже не надо в биты упаковывать, та ккак у вас 4 варианта значений)
это 80,000,000,000 байт, т.е. считанные 80 гигабайт данных в виде матрицы
какого вида нужна обработка? с линейным чтением справится просто массив