@NIKA_R

Scala Spark Missing type parameter for List / Почему возникает данная ошибка?

spoiler
631381677a567620878086.png
Код с ошибкой
def read(resource: String): (List[String], DataFrame) = {
    val rdd = spark.sparkContext.textFile(fsPath(resource))

    val headerColumns = rdd.first().split(",").to[List]
    // Compute the schema based on the first line of the CSV file
    val schema = dfSchema(headerColumns)
    val data =
      rdd
        .mapPartitionsWithIndex((i, it) => if (i == 0) it.drop(1) else it) // skip the header line
        .map(_.split(",").to[List])
        .map(row)

    val dataFrame =
      spark.createDataFrame(data, schema)

    (headerColumns, dataFrame)
  }


Сам проект
https://disk.yandex.ru/d/8S5i3CfKj5z0SA

Датасет с описанием https://www.kaggle.com/bls/american-time-use-survey
https://moocs.scala-lang.org/~dockermoocs/bigdata/... (164 Мб)

Цель .
Сколько времени занятые люди тратят на досуг по сравнению с безработными? (см. датасет)

Пошагово:
1. Прочитаем набор данных с помощью Spark;
CSV-файл в Spark-sql
https://spark.apache.org/docs/3.2.0/api/scala/org/...

def row(line: List[String]): Row = {
    Row((line.head :: line.tail.map(_.toDouble)): _*)
  }

2. Преобразуем его в промежуточный набор данных;

def classifiedColumns(columnNames: List[String]): (List[Column], List[Column], List[Column]) = {
    def isPrimary(col: String): Boolean =
        col.startsWith("t01") ||
        col.startsWith("t03") ||
        col.startsWith("t11") ||
        col.startsWith("t1801") ||
        col.startsWith("t1803")

    def isWork(col: String): Boolean =
        col.startsWith("t05") ||
        col.startsWith("t1805")

    def isOther(col: String): Boolean =
        col.startsWith("t02") ||
        col.startsWith("t04") ||
        col.startsWith("t06") ||
        col.startsWith("t07") ||
        col.startsWith("t08") ||
        col.startsWith("t09") ||
        col.startsWith("t10") ||
        col.startsWith("t12") ||
        col.startsWith("t13") ||
        col.startsWith("t14") ||
        col.startsWith("t15") ||
        col.startsWith("t16") ||
        (col.startsWith("t18") && !(col.startsWith("t1801") || col.startsWith("t1803") || col.startsWith("t1805")))

    (columnNames.filter(isPrimary).map(column), columnNames.filter(isWork).map(column), columnNames.filter(isOther).map(column))
  }

3. Посчитать параметры для ответа на вопрос.
Реализуем метод timeUsageSummary, который сопоставит сформированный выше набор данных и общий набор данных. Получим 6 столбцов:
  1. workingStatusProjection рабочий статус,
  2. sexProjection пол,
  3. ageProjection возраст,
  4. workProjection количество ежедневных часов, затрачиваемых на основные виды деятельности,
  5. otherProjection количество ежедневных часов, затрачиваемых на работу, и количество ежедневных часов, затрачиваемых на иное


def timeUsageSummary(
    primaryNeedsColumns: List[Column],
    workColumns: List[Column],
    otherColumns: List[Column],
    df: DataFrame
  ): DataFrame = {
    val workingStatusProjection: Column = when($"telfs" >= 1 && $"telfs" < 3, "working").otherwise("not working").as("working")
    val sexProjection: Column = when($"tesex" === 1, "male").otherwise("female").as("sex")
    val ageProjection: Column = when($"teage" >= 15 && $"teage" <= 22, "young").when($"teage" >= 23 && $"teage" <= 55, "active").otherwise("elder").as("age")
    val primaryNeedsProjection: Column = primaryNeedsColumns.reduce(_ + _).divide(60).as("primaryNeeds")
    val workProjection: Column = workColumns.reduce(_ + _).divide(60).as("work")
    val otherProjection: Column = otherColumns.reduce(_ + _).divide(60).as("other")
    df
      .select(workingStatusProjection, sexProjection, ageProjection, primaryNeedsProjection, workProjection, otherProjection)
      .where($"telfs" <= 4) 
  }


Посчитаем среднее значение по группировке в DataFrame Spark
def timeUsageGrouped(summed: DataFrame): DataFrame = {
    summed.groupBy($"working", $"sex", $"age")
      .agg(
        round(avg("primaryNeeds"),1).as("primaryNeeds"),
        round(avg("work"),1).as("work"),
        round(avg("other"),1).as("other")
      ).orderBy($"working", $"sex", $"age")
  }


Если аккурат убрать всю обработку.... 0_о не хватает памяти?
spoiler
6313978e8bdb1899988918.png
  • Вопрос задан
  • 89 просмотров
Пригласить эксперта
Ответы на вопрос 2
mayton2019
@mayton2019
Bigdata Engineer
Я долго подбирал те слова которые ругательные, но которые цензура все таки пропускает. Вобщем выходило очень по старо-словянски. Вобщем лихо. Сиречь молвить совсем тяжко. Бесовщина... Все неправильно.

Во первых для чтения и обработки CSV файла тебе не нужен RDD. Это очень сырой тип данных который щас никто почти не использует. Его почти всегда заменяют на DataFrame/DataSet. Он остался только в легаси коде и в тех кейсах когда нужно обработать текст (natural language).

Во вторых желательно разделить просто Scala-функции и протестировать их отдельно.

В третьих. Тебе надо поработать со Schema и DataFrames. Тебе дано 9 comma-separated файлов. Получи из них сначала 9 датафреймов. При чтении используй следующий шаблон.
val atusact = spark.read.format("csv")
                .option("header","true")
                .option("inferSchema", "true")
                .option("delimiter", ",")
                .load("/data/atusact.csv")


После того как все датафреймы будут загружены - ты работаешь с данными которые имеют схему. (опция infer).
И из каждого датафрейма можешь получить колонки и типы как коллекции объектов.

Ошибки implicite conversions имеют отношение к языку Scala больше чем к Spark. Если перепишешь все на PySpark то ошибка сама собой уйдет. Я не хотел-бы на ней останавливаться. Отдельным топиком задай по Scala.

P.S. Я вообще игнорировал твоё задание. Мне безразлично что там надо сделать. Я просто смотрел по стилю.

P.P.S Не пиши флуд с картинками. Это не помогает в решении вопроса а только запутывает и раздражает читателя.
Ответ написан
angrySCV
@angrySCV
machine learning, programming, startuping
попробую прочитать датафрейм как тебе посоветовали выше, а потом после каждой операции вызывай на датафрейме show() чтоб посмотреть промежуточный результат, и так постепенно дойдешь до требуемого результата
Ответ написан
Комментировать
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Войти через центр авторизации
Похожие вопросы