1

Can any one give me an example UDTF (eg; explode) written in scala which returns multiple row and use it as UDF in SparkSQL?

Table: table1

+------+----------+----------+
|userId|someString|      varA|
+------+----------+----------+
|     1|  example1| [0, 2, 5]|
|     2|  example2|[1, 20, 5]|
+------+----------+----------+

I'd like to create the following Scala code:

def exampleUDTF(var: Seq[Int]) = <Return Type???>  {
  // code to explode varA field ???
}

sqlContext.udf.register("exampleUDTF",exampleUDTF _)

sqlContext.sql("FROM table1 SELECT userId, someString, exampleUDTF(varA)").collect().foreach(println)

Expected output:

+------+----------+----+
|userId|someString|varA|
+------+----------+----+
|     1|  example1|   0|
|     1|  example1|   2|
|     1|  example1|   5|
|     2|  example2|   1|
|     2|  example2|  20|
|     2|  example2|   5|
+------+----------+----+
Subhajit Sen
  • 81
  • 3
  • 8
  • Honestly @zero323, I thought this was a duplicate at first too. But look at my amended answer below -- perhaps there is value in being able to pass a pre-defined function to `explode` instead of declaring one anonymously. – David Griffin Apr 11 '16 at 13:32
  • @DavidGriffin To be honest without type safety and with unchecked and non-exhaustive pattern matching I am not convinced :) Especially when `explode` is safe and doesn't require any additional code. But since OP posted another question which could be covered by your answer we can reopen this and close another one (http://stackoverflow.com/q/36549297/1560062) – zero323 Apr 11 '16 at 13:58
  • WTF -- how many times does this guy ask the same question over and over and over? – David Griffin Apr 11 '16 at 14:30
  • Hey, @zero323, I edited my answer below. I don't see how my two uses of `explode` are any different in terms of type safety. If they are, can you explain? – David Griffin Apr 11 '16 at 14:39
  • @DavidGriffin Try to execute it on for example `df.withColumn("varA", $"varA".cast("array"))` – zero323 Apr 11 '16 at 14:54
  • Get the same error either way -- runtime errors of `java.lang.ClassCastException: java.lang.Long cannot be cast to java.lang.Integer` – David Griffin Apr 11 '16 at 15:49
  • My new question with slight modification: http://stackoverflow.com/questions/36549297 – Subhajit Sen Apr 12 '16 at 08:46

2 Answers2

1

You can't do this with a UDF. A UDF can only add a single column to a DataFrame. There is, however, a function called DataFrame.explode, which you can use instead. To do it with your example, you would do this:

import org.apache.spark.sql._

val df = Seq(
  (1,"example1", Array(0,2,5)),
  (2,"example2", Array(1,20,5))
).toDF("userId", "someString", "varA")

val explodedDf = df.explode($"varA"){
  case Row(arr: Seq[Int]) => arr.toArray.map(a => Tuple1(a))
}.drop($"varA").withColumnRenamed("_1", "varA")

+------+----------+-----+
|userId|someString| varA|
+------+----------+-----+
|     1|  example1|    0|
|     1|  example1|    2|
|     1|  example1|    5|
|     2|  example2|    1|
|     2|  example2|   20|
|     2|  example2|    5|
+------+----------+-----+

Note that explode takes a function as an argument. So even though you can't create a UDF to do what you want, you can create a function to pass to explode to do what you want. Like this:

def exploder(row: Row) : Array[Tuple1[Int]] = {
  row match { case Row(arr) => arr.toArray.map(v => Tuple1(v)) }
}

df.explode($"varA")(exploder)

That's about the best you are going to get in terms of recreating a UDTF.

David Griffin
  • 13,677
  • 5
  • 47
  • 65
  • This is about explode but I want a way to build my custom function to be used as UDTF. Basically my function should take one column of complex type (Map ,Struct or Array) as input from the table and should split the individual entries to single row object and return List or Sequence of Rows. – Subhajit Sen Apr 11 '16 at 12:21
  • I want a custom explode function in case I've to do operations on each sequesnce like: – Subhajit Sen Apr 11 '16 at 12:45
  • That's great, what you want. That's not how it works. You got `explode`. Use `explode`. Or don't. Up to you. – David Griffin Apr 11 '16 at 13:05
  • Here, I edited it slightly to maybe get closer to what you want. – David Griffin Apr 11 '16 at 13:29
  • My new question with slight modification: http://stackoverflow.com/questions/36549297/how-to-create-a-custom-udtf-like-function-in-scala-which-will-do-operations-and – Subhajit Sen Apr 12 '16 at 08:42
0

Hive Table:

name                                               id
["Subhajit Sen","Binoy Mondal","Shantanu Dutta"]   15
["Gobinathan SP","Harsh Gupta","Rahul Anand"]      16
  1. Creating a scala function :

    def toUpper(name: Seq[String]) = (name.map(a => a.toUpperCase)).toSeq

  2. Registering function as UDF : sqlContext.udf.register("toUpper",toUpper _)

  3. Calling the UDF using sqlContext and storing output as DataFrame object :

var df = sqlContext.sql("SELECT toUpper(name) FROM namelist").toDF("Name")

  1. Exploding the DataFrame : df.explode(df("Name")){case org.apache.spark.sql.Row(arr: Seq[String]) => arr.toSeq.map(v => Tuple1(v))}.drop(df("Name")).withColumnRenamed("_1","Name").show

Result:

+--------------+
|          Name|
+--------------+
|  SUBHAJIT SEN|
|  BINOY MONDAL|
|SHANTANU DUTTA|
| GOBINATHAN SP|
|   HARSH GUPTA|
|   RAHUL ANAND|
+--------------+
Ram Ghadiyaram
  • 28,239
  • 13
  • 95
  • 121
Subhajit Sen
  • 81
  • 3
  • 8