1

I am currently trying to run an example script from the book "TensorFlow Machine Learning Projects" on Packtpub. I am getting the below error...

Py4JJavaError: An error occurred while calling o99.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 5, spark-deeplearning-w-3.us-central1-a.c.deeplearnig-spark.internal, executor 2): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 345, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream
    for obj in iterator:
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 334, in _batched
    for item in iterator:
  File "<string>", line 1, in <lambda>
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 83, in <lambda>
    return lambda *a: toInternal(f(*a))
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "/opt/conda/anaconda/lib/python3.6/site-packages/sparkdl/image/imageIO.py", line 158, in resizeImageAsRow
  File "/opt/conda/lib/python3.6/site-packages/sparkdl/image/imageIO.py", line 121, in imageStructToArray
    imType = imageType(imageRow)
  File "/opt/conda/lib/python3.6/site-packages/sparkdl/image/imageIO.py", line 111, in imageType
    return sparkModeLookup[imageRow.mode]
KeyError: 16

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:81)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:64)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1890)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1877)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2111)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2060)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2049)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:365)
    at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
    at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3257)
    at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3254)
    at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
    at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3254)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 345, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream
    for obj in iterator:
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 334, in _batched
    for item in iterator:
  File "<string>", line 1, in <lambda>
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 83, in <lambda>
    return lambda *a: toInternal(f(*a))
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "/opt/conda/anaconda/lib/python3.6/site-packages/sparkdl/image/imageIO.py", line 158, in resizeImageAsRow
  File "/opt/conda/lib/python3.6/site-packages/sparkdl/image/imageIO.py", line 121, in imageStructToArray
    imType = imageType(imageRow)
  File "/opt/conda/lib/python3.6/site-packages/sparkdl/image/imageIO.py", line 111, in imageType
    return sparkModeLookup[imageRow.mode]
KeyError: 16

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:81)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:64)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more

this occurs when I run this script on my gcp cluster...

from pyspark.sql import SparkSession
import pyspark.sql.functions as f
import sparkdl as dl
from pyspark.ml.image import ImageSchema
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

spark = SparkSession.builder \
      .appName("ImageClassification") \
      .config("spark.executor.memory", "70g") \
      .config("spark.driver.memory", "50g") \
      .config("spark.memory.offHeap.enabled",True) \
      .config("spark.memory.offHeap.size","16g") \
      .getOrCreate()


dfbuses = ImageSchema.readImages('gs://car-buses/data/buses/').withColumn('label', f.lit(0))
dfcars = ImageSchema.readImages('gs://car-buses/data/cars/').withColumn('label', f.lit(1))

trainDFbuses, testDFbuses = dfbuses.randomSplit([0.60,0.40], seed = 123)
trainDFcars, testDFcars = dfcars.randomSplit([0.60,0.40], seed = 122)

trainDF = trainDFbuses.unionAll(trainDFcars)
testDF = testDFbuses.unionAll(testDFcars)

vectorizer = dl.DeepImageFeaturizer(inputCol="image", outputCol="features", modelName="InceptionV3")

logreg = LogisticRegression(maxIter=30, labelCol="label")
pipeline = Pipeline(stages=[vectorizer, logreg])
pipeline_model = pipeline.fit(trainDF)

The last line I included in the script above is where the error happens. The data being trained on is a spark data frame of the form with the labels column being a binary classification with 0 (buses) and 1 (cars)...

+--------------------+-----+
|               image|label|
+--------------------+-----+
|[gs://car-buses/d...|    0|
|[gs://car-buses/d...|    0|
|[gs://car-buses/d...|    0|
|[gs://car-buses/d...|    0|
|[gs://car-buses/d...|    0|
+--------------------+-----+

and the images column is a row of the form...

Row(image=Row(origin='gs://car-buses/data/buses/images.jpeg', height=84, width=126, nChannels=3, mode=16, data=bytearray(b'\xd6\xde\xde\xd6\...

The GCP cluster I am running it has 5 nodes with 1 master and 4 slaves. Below is the gcloud command I run in the GCP CLI in order to create the environment...

gcloud beta dataproc clusters create spark-deeplearning --image-version 1.4 --zone us-central1-a --master-machine-type n1-standard-4 --master-boot-disk-size 500 --worker-machine-type n1-standard-4 --num-workers 4 --worker-boot-disk-size 500 --metadata=MINICONDA_VERSION=4.3.30 --optional-components=ANACONDA,JUPYTER --enable-component-gateway --initialization-actions gs://initializations/creata_sparkdl_cluster.sh

The initialization file I included in the script included in the gcloud command above is a shell script that downloads the necessary conda (I can upload this to the comments if necessary) and pip packages needed to work run sparkdl and the "--metadata=MINICONDA_VERSION=4.3.30" keeps the version of python consistent on all the nodes.

I have searched aimlessly for an error close to mine, but the only one I found is from this stack thread which refers to the "overhead limit" being exceeded. The error I am using is a bit different from that and only mentions a stage failure.

I suspect the error may be in the versions I am using for libraries in the cluster, but I am truly unsure. I also tried this example on an Ubuntu 16 VM holding similar dependencies and the same error occurs.

The goal I am trying to achieve is to fit a object detection model using sparkdl and tensorflowOnSpark on the inception v3 model.

Dylan Edmonds
  • 47
  • 1
  • 6
  • Can you share the python stack traceback? – Amit Singh Jul 15 '19 at 15:03
  • @AmitSingh I edited question to include the full error – Dylan Edmonds Jul 15 '19 at 15:08
  • Also can you add a data sample from both the dfs. – Amit Singh Jul 15 '19 at 15:44
  • @AmitSingh yes I have added it under the training script – Dylan Edmonds Jul 15 '19 at 16:05
  • What version of sparkdl are you using? Because it seems that ordinal 16 is not supported by your version of ```sparkdl```. However, the latest sparkdl does support it. You can verify at https://github.com/databricks/spark-deep-learning/blob/master/python/sparkdl/image/imageIO.py on line 43. – Amit Singh Jul 15 '19 at 17:40
  • Update your library and it should work – Amit Singh Jul 15 '19 at 17:42
  • @AmitSingh I have the latest version of pip but I do not believe sparkdl has been updated on pip. It seems the last version was 2.2 – Dylan Edmonds Jul 15 '19 at 23:54
  • @AmitSingh 0.2.2 – Dylan Edmonds Jul 16 '19 at 00:41
  • I don't know how you have set it up, but since you are using miniconda, you usually install packages through conda, right? So if you could check the version of sparkdl there, it will make things clear. – Amit Singh Jul 16 '19 at 04:18
  • @AmitSingh thank you for your help, but what I found out over the past couple hours is that pip only has 0.2.2 as the latest version and conda has no support for sparkdl. I will now be using BigDL instead as this has more documentation and support from pip. I can use sparkdl if I specify it as an included package when I enter the pyspark CLI. – Dylan Edmonds Jul 16 '19 at 04:25

0 Answers0