Задать вопрос
@rjsem

Как Apache Spark будет параллельно(или не) брать и обрабатывать данные?

Здравствуйте,

Я решил попытаться разобраться с Apache Spark, и в ходе знакомства с документацией и примерами у меня возник следующий вопрос:

Как спарк будет параллельно(или не) брать и обрабатывать данные?
1. В документации существует куча примеров с sc.textFile(“example.txt”), но нет примеров с parallelize, получается все это будет обрабатываться в 1 потоке(для каждого spark-submit)?

2. Есть примеры с HBase, HDFS, скажите, а как будут браться данные из hdfs, по 1 куску или сразу пачкой(и будут как-то распределятся и суммироваться)? и как всё это будет обрабатываться? параллельно(распределено различными воркерами)?
Что будет в случае использования hbase? а в случае с JDBC(POSTGRES)? Как распределять задания в таком случае?

В дополнение:
Как отправлять данные в spark? я вижу только spark-submit, а существуют ли другие способы и как получить только результат, а не весь мусор?
  • Вопрос задан
  • 256 просмотров
Подписаться 1 Средний Комментировать
Пригласить эксперта
Ответы на вопрос 2
@Miron11
Пишу sql 20 лет. Срок :)
Ууу... три года назад вопрос задан.
Так не интересно :)
Но попробую ответить, все-равно. Даже зная, что автор наверное поднаторел и возможно может ответить на свой вопрос сам, намного лучше.
Итак.
1. Спарк все и всегда делает параллельно. Вопрос лишь в том, дает ему пользователь выполнить параллельный запрос используя 2 или больше, так называемых "executor(s)", по русски наверное "выполнимых".
Выполнимый это просто java.exe выполнимый. Кроме того, что это выполнимый, это тот ( или те ) выполнимый(ые), которые и выполняют такие операции, как считывание файлов, запись файлов, jdbc, spark sql и другие запросы.
В зависимости от конфигурации Spark их может быть один или больше.
Как правило эти executors запускаются на машинах, которые распределены в сети, и физически отделены друг от друга. Есть настройки, которые устанавливают сколько именно executors могут быть запущены на этом массиве машин.
Машины эти, в свою очередь, называются "worker nodes".
Кроме worker nodes несущих executors Spark требуется driver. И хотя на этом драйвере так же выполняется java.exe, в свою очередь этот выполнимый не входит в счет executors, за исключением того случая, когда Вы работаете с Spark в режиме локальной консоли "spark-shell". При запуске Spark в режиме локальной консоли все возможности Spark по параллельному распределенному выполнению заданий сводятся к немного больше, чем нулю.
Да, никто не мешает создать Thread и запустить её параллельно, но это уже не совсем Spark, но собственный код.
Итак, как же добиться, чтобы Spark выполнил задание параллельно, пользуясь встроенными в Spark возможностями.
Часть ответа уже прояснилась - надо запустить Spark кластер, по крайней мере с одним worker на котором надо отконфигурировать и запустить 2 или больше Executors. После чего надо выполнить задание, которое Spark умеет выполнить параллельно.
При этом надо всегда помнить, что Spark попытается оттянуть выполнение задание как можно больше, а вместо выполнения задания, при вызове той или иной функции, запомнить некий план выполнения, который будет выполнен в будущем. Такой план называется DAG ( direct acyclic graph ).
Хорошим примером операции, которую Spark выполняет одновременно несколькими потоками, это считывание файлов из директории
spark.read.format("json").load("my_path_to_json_files/files_subdirectory/*.json")
хотя и не прочитает файлы, как таковые, но создаст DAG, при этом воспользовавшись thread равным по количеству executors. То есть уже некоторые аспекты параллельного выполнения появляется.
Далее, если Вы выполните следующую линию, запомнив этот DAG в неизвестном
val df = spark.read.format("json").load("my_path_to_json_files/files_subdirectory/*.json")
а следующим шагом, допустим, запишете этот же список файлов в формате parquet в другой файловой системе, например

val df = spark.read.format("json").load("my_path_to_json_files/files_subdirectory/*.json")
df.write.format("parquet").save("my_path_to_parquet_files/files_subdirectory")

Spark воспримет эти две линии как команду создать, так называемую job, в рамках этой job Spark задаст executors их собственные Tasks в рамках этой самой job, и по мере выполнения Task на executors будет продвигаться к тому, чтобы выполнить всю Job, при этом заметьте
задание будет выполнено параллельно
и
независимо друг от друга, теми самыми Тhreads.
Стоит заметить, что если Job задает driver, то распределением Tasks, и отслеживанием стадий их выполнения ( Stages ) уже занимается Spark Scheduler.
Так выглядит архитектура Spark при выполнении некоего задания.
Есть ещё несколько способов обратиться к подсистеме executors с целью параллельного и одновременного выполнения задания, это
df.edd.foreachPartitionAsync()
но в этом случае Вам придется написать код, который уже будет выполняться настолько параллельно, насколько умело Вы владеете языком описания заданий для Spark, scala и где то java.
Кроме параллельного выполнения заданий Spark поддерживает, через свой язык, scala, те же Threads и параллельные коллекции, которые так же способны поддерживать параллельное выполнение потоков. Но, по большей части, эти потоки будут выполняться на driver, то есть сами по себе они не будут выполняться распределено. А это уже сильно уменьшает возможности Spark использовать параллельное выполнение заданий, поскольку ресурсы одного отдельно взятого driver конечны, и при параллельном выполнении заданий driver довольно быстро их, эти ресурсы, исчерпает, если Вы попытаетесь считывать, допустим, файлы большого размера, или выполнять задания на данных большого размера.
Я попытался найти в документации описание распределенного параллельного выполнения запросов, но как - то быстро у меня найти не получилось.
Не поленитесь, и поищите Spark документацию. Там все эти детали досконально описаны.
Ответ написан
Комментировать
angrySCV
@angrySCV
machine learning, programming, startuping
данные из внешних источников грузятся в датаСеты (специальный интерфес над RDD) - поэтому вы там не увидели parallelize, вместо этого используется метод toDF или toDS.
в любом случае спарк работает только с RDD и только паралельно/распределенно (используя или нет дополнительные интерфейсы)
Ответ написан
Комментировать
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы