0

I'm new to the Scala/Spark world.

I have a spark dataset(df with a case class) called person.

scala> val person_with_contact = person.map(r => (
     | r.id,
     | r.name,
     | r.age
     | )).toDF()

Now, I want to add a list of address attributes(like apt_no, street, city, zip) to each record of this dataset. The get the list of address attributes, I have a function which takes person's id as input and returns a map that contains all the address attributes and their corresponding values.

I tried the following and a few other Stack Overflow suggested approaches but I couldn't solve it yet. (Ref - static col ex - Spark, add new Column with the same value in Scala)

scala> val person_with_contact = person.map(r => (
    | r.id,
    | r.name,
    | r.age,
    | getAddress(r.id) 
    | )).toDF()

The final dataframe should have the following columns.

id, name, age, apt_no, street, city, zip

halfer
  • 19,824
  • 17
  • 99
  • 186
Manas Mukherjee
  • 5,270
  • 3
  • 18
  • 30
  • Does this answer your question? [Spark Build Custom Column Function, user defined function](https://stackoverflow.com/questions/36546456/spark-build-custom-column-function-user-defined-function) – Shaido Nov 01 '19 at 07:35
  • @Shaido, Thanks for your reply. I have a UDF function already. I'm not sure how to return the list of address attributes from this UDF, so that those will be added as an individual column to the new dataframe. – Manas Mukherjee Nov 01 '19 at 07:54
  • @HristoIliev, thanks for your reply. Each person would have only one address which is represented by 4 attributes. I have a UDF function that takes a person's id as input and returns the 4 attributes as a map. I would like to join `id, name, age` with the address fields ie. `apt_no, street, city, zip`. Finally, it should be a single dataframe with all 7 attributes. – Manas Mukherjee Nov 01 '19 at 08:51
  • @ManasMukherjee, on the second read of your question I got that you are adding a *list of attributes* and is why I deleted my comment. Is `person` a DataFrame or an RDD? – Hristo Iliev Nov 01 '19 at 08:58
  • the `person` is a dataset that is created using a `case class` with id, name, and age as attributes. – Manas Mukherjee Nov 01 '19 at 09:04

2 Answers2

1

use udf

package yourpackage

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._


object MainDemo {

  def getAddress(id: Int): String = {
    //do your things
    "address id:" + id
  }

  def getCity(id: String): String = {
    //do your things
    "your city :" + id
  }

  def getZip(id: String): String = {
    //do your things
    "your zip :" + id
  }

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName(this.getClass.getSimpleName).master("local[3]").enableHiveSupport().getOrCreate()
    val person = Seq(Person(1, "name_m", 21), Person(2, "name_w", 40))
    import spark.implicits._
    val person_with_contact = person.map(r => (r.id, r.name, r.age, getAddress(r.id))).toDF("id", "name", "age", "street")
    person_with_contact.printSchema()
    //root
    // |-- id: integer (nullable = false)
    // |-- name: string (nullable = true)
    // |-- age: integer (nullable = false)
    // |-- street: string (nullable = true)
    val result = person_with_contact.select(
      col("id"),
      col("name"),
      col("age"),
      col("street"),
      udf { id: String =>
        val city = getCity(id)
        city
      }.apply(col("id")).as("city"),
      udf { id: String =>
        val city = getZip(id)
        city
      }.apply(col("id")).as("zip")
    )
    result.printSchema()
    //root
    // |-- id: integer (nullable = false)
    // |-- name: string (nullable = true)
    // |-- age: integer (nullable = false)
    // |-- street: string (nullable = true)
    // |-- city: string (nullable = true)
    // |-- zip: string (nullable = true)
    result.show()
    //+---+------+---+------------+------------+-----------+
    //| id|  name|age|      street|        city|        zip|
    //+---+------+---+------------+------------+-----------+
    //|  1|name_m| 21|address id:1|your city :1|your zip :1|
    //|  2|name_w| 40|address id:2|your city :2|your zip :2|
    //+---+------+---+------------+------------+-----------+
  }
}

wenjiangFu
  • 11
  • 3
  • Thank you for writing sharing the custom code and solution. Instead of maintaining separate UDF for each component of the Address, I would prefer to have a more unified UDF as Hristo shared. I learned something new today. Thanks again. – Manas Mukherjee Nov 02 '19 at 01:40
0

Given that you already have a function that returns the address as a map, you can create a UDF that converts that map to a struct and then select all map fields:

import org.apache.spark.sql.functions.*

// For demo only
def getAddress(id: Int): Option[Map[String, String]] = {
  id match {
    case 1 => Some(Map("apt_no" -> "12", "street" -> "Main Street", "city" -> "NY", "zip" -> "1234"))
    case 2 => Some(Map("apt_no" -> "1", "street" -> "Back Street", "city" -> "Gotham", "zip" -> "G123"))
    case _ => None
  }
}

case class Address(apt_no: String, street: String, city: String, zip: String)

def getAddressUdf = udf((id: Int) => {
  getAddress(id) map (m =>
    Address(m("apt_no"), m("street"), m("city"), m("zip"))
  )
})

udf() transforms functions that return case class instances into UDFs that return struct columns with the corresponding schema. Option[_] return types are automatically resolved to null-able datatypes. The fields of the struct column can then be expanded into multiple columns by selecting them with $"struct_col_name.*":

scala> val df = Seq(Person(1, "John", 32), Person(2, "Cloe", 27), Person(3, "Pete", 55)).toDS()
df: org.apache.spark.sql.Dataset[Person] = [id: int, name: string ... 1 more field]

scala> df.show()
+---+----+---+
| id|name|age|
+---+----+---+
|  1|John| 32|
|  2|Cloe| 27|
|  3|Pete| 55|
+---+----+---+

scala> df
     | .withColumn("addr", getAddressUdf($"id"))
     | .select($"id", $"name", $"age", $"addr.*")
     | .show()
+---+----+---+------+------------+------+-----+
| id|name|age|apt_no|      street|  city|  zip|
+---+----+---+------+------------+------+-----+
|  1|John| 32|    12| Main Street|    NY| 1234|
|  2|Cloe| 27|     1| Back Street|Gotham| G123|
|  3|Pete| 55|  null|        null|  null| null|
+---+----+---+------+------------+------+-----+
Hristo Iliev
  • 72,659
  • 12
  • 135
  • 186
  • one related qts. Is there any way to produce the 7 columns, without knowing the structure of the Address ? In my case, I don't control the elements of the Address. The client can add/delete attributes from the Address. I just want to take the top-level elements of the Address component without knowing the structure of the Address. – Manas Mukherjee Nov 02 '19 at 02:29
  • brilliant. Thank you. I really appreciate your help – Manas Mukherjee Nov 02 '19 at 02:34
  • @ManasMukherjee, it is possible to return `org.apache.spark.sql.Row` instead of a case class, but then you have to build an `org.apache.spark.sql.types.StructType` instance that matches the row schema and give it as the second argument to `udf()`. You should probably call `getAddress` once with a valid `id`, examine the keys of the returned mapping and store them in an array. Then use the keys in the exact same order to both construct the fields of the `StructType` and to build the argument list for the `Row` constructor. – Hristo Iliev Nov 02 '19 at 12:55
  • Thanks agan, https://stackoverflow.com/users/1374437/hristo-iliev – Manas Mukherjee Nov 03 '19 at 20:15