I have written a Scala program for reading data from large tables in MS SQL Server and writing them to BigQuery. I have issues getting partitioning to work using the "partitionColumn" option in the JDBC driver (see my other post here: a link), and have therefore tried switching to using predicates instead. This throws an exception though, which I can't figure out.
This is the code:
val predicates = Array[String]("entityid < 20000"
, "entityid >= 20000 && entityid < 40000"
, "entityid >= 40000 && entityid < 60000"
, "entityid >= 60000 && entityid < 80000"
, "entityid >= 80000 && entityid < 100000"
, "entityid >= 100000 && entityid < 120000"
, "entityid >= 120000 && entityid < 140000"
, "entityid >= 140000 && entityid < 160000"
, "entityid >= 160000 && entityid < 180000"
, "entityid >= 180000")
val result = ss.read.jdbc(
getJdbcUrl(hostName, port, databaseName)
, tablename
, predicates
, getConnection(user, password))
This is how I run the job:
gcloud dataproc jobs submit spark \
--cluster my-cluster \
--region europe-north1 \
--jars gs://mybucket/mycode.jar,gs://hadoop-lib/bigquery/bigquery-connector-hadoop3-latest.jar \
--class Main \
--properties \
spark.executor.memory=19g,\
spark.executor.cores=4,\
spark.executor.instances=11 \
-- yarn
And this is the exception I get (a lot of):
19/07/31 07:49:18 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 5.0 in stage 7.0 (TID 12, odsengine-cluster-w-0.europe-north1-b.c.velliv-dwh-development.internal, executor 4): UnknownReason
19/07/31 07:49:18 WARN org.apache.spark.ThrowableSerializationWrapper: Task exception could not be deserialized
java.io.InvalidClassException: com.microsoft.sqlserver.jdbc.SQLServerException; local class incompatible: stream classdesc serialVersionUID = 6734015607512574479, local class serialVersionUID = -2195310557661496761
at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at org.apache.spark.ThrowableSerializationWrapper.readObject(TaskEndReason.scala:193)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2178)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
at org.apache.spark.scheduler.TaskResultGetter$$anon$4$$anonfun$run$2.apply$mcV$sp(TaskResultGetter.scala:136)
at org.apache.spark.scheduler.TaskResultGetter$$anon$4$$anonfun$run$2.apply(TaskResultGetter.scala:132)
at org.apache.spark.scheduler.TaskResultGetter$$anon$4$$anonfun$run$2.apply(TaskResultGetter.scala:132)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
at org.apache.spark.scheduler.TaskResultGetter$$anon$4.run(TaskResultGetter.scala:132)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Any suggestions as to what the problem could be?