1

I am trying to move data from GP to HDFS using Scala & Spark.

val execQuery    = "select * from schema.tablename"
val yearDF       = spark.read.format("jdbc").option("url", connectionUrl).option("dbtable", s"(${execQuery}) as year2016").option("user", devUserName).option("password", devPassword).option("partitionColumn","header_id").option("lowerBound", 19919927).option("upperBound", 28684058).option("numPartitions",30).load()
val yearDFSchema = yearDF.schema

The schema for yearDF is:

root
 |-- source_system_name: string (nullable = true)
 |-- table_refresh_delay_min: decimal(38,30) (nullable = true)
 |-- release_number: decimal(38,30) (nullable = true)
 |-- change_number: decimal(38,30) (nullable = true)
 |-- interface_queue_enabled_flag: string (nullable = true)
 |-- rework_enabled_flag: string (nullable = true)
 |-- fdm_application_id: decimal(15,0) (nullable = true)
 |-- history_enabled_flag: string (nullable = true)

The schema of same table on hive which is given by our project:

val hiveColumns = source_system_name:String|description:String|creation_date:Timestamp|status:String|status_date:Timestamp|table_refresh_delay_min:Timestamp|release_number:Double|change_number:Double|interface_queue_enabled_flag:String|rework_enabled_flag:String|fdm_application_id:Bigint|history_enabled_flag:String

So I took hiveColumns and created a new StructType as given below:

def convertDatatype(datatype: String): DataType = {
  val convert = datatype match {
    case "string"     => StringType
    case "bigint"     => LongType
    case "int"        => IntegerType
    case "double"     => DoubleType
    case "date"       => TimestampType
    case "boolean"    => BooleanType
    case "timestamp"  => TimestampType
  }
  convert
}


val schemaList = hiveColumns.split("\\|")
val newSchema  = new StructType(schemaList.map(col => col.split(":")).map(e => StructField(e(0), convertDatatype(e(1)), true)))
newSchema.printTreeString()
root
 |-- source_system_name: string (nullable = true)
 |-- table_refresh_delay_min: double (nullable = true)
 |-- release_number: double (nullable = true)
 |-- change_number: double (nullable = true)
 |-- interface_queue_enabled_flag: string (nullable = true)
 |-- rework_enabled_flag: string (nullable = true)
 |-- fdm_application_id: long (nullable = true)
 |-- history_enabled_flag: string (nullable = true)

When I try to apply my new schema: schemaStructType on yearDF as below, I get the exception:

 Caused by: java.lang.RuntimeException: java.math.BigDecimal is not a valid external type for schema of double

The exception occurs due to conversion of decimal to double. What I don't understand is how can I convert the datatype of columns: table_refresh_delay_min, release_number, change_number, fdm_application_id in the StructType: newSchema from DoubleType to their corresponding datatypes present in yearDF's Schema. i.e.

If the column in yearDFSchema has a decimal datatype with precision more than zero, in this case decimal(38,30), I need to convert the same column's datatype in newSchema to DecimalType(38,30)

Could anyone let me know how can I achieve it ?

Community
  • 1
  • 1
Metadata
  • 2,127
  • 9
  • 56
  • 127

1 Answers1

0

Errors like this occur when you try to apply schema on RDD[Row], using Developer's API functions:

def createDataFrame(rows: List[Row], schema: StructType): DataFrame
def createDataFrame(rowRDD: JavaRDD[Row], schema: StructType): DataFrame
def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame

In such cases stored data types have to match external (i.e. Value type in Scala) data types as listed in the official SQL and no type casting or coercion is applied.

Therefore it is your responsibility as an user to ensure that the date and schema are compatible.

The description of the problem you've provided indicates rather different scenario, which asks for CAST. Let's create dataset with exact the same schema as in your example:

val yearDF = spark.createDataFrame(
  sc.parallelize(Seq[Row]()),
  StructType(Seq(
    StructField("source_system_name", StringType),
    StructField("table_refresh_delay_min", DecimalType(38, 30)),
    StructField("release_number", DecimalType(38, 30)),
    StructField("change_number", DecimalType(38, 30)),
    StructField("interface_queue_enabled_flag", StringType),
    StructField("rework_enabled_flag", StringType),
    StructField("fdm_application_id", DecimalType(15, 0)),
    StructField("history_enabled_flag", StringType)
)))

yearDF.printSchema
root
 |-- source_system_name: string (nullable = true)
 |-- table_refresh_delay_min: decimal(38,30) (nullable = true)
 |-- release_number: decimal(38,30) (nullable = true)
 |-- change_number: decimal(38,30) (nullable = true)
 |-- interface_queue_enabled_flag: string (nullable = true)
 |-- rework_enabled_flag: string (nullable = true)
 |-- fdm_application_id: decimal(15,0) (nullable = true)
 |-- history_enabled_flag: string (nullable = true)

and desired types like

val dtypes = Seq(
  "source_system_name" -> "string",
  "table_refresh_delay_min" -> "double",
  "release_number" -> "double",
  "change_number" -> "double",
  "interface_queue_enabled_flag" -> "string",
  "rework_enabled_flag" -> "string",
  "fdm_application_id" -> "long",
  "history_enabled_flag" -> "string"
)

then you can just map:

val mapping = dtypes.toMap

yearDF.select(yearDF.columns.map { c => col(c).cast(mapping(c)) }: _*).printSchema
root
 |-- source_system_name: string (nullable = true)
 |-- table_refresh_delay_min: double (nullable = true)
 |-- release_number: double (nullable = true)
 |-- change_number: double (nullable = true)
 |-- interface_queue_enabled_flag: string (nullable = true)
 |-- rework_enabled_flag: string (nullable = true)
 |-- fdm_application_id: long (nullable = true)
 |-- history_enabled_flag: string (nullable = true)

This of course assumes that actual and desired types are compatible, and CAST is allowed.

If you still experience problems due you to peculiarities of specific JDBC driver, you should consider placing cast directly in the query, either manually (In Apache Spark 2.0.0, is it possible to fetch a query from an external database (rather than grab the whole table)?)

val externalDtypes = Seq(
  "source_system_name" -> "text",
  "table_refresh_delay_min" -> "double precision",
  "release_number" -> "float8",
  "change_number" -> "float8",
  "interface_queue_enabled_flag" -> "string",
  "rework_enabled_flag" -> "string",
  "fdm_application_id" -> "bigint",
  "history_enabled_flag" -> "string"
)

val externalDtypes = dtypes.map { 
  case (c, t) => s"CAST(`$c` AS $t)" 
} .mkString(", ")

val dbTable = s"""(select $fields from schema.tablename) as tmp"""

or through custom schema:

spark.read
  .format("jdbc")
  .option(
    "customSchema",
    dtypes.map { case (c, t) => s"`$c` $t" } .mkString(", "))
  ...
  .load()
user10938362
  • 3,991
  • 2
  • 12
  • 29