0

I have a Dataframe which has one column and value is concatenated with some delimiter, Now I Want to divide into multiple columns which can be close to up to 1000-2000 columns and number of records can be 60 millions or so. I am trying to find the best approach to do that , so the performance cannot be impacted.

I have the below approach but , can anyone please suggest if there is any better way to achieve this ?

val df = Seq(("1|2|3|4|5|6|7|8|9")).toDF("data")

val df2 = df1.withColumn("_tmp", split(col("data"), "\\|"))

df2.select(   $"_tmp".getItem(0).as("col1"),
      $"_tmp".getItem(1).as("col2"),
      $"_tmp".getItem(2).as("col3"),
      $"_tmp".getItem(3).as("col4")).drop("_tmp")

Thanks a lot in advance.

Babu
  • 861
  • 3
  • 13
  • 36
  • On top of my head, since you mentioned only one column. why can't you use same logic to split the data while loading that data itself and those will be reading as columns and you can have schema defined for the same. – Girish501 Dec 12 '19 at 15:19
  • I am reading the data from hbase table,so each dataset is stored as one column qualifier in hbase table,either way there will be performance issue, so trying to find the best approach. – Babu Dec 12 '19 at 15:21
  • Does this answer your question? [How to explode an array into multiple columns in Spark](https://stackoverflow.com/questions/49499263/how-to-explode-an-array-into-multiple-columns-in-spark) – blackbishop Dec 12 '19 at 18:14

2 Answers2

0

If your data is stored on disk, you could simply read it according to:

val df = spark.read.format("csv").option("delimiter", "|").load(<path_to_data>)

And if the file doesn't have a header (which I assume in your case it wouldn't) and you don't like the default column naming (_c0, _c1, ... , _cN) you can simply rename it by:

val newColumnNames = (1 to df.columns.size).map("col" + _)
val df2 = df.toDF(newColumnNames: _*)

If your data is instead stored in memory, as a String s let's say, and all delimited data points are of the same type, you could first parallellize your data into an RDD and subsequently transform it to DataFrame by inferring a schema:

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

val data = s.split("\\|")
val schema = StructType((1 to data.size).map(c => StructField("col" + c, StringType, false)))
val dataRDD = spark.sparkContext.makeRDD(Seq(org.apache.spark.sql.Row(data: _*)))
val df = spark.sqlContext.createDataFrame(dataRDD, schema)
edaljo
  • 1
-1

If you know the number of columns, you can automate this a bit:

val df = Seq(("1|2|3|4|5|6|7|8|9")).toDF("data")

val ncols = 9
val selectExpr = (0 to ncols-1).map(i => $"tmp"(i).as(s"col${i+1}"))
df
  .withColumn("tmp", split(col("data"), "\\|"))
  .select(selectExpr:_*)
  .show()

gives:

+----+----+----+----+----+----+----+----+----+
|col1|col2|col3|col4|col5|col6|col7|col8|col9|
+----+----+----+----+----+----+----+----+----+
|   1|   2|   3|   4|   5|   6|   7|   8|   9|
+----+----+----+----+----+----+----+----+----+
Raphael Roth
  • 26,751
  • 15
  • 88
  • 145