1

I have written a Scala program for reading data from large tables in MS SQL Server and writing them to BigQuery. I have issues getting partitioning to work using the "partitionColumn" option in the JDBC driver (see my other post here: a link), and have therefore tried switching to using predicates instead. This throws an exception though, which I can't figure out.

This is the code:

    val predicates = Array[String]("entityid < 20000"
      , "entityid >= 20000 && entityid < 40000"
    , "entityid >= 40000 && entityid < 60000"
    , "entityid >= 60000 && entityid < 80000"
    , "entityid >= 80000 && entityid < 100000"
    , "entityid >= 100000 && entityid < 120000"
    , "entityid >= 120000 && entityid < 140000"
    , "entityid >= 140000 && entityid < 160000"
    , "entityid >= 160000 && entityid < 180000"
    , "entityid >= 180000")

    val result = ss.read.jdbc(
   getJdbcUrl(hostName, port, databaseName)
   , tablename
   , predicates
   , getConnection(user, password))

This is how I run the job:

gcloud dataproc jobs submit spark \
--cluster my-cluster \
--region europe-north1 \
--jars gs://mybucket/mycode.jar,gs://hadoop-lib/bigquery/bigquery-connector-hadoop3-latest.jar \
--class Main \
--properties \
spark.executor.memory=19g,\
spark.executor.cores=4,\
spark.executor.instances=11 \
-- yarn

And this is the exception I get (a lot of):

19/07/31 07:49:18 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 5.0 in stage 7.0 (TID 12, odsengine-cluster-w-0.europe-north1-b.c.velliv-dwh-development.internal, executor 4): UnknownReason
19/07/31 07:49:18 WARN org.apache.spark.ThrowableSerializationWrapper: Task exception could not be deserialized
java.io.InvalidClassException: com.microsoft.sqlserver.jdbc.SQLServerException; local class incompatible: stream classdesc serialVersionUID = 6734015607512574479, local class serialVersionUID = -2195310557661496761
        at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)
        at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)
        at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
        at org.apache.spark.ThrowableSerializationWrapper.readObject(TaskEndReason.scala:193)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2178)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
        at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
        at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
        at org.apache.spark.scheduler.TaskResultGetter$$anon$4$$anonfun$run$2.apply$mcV$sp(TaskResultGetter.scala:136)
        at org.apache.spark.scheduler.TaskResultGetter$$anon$4$$anonfun$run$2.apply(TaskResultGetter.scala:132)
        at org.apache.spark.scheduler.TaskResultGetter$$anon$4$$anonfun$run$2.apply(TaskResultGetter.scala:132)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
        at org.apache.spark.scheduler.TaskResultGetter$$anon$4.run(TaskResultGetter.scala:132)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

Any suggestions as to what the problem could be?

Bjoern
  • 433
  • 3
  • 16

1 Answers1

2

This could be a shading problem. Are you marking bigquery-connector jar as provided while building mycode.jar? You can refer to this for some examples.

  • Good suggestion. However, the code works fine if I just don't use the predicates parameter, so I don't think it's a shading problem. I got partitioning working using the partitionColumn, so I've left the predicates route. – Bjoern Aug 08 '19 at 06:10