0

I have the following RDD and many just like it:

val csv = sc.parallelize(Array(
  "col1, col2, col3",
  "1, cat, dog",
  "2, bird, bee"))

I would like to convert the RDD into a dataframe where the schema is created dynamically/programmatically based on the first row of the RDD.

I would like to apply the logic to multiple similar like RDDs and cannot specify the schema programmatically using a case class nor use spark-csv to load the data in as a dataframe from the start.

I've created a flattened dataframe, but am wondering how to breakout the respective columns when creating the dataframe?

Current code:

val header= file.first()
val data = file.mapPartitionsWithIndex {
  (idx, iter) => if (idx == 0) iter.drop(1) else iter 
}.toDF(header).show()

Current output:

+----------------+
|col1, col2, col3|
+----------------+
|     1, cat, dog|
|    2, bird, bee|
+----------------+
Cœur
  • 37,241
  • 25
  • 195
  • 267
  • If this is from a csv file then read it directly as a dataframe: https://stackoverflow.com/questions/29704333/spark-load-csv-file-as-dataframe – Shaido Nov 19 '19 at 03:34
  • I cannot read it directly into a dataframe using spark-csv – Cookie Monster Nov 19 '19 at 03:35
  • How come? Also note that in newer spark version there is no need to use spark-csv as it can be used directly without additional packages, e.g.: `spark.read.format("csv").option("header", "true").load("csvfile.csv")`. – Shaido Nov 19 '19 at 04:15
  • Yes, that is what I meant. There are restrictions in place where I am not able to do so – Cookie Monster Nov 19 '19 at 04:23

2 Answers2

2

In most cases it's preferred to read csv files directly as a dataframe, see e.g.: Spark - load CSV file as DataFrame?.


First you need to split the data into arrays, this is true for both the header and the RDD itself:

val header = csv.first().split(", ")

val data = full_csv.mapPartitionsWithIndex {
  (idx, iter) => if (idx == 0) iter.drop(1) else iter 
}.map(_.split(", ")).toDF("arr")

Note that the above code will convert the RDD to a dataframe, however, it will only have a single column called arr. On the other hand, header will be a Array[String].

The next step is to convert the dataframe with a single array column into the correct number of columns and with the correct names (based on header):

data.select((0 until header.size).map(i => col("arr")(i).alias(header(i))): _*)

This will result in the wanted output dataframe:

+----+----+----+
|col1|col2|col3|
+----+----+----+
|col1|col2|col3|
|   1| cat| dog|
|   2|bird| bee|
+----+----+----+
Shaido
  • 27,497
  • 23
  • 70
  • 73
1

You can use this code :

import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}

val csv = spark.sparkContext.parallelize(Array(
    "col1, col2, col3",
    "1, cat, dog",
    "2, bird, bee"))

val header = csv.first.split(",").map(_.trim)

val resultDF = spark.createDataFrame(
    csv.zipWithIndex
    .filter(_._2 > 0)
    .map{case (str, _) => Row.fromSeq(str.split(",").map(_.trim))}
  ,
    StructType(header.map(c => StructField(c, StringType)))
  )

resultDF.show(false)

Output :

+----+----+----+
|col1|col2|col3|
+----+----+----+
|1   |cat |dog |
|2   |bird|bee |
+----+----+----+
baitmbarek
  • 2,440
  • 4
  • 18
  • 26
  • 1
    Note that you need zipWithIndex to filter out the header, mapPartitionsWithIndex will assign an index per partition, not per record. – baitmbarek Nov 19 '19 at 13:36