3

I'm using Spark 1.3.1 where joining two dataframes repeats the column(s) being joined. I'm left outer joining two data frames and want to send the resulting dataframe to the na().fill() method to convert nulls to known values based on the data type of the column. I've built a map of "table.column" -> "value" and pass that to the fill method. But I get exception instead of success :(. What are my options? I see that there is a dataFrame.withColumnRenamed method but I can only rename one column. I have joins that involve more than one column. Do I just have to ensure that there is a unique set of column names, regardless of table aliases in the dataFrame where I apply the na().fill() method?

Given:

scala> val df1 = sqlContext.jsonFile("people.json").as("df1")
df1: org.apache.spark.sql.DataFrame = [first: string, last: string]

scala> val df2 = sqlContext.jsonFile("people.json").as("df2")
df2: org.apache.spark.sql.DataFrame = [first: string, last: string]

I can join them together with

val df3 = df1.join(df2, df1("first") === df2("first"), "left_outer")

And I have a map that converts data type to value.

scala> val map = Map("df1.first"->"unknown", "df1.last" -> "unknown",
"df2.first" -> "unknown", "df2.last" -> "unknown")

But executing fill(map) results in exception.

scala> df3.na.fill(map)

org.apache.spark.sql.AnalysisException: Reference 'first' is ambiguous,
could be: first#6, first#8.;
Alberto Bonsanto
  • 17,556
  • 10
  • 64
  • 93
bruce szalwinski
  • 724
  • 1
  • 8
  • 27
  • I'm pretty sure that you can replace only values by values, e.g. "Alberto" -> "unknown", "Bruce" -> "unknown", but not referring those values as positions in your `DataFrame`, here is where you should see [DataFrameNaFunctions](http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameNaFunctions), furthermore I don't know exactly what you want to do, if you want to replace values, or rename columns, could you be more specific? – Alberto Bonsanto Feb 28 '16 at 12:12
  • I see my map doesn't need the df1 references, which might be causing the confusion. After the join, I want to replace the nulls that might be present with "unknown" in df3. the na.fill() method takes a map of column -> value pairs, def fill(valueMap: immutable.Map[String,Any]): DataFrame. where the String field references the column name. – bruce szalwinski Feb 28 '16 at 13:50

1 Answers1

3

Here is what I came up with. In my original example, there is nothing interesting left in df2 after the join, so I changed this to be classical department / employee example.

department.json

{"department": 2, "name":"accounting"}
{"department": 1, "name":"engineering"}

person.json

{"department": 1, "first":"Bruce", "last": "szalwinski"}

And now I can join the dataframes, build the map, and replace nulls with unknowns.

scala> val df1 = sqlContext.jsonFile("department.json").as("df1")
df1: org.apache.spark.sql.DataFrame = [department: bigint, name: string]

scala> val df2 = sqlContext.jsonFile("people.json").as("df2")
df2: org.apache.spark.sql.DataFrame = [department: bigint, first: string, last: string]

scala> val df3 = df1.join(df2, df1("department") === df2("department"), "left_outer")
df3: org.apache.spark.sql.DataFrame = [department: bigint, name: string, department: bigint, first: string, last: string]

scala> val map = Map("first" -> "unknown", "last" -> "unknown")
map: scala.collection.immutable.Map[String,String] = Map(first -> unknown, last -> unknown)

scala> val df4 = df3.select("df1.department", "df2.first", "df2.last").na.fill(map)
df4: org.apache.spark.sql.DataFrame = [department: bigint, first: string, last: string]

scala> df4.show()
+----------+-------+----------+
|department|  first|      last|
+----------+-------+----------+
|         2|unknown|   unknown|
|         1|  Bruce|szalwinski|
+----------+-------+----------+
Alberto Bonsanto
  • 17,556
  • 10
  • 64
  • 93
bruce szalwinski
  • 724
  • 1
  • 8
  • 27