3

I am trying to access a table (ORC format) stored on a remote cluster using Spark's JDBC:

val jdbcDF = spark.read
      .format("jdbc")
      .option("url", url)
      .option("dbtable", "metrics")
      .option("user", user)
      .option("password", password)
      .load()

However, no matter what I do, I keep getting this error:

Caused by: java.sql.SQLException: Cannot convert column 2 to long: java.lang.NumberFormatException: For input string: "metrics.t" at org.apache.hive.jdbc.HiveBaseResultSet.getLong(HiveBaseResultSet.java:372) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter$8.apply(JdbcUtils.scala:365) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter$8.apply(JdbcUtils.scala:364) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:286) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:268) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.NumberFormatException: For input string: "metrics.t" at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) at java.lang.Long.parseLong(Long.java:589) at java.lang.Long.parseLong(Long.java:631) at org.apache.hive.jdbc.HiveBaseResultSet.getLong(HiveBaseResultSet.java:368) ... 22 more

The input string "metrics.t" corresponds to the tablename and the name of the 2nd column, "t", which features timestamps as long.

How do I skip past the headers with the JDBC format?

The CSV option ("header", true) has no effect in my case.

PS: Spark version 2.1.0

jharb
  • 159
  • 3
  • 16

3 Answers3

1

The code does not throw any exceptions with the following implementation:

val jdbcUrl = s"jdbc:hive2://$jdbcHostname:$jdbcPort/$jdbcDatabase"

val connectionProperties = new java.util.Properties()
connectionProperties.setProperty("user", jdbcUsername)
connectionProperties.setProperty("password", jdbcPassword)

val jdbcDF = spark.read.jdbc(jdbcUrl, "metrics", Array(), connectionProperties)

Weirdly though, if I remove the empty predicate Array(), the exception is back again.

Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
jharb
  • 159
  • 3
  • 16
  • 2
    Which JDBC driver are you using? Because the Apache driver has many limitations (i.e. does not implement all expected JDBC features), and has cumbersome dependencies on Hadoop libraries (especially when Kerberos is involved); the Cloudera driver is more robust, JDBC-wise. And of course, if your Spark build has native integration with Hive, ditch JDBC and simply `sqlContext.sql("select * from mydb.mytbl")`... – Samson Scharfrichter May 04 '17 at 22:38
  • @SamsonScharfrichter It is the Apache Hive driver indeed. I will attempt to replicate my environment on the remote cluster so as to ditch JDBC. Thanks. – jharb May 05 '17 at 12:05
  • You don't need to run Spark _on_ the remote cluster to use native Hive integration -- it just means that Spark connects to the Metastore service (to get the HDFS location of data files for a given table) then connects to HDFS. Copy the relevant config files from a cluster node (from `/etc/hadoop/conf/` and `/etc/hive/conf/`) to a local directory then set `HADOOP_CONF_DIR` in `spark-env.sh`. _(plus un peu de debugging sinon c'est pas drôle)_ – Samson Scharfrichter May 05 '17 at 21:47
1

Because Spark JdbcDialect use double quotation mark as quoteIdentifier and it does not provide a HiveDialect (unlike e.g MySQL).

Therefore, Spark will send such SQL to Hive through JDBC: select "some_column_name" from table, and "some_column_name" turns out to be a string scalar rather than a column name.

val jdbcDF = spark.read.jdbc(jdbcUrl, "metrics", Array(), connectionProperties) by this line of code, you're telling Spark to generate a JDBC DataFrame without any partition. So no actual data fetch SQL was sent to Hive, and Spark just gives you an empty DataFrame.

The only correct way is to implement a corresponding dialect: How to specify sql dialect when creating spark dataframe from JDBC?

lyman
  • 183
  • 1
  • 5
1

I enabled Hive Support when initializing the SparkSession, worked for me:

SparkSession spark = new SparkSession.Builder()
                .master("local")
                .appName("test")
                .enableHiveSupport()
                .getOrCreate();