43

I have a dataframe in Spark using scala that has a column that I need split.

scala> test.show
+-------------+
|columnToSplit|
+-------------+
|        a.b.c|
|        d.e.f|
+-------------+

I need this column split out to look like this:

+--------------+
|col1|col2|col3|
|   a|   b|   c|
|   d|   e|   f|
+--------------+

I'm using Spark 2.0.0

Thanks

Matt Maurer
  • 435
  • 1
  • 4
  • 6

7 Answers7

99

Try:

import sparkObject.spark.implicits._
import org.apache.spark.sql.functions.split

df.withColumn("_tmp", split($"columnToSplit", "\\.")).select(
  $"_tmp".getItem(0).as("col1"),
  $"_tmp".getItem(1).as("col2"),
  $"_tmp".getItem(2).as("col3")
)

The important point to note here is that the sparkObject is the SparkSession object you might have already initialized. So, the (1) import statement has to be compulsorily put inline within the code, not before the class definition.

Manindar
  • 999
  • 2
  • 14
  • 30
44

To do this programmatically, you can create a sequence of expressions with (0 until 3).map(i => col("temp").getItem(i).as(s"col$i")) (assume you need 3 columns as result) and then apply it to select with : _* syntax:

df.withColumn("temp", split(col("columnToSplit"), "\\.")).select(
    (0 until 3).map(i => col("temp").getItem(i).as(s"col$i")): _*
).show
+----+----+----+
|col0|col1|col2|
+----+----+----+
|   a|   b|   c|
|   d|   e|   f|
+----+----+----+

To keep all columns:

df.withColumn("temp", split(col("columnToSplit"), "\\.")).select(
    col("*") +: (0 until 3).map(i => col("temp").getItem(i).as(s"col$i")): _*
).show
+-------------+---------+----+----+----+
|columnToSplit|     temp|col0|col1|col2|
+-------------+---------+----+----+----+
|        a.b.c|[a, b, c]|   a|   b|   c|
|        d.e.f|[d, e, f]|   d|   e|   f|
+-------------+---------+----+----+----+

If you are using pyspark, use a list comprehension to replace the map in scala:

df = spark.createDataFrame([['a.b.c'], ['d.e.f']], ['columnToSplit'])
from pyspark.sql.functions import col, split

(df.withColumn('temp', split('columnToSplit', '\\.'))
   .select(*(col('temp').getItem(i).alias(f'col{i}') for i in range(3))
).show()
+----+----+----+
|col0|col1|col2|
+----+----+----+
|   a|   b|   c|
|   d|   e|   f|
+----+----+----+
Psidom
  • 209,562
  • 33
  • 339
  • 356
  • 2
    Can we split to the arbitrary number of columns? Why 3 has to be hardcoded? – Ivan Balashov Oct 08 '18 at 11:13
  • 1
    I have similar scenario, but number of "." separated values are unknown. how can we split the rows dynamically? – Jeevan Jan 10 '19 at 17:09
  • There are multiple nice answers here because they each break things down a little further, but if you have a lot of columns or a lot of dataframes to do this with (for example reading many topics from Kafka), then this answer is the way to go. – NYCeyes Jan 18 '19 at 00:42
  • How does the first answer (the not all columns version) translate in pyspark/python? – NYCeyes Jan 18 '19 at 00:46
  • I created another answer to show how to implement this approach without hardcoding the number of columns. – Powers Mar 17 '19 at 10:36
19

A solution which avoids the select part. This is helpful when you just want to append the new columns:

case class Message(others: String, text: String)

val r1 = Message("foo1", "a.b.c")
val r2 = Message("foo2", "d.e.f")

val records = Seq(r1, r2)
val df = spark.createDataFrame(records)

df.withColumn("col1", split(col("text"), "\\.").getItem(0))
  .withColumn("col2", split(col("text"), "\\.").getItem(1))
  .withColumn("col3", split(col("text"), "\\.").getItem(2))
  .show(false)

+------+-----+----+----+----+
|others|text |col1|col2|col3|
+------+-----+----+----+----+
|foo1  |a.b.c|a   |b   |c   |
|foo2  |d.e.f|d   |e   |f   |
+------+-----+----+----+----+

Update: I highly recommend to use Psidom's implementation to avoid splitting three times.

Sascha Vetter
  • 2,466
  • 1
  • 19
  • 36
9

This appends columns to the original DataFrame and doesn't use select, and only splits once using a temporary column:

import spark.implicits._

df.withColumn("_tmp", split($"columnToSplit", "\\."))
  .withColumn("col1", $"_tmp".getItem(0))
  .withColumn("col2", $"_tmp".getItem(1))
  .withColumn("col3", $"_tmp".getItem(2))
  .drop("_tmp")
soaptree
  • 431
  • 4
  • 6
5

This expands on Psidom's answer and shows how to do the split dynamically, without hardcoding the number of columns. This answer runs a query to calculate the number of columns.

val df = Seq(
  "a.b.c",
  "d.e.f"
).toDF("my_str")
.withColumn("letters", split(col("my_str"), "\\."))

val numCols = df
  .withColumn("letters_size", size($"letters"))
  .agg(max($"letters_size"))
  .head()
  .getInt(0)

df
  .select(
    (0 until numCols).map(i => $"letters".getItem(i).as(s"col$i")): _*
  )
  .show()
Powers
  • 18,150
  • 10
  • 103
  • 108
2

We can write using for with yield in Scala :-

If your number of columns exceeds just add it to desired column and play with it. :)

val aDF = Seq("Deepak.Singh.Delhi").toDF("name")
val desiredColumn = Seq("name","Lname","City")
val colsize = desiredColumn.size

val columList = for (i <- 0 until colsize) yield split(col("name"),".").getItem(i).alias(desiredColumn(i))

aDF.select(columList: _ *).show(false) 

Output:-

+------+------+-----+--+
|name  |Lname |city |
+-----+------+-----+---+
|Deepak|Singh |Delhi|
+---+------+-----+-----+

If you don't need name column then, drop the column and just use withColumn.

Deepak
  • 191
  • 1
  • 9
0

Example: Without using the select statement.

Lets assume we have a dataframe having a set of columns and we want to split a column having column name as name

import spark.implicits._

val columns = Seq("name","age","address")

val data = Seq(("Amit.Mehta", 25, "1 Main st, Newark, NJ, 92537"),
             ("Rituraj.Mehta", 28,"3456 Walnut st, Newark, NJ, 94732"))

var dfFromData = spark.createDataFrame(data).toDF(columns:_*)
dfFromData.printSchema()

val newDF = dfFromData.map(f=>{
val nameSplit = f.getAs[String](0).split("\\.").map(_.trim)
      (nameSplit(0),nameSplit(1),f.getAs[Int](1),f.getAs[String](2))
    })

val finalDF = newDF.toDF("First Name","Last Name", "Age","Address")

finalDF.printSchema()

finalDF.show(false)

output: output

venus
  • 1,188
  • 9
  • 18