2

Spark 2.x here. My code:

val query = "SELECT * FROM some_big_table WHERE something > 1"

val df : DataFrame = spark.read
  .option("url",
    s"""jdbc:postgresql://${redshiftInfo.hostnameAndPort}/${redshiftInfo.database}?currentSchema=${redshiftInfo.schema}"""
  )
  .option("user", redshiftInfo.username)
  .option("password", redshiftInfo.password)
  .option("dbtable", query)
  .load()

Produces:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Unable to infer schema for Parquet. It must be specified manually.;
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$8.apply(DataSource.scala:183)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$8.apply(DataSource.scala:183)
    at scala.Option.getOrElse(Option.scala:121)

I'm not reading anything from a Parquet file, I'm reading from a Redshift (RDBMS) table. So why am I getting this error?

hotmeatballsoup
  • 385
  • 6
  • 58
  • 136

1 Answers1

4

If you use generic load function you should include format as well:

// Query has to be subquery 
val query = "(SELECT * FROM some_big_table WHERE something > 1) as tmp"

...
  .format("jdbc")
  .option("dbtable", query)
  .load()

Otherwise Spark assumes that you use default format, which in presence of no specific configuration, is Parquet.

Also nothing forces you to use dbtable.

spark.read.jdbc(
  s"jdbc:postgresql://${hostnameAndPort}/${database}?currentSchema=${schema}",
  query, 
  props 
)

variant is also valid.

And of course with such simple query all of that it is not needed:

spark.read.jdbc(
  s"jdbc:postgresql://${hostnameAndPort}/${database}?currentSchema=${schema}",
  some_big_table, 
  props 
).where("something > 1")

will work the same way, and if you want to improve performance you should consider parallel queries

or even better, try Redshift connector.

Alper t. Turker
  • 34,230
  • 9
  • 83
  • 115