4

For a dataframe containing a mix of string and numeric datatypes, the goal is to create a new features column that is a minhash of all of them.

While this could be done by performing a dataframe.toRDD it is expensive to do that when the next step will be to simply convert the RDD back to a dataframe.

So is there a way to do a udf along the following lines:

val wholeRowUdf = udf( (row: Row) =>  computeHash(row))

Row is not a spark sql datatype of course - so this would not work as shown.

Update/clarifiction I realize it is easy to create a full-row UDF that runs inside withColumn. What is not so clear is what can be used inside a spark sql statement:

val featurizedDf = spark.sql("select wholeRowUdf( what goes here? ) as features 
                              from mytable")
WestCoastProjects
  • 58,982
  • 91
  • 316
  • 560

2 Answers2

14

Row is not a spark sql datatype of course - so this would not work as shown.

I am going to show that you can use Row to pass all the columns or selected columns to a udf function using struct inbuilt function

First I define a dataframe

val df = Seq(
  ("a", "b", "c"),
  ("a1", "b1", "c1")
).toDF("col1", "col2", "col3")
//    +----+----+----+
//    |col1|col2|col3|
//    +----+----+----+
//    |a   |b   |c   |
//    |a1  |b1  |c1  |
//    +----+----+----+

Then I define a function to make all the elements in a row as one string separated by , (as you have computeHash function)

import org.apache.spark.sql.Row
def concatFunc(row: Row) = row.mkString(", ")

Then I use it in udf function

import org.apache.spark.sql.functions._
def combineUdf = udf((row: Row) => concatFunc(row))

Finally I call the udf function using withColumn function and struct inbuilt function combining selected columns as one column and pass to the udf function

df.withColumn("contcatenated", combineUdf(struct(col("col1"), col("col2"), col("col3")))).show(false)
//    +----+----+----+-------------+
//    |col1|col2|col3|contcatenated|
//    +----+----+----+-------------+
//    |a   |b   |c   |a, b, c      |
//    |a1  |b1  |c1  |a1, b1, c1   |
//    +----+----+----+-------------+

So you can see that Row can be used to pass whole row as an argument

You can even pass all columns in a row at once

val columns = df.columns
df.withColumn("contcatenated", combineUdf(struct(columns.map(col): _*)))

Updated

You can achieve the same with sql queries too, you just need to register the udf function as

df.createOrReplaceTempView("tempview")
sqlContext.udf.register("combineUdf", combineUdf)
sqlContext.sql("select *, combineUdf(struct(`col1`, `col2`, `col3`)) as concatenated from tempview")

It will give you the same result as above

Now if you don't want to hardcode the names of columns then you can select the column names according to your desire and make it a string

val columns = df.columns.map(x => "`"+x+"`").mkString(",")
sqlContext.sql(s"select *, combineUdf(struct(${columns})) as concatenated from tempview")

I hope the answer is helpful

Ramesh Maharjan
  • 41,071
  • 6
  • 69
  • 97
  • I am looking for a construct that supports a *sql statement*. Will update the OP – WestCoastProjects Mar 22 '18 at 19:05
  • What if we have two dozen columns that may be changing .. i'd rather not do it the way you showed that hard codes the columns : better to "discover" the column names in some way. If you can provide that then I'll award. – WestCoastProjects Mar 22 '18 at 19:35
  • I had already given you hints in my previous update on choosing variable column names instead of hard codings. Look at my updates I have fulfilled your every challange. :) . What award would you give me for my hard work ;) 25 points ? hahaha. let me know if you have further challanges :P – Ramesh Maharjan Mar 23 '18 at 03:32
  • My last comment was not answered. – WestCoastProjects Mar 23 '18 at 17:29
  • Oh I did not notice you had updated to include the `combineUdf` . I will upvote as potentially useful. Do you think your approach were simpler than what I had posted? – WestCoastProjects Mar 24 '18 at 00:18
  • It would be the same . I answered just to show you that it is possible to pass Row as a datatype to udf. thats all. – Ramesh Maharjan Mar 24 '18 at 05:17
  • my approach/answer was more succint /simple and just as powerful. but usually i prefer to award to someone else – WestCoastProjects Sep 22 '21 at 05:02
1

I came up with a workaround: drop the column names into any existing spark sql function to generate a new output column:

concat(${df.columns.tail.mkString(",'-',")}) as Features

In this case the first column in the dataframe is a target and was excluded. That is another advantage of this approach: the actual list of columns many be manipulated.

This approach avoids unnecessary restructuring of the RDD/dataframes.

WestCoastProjects
  • 58,982
  • 91
  • 316
  • 560