@nurzhannogerbek

Как отфильтровать данные за определенный период в Spark?

Здравствуйте, товарищи! Помогите пожалуйста разобраться.

Раньше со Spark не работал. Пытаюсь разобраться с ним на простом примере. Предположим есть большой файл со следующей структурой (см. ниже). В ней хранится дата, мобильный номер и его статус в это время.

| CREATE_DATE         | MOBILE_KEY | STATUS |
|---------------------|------------|--------|
| 2018-11-28 00:00:00 | 8792548575 | IN     |
| 2018-11-29 20:00:00 | 7052548575 | OUT    |
| 2018-11-30 07:30:00 | 7772548575 | IN     |


Как правильно отфильтровать все данные за указанный период для определенных мобильных номеров? К примеру в качестве входящих данных я получаю такие данные:

val dateFrom = "2018-10-01"
val dateTo = "2018-11-05"
val numbers = "7778529636,745128598,7777533575"

val arr = numbers.split(",") // Создать массив из мобильных номеров

spark.read.parquet("fs://path/file.parquet").filter(???)
  • Вопрос задан
  • 159 просмотров
Пригласить эксперта
Ответы на вопрос 2
angrySCV
@angrySCV
machine learning, programming, startuping
можно просто попытаться как вы пишете отфильтровать, для этого в начале получить определенную структуру и тип данных:

источникДанных
  .мап(созданиеСтруктуры)
  .фильтр(текущаяЗапись => СписокТребуемыхНомеров.содержит(текущаяЗапись.телефон) 
    && текущаяЗапись.дата<>требуемыйИнтервал)


так будет работать, но очень долго, медленно и сожрет кучу ресурсов на одной машине - это не то ради чего спарк используют, спарк - это движек для распределенных вычислений. А чтоб запустить распределенные вычисления, нужно в начале создать пару "ключ"->"значение" (где ключ номер телефона, а значение все остальные данные), эти пары распределятся по узлам, где будут параллельно обрабатываться, а потом результат паралельной обработки агрегировать в один общий результат, и для этого не фильтр использовать а reduceByKey с aggregate, для паралельного сбора ключей и значений для этих ключей.
Ответ написан
@potan
Функциональный программист
Дату в формате ISO можно сравнивать как строки. Список телефонов оформить как множество.
Будет что-то типа

val arr = numbers.split(",").toSet

spark.read.parquet("fs://path/file.parquet").filter(t => t("CREATE_DATE") < dateTo && t("CREATE_DATE") > dateFrom && arr(t("MOBILE_KEY")))

Точно не знаю как к полям записи в SPARC обращаться, может быть надо будет немного переделать.
Ответ написан
Комментировать
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы